Hello everyone, I am running Daft/Ray on AWS with ...
# general
i
Hello everyone, I am running Daft/Ray on AWS with two r7g.16xlarge workers. I have encountered a "reduce your request rate" S3 error while executing the following command:
Copy code
python
df.write_parquet(ROOT_PATH, io_config=io_config, partition_cols=[daft.col(SOME_COL)])
In this case,
SOME_COL
has 256 distinct values. The error message is as follows:
Copy code
When completing multiple part upload for AWS
Error SLOW_DOWN during CompleteMultipartUpload operation:
Please reduce your request rate. /arrow/cpp/src/arrow/io/interfaces.cc:288: Error ignored when destroying file of type N5arrow2fs12_GLOBAL__N_118 ObjectOutputStream: IO Error: When uploading part for key XXXXX
The likely cause is that each worker is writing multiple small Parquet files under the same HIVE partition (with the same
SOME_COL
value). It seems that after many retries, the write operation eventually succeeds. Increasing the
retry_initial_backoff_ms
might help, but could you suggest a more effective solution? For example, is there a way to configure Daft workers to avoid spilling small Parquet files under the same partitions and instead keep them larger? As a comparison, it seems that DuckDB exports partitioned Parquets in this manner. Thank you in advance!
j
Hey @Igor Perchersky! Yeah we currently leverage Pyarrow’s writer for writing files and it seems that some folks have been encountering issues with the way they interact with S3. @Sammy Sidhu @Colin Ho for thoughts?
DuckDB has a much easier time with this because they are single-node only and can control how many writers are writing to S3 in parallel. We are currently finishing up our streaming engine and this will likely be our behavior as well when running on single-node. However in a distributed world, you can think of us as running “many independent duckdbs”. This leads to much more fragmentation of the data.
i
Hi @jay, in this specific case it would be jolly good if each Daft worker will act as "independent duckdb", making its best to reduce the fragmentation of its own parquet files under any given partition...
@jay - in the case described above, could it be helpful to "force"
pyarrow.write_dataset
to avoid excessive fragmentation by appropriate settings (parquet_target_filesize, parquet_inflation_factor, parquet_target_row_group_size, ...) in
daft.set_execution_config
?
j
Yes so we already do default to a good
parquet_target_filesize
(defaults to 512MB) The issue is that in distributed execution, each “independent worker” (think of it as an independent duckdb) received one partition at a time, and performs a write on that partition. That partition is likely not large enough to saturate the
parquet_target_filesize
. (e.g. if the Daft partition is 1GB in size, and you’re writing into 10 logical partitions on S3, then you’ll get files of roughly 100MB in size each) There is a solution we have in the works here which is to perform a streaming write — essentially we would want to not just write per-Daft-partition, but to perform a write across Daft partitions. For now, a potential workaround is to use
.into_partitions(N)
to reduce the number of partitions (hence increasing the number of rows available in each partition to be piped into compacted files). You can try that and see if it fixes some of the issues you’re facing!
Just chatted with the team — similar to Spark, the best way of doing this now actually is using
df.repartition(N, *my_partition_columns)
! What this would do under the hood: 1. Daft will hash your partition keys 2. Daft will move all data with the same partition keys to the same location 3. Now when you perform a write, we can write bigger files because they’re all colocated and will go through the same writer LMK if this helps!
👀 1
i
Thank you @jay It seems that repartition is an overkill I have found that in my specific case smth. like
Copy code
df. with_column("id_bucket",df["id"].partitioning.iceberg_bucket(N)).write_parquet(..., partition_cols=[daft.col("id_bucket")])
is good enough
j
Great to hear!