Hey everyone - I'm looking to create a shuffle buf...
# general
b
Hey everyone - I'm looking to create a shuffle buffer, any way to do this in daft? For example, I want to read 10_000 rows from parquet dataset of 1_000_000 rows, shuffle just the 10_000 rows, and than split into futher partitions, for example of 1_000 rows, to complete the rest of the "pre-processing". Thnx!
đź‘‹ 1
j
Hey! You can try: 1. Run a
.limit(10_000)
2. then do a
.repartition(10)
which should randomly shuffle your data into partitions of roughly 1000 rows each
b
Thnx! This wont allow me to use
into_partition
right?
j
.into_partition
will try to avoid shuffling your data, and performs “in-place” splitting and coalescing instead!
s
After using
.repartition
the resulting df is sometimes far from being randomly shuffled
Copy code
df_a = daft.from_pydict({"char": ["a"]*10})
df_b = daft.from_pydict({"char": ["b"]*10})

df = df_a.concat(df_b)
df.repartition(2).show()
Can you shed some light on how
.repartition
is implemented?
j
It does perform a “random” shuffle of partitions (see screenshots) but not rows. I think the part that isn’t matching your expectations is that there isn’t a “final” shuffling within-each-resultant-partition. Each resultant partition still has data grouped by the origin partition, and ordered the same way that it was in the original partition.
s
Thanks!
j
Is your intention perhaps instead for a true global shuffle? We could perhaps expose functionality for a per-partition random shuffle. That could allow you to first do a random shuffle between partitions, and then do a per-partition shuffle
s
Yes, that's actually what I do now (iterating over the partitions and shuffling the rows within each partition). If I'd be able to do that with native daft operations that would be great 🙂
j
Are you using a UDF?
Feel free to make an issue? Probably like a local per-partition sort issue
s
Not using a UDF, for global shuffle I roughly tried the following approach (/adding a random column of integers and sorting the dataframe by that column):
Copy code
class MyDataset(torch.utils.data.IterableDataset):
    df: daft.DataFrame
    ...

    def __iter__(self):
        df = self.df.repartition(None)
        for partition in df.iter_partitions():
            samples = partition.to_pylist()
            np.random.shuffle(samples)
            yield from samples
and sure 🙂 just opened an issue here: https://github.com/Eventual-Inc/Daft/issues/2612 thanks
🙌 1