Is there any way to specify the amount of resource...
# general
k
Is there any way to specify the amount of resources I should reserve for the join like we have for UDFs?
j
Only UDFs support resource requests — how are you intending to provide requests to joins?
k
I was thinking of maybe the planning config, execution config or the join function itself. I just thought maybe it could help with some of the spilling happening now in the joins because requesting for a bigger resource seems to have helped with the UDFs to handle my data which has high memory size variance across rows.
j
Yeah we don’t really have resources available for joins… It’s especially tricky because there are multiple different kinds of joins algorithms, each of which can be optimized differently and have very different memory characteristics. If you are struggling with data skews, you might be able to try a sort_merge join!
.join(strategy="sort_merge")
k
Okay!! Let me try that! Thanks!
Oh it only works for inner joins, how about for antijoins? 😅 I have a list of IDs of the duplicated rows which i want to drop from my dataframe.
j
Hmm cc @Kevin Wang who’s been working on antijoins
k
We have two strategies for anti joins: hash and broadcast. If your right side is small enough we automatically use a broadcast join, but the limit is pretty low so if your right side table can fit into an entire machine, feel free to try
strategy="broadcast"
and see if that helps!
k
I've tried the broadcast strategy but it consistently spills over so I think it is replicating the data too much. It's a fairly manageable table on the right side for my machine (16gb for a 200gb machine) but I still get tons of spills and OOM. I'm guessing that the single core ray resource requests means that the memory is being split over 20 cores and that leaves less than 10gb per core. After trying out the request for resources on the UDFs, I realised that it reduced this problem a lot with huge speedups for me on the runs with UDFs. Not sure if something similar would work for the joins (or maybe at least the joins which don't require shuffles) 🤔
j
I’m guessing that the single core ray resource requests means that the memory is being split over 20 cores and that leaves less than 10gb per core.
I see. And I’m guessing if you’re running the broadcast antijoin it’s likely running 20 broadcast joins in parallel, each taking up >16GB of memory.
We do have some heuristics around how much memory to request Ray for a broadcast join, but they could be “too naive” for this case.