<@U042126MG49> thanks! I just acked on Github. You...
# general
d
@jay thanks! I just acked on Github. Your description of Daft’s procedure for pivoting on partitioned dataframes was exactly what I was hoping for. To expand further on my use case, for my narrow purposes, Daft's procedures (specifically steps 2 and 3 from your GH post) are overkill. No harm, but I don't believe my use case benefits significantly from the shuffling and regrouping. If I find I'm performance limited, in place of my last transformation step (pivoting), I may instead convert the Dataframe to a Ray Dataset (which is the format it will end up in anyway), partition by truncated timestamp, and then use Dataset.map_batches(batch.pivot()) , which will give me a local groupby, pivot, and aggregate. Today I stressed the system a bit more, and everything was fine up until I tried filling gaps in my timestamps using (admittedly halfbaked) fill procedures. Daft did not handle the filling gracefully, and I got lots of dying Ray workers and OOM errors. The procedure I used involved creating a new dataframe consisting of sender_id, earliest message timestamp, and latest message timestamp, generating Python lists of all possible truncated timestamps between earliest and latest, and then joining the new dataframe to the primary dataframe containing signals. From here my intention was to explode the complete lists of truncated timestamps, but my join wouldn’t complete with the resources available to me. (The stress dataset is 2 mos long, 2000 senders, 10 signals, and timestamps truncated to 1 second. Resources limited to ~20 CPUs / 80 GB.) On paper the dataset should have fit into memory easily, and it did fit up until I tried that join and explode. I could further process the lists of timestamps to include just those that are missing to minimize the memory required for the join, but this is not a great tradeoff. So … perhaps your team would be interested in adding some optimized timeseries ffill()/bfill()/mean_fill()/smoothing() methods?
j
No harm, but I don’t believe my use case benefits significantly from the shuffling and regrouping.
Is this because your dataframe is already partitioned by the timestamp?
Do daft UDFs support sliding windows?
This is tricky because Daft is distributed and has partitions. It would be easy to implement per-partition sliding windows (in fact, today when you run a UDF you get access to the entire partition of data so you can access it however you want), but doing this in a global way is going to be quite tricky. We will probably need to “pad” each partition with the data from the partitions that comes before and after it (with
WINDOW_SIZE/2
rows) and then run the operations I think.
So … perhaps your team would be interested in adding some optimized timeseries ffill()/bfill()/mean_fill()/smoothing() methods?
Could you elaborate or provide examples here of what that looks like? I think per-partition operations would be quite simple to add, but anything that might span across partitions will also be tricky.