I have a table of selected IDs I want to keep (whi...
# general
k
I have a table of selected IDs I want to keep (which is the df on the right) from a table with the IDs as well as a bunch of other columns (which is the df on the left). I've been trying to do a semi-join but the join is bloating my object store (and spilling) beyond belief (to like 5x the overall data size) until it eventually either slows down with low memory and cpu usage or simply ends in error. Sometimes I do get a few chunks which succeed in getting written, but most of the join never gets done. Any possible data or join problem which would cause such a scenario?
j
Could you provide more details?
df.explain(True)
would give us more details to go off of! My guess is that there is some repartitioning happening here, but it’s hard to tell without looking at the physical plan
k
== Physical Plan == * HashJoin: Type = Semi | On = col(contents_hash) |\ | * Aggregation: | | Group by = col(contents_hash) | | | * ReduceMerge | | | * FanoutByHash: 413 | | Partition by = col(contents_hash) | | | * Aggregation: | | Group by = col(contents_hash) | | | * TabularScan: | | Num Scan Tasks = 413 | | Estimated Scan Bytes = 15757540664 | | Clustering spec = { Num partitions = 413 } | | Schema: {contents_hash#Utf8} | | Scan Tasks: [ | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0055391d-defc-4263-bdfe-fa533f3466c6-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0067321b-5b48-4039-8466-fdda300792eb-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0077ef59-eb3e-4f55-b9a7-5b4d13b50c94-0.parquet}, ..., | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/00a1dc15-a00b-4fda-95dc-bad019438772-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/00edf424-0a1d-4f35-a930-ab5928344ffc-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0122c8c9-4172-4684-b7e6-10f93447c0f2-0.parquet}} | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0126d66d-03c8-41e7-8e3f-f230b5666e2e-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/012bce38-854f-4e06-b326-3370adedb15e-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0159927f-1270-49a0-9804-c7e0ad3d8a4a-0.parquet}, ..., | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0196cbe3-5208-4d3a-80e2-21fd84973957-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/019ba0bb-4e2d-467f-b99d-857ba029de5d-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/01a350a6-f063-4c36-9ba3-6cdd53fe745e-0.parquet}} | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/01a56832-d7e5-4b76-87f0-6c3ed6601c1a-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/01cd14bc-b22d-488b-93dd-ec9bc04e51f4-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/01df8e1b-27ee-4345-b9fc-e8bdcc447a90-0.parquet}, ..., | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/02087e4b-b73b-4619-8d2b-95d9675fa965-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0270cdd3-9fbb-42fa-983b-2f4dcf752455-0.parquet}, | | File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/0271cf92-7d8d-44c8-bd9a-da091550d807-0.parquet}} | | ... | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/fe527e40-a3a5-4ec8-b2b8-c16d27cd6d55-0.parquet}, File | | {file:///tmp/ray/proc/run_001/deduplicated_ids/ | | fe7b96bd-f737-42be-8b45-92d54f0ea81a-0.parquet}, File {file:///tmp/ray/ | | proc/run_001/deduplicated_ids/feb979bc-cb59-4b89- | | 8de0-dd227300fc62-0.parquet}, ..., File {file:///tmp/ray/llm_algo/full_run/ | | cp_scd_s1_s2fid/run_001/deduplicated_ids/fedd41e2-e35b-4b92-8453-78469b17b397- | | 0.parquet}, File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/fee20ee8-ced5-4a2f-aa05-27a76fa976b5-0.parquet}, File | | {file:///tmp/ray/proc/run_001/deduplicated_ids/ | | fee91892-9058-4663-aae6-18ccc00b9d87-0.parquet}} | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/fef0fb5a-4667-4248-a183-a8adf6f566b5-0.parquet}, File | | {file:///tmp/ray/proc/run_001/deduplicated_ids/ | | fef9ded4-9d86-4dca-9824-b7b0984383ff-0.parquet}, File {file:///tmp/ray/ | | proc/run_001/deduplicated_ids/ff1aed2d-cdc4-45c2- | | 9f02-af0eec22c246-0.parquet}, ..., File {file:///tmp/ray/llm_algo/full_run/ | | cp_scd_s1_s2fid/run_001/deduplicated_ids/ffad9786-b2ca-49ef-8d4c-8f355c412a07- | | 0.parquet}, File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/ffb4e756-dcc0-4c19-be15-6f9e9c558016-0.parquet}, File | | {file:///tmp/ray/proc/run_001/deduplicated_ids/ | | ffc3116c-886e-42ea-8221-a4f6ae500070-0.parquet}} | | {File {file:///tmp/ray/proc/run_001/ | | deduplicated_ids/ffe914cb-ff12-43b0-b2cb-0b2566ea4b3b-0.parquet}} | | ] | * ReduceMerge | * FanoutByHash: 413 | Partition by = col(contents_hash) | * Project: col(contents_hash), | col(col1.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fals | e)) AS col1, | col(col2.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa | lse)) AS col2, | col(col3.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa | lse)) AS col3, | col(col4.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false | )) AS col4, | col(col5.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa | lse)) AS col5, | col(col6.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false) | ) AS col6, | col(date.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls= | false)) AS date, | col(col7.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fal | se)) AS col7, | col(col8.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=f | alse)) AS col8, | col(col9.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=f | alse)) AS col9, | col(col10.local_any_value(ignore_nulls=false).local_any_value(ignor | e_nulls=false)) AS col10, | col(hash.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false) | ) AS hash | Clustering spec = { Num partitions = 200, By = col(contents_hash) } | * Aggregation: any_value(col(col1.local_any_value(ignore_nulls=false)) AS | col1.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(col4.local_any_value(ignore_nulls=false)) AS | col4.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false), | ignore_nulls=false), | any_value(col(date.local_any_value(ignore_nulls=false)) AS | date.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fals | e), ignore_nulls=false), | any_value(col(col7.local_any_value(ignore_nulls=false)) AS | col7.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false), | ignore_nulls=false), | any_value(col(col8.local_any_value(ignore_nulls=false)) AS | col8.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false | ), ignore_nulls=false), any_value(col(col6.local_any_value(ignore_nulls=false)) | AS col6.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false), | ignore_nulls=false), | any_value(col(col5.local_any_value(ignore_nulls=false)) AS | col5.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false) | , ignore_nulls=false), | any_value(col(col9.local_any_value(ignore_nulls=false)) AS | col9.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false | ), ignore_nulls=false), | any_value(col(col10.local_any_value(ignore_nulls=false)) AS | col10.local_any_value(ignore_nulls=false).local_any_value(ignore_nu | lls=false), ignore_nulls=false), | any_value(col(col3.local_any_value(ignore_nulls=false)) AS | col3.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false) | , ignore_nulls=false), any_value(col(hash.local_any_value(ignore_nulls=false)) | AS hash.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false), | ignore_nulls=false), | any_value(col(col2.local_any_value(ignore_nulls=false)) AS | col2.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false) | , ignore_nulls=false) | Group by = col(contents_hash) | * ReduceMerge | * FanoutByHash: 200 | Partition by = col(contents_hash) | * Aggregation: any_value(col(col7) AS | col7.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col5) AS col5.local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(col2) AS | col2.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col3) AS col3.local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(date) AS | date.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col9) AS col9.local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(col1) AS | col1.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col4) AS col4.local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(col10) AS | col10.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col8) AS col8.local_any_value(ignore_nulls=false), | ignore_nulls=false), any_value(col(hash) AS | hash.local_any_value(ignore_nulls=false), ignore_nulls=false), | any_value(col(col6) AS col6.local_any_value(ignore_nulls=false), | ignore_nulls=false) | Group by = col(contents_hash) | * TabularScan: | Num Scan Tasks = 9968 | Estimated Scan Bytes = 2621535856976 | Clustering spec = { Num partitions = 9968 } | Schema: {col1#Utf8, col2#Utf8, col3#Utf8, col4#Utf8, col5#Utf8, | col6#Utf8, date#Utf8, col7#Utf8, col8#UInt64, col9#UInt64, | col10#Float64, hash#UInt64, contents_hash#Utf8} | Scan Tasks: [ | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/000246e7-d1b8-49cf-85c3-8b12cead4d6c-0.parquet}} | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/0003b7fe-033d-42dc-b1ce-eede3ffb27ec-0.parquet}} | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/000e117e-7587-4d1a-ace7-8bebf1ebaeb4-0.parquet}} | ... | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/ffeb98ae-ec24-4f30-b8ec-22a7a0ce29bc-0.parquet}} | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/ffecf651-9485-442a-b65b-05131417e841-0.parquet}} | {File {file:///tmp/ray/proc/run_011/ | initial_filtering/fff68eb6-539c-41f7-8756-397d67327511-0.parquet}} | ] It usually goes haywire at the ReduceMerge-Aggregate-Project-FanoutHash stage...
j
Super useful! Already spotted an issue that we can fix to give you a better speedup (looks like there’s an unneeded repartitioning of the left table here, we can actually do a merge with the repartitioning after your aggregation) I’m guessing the excessive spilling you’re seeing is due to these repartitions.
k
Oh great! For all of the joins the repartitioning will take place? How about for a broadcast join?
j
Broadcast join doesn’t repartition. This plan you presented does a HashJoin which involves: 1. Repartition left side 2. Repartition right side 3. Perform the join But the fun thing about your plan is that step (1) actually can be “pushed” to the Aggregation step… That aggregation step also did a repartition by
col(contents_hash)
but to 200 partitions instead of 413. Daft should instead just make the aggregation step repartition to 413 partitions directly, and this will let us skip the second repartitioninig step 😎
k
Oh nice! I tried doing a broadcast join as well but it also failed.. maybe let me see if I can get the physical plan as well. How much memory (like 1x of the right table in each worker?) would I need for a broadcast join to work? Would it go into the object store memory?
j
I think I see a bug in our optimizer that is supposed to do this optimization. Let me write a little fix. ---- Your RHS table looks fairly large (15GB compressed), so I’m guessing a broadcast join might OOM. Would be great to take a look at the plan too though cc @Kevin Wang for thoughts here It should go into object store memory, but might also result in quite a bit of spilling.
k
No matter which join I pick the object store goes crazy actually, even for the hash join. It actually goes slightly less crazy for the broadcast join.
j
Yup. Spilling is --completely normal and expected-- actually. Here’s a guide we wrote on memory: https://www.getdaft.io/projects/docs/en/stable/user_guide/poweruser/memory.html
It actually goes slightly less crazy for the broadcast join.
This is expected as well — the broadcast join in this case would avoid repartitions on both sides. Repartitions are usually very expensive to run! Broadcast joins are just dangerous if the machines can’t handle the broadcasted data (OOM).