Igor Perchersky
09/11/2024, 5:32 PMpython
df.write_parquet(ROOT_PATH, io_config=io_config, partition_cols=[daft.col(SOME_COL)])
In this case, SOME_COL has 256 distinct values. The error message is as follows:
When completing multiple part upload for AWS
Error SLOW_DOWN during CompleteMultipartUpload operation:
Please reduce your request rate. /arrow/cpp/src/arrow/io/interfaces.cc:288: Error ignored when destroying file of type N5arrow2fs12_GLOBAL__N_118 ObjectOutputStream: IO Error: When uploading part for key XXXXX
The likely cause is that each worker is writing multiple small Parquet files under the same HIVE partition (with the same SOME_COL value). It seems that after many retries, the write operation eventually succeeds. Increasing the retry_initial_backoff_ms might help, but could you suggest a more effective solution? For example, is there a way to configure Daft workers to avoid spilling small Parquet files under the same partitions and instead keep them larger? As a comparison, it seems that DuckDB exports partitioned Parquets in this manner.
Thank you in advance!jay
09/11/2024, 5:42 PMjay
09/11/2024, 5:44 PMIgor Perchersky
09/11/2024, 6:01 PMIgor Perchersky
09/11/2024, 6:59 PMpyarrow.write_dataset to avoid excessive fragmentation by appropriate settings (parquet_target_filesize, parquet_inflation_factor, parquet_target_row_group_size, ...) in daft.set_execution_config?jay
09/11/2024, 7:04 PMparquet_target_filesize (defaults to 512MB)
The issue is that in distributed execution, each “independent worker” (think of it as an independent duckdb) received one partition at a time, and performs a write on that partition. That partition is likely not large enough to saturate the parquet_target_filesize. (e.g. if the Daft partition is 1GB in size, and you’re writing into 10 logical partitions on S3, then you’ll get files of roughly 100MB in size each)
There is a solution we have in the works here which is to perform a streaming write — essentially we would want to not just write per-Daft-partition, but to perform a write across Daft partitions.
For now, a potential workaround is to use .into_partitions(N) to reduce the number of partitions (hence increasing the number of rows available in each partition to be piped into compacted files). You can try that and see if it fixes some of the issues you’re facing!jay
09/12/2024, 1:55 AMdf.repartition(N, *my_partition_columns)!
What this would do under the hood:
1. Daft will hash your partition keys
2. Daft will move all data with the same partition keys to the same location
3. Now when you perform a write, we can write bigger files because they’re all colocated and will go through the same writer
LMK if this helps!Igor Perchersky
09/12/2024, 6:17 AMdf. with_column("id_bucket",df["id"].partitioning.iceberg_bucket(N)).write_parquet(..., partition_cols=[daft.col("id_bucket")])
is good enoughjay
09/12/2024, 2:53 PM