mesut yildiz
05/06/2024, 7:46 AMpool.map
function to execute my function on multiple threads.
Now, I'm considering rewriting this function using daft and attempting to run my code using multiprocessing. What best practices would you suggest?jay
05/06/2024, 3:52 PMmesut yildiz
05/06/2024, 6:26 PMjay
05/06/2024, 6:43 PMdf = daft.read_parquet(...)
df = df.where(...) # run filters
df = df.with_column(...) # run calculations
df.write_parquet() # execute the dataframe**
** The execution of the dataframe will automatically take advantage of all the available resources on your machine already (there should be no need to run this in a multiprocessing pool!)jay
05/06/2024, 6:45 PMfor df in dataframes:
df = df.where(...)
df = df.with_column(...)
df.write_parquet(...) # each execution here will block and use all available resources on your machine
Alternatively, if the dataframes have the same schema you can also just concat them together and run everything as one dataframe:
df = df.concat(df2).concat(df3)...
df = df.where(...)
df = df.with_column(...)
df.write_parquet(...)
mesut yildiz
05/06/2024, 9:33 PMmesut yildiz
05/06/2024, 9:55 PMlink_data_directed_df = unique_nodes_df[(unique_nodes_df["from_column"].isin(nodes_id_list))
& (unique_nodes_df["to_column"].isin(nodes_id_list))]
• Then, I create a simple graph object (using the igraph library) with the filtered dataframe (link_data_directed_df), let's call it "graph_directed_obj."
• Lastly, I perform some calculations with the graph object and store these calculations in the "centrality_matrix_df" dataframe before returning it.
I use the pool.map method in order to distribute my function across threads. I want to use Daft to decrease runtime, but I need to understand Daft a little bit more.
I'm running my code on an OCP pod. I can't create a Ray cluster environment, but my environment's CPU and RAM resources are really highjay
05/06/2024, 9:59 PMcentrality_matrix_df
If (1, 2) are your bottleneck, you will see Daft providing substantial speedups in terms of reading your data from Parquet/CSV/JSON etc and filtering it.
Unfortunately, Daft probably won’t be able to help you run steps (3, 4) faster, since those are operations being run by the igraph library.mesut yildiz
05/06/2024, 10:10 PMjay
05/06/2024, 10:10 PMjay
05/06/2024, 10:14 PMpd.read_csv
to daft.read_csv
or something, if you’d see speedups there already 😛
Daft should be fine to run in a multiprocessing environment as well. Let us know if that isn’t the case though!mesut yildiz
05/06/2024, 10:17 PMjay
05/06/2024, 10:18 PMmesut yildiz
05/06/2024, 10:20 PMmesut yildiz
05/06/2024, 10:22 PMmesut yildiz
05/06/2024, 10:23 PMjay
05/06/2024, 10:31 PM# Create a dataframe of group_id to node_id
groups_to_nodes_df = daft.from_pydict({"group": [...], "node_id": [...]})
# Read your data
nodes_df = daft.read_parquet(...)
# Join the dataframes, so that you now have a schema:
# | node_id | group_id | node_data... |
df = nodes_df.join(groups_to_nodes_df, on="node_id")
# Group by the group_id and run your function per-group
df.groupby("group_id").map_groups(my_per_group_udf_function(df["node_id"], df["data"]))
jay
05/06/2024, 10:33 PM| group_id | node_id | node_data...|
(Note that this does duplicate the node’s data N_GROUPS times)
Then after that, you can use a groupby
+ .map_groups
to apply your function per-group!jay
05/06/2024, 10:50 PM# Use daft to read data
df = daft.read_parquet(...)
df.collect()
# Create a bounded queue and a pool of workers that continuously pull from the queue
queue = Queue(...)
workers = ...
# For every iteration, put work into the queue
for iteration in iterations:
pandas_df = df.where(...).to_pandas()
queue.put(pandas_df) # block if the queue is full!
Let me know how it goes! 🙂mesut yildiz
05/06/2024, 10:53 PMmesut yildiz
05/06/2024, 10:56 PMmesut yildiz
05/08/2024, 12:25 AM@daft.udf (return_dtype=daft.DataType. string())
def get_node_id(from_node_id):
nodes=[]
for node in <http://from_node_id.to|from_node_id.to>_pylist():
nodes.append(community_daft_df.where(community_daft_df ["NODE"] == node).collect (). to_pydict() ["COMMUNITY"][0])
return nodes
df = edge_100_sample_df.with_column("community",get_node_id(edge_100_sample_df["FROM_HESAP_NO_MBB_NO"]))
df.collect()
both edge_100_sample_df and community_daft_df are daft dataframe and i try to get filter community_daft_df and get one of the matching value. Try to run udf function on sample data with 100 row but when i run cell with "df.collect()" it wil stuck. I've attempted to run the 'df.show()' code multiple times. Occasionally, it returns part of the result dataframe, but it also stuck at timesmesut yildiz
05/08/2024, 12:26 AMjay
05/08/2024, 12:32 AMedge_100_sample_df
with a new "community"
column. Here’s a quick way of doing that, using a join:
# Get a dataframe of unique (| node | community|)
node_to_community_df = community_daft_df.groupby("NODE").agg([daft.col("COMMUNITY").any_value()]).collect()
# Now, just join!
edge_100_sample_df.join(community_daft_df, left_on="FROM_HESAP_NO_MBB_NO", right_on="NODE")
jay
05/08/2024, 12:32 AMmesut yildiz
05/08/2024, 12:35 AMmesut yildiz
05/08/2024, 12:35 AMmesut yildiz
05/08/2024, 12:38 AMjay
05/08/2024, 12:44 AMjay
05/08/2024, 12:44 AMmesut yildiz
05/08/2024, 7:33 AMfrom igraph import *
#@daft.udf (return_dtype=daft.DataType.float64())
@daft.udf (return_dtype=daft.DataType. python ())
def centrality_matrix_calculation(from_column, to_column, expression, community_id):
edges_directed_list = list(
zip(
from_column. to_pylist(),
to_column.to_pylist(),
expression.to_pylist()
))
G_directed_obj = Graph. Tupletist(edges_directed_list, directed=True, weights=True)
community_member_count_int=len(G_directed_obj.vs["name"])
community_id_int=community_id.to _pylist()[0]
centrality matrix dict = {
"node_id": G_directed_obj.vs['name'],
"community_id": [community_id] * community_member_count_int,
"authority val": G_directed_obj.authority_score(weights=G_directed_obj.es('weight'),
"between _val" : G_directed_obj.betweenness(),
"close val" : G_directed_obj.closeness(),
"closein val" : G_ directed_obj.closeness (mode='in'),
"closeout val" : G_directed_obj.closeness (mode='out'),
"degree val": G_directed_obj.degree(),
"degreein_val" : G_directed_obj.degree (mode='in'),
"degreeout_val": G_directed_obj.degree (mode='out'),
"hub_val" : G_directed_obj.hub_score(weights=G_directed_obj.es['weight'])
}
centrality _matrix_df # daft.from_pydict (centrality_matrix_dict)
return centrality _matrix_df
edge_filtered_df.groupby("COMMUNITY_ID").map_groups(
centrality_matrix_calculation( edge_filtered_df["FROM_HESAP_NO_MBB_NO"], edge_filtered_df["TO_HESAP_NO_MBB_NO"], edge_filtered_df["EXPRESSION"], edge_filtered_df["COMMUNITY_ID"],
)).collect()
The content of the centrality_matrix_calculation function is relatively irrelevant. My final (I hope :)) problem is that I need to return a dataframe-like data type from centrality_matrix_calculation function, but I can't do that with the UDF function. Do you have any advice on how I can return centrality_matrix_df somehow ?jay
05/08/2024, 5:05 PMStruct
column, and then write to Parquet
etcmesut yildiz
05/08/2024, 6:41 PMjay
05/08/2024, 6:49 PMDataType.struct({"name": DataType.string(), "age": DataType.int64()})
jay
05/08/2024, 6:50 PMmesut yildiz
05/08/2024, 9:15 PMjay
05/08/2024, 9:17 PMDataType.struct({"foo": DataType.list(DataType.int64()), "bar": DataType.list(DataType.int64())})
jay
05/08/2024, 9:19 PM"my_column_name"
with the above datatype would look something like this:
| my_column_name: Struct[foo: list[int64], bar: list[int64]] |
| {"foo": [1, 2, 3], "bar": [4, 5, 6]} |
| {"foo": [7, 8, 9], "bar": []} |
mesut yildiz
05/08/2024, 9:42 PMtemp_dict = {
"column_1" [1,2,3],
"column_2" [3,4,5]
}
temp_df = daft.from_pydict(temp_dict)
how can i use struct instead of "temp_df" in this scenario ?jay
05/08/2024, 11:05 PMjay
05/08/2024, 11:07 PMreturn Series.from_pylist([
{
"node_id": G_directed_obj.vs['name'][i],
"authority": G_directed_obj.authority_score(weights=<http://G_directed_obj.es|G_directed_obj.es>('weight'))[i],
...
} for i in range(community_member_count_int)
])
mesut yildiz
05/09/2024, 2:17 AMjay
05/09/2024, 2:17 AMmesut yildiz
05/09/2024, 2:23 AMdf.groupby("group_id").map_groups(my_per_group_udf_function(df["node_id"], df["data"]))
udf function execution gets stuck in for loopmesut yildiz
05/09/2024, 2:25 AMjay
05/09/2024, 2:36 AMmesut yildiz
05/09/2024, 2:39 AMmesut yildiz
05/09/2024, 2:57 AM@daft.udf (return_dtype=daft.DataType.struct(fields=fieldcs_dict))
def centrality_matrix_calculation(influence_df):
inf1_list = []
inf2_list = []
node_id_list = G_directed_obj.vs["name"]
print ("influence calculations")
for node in node_id_list:
print (node)
cur_inf_out_data_daft_df = influence_df.where(influence_df["FROM_HESAP_NO_MBB_NO"] == node).collect()
cur_inf_to_data_daft_df = influence_df.where(influence_dF["TO_HESAP_NO_MBB_NO"] == node).collect()
print("get influence subdfs")
sum_outdegree_weights_float = cur_inf_out_data_daft_df.sum("EXPRESSION").collect().to_pydict()["EXPRESSION"][0]
sum_indegree_weights_float = cur_inf_to_data_daft_df.sum("EXPRESSION").collect().to_pydict()["EXPRESSION"][0]
print ("sum")
try:
inf1_list.append(sum_outdegree_weights_float / community_member_count_int)
except TypeError:
inf1_list.append(0)
print ("append1")
try:
inf2_list.append(sum_indegree_weights_float / community_member_count_int)
except TypeError:
inf2_list.append(0)
print("append2")
centrality_matrix_dict = {}
centrality_matrix_dict["influence1_val"] = inf1_list
centrality_matrix_dict["influence2_val"]=inf2_list
centrality matrix_dict["influence3_val"] = list(map(sum, zip(inf1_list, inf2_list)))
return daft.Series.from_pylist([matrix_dict])
edge_sample10_df.groupby("COMMUNITY_ID").map_groups(centrality_matrix_calculation(
edge_sample10_df[["FROM_HESAP_NO_MBB_NO","TO_HESAP_NO_MBB_NO","EXPRESSION"]]
)).collect()
when i run "edge_sample10_df.groupby("COMMUNITY_ID").map_groups(centrality_matrix_calculation"
it will stuck random part of for loop. There will be a maximum of 50-60 iterations in the for loop.jay
05/09/2024, 4:59 AMjay
05/09/2024, 4:59 AMmesut yildiz
05/09/2024, 6:33 AMmesut yildiz
05/09/2024, 6:36 AMjay
05/09/2024, 5:17 PMmesut yildiz
05/09/2024, 6:40 PMjay
05/09/2024, 6:43 PMI can share performance results comparison between the pandas version and the daft version.Perfect! And yes! Would love to share it on the Daft blog page as well 🙂
jay
05/09/2024, 6:43 PMmesut yildiz
05/09/2024, 6:49 PMjay
05/09/2024, 7:45 PM