I have a question on running a pipeline. I am usin...
# general
n
I have a question on running a pipeline. I am using DAFT for data filtering / processing. At the moment I have one main script, which is calling different endpoints for AI functionality (which were added over time as the pipeline increased in complexity). I am thinking about splitting it up and running it more like a waterfall passing the data (basically splitting it into steps and each step has data as input and data as output and does its operations + the AI part. Data is fairly heavy (multiple long texts and multiple embeddings for each row. What is your choice of orchestration? Do you use something like Airflow / Dagster / Prefect or do you manually trigger the steps in an orchestration script? What is your choice of data transfer? Do you use a database (in my opinion not necessary for most pipes since I have to take it out of memory anyhow), file storage (buckets with trigers for the next script), queues, or just direct API calls? Thanks for any strong opinions / pointers!
j
Hi @Nicolay Gerold ! We do have some integrations with orchestration engines, but honestly most of them should do the job just fine. Cc @Colin Ho as well for his work on Prefect Technically a Daft dataframe is already a DAG and so defining your entire workflow via a dataframe does provide you with that “waterfall” approach. If you want to visualize your DAG, call
df.explain()
and it will show you each step! That being said, there are some arguments to be made for running Daft in an orchestration engine. 1. Intermediate materializations: if your Daft workflow performs steps 1, 2, and 3, but you have consumers of the data that need data from both step 2 AND 3, then it makes sense to introduce an intermediate materialization point 2. Data versioning: you can tie your dataset versions to the ID of your run on the orchestration engine My recommendations if you decide to use an orchestration engine: • Try to minimize data transfer between steps and do heavy filtering. A lot of your execution runtime will likely come from reading/writing data from persistent storage. • I would store data as files (preferably Parquet), and you can just create one “folder” per run of your workflow.
/<workflow_execution_id>/step_<n>/<output_name>
This makes it very easy to inspect the outputs of your workflow and perform any necessary debugging. This way instead of passing the actual data between steps, you can just pass a folder URL and recreate the dataframe by reading from storage in the subsequent step Generally you’ll want to be very intentional in deciding data materialization boundaries. If you do need to materialize data, make a strong case as to why you need to do it! Note also that sometimes breaking workloads up into steps with intermediate materializations might break any optimizations that Daft might be able to perform for you Having more steps doesn’t always mean a better pipeline 😉
n
Yeah so the thing is rather I have to take the data out of Daft a few times (eg. to do some graph stuff, topic clustering,…). For that I am thinking about storing, because most of the time I am working on a few columns to create a new one which is used in the next step and I want to keep the memory free from unused stuff.
j
Yup! That makes sense. I would just also note that if you do something like:
Copy code
df = df.select(col("x") + col("y"))
The original columns x and y will be dropped from the dataframe once the new column is calculated!