Hello everyone, I've encountered a strange issue w...
# daft-dev
s
Hello everyone, I've encountered a strange issue when iterating over Daft dataframes with versions 0.2.32 and later. When profiling the process, I've noticed that the
ThreadPoolExecutor
only dispatches a single worker and waits for it to complete before dispatching another (essentially achieving a concurrency of 1). This problem does not occur with Daft version 0.2.31. A significant amount of time seems to be spent in this section of the code: link to code. Does anyone have any ideas why this might be happening? Here is a snippet that reproduces the issue:
Copy code
import os
import tempfile

import daft
import numpy as np
import pyarrow.parquet as pq


def write_dummy_dataset(base_path: str) -> None:
    n_rows = 1000
    tensor_shape = (500, 500)
    groups = list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

    df = daft.from_pydict(
        {
            "features": np.random.random((n_rows, *tensor_shape)),
            "group": np.random.choice(groups, n_rows),
        }
    )

    table = df.to_arrow()
    pq.write_to_dataset(table, base_path, partition_cols=["group"])


if __name__ == "__main__":
    with tempfile.TemporaryDirectory() as tempdir:
        write_dummy_dataset(tempdir)

        print("Reading data from parquet files")
        path_pattern = os.path.join(tempdir, "*/*.parquet")
        df = daft.read_parquet(path_pattern)
        for row in df:
            pass
j
Hmm that code is there to try and make sure that some work is being completed (across dataframes) before we keep dispatching Let me take a closer look today!
Off the top of my head, the other change that was made was that we added a buffer to your dataframes (by default, each dataframe now keeps only one partition buffered). In the past, every dataframe would schedule as many partitions as it can, potentially starving other dataframes’ executions. This was added to avoid cases where you might have “too many” tasks inflight when you run something like 80 dataframes in parallel. Eg dataframe 1 might be running 8 tasks in parallel on a 8CPU machine, starving all the other dataframes. By default,
iter
on dataframe will set a buffer size of 1, which is likely what you’re seeing here. You can increase this arbitrarily with
df.iter_partitions(results_buffer_size=NUM_CPUS)
to increase that concurrency for a given dataframe iterator. We can also expose a configuration flag to make this configurable for the default
__iter__
But let me take a closer look at your workload to confirm that this is the issue you’re facing
s
Solved by adjusting the results buffer size, thanks!
j
Ok awesome, thanks for confirming! Is your primary API just
for row in df:
? I feel like we should either make a iter_row API, or make this behavior configurable for the dataframe
__iter__
Any thoughts?
s
It depends. When I want to shuffle each partition after calling
.repartition
, I use
iter_partitions
. In our benchmarks, we simply used
_iter_
. I think a configurable
iter_rows
is great, and it conforms with most other similar APIs 🙂
👍 1
j
Ok great, I think my proposed solution is: 1. Change the defaults on our iterators to be
None
so the single-iterator case (which is likely to be the most common like you’ve done here) is optimal and will use all resources available 2. Expose an iter_rows API with a configurable
buffer_size
, so that you can configure your multi-iterator case and tune your workload appropriately. The more iterators you run, the lower the buffer size you’ll want to reduce memory usage and to avoid starvation. The new streaming execution engine we’re working on should help alleviate much more of this problem! Once we have that, we can build a custom “weighted sampling concat” operation that will concat all your dataframes, and we won’t need to specify any buffer sizes or anything since the entire system will be just 1 concatted iterator. Cc @Desmond Cheong too who actually pointed out that hardcoding
result_buffer_size=1
in our iter was a bad idea in the PR review 😭
❤️ 1
🙌 1