Kyle
09/30/2024, 9:05 AMSammy Sidhu
10/01/2024, 3:52 AM# in UDF
table = Table.from_pydict({"a": a_series, "b": b_series})
result = table.agg(to_agg=[col("b").sum()], group_by=[col("a")])Kyle
10/01/2024, 3:55 AMKyle
10/01/2024, 6:46 AMKyle
10/01/2024, 6:47 AMKyle
10/01/2024, 6:47 AMSammy Sidhu
10/01/2024, 7:01 AMfrom_pydict
https://github.com/Eventual-Inc/Daft/blob/a2be8a362907853e5ffc0f9f7c49326c99df9fda/daft/table/micropartition.py#L106
and the agg
https://github.com/Eventual-Inc/Daft/blob/a2be8a362907853e5ffc0f9f7c49326c99df9fda/daft/table/micropartition.py#L214Sammy Sidhu
10/01/2024, 7:03 AMStruct type series that has all the fields you want. We should be able to add some syntactic sugar to make multi-return series cleaner in a UDF.
@Kevin Wang, could you spec out what that would look like?Kyle
10/01/2024, 7:14 AMnew_df which has two columns group_name and all_the_rest_of_the_data in 8 rows.
To apply the udf over the partitions:
new_df = df.iter_partitions(grouped_agg_from_partition(col("all_my_data_in_one_col"))
And the udf is:
@daft.udf(return_type=some_predefined_struct):
def grouped_agg_from_partition(some_micropartition):
some_micropartition = some_micropartition.with_column("group_name", some_micropartition['all_my_data_in_one_col"].struct.get("group_name"))
return some_micropartition.agg(to_agg=[col("all_my_data_in_one_col").agg_list().alias("all_the_rest_of_the_data")], group_by=[col("group_name")])Kevin Wang
10/01/2024, 6:58 PMimport daft
import pyarrow as pa
@daft.udf(return_type=daft.DataType.struct({"field1": ...}))
def local_agg_udf(key: daft.Series, value1: daft.Series, value2: daft.Series):
table = daft.table.Micropartition.from_pydict({"key": key, "value1": value1, "value2": value2})
agged_data = table.agg(your_agg_expr, your_group_by) # if your entire partition is one group you may not need the group_by
# now, construct a struct series to return since you cannot return a micropartition
# we first convert into pyarrow arrays to construct a pyarrow structarray
names = agged_data.column_names()
arrays = [agged_data.get_column(n).to_arrow() for n in names]
struct_arr = pa.StructArray.from_arrays(arrays, names)
series = daft.Series.from-arrow(struct_arr)
return series
You can then use it as such, using a select statement:
df = df.select(local_agg_udf(df["key"], df["value1"], df["value2"]).alias("agged_data"))
# expand the struct array back into columns
df = df.select("agged_data.*")Kevin Wang
10/01/2024, 6:58 PMKyle
10/03/2024, 7:41 AMdata is a series and it doesn't have the attribute 'agg'.
If I do not select a column like df['foo'] it then says that UDFs must be run on expressions.Kevin Wang
10/03/2024, 5:01 PM