Hello everyone, I've written Python a function tha...
# daft-dev
m
Hello everyone, I've written Python a function that filters a DataFrame based on a column, performs some calculations on the filtered DataFrame, and returns a DataFrame to store the results. Currently, I'm using the
pool.map
function to execute my function on multiple threads. Now, I'm considering rewriting this function using daft and attempting to run my code using multiprocessing. What best practices would you suggest?
j
Hi! Daft currently runs to take advantage of all CPUs on your machine, so running it in a multithreading environment might actually hurt performance! You could instead run it in a for-loop.
m
if i use a for loop is my function runs multiprocessed ? I feel like I need to use method like "udf" "apply" for parallel execution
j
Let me know if I’m understanding your question correctly!
Copy code
df = daft.read_parquet(...)

df = df.where(...)  # run filters
df = df.with_column(...)  # run calculations

df.write_parquet()  # execute the dataframe**
** The execution of the dataframe will automatically take advantage of all the available resources on your machine already (there should be no need to run this in a multiprocessing pool!)
If you needed to do this over multiple dataframes, you could run this in a for-loop:
Copy code
for df in dataframes:
    df = df.where(...)
    df = df.with_column(...) 
    df.write_parquet(...)  # each execution here will block and use all available resources on your machine
Alternatively, if the dataframes have the same schema you can also just concat them together and run everything as one dataframe:
Copy code
df = df.concat(df2).concat(df3)...
df = df.where(...)
df = df.with_column(...) 
df.write_parquet(...)
m
Hi Jay, my funtion is little complex then this.
Hi Jay, my function is a little more complex than this. Let me explain a bit: • Firstly, a pandas dataframe is filtered based on two columns.
link_data_directed_df = unique_nodes_df[(unique_nodes_df["from_column"].isin(nodes_id_list))
& (unique_nodes_df["to_column"].isin(nodes_id_list))]
• Then, I create a simple graph object (using the igraph library) with the filtered dataframe (link_data_directed_df), let's call it "graph_directed_obj." • Lastly, I perform some calculations with the graph object and store these calculations in the "centrality_matrix_df" dataframe before returning it. I use the pool.map method in order to distribute my function across threads. I want to use Daft to decrease runtime, but I need to understand Daft a little bit more. I'm running my code on an OCP pod. I can't create a Ray cluster environment, but my environment's CPU and RAM resources are really high
j
Makes sense, do you know where most of your bottleneck is happening right now? There are a few potential locations I think, and Daft can only help with some: 1. Reading your dataframe 2. Filtering your dataframe (I doubt this is a bottleneck though!) 3. Creating and running calculations on your igraph data 4. Creating your
centrality_matrix_df
If (1, 2) are your bottleneck, you will see Daft providing substantial speedups in terms of reading your data from Parquet/CSV/JSON etc and filtering it. Unfortunately, Daft probably won’t be able to help you run steps (3, 4) faster, since those are operations being run by the igraph library.
m
My biggest bottleneck is reading data. The entire operation takes 20 seconds in total, with data reading taking approximately 15 seconds and data filtering taking around 5.5 seconds per iteration. Assuming there are 1.2 million iterations in total. I used the pool.map method. However, since pandas doesn't support distributing data over cores, in every thread, the function needs to read all data, and this is unfortunately the most time-consuming part
j
That’s great to know! What does your data currently look like?
I’m wondering if you just substituted the read call from
pd.read_csv
to
daft.read_csv
or something, if you’d see speedups there already 😛 Daft should be fine to run in a multiprocessing environment as well. Let us know if that isn’t the case though!
m
My purpose of using daft is to ensure that dask distributes the data and the operation to be performed to the processors instead of reading data in all iterations.
j
Ohhhh 😓 I think I understand now. Let me know if this sounds right: 1. Read all data into a dataframe 2. For each iteration: a. Run a filter (unique to the current iteration) b. Runs some graph operations over this subset of your data c. Return a new dataframe And you are running (2) over a multiprocessing pool
m
yes, exacly
My purpose of using daft is to run the function in a distributed, parallel way with daft instead of the pool.map method.
i transform most of my function to daft syntax but i'm not sure how to do this
j
You can try this way:
Copy code
# Create a dataframe of group_id to node_id
groups_to_nodes_df = daft.from_pydict({"group": [...], "node_id": [...]})

# Read your data
nodes_df = daft.read_parquet(...)

# Join the dataframes, so that you now have a schema:
# | node_id | group_id | node_data... |
df = nodes_df.join(groups_to_nodes_df, on="node_id")

# Group by the group_id and run your function per-group
df.groupby("group_id").map_groups(my_per_group_udf_function(df["node_id"], df["data"]))
Basically I use a join between a dataframe containing the nodes, and a dataframe containing the group->node mapping to get a dataframe of:
Copy code
| group_id | node_id | node_data...|
(Note that this does duplicate the node’s data N_GROUPS times) Then after that, you can use a
groupby
+
.map_groups
to apply your function per-group!
The above approach will definitely be very memory-hungry because of the big join. If you find that your program is going OOM, you can consider another approach which uses Daft for the reading/filtering, but parallelizes your function outside of Daft using a bounded queue:
Copy code
# Use daft to read data
df = daft.read_parquet(...)
df.collect()

# Create a bounded queue and a pool of workers that continuously pull from the queue
queue = Queue(...)
workers = ...

# For every iteration, put work into the queue
for iteration in iterations:
    pandas_df = df.where(...).to_pandas()
    queue.put(pandas_df)  # block if the queue is full!
Let me know how it goes! 🙂
m
Actually, there are two dataframes, the first of which is "node_id" and the "community_id" column that shows which community this node belongs to. In the other dataframe, the relationship between nodes is represented, there are 3 columns in this data, the first one is "from_node_id" which is related to each other, the other is the second "to_node_id" column and the last column is the "expression" column indicating the relationship level. The function first filters on a community basis in the first dataframe and keeps the "node_id" information of the elements of the relevant community in a list. Using the elements in this list, the filtered dataset is obtained by filtering both the "from_node_id" and "to_node_id" columns of the second dataframe. The second dataframe is basically filtered based on the community nodes in the first data. Then, a graph object is created from the filtered data, the results obtained using the functions of the igraph library are written to a new dataframe and this data is returned.
I am examining the method in your first answer, I will design a new udf function to see if I can use this method.
🙌 1
Hi Jay, i try to filter a daft dataframe with some other daft dataframe but when i run df.with_column().collect() code it will stuck at some point. here is my code:
@daft.udf (return_dtype=daft.DataType. string())
def get_node_id(from_node_id):
nodes=[]
for node in <http://from_node_id.to|from_node_id.to>_pylist():
nodes.append(community_daft_df.where(community_daft_df ["NODE"] ==                                                             node).collect (). to_pydict() ["COMMUNITY"][0])
return nodes
df = edge_100_sample_df.with_column("community",get_node_id(edge_100_sample_df["FROM_HESAP_NO_MBB_NO"]))
df.collect()
both edge_100_sample_df and community_daft_df are daft dataframe and i try to get filter community_daft_df and get one of the matching value. Try to run udf function on sample data with 100 row but when i run cell with "df.collect()" it wil stuck. I've attempted to run the 'df.show()' code multiple times. Occasionally, it returns part of the result dataframe, but it also stuck at times
sometimes my udf functions stuck idk why
j
Yeah it’s not a good idea to run dataframes within UDFs 😓 Also not a great idea to do so many filters! It’s very expensive 😄 I see that you are trying to annotate
edge_100_sample_df
with a new
"community"
column. Here’s a quick way of doing that, using a join:
Copy code
# Get a dataframe of unique (| node | community|)
node_to_community_df = community_daft_df.groupby("NODE").agg([daft.col("COMMUNITY").any_value()]).collect()

# Now, just join!
edge_100_sample_df.join(community_daft_df, left_on="FROM_HESAP_NO_MBB_NO", right_on="NODE")
The join will be muuuuuch more performant than iteratively filtering 😄
🎉 1
m
"NODE" column in community_daft_df dataframe is already unique, so i can skip that part i guess
i will try join function now thanks for the tip
join works perfectly thanks again, I will come with more concrete examples like this :)
j
Nice!!
Yes much much faster than the filters
m
Hi Jay, i finally turn my function into good shape. here is my solution based on your guidance:
Copy code
from igraph import *
#@daft.udf (return_dtype=daft.DataType.float64())
@daft.udf (return_dtype=daft.DataType. python ())
def centrality_matrix_calculation(from_column, to_column, expression, community_id):

    edges_directed_list = list(
    zip( 
        from_column. to_pylist(),
        to_column.to_pylist(), 
        expression.to_pylist()
    ))
    G_directed_obj = Graph. Tupletist(edges_directed_list, directed=True, weights=True)
    
    community_member_count_int=len(G_directed_obj.vs["name"])
    community_id_int=community_id.to _pylist()[0]
    
    centrality matrix dict = {
        "node_id": G_directed_obj.vs['name'],
        "community_id": [community_id] * community_member_count_int,
        "authority val": G_directed_obj.authority_score(weights=G_directed_obj.es('weight'),
        "between _val" : G_directed_obj.betweenness(),
        "close val" : G_directed_obj.closeness(),
        "closein val" : G_ directed_obj.closeness (mode='in'),
        "closeout val" : G_directed_obj.closeness (mode='out'),
        "degree val": G_directed_obj.degree(),
        "degreein_val" : G_directed_obj.degree (mode='in'),
        "degreeout_val": G_directed_obj.degree (mode='out'),
        "hub_val" : G_directed_obj.hub_score(weights=G_directed_obj.es['weight'])
     }

    centrality _matrix_df # daft.from_pydict (centrality_matrix_dict)
    
    return centrality _matrix_df

edge_filtered_df.groupby("COMMUNITY_ID").map_groups(
centrality_matrix_calculation(                                                          edge_filtered_df["FROM_HESAP_NO_MBB_NO"],                                               edge_filtered_df["TO_HESAP_NO_MBB_NO"],                                                    edge_filtered_df["EXPRESSION"],                                                    edge_filtered_df["COMMUNITY_ID"],
                                     )).collect()
The content of the centrality_matrix_calculation function is relatively irrelevant. My final (I hope :)) problem is that I need to return a dataframe-like data type from centrality_matrix_calculation function, but I can't do that with the UDF function. Do you have any advice on how I can return centrality_matrix_df somehow ?
j
Hi @mesut yildiz! What is your final goal here - are you trying to write this data somewhere? I ask this because depending on the final goal, you might choose a few approaches: • You can write data directly from UDFs (just write to S3 or something) • You can return data as a new
Struct
column, and then write to Parquet etc
m
I need to collect all dataframes returned from the UDF function and concatenate them into one dataframe. Then, I will write the concatenated dataframe into a Parquet file. Second approach seems more suitable for me. I actually see struct data type on documentation but there is no example how can i use it.
j
Here’s an example of how to use it!
Copy code
DataType.struct({"name": DataType.string(), "age": DataType.int64()})
Adding it to our docs as well: https://github.com/Eventual-Inc/Daft/pull/2256 🙂
m
would you give one example that how can i store 2 list of integers in a struct data type ? i cant find anything with struct data type
j
Is this what you’re thinking of?
Copy code
DataType.struct({"foo": DataType.list(DataType.int64()), "bar": DataType.list(DataType.int64())})
A dataframe containing a column called
"my_column_name"
with the above datatype would look something like this:
Copy code
| my_column_name: Struct[foo: list[int64], bar: list[int64]] |
| {"foo": [1, 2, 3], "bar": [4, 5, 6]}                       |
| {"foo": [7, 8, 9], "bar": []}                              |
m
more spesificially: lets say i create a dataframe from dict like my udf function:
Copy code
temp_dict = {
        "column_1" [1,2,3],
        "column_2" [3,4,5]
     }

temp_df = daft.from_pydict(temp_dict)
how can i use struct instead of "temp_df" in this scenario ?
j
I see! I think you’re trying to return a struct series. Here’s an example of how you could do that!
In your case, you can do:
Copy code
return Series.from_pylist([
    {
        "node_id": G_directed_obj.vs['name'][i],
        "authority": G_directed_obj.authority_score(weights=<http://G_directed_obj.es|G_directed_obj.es>('weight'))[i],
        ...
    } for i in range(community_member_count_int)
])
m
Thank you for all your gudidance, i finally return all data with series data type
j
🎉 !! You should share a screenshot of it working with the rest of #C052CA6Q9N1 😄
m
just noticed a strange behaviour, if i use a for loop in udf function(which my function have) when i try to run udf function like this:
df.groupby("group_id").map_groups(my_per_group_udf_function(df["node_id"], df["data"]))
udf function execution gets stuck in for loop
idk why but i cant solve this problem
j
There shouldn’t be anything special about a for loop 😝 are you running anything expensive in that for loop?
m
actually no, some iterations are running fast and returning results immediately but other ones running too slow i will share my code maybe you can catch the problem
here is my udf function(for loop part)
Copy code
@daft.udf (return_dtype=daft.DataType.struct(fields=fieldcs_dict))
def centrality_matrix_calculation(influence_df):
    inf1_list = []
    inf2_list = []
    node_id_list = G_directed_obj.vs["name"]
    
    print ("influence calculations")
    
    for node in node_id_list:
        print (node)

        cur_inf_out_data_daft_df = influence_df.where(influence_df["FROM_HESAP_NO_MBB_NO"] == node).collect()
        cur_inf_to_data_daft_df = influence_df.where(influence_dF["TO_HESAP_NO_MBB_NO"] == node).collect()

        print("get influence subdfs")
        
        sum_outdegree_weights_float = cur_inf_out_data_daft_df.sum("EXPRESSION").collect().to_pydict()["EXPRESSION"][0]
        sum_indegree_weights_float = cur_inf_to_data_daft_df.sum("EXPRESSION").collect().to_pydict()["EXPRESSION"][0]

        print ("sum")
        
        try:
            inf1_list.append(sum_outdegree_weights_float / community_member_count_int)
        except TypeError:
            inf1_list.append(0)
        
        print ("append1")
        
        try:
            inf2_list.append(sum_indegree_weights_float / community_member_count_int)
        except TypeError:
            inf2_list.append(0)
        
        print("append2")
    
    centrality_matrix_dict = {}

    centrality_matrix_dict["influence1_val"] = inf1_list
    centrality_matrix_dict["influence2_val"]=inf2_list
    centrality matrix_dict["influence3_val"] = list(map(sum, zip(inf1_list, inf2_list)))

    return daft.Series.from_pylist([matrix_dict])
    

    edge_sample10_df.groupby("COMMUNITY_ID").map_groups(centrality_matrix_calculation(
                                                    edge_sample10_df[["FROM_HESAP_NO_MBB_NO","TO_HESAP_NO_MBB_NO","EXPRESSION"]]
                                        )).collect()
when i run
"edge_sample10_df.groupby("COMMUNITY_ID").map_groups(centrality_matrix_calculation"
it will stuck random part of for loop. There will be a maximum of 50-60 iterations in the for loop.
j
Hmm yes we do not typically recommend running Daft dataframes inside of UDFs.
It’s probably deadlocking in there when you call the filter and collect on the dataframe
m
hmmm i cant pass deadlock with any workaround solution i guess. I probably remove the for loop and try to calculate 3 column with another approach. Maybe i need to write another udf for, for loop part.
by the way you sad "share a screenshot" in "#C052CA6Q9N1" before, am a i need to screenshot my code or what i didnt understand 😅
j
Oh! Yeah I was thinking you could share some code, or screenshot to show your code successfully working end-to-end 🙂 No pressure to do it though!
m
I can share performance results comparison between the pandas version and the daft version. The entire code is too large to capture in a screenshot. Additionally, I am considering writing an article for our company's Medium blog to share the performance improvements achieved with the daft library. Perhaps you can also share this article on the Daft blog page.
j
I can share performance results comparison between the pandas version and the daft version.
Perfect! And yes! Would love to share it on the Daft blog page as well 🙂
Happy to provide any inputs there
m
I need to write article in Turkish, then I'll probably translate it into English for your blog, right ? 😅
j
Heh 😛 that’s fine! We can also do copywriting too once you have a draft so no need to get the translation exactly right