== Physical Plan ==
* HashJoin: Type = Semi
| On = col(contents_hash)
|\
| * Aggregation:
| | Group by = col(contents_hash)
| |
| * ReduceMerge
| |
| * FanoutByHash: 413
| | Partition by = col(contents_hash)
| |
| * Aggregation:
| | Group by = col(contents_hash)
| |
| * TabularScan:
| | Num Scan Tasks = 413
| | Estimated Scan Bytes = 15757540664
| | Clustering spec = { Num partitions = 413 }
| | Schema: {contents_hash#Utf8}
| | Scan Tasks: [
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0055391d-defc-4263-bdfe-fa533f3466c6-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0067321b-5b48-4039-8466-fdda300792eb-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0077ef59-eb3e-4f55-b9a7-5b4d13b50c94-0.parquet}, ...,
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/00a1dc15-a00b-4fda-95dc-bad019438772-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/00edf424-0a1d-4f35-a930-ab5928344ffc-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0122c8c9-4172-4684-b7e6-10f93447c0f2-0.parquet}}
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0126d66d-03c8-41e7-8e3f-f230b5666e2e-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/012bce38-854f-4e06-b326-3370adedb15e-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0159927f-1270-49a0-9804-c7e0ad3d8a4a-0.parquet}, ...,
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0196cbe3-5208-4d3a-80e2-21fd84973957-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/019ba0bb-4e2d-467f-b99d-857ba029de5d-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/01a350a6-f063-4c36-9ba3-6cdd53fe745e-0.parquet}}
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/01a56832-d7e5-4b76-87f0-6c3ed6601c1a-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/01cd14bc-b22d-488b-93dd-ec9bc04e51f4-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/01df8e1b-27ee-4345-b9fc-e8bdcc447a90-0.parquet}, ...,
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/02087e4b-b73b-4619-8d2b-95d9675fa965-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0270cdd3-9fbb-42fa-983b-2f4dcf752455-0.parquet},
| | File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/0271cf92-7d8d-44c8-bd9a-da091550d807-0.parquet}}
| | ...
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/fe527e40-a3a5-4ec8-b2b8-c16d27cd6d55-0.parquet}, File
| | {file:///tmp/ray/proc/run_001/deduplicated_ids/
| | fe7b96bd-f737-42be-8b45-92d54f0ea81a-0.parquet}, File {file:///tmp/ray/
| | proc/run_001/deduplicated_ids/feb979bc-cb59-4b89-
| | 8de0-dd227300fc62-0.parquet}, ..., File {file:///tmp/ray/llm_algo/full_run/
| | cp_scd_s1_s2fid/run_001/deduplicated_ids/fedd41e2-e35b-4b92-8453-78469b17b397-
| | 0.parquet}, File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/fee20ee8-ced5-4a2f-aa05-27a76fa976b5-0.parquet}, File
| | {file:///tmp/ray/proc/run_001/deduplicated_ids/
| | fee91892-9058-4663-aae6-18ccc00b9d87-0.parquet}}
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/fef0fb5a-4667-4248-a183-a8adf6f566b5-0.parquet}, File
| | {file:///tmp/ray/proc/run_001/deduplicated_ids/
| | fef9ded4-9d86-4dca-9824-b7b0984383ff-0.parquet}, File {file:///tmp/ray/
| | proc/run_001/deduplicated_ids/ff1aed2d-cdc4-45c2-
| | 9f02-af0eec22c246-0.parquet}, ..., File {file:///tmp/ray/llm_algo/full_run/
| | cp_scd_s1_s2fid/run_001/deduplicated_ids/ffad9786-b2ca-49ef-8d4c-8f355c412a07-
| | 0.parquet}, File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/ffb4e756-dcc0-4c19-be15-6f9e9c558016-0.parquet}, File
| | {file:///tmp/ray/proc/run_001/deduplicated_ids/
| | ffc3116c-886e-42ea-8221-a4f6ae500070-0.parquet}}
| | {File {file:///tmp/ray/proc/run_001/
| | deduplicated_ids/ffe914cb-ff12-43b0-b2cb-0b2566ea4b3b-0.parquet}}
| | ]
|
* ReduceMerge
|
* FanoutByHash: 413
| Partition by = col(contents_hash)
|
* Project: col(contents_hash),
| col(col1.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fals
| e)) AS col1,
| col(col2.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa
| lse)) AS col2,
| col(col3.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa
| lse)) AS col3,
| col(col4.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false
| )) AS col4,
| col(col5.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fa
| lse)) AS col5,
| col(col6.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false)
| ) AS col6,
| col(date.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=
| false)) AS date,
| col(col7.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fal
| se)) AS col7,
| col(col8.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=f
| alse)) AS col8,
| col(col9.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=f
| alse)) AS col9,
| col(col10.local_any_value(ignore_nulls=false).local_any_value(ignor
| e_nulls=false)) AS col10,
| col(hash.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false)
| ) AS hash
| Clustering spec = { Num partitions = 200, By = col(contents_hash) }
|
* Aggregation: any_value(col(col1.local_any_value(ignore_nulls=false)) AS
| col1.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(col4.local_any_value(ignore_nulls=false)) AS
| col4.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false),
| ignore_nulls=false),
| any_value(col(date.local_any_value(ignore_nulls=false)) AS
| date.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=fals
| e), ignore_nulls=false),
| any_value(col(col7.local_any_value(ignore_nulls=false)) AS
| col7.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false),
| ignore_nulls=false),
| any_value(col(col8.local_any_value(ignore_nulls=false)) AS
| col8.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false
| ), ignore_nulls=false), any_value(col(col6.local_any_value(ignore_nulls=false))
| AS col6.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false),
| ignore_nulls=false),
| any_value(col(col5.local_any_value(ignore_nulls=false)) AS
| col5.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false)
| , ignore_nulls=false),
| any_value(col(col9.local_any_value(ignore_nulls=false)) AS
| col9.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false
| ), ignore_nulls=false),
| any_value(col(col10.local_any_value(ignore_nulls=false)) AS
| col10.local_any_value(ignore_nulls=false).local_any_value(ignore_nu
| lls=false), ignore_nulls=false),
| any_value(col(col3.local_any_value(ignore_nulls=false)) AS
| col3.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false)
| , ignore_nulls=false), any_value(col(hash.local_any_value(ignore_nulls=false))
| AS hash.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false),
| ignore_nulls=false),
| any_value(col(col2.local_any_value(ignore_nulls=false)) AS
| col2.local_any_value(ignore_nulls=false).local_any_value(ignore_nulls=false)
| , ignore_nulls=false)
| Group by = col(contents_hash)
|
* ReduceMerge
|
* FanoutByHash: 200
| Partition by = col(contents_hash)
|
* Aggregation: any_value(col(col7) AS
| col7.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col5) AS col5.local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(col2) AS
| col2.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col3) AS col3.local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(date) AS
| date.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col9) AS col9.local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(col1) AS
| col1.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col4) AS col4.local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(col10) AS
| col10.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col8) AS col8.local_any_value(ignore_nulls=false),
| ignore_nulls=false), any_value(col(hash) AS
| hash.local_any_value(ignore_nulls=false), ignore_nulls=false),
| any_value(col(col6) AS col6.local_any_value(ignore_nulls=false),
| ignore_nulls=false)
| Group by = col(contents_hash)
|
* TabularScan:
| Num Scan Tasks = 9968
| Estimated Scan Bytes = 2621535856976
| Clustering spec = { Num partitions = 9968 }
| Schema: {col1#Utf8, col2#Utf8, col3#Utf8, col4#Utf8, col5#Utf8,
| col6#Utf8, date#Utf8, col7#Utf8, col8#UInt64, col9#UInt64,
| col10#Float64, hash#UInt64, contents_hash#Utf8}
| Scan Tasks: [
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/000246e7-d1b8-49cf-85c3-8b12cead4d6c-0.parquet}}
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/0003b7fe-033d-42dc-b1ce-eede3ffb27ec-0.parquet}}
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/000e117e-7587-4d1a-ace7-8bebf1ebaeb4-0.parquet}}
| ...
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/ffeb98ae-ec24-4f30-b8ec-22a7a0ce29bc-0.parquet}}
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/ffecf651-9485-442a-b65b-05131417e841-0.parquet}}
| {File {file:///tmp/ray/proc/run_011/
| initial_filtering/fff68eb6-539c-41f7-8756-397d67327511-0.parquet}}
| ]
It usually goes haywire at the ReduceMerge-Aggregate-Project-FanoutHash stage...