Sagi
08/08/2024, 1:52 PMThreadPoolExecutor
only dispatches a single worker and waits for it to complete before dispatching another (essentially achieving a concurrency of 1). This problem does not occur with Daft version 0.2.31. A significant amount of time seems to be spent in this section of the code: link to code.
Does anyone have any ideas why this might be happening?
Here is a snippet that reproduces the issue:
import os
import tempfile
import daft
import numpy as np
import pyarrow.parquet as pq
def write_dummy_dataset(base_path: str) -> None:
n_rows = 1000
tensor_shape = (500, 500)
groups = list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
df = daft.from_pydict(
{
"features": np.random.random((n_rows, *tensor_shape)),
"group": np.random.choice(groups, n_rows),
}
)
table = df.to_arrow()
pq.write_to_dataset(table, base_path, partition_cols=["group"])
if __name__ == "__main__":
with tempfile.TemporaryDirectory() as tempdir:
write_dummy_dataset(tempdir)
print("Reading data from parquet files")
path_pattern = os.path.join(tempdir, "*/*.parquet")
df = daft.read_parquet(path_pattern)
for row in df:
pass
jay
08/08/2024, 2:43 PMjay
08/08/2024, 2:52 PMiter
on dataframe will set a buffer size of 1, which is likely what you’re seeing here. You can increase this arbitrarily with df.iter_partitions(results_buffer_size=NUM_CPUS)
to increase that concurrency for a given dataframe iterator. We can also expose a configuration flag to make this configurable for the default __iter__
jay
08/08/2024, 2:52 PMSagi
08/08/2024, 3:04 PMjay
08/08/2024, 3:06 PMfor row in df:
?
I feel like we should either make a iter_row API, or make this behavior configurable for the dataframe __iter__
jay
08/08/2024, 3:06 PMSagi
08/08/2024, 3:21 PM.repartition
, I use iter_partitions
. In our benchmarks, we simply used _iter_
. I think a configurable iter_rows
is great, and it conforms with most other similar APIs 🙂jay
08/08/2024, 3:54 PMNone
so the single-iterator case (which is likely to be the most common like you’ve done here) is optimal and will use all resources available
2. Expose an iter_rows API with a configurable buffer_size
, so that you can configure your multi-iterator case and tune your workload appropriately. The more iterators you run, the lower the buffer size you’ll want to reduce memory usage and to avoid starvation.
The new streaming execution engine we’re working on should help alleviate much more of this problem! Once we have that, we can build a custom “weighted sampling concat” operation that will concat all your dataframes, and we won’t need to specify any buffer sizes or anything since the entire system will be just 1 concatted iterator.
Cc @Desmond Cheong too who actually pointed out that hardcoding result_buffer_size=1
in our iter was a bad idea in the PR review 😭jay
08/19/2024, 6:16 PM