Is there a way to do local aggregations? I would l...
# general
k
Is there a way to do local aggregations? I would like to do a groupby and I know that each group should only contain rows from a single partition, so I don't want it to do a massive shuffle across partitions just to collect each group.
s
Hmm, We don't have an exposed way to the user to do this. But a workaround could be the following. Daft UDFs currently run on the entire partition, so what you could do is run a UDF on all the columns that you would need for your group_by + agg. in the udf, you could do the following: use the Table api (which is an internal api)
Copy code
# in UDF
table = Table.from_pydict({"a": a_series, "b": b_series})
result = table.agg(to_agg=[col("b").sum()], group_by=[col("a")])
k
Okay thanks!!
Can I construct a Table from a micropartition?
Oh every micropartition is already a Table?
Can the return type for the UDF be a table as well?
Right now, we don't support returning a Table from a UDF, but you can support return a
Struct
type series that has all the fields you want. We should be able to add some syntactic sugar to make multi-return series cleaner in a UDF. @Kevin Wang, could you spec out what that would look like?
k
Does that mean something like this? For example, if I have partitions for group_names A,B,C,D and then another partition for group_names 1,2,3,4, And I would like to end up with
new_df
which has two columns
group_name
and
all_the_rest_of_the_data
in 8 rows. To apply the udf over the partitions:
Copy code
new_df = df.iter_partitions(grouped_agg_from_partition(col("all_my_data_in_one_col"))
And the udf is:
Copy code
@daft.udf(return_type=some_predefined_struct):
def grouped_agg_from_partition(some_micropartition):
    some_micropartition = some_micropartition.with_column("group_name", some_micropartition['all_my_data_in_one_col"].struct.get("group_name"))
    return some_micropartition.agg(to_agg=[col("all_my_data_in_one_col").agg_list().alias("all_the_rest_of_the_data")], group_by=[col("group_name")])
k
Hi @Kyle, to do what you’re asking, you would create a UDF like this:
Copy code
import daft
import pyarrow as pa

@daft.udf(return_type=daft.DataType.struct({"field1": ...}))
def local_agg_udf(key: daft.Series, value1: daft.Series, value2: daft.Series):
    table = daft.table.Micropartition.from_pydict({"key": key, "value1": value1, "value2": value2})
    agged_data = table.agg(your_agg_expr, your_group_by) # if your entire partition is one group you may not need the group_by
    
    # now, construct a struct series to return since you cannot return a micropartition
    # we first convert into pyarrow arrays to construct a pyarrow structarray
    names = agged_data.column_names()
    arrays = [agged_data.get_column(n).to_arrow() for n in names]
    struct_arr = pa.StructArray.from_arrays(arrays, names)
    series = daft.Series.from-arrow(struct_arr)

    return series
You can then use it as such, using a select statement:
Copy code
df = df.select(local_agg_udf(df["key"], df["value1"], df["value2"]).alias("agged_data"))

# expand the struct array back into columns
df = df.select("agged_data.*")
This is pretty complicated though, I'll look into how we can simplify it in the future. For now, try this and let me know how it goes!
k
@Kevin Wang when I run the udf it says that
data
is a series and it doesn't have the attribute 'agg'. If I do not select a column like
df['foo']
it then says that UDFs must be run on expressions.
k
Oh sorry, that was not entirely correct. I've modified the script to use the correct types. What you want to do is pass in all of the columns that you need in your groupby, then construct a micropartition from that data. In my example, there's one groupby key "key" and two values "value1" and "value2"