Hi all, awesome library you're working on! I've be...
# daft-dev
m
Hi all, awesome library you're working on! I've been prototyping data filtering and dedupe pipelines over the last few days for curation of LLM pretraining data. What I noticed happening quite often is that I run into errors which I believe come from interrupted connections with our R2 bucket. Specifically, I'm observing the following: • In the first use case, I'm reading ~7k parquet files with a total size of 1.3TB (on disk), applying filters on different columns and a simple udf to compute new fields, after which I write back to R2 as parquet. I'm on a single node with 176 cores, and using the ray runner. During writing, I am getting the following error (which interrupts the workflow):
Copy code
OSError: When completing multiple part upload for key 'path/to/output_file.parquet' in bucket 'XXXX': AWS Error UNKNOWN (HTTP status 400) during CompleteMultipartUpload operation: Unable to parse ExceptionName: InvalidPart Message: One or more of the specified parts could not be found.
• In the second use case I run MinhashLSH on 10k parquet files (~1.4TB on disk). The pipeline consists of (1) computing minhash signatures, (2) groupby on the signatures, and (3) distributed connected components. I'm using a ray cluster on 5 nodes here, each with 200 cores. In this case I get warnings and subsequent errors get thrown during reading -- at first, I get something which seems like a warning:
Copy code
Encountered error while streaming bytes into parquet reader. This may show up as a Thrift Error Downstream: Cached error: Unable to read data from file
<s3://bucket/datasets/input_file.parquet>: error reading a body from connection: end of file before message length reached
and then, some time later the following gets thrown, again interrupting the workflow:
Copy code
daft.exceptions.DaftCoreException: DaftError::ArrowError External format error: File out of specification: underlying IO error: Cached error: Unable to read data from file <s3://bucket/datasets/input_file.parquet>: error reading a body from connection: end of file before message length reached
do you have any hints on how I can deal with such errors?
🔥 2
j
cc @Sammy Sidhu any thoughts? I was wondering if these were maybe R2-specific, and maybe we need to tune our configurations to be less aggressive for R2.
s
Hi @Maurice Weber, thanks for raising these points! For workload 1: • We actually use pyarrow's parquet writer to write out each partition, and it does look like it is coming from there. • I think we need to need some of the s3 concurrency parameters there to not overwhelm the r2 connections. • What version of pyarrow are you using here?
for the second use case: • are you reading data from R2 again or from S3? • You may want to try a cluster of many more nodes but smaller (maybe 16 nodes of 64 cores) to get a better IO bandwidth / compute ratio. • It may be that R2 is much more flakey than S3. And if that is the case, we can add more retry mechanisms around stream interruption. • We already have pretty good retry semantics for retries (but for connections)
You could try bumping the
num_tries
number from 5 to something like 15 or 20 in the
S3Config
to combat the heavy load. https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.S3Config.html#daft-io-s3config
m
Hi Sammy, thanks for your responses! I haven't come round yet to further tweak the number of retries and max connections -- I had these set to 10 retries and 32 connections in my previous runs, but I'll run it with less overwhelming values. Quick question re the max_connections param: does this refer to the overall number of connections to R2, or is this on a per node level? Regarding your questions:
For workload 1:
• We actually use pyarrow's parquet writer to write out each partition, and it does look like it is coming from there.
• I think we need to need some of the s3 concurrency parameters there to not overwhelm the r2 connections.
are you referring to parameters aside from the num_tries and max connections set in the S3Config?
• What version of pyarrow are you using here?
I'm on version 14.0.2
for the second use case:
• are you reading data from R2 again or from S3?
I'm reading from R2 (we don't use S3 at all)
• It may be that R2 is much more flakey than S3. And if that is the case, we can add more retry mechanisms around stream interruption.
How is stream interruption handled currently? Is there any way we can retry or drop the reading task if a stream got interrupted?