I found an issue when reading many parquet files f...
# daft-dev
c
I found an issue when reading many parquet files from AWS. The logical to physical plan translator goes incredibly slow when running this code:
Copy code
import daft
from daft import col
from <http://daft.io|daft.io> import IOConfig

io_config = IOConfig(
    s3=daft.io.S3Config.from_env()
)
dedup = daft.read_parquet("<s3://eventual-dev-benchmarking-fixtures/redpajama-parquet/dedupe1>", io_config=io_config)

cc_sizes = dedup.groupby("original_url", "original_date_download").agg(col("url").count().alias("component_size"))
cc_sizes.sort(by="component_size", desc=True).explain(True)
It seems to be doing a lot of sequential requests to AWS. (More info in thread)
@Cory Grinstead Running a
git bisect
for this issue, it gets traced back to this commit: https://github.com/Eventual-Inc/Daft/commit/c5f2d4aac4ce86effeb3f1efe6199ad26a91f8f3
The s3 bucket that I’m reading from is a folder containing 195 parquet files, totaling only about 70MB of data.
Before this commit, the physical plan translation ran basically instantly.
Running with the Ray runner fixes this, which basically confirms that this is the problematic code
c
won't this revert the perf benefits for local reading though?
c
That’s why I want a better solution. @Sammy Sidhu had some ideas
s
currently on main, we're fetching the parquet metadata for each file sequentially. In the case for conor's query that's like a few hundred small parquet files which ends up taking like a minute during the translation phase (since we exhaust the glob scan operator iterator). Since it's a bunch of small files, theres no benefit to fetch the parquet metadata upfront since it won't be split across scan tasks In the PR that conor has, the idea is to cache the parquet metadata if we are in the process of spitting a ScanTask. The rational is that: 1. We already have the metadata, so might as well cache it 2. If it's a large file, we can cache it across multiple ScanTasks 3. We only perform this step if the total number of files is less than some threshold, bounding the time in planning. Beyond that, we get plenty of parallelism anyways
c
I changed the way that the metadata is stored. Now if we split the file across scan tasks, each scan task only stores the metadata relevant to it. This makes reading in the adversarial parquet file really fast--suspiciously fast, even. It went from taking ~13s to read and
.collect
to only 1.9s. Hopefully this doesn’t mean I broke anything!
Nope, I definitely broke something.
🤣 1
Seems like we depend on all the row groups being there in the metadata for indexing purposes.
Might require a bit of a refactor to fix this; @Sammy Sidhu can we discuss this later?
@Sammy Sidhu Benchmarking results!
🔥 5
Script used to benchmark:
Copy code
import daft
from <http://daft.io|daft.io> import IOConfig, S3Config
import time

io_config = IOConfig(s3=S3Config.from_env())

# FILE = "<s3://daft-public-data/testing_data/bad-polars-lineitem.parquet>"
FILE = "<s3://eventual-dev-benchmarking-fixtures/redpajama-parquet/dedupe1>"

total = 0
for i in range(10):
    start = time.perf_counter()
    df = daft.read_parquet(FILE, io_config=io_config).count("*")
    df.collect()
    
    dur = time.perf_counter() - start
    print("Time for iteration %i: %.3fs" % (i+1, dur))
    total += dur

print("Total time: %.3fs" % total)
print("Avg time: %.3fs" % (total / 10))
Just does a simple count + collect
3rd column was how I originally noticed the issue!
s
Amazing!!
Is the PR ready to review?
c
It is