Hello, <Apache Beam> is an open-source, unified p...
# general
j
Hello, Apache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. I'm developing Apache Beam Python I/O connectors that utilises your integration features of SQL and open table format sources (and possibly sinks). Following the I/O connector development guide, I'll be building
Splittable DoFn
objects, and they require a complete set of SQL queries as per partition conditions (eg _partition_col, num_partitons_...). Then, those queries are executed in multiple instances of
DoFn
objects followed by combining the results before ingesting into subsequent tasks. For SQL sources, the physical plan scheduler prints SQL queries but they don't seem to be available in Python. I might be able to obtain the queries using the sql scan operator but it looks incomplete because the pushdowns object is not directly obtainable in python. Can you please guide me if there is a way that I can obtain a complete set of queries by specifying a sql query and partition conditions? Thank you! Jaehyeon
j
Hi there! This seems like it might require some extensive discussion. Do you mind opening an issue? cc @Colin Ho for SQL sources
c
Currently we don't expose a dedicated method to view the complete set of executed sql queries. As you mentioned, we do print a snippet of them if you do a
.explain(show_all=True)
, for example you would see something like:
Copy code
== Physical Plan ==

* TabularScan:
|   Num Scan Tasks = 3
|   Estimated Scan Bytes = 0
|   Clustering spec = { Num partitions = 3 }
|   SQL Queries = [SELECT * FROM (SELECT * FROM test_table) AS subquery WHERE id >= 1 AND id < 4.666666666666666,..]
|   Schema: {id#Int64, name#Utf8, age#Int64, email#Utf8}
|   Scan Tasks: [
|   {Database {sqlite:///test.db}}
|   {Database {sqlite:///test.db}}
|   {Database {sqlite:///test.db}}
|   ]
However, the full set of queries is hidden for brevity. We could expose a parameter such as
print_full_sql_queries_in_explain
which would disable truncation, in which case you could in theory capture the output of
.explain
and use regex to find the queries. Some other solutions i can think of: • Include a
sql_query_column
in the result dataframe that includes the sql query string that was executed • Allow users to pass in some object that exposes a
.append
method (like a python list) to
read_sql
, which would get appended to with the sql queries as they are generated.
j
@Jaehyeon Kim to clarify — are you having Daft read from Beam, or Beam read from Daft? If it’s the former, you’ll likely be implementing a new ScanOperator, possibly extending our current ScanOperator for reading from SQL database sources! We might not need SQL either :)
j
Hi @jay and @Colin Ho Thanks for your replies. I created an issue - https://github.com/Eventual-Inc/Daft/issues/2977. My proposal is adding a
to_dict()
method or similar to the physical plan scheduler. I guess it is available for both SQL and open table format sources. Here is an example.
Copy code
USER_STMT = "SELECT id, first_name, last_name, email FROM staging.users"
## not fully evaluated
df = daft.read_sql(
    sql=USER_STMT, conn=create_connection, partition_col="id", num_partitions=9
)
## physical plan schedule should exist for sql and open table formats
physical_plan_scheduler = df._builder.to_physical_plan_scheduler(
    daft.context.get_context().daft_execution_config
)
## can we have a to_dict method or another that can obtain a complete sql queries?
physical_plan_scheduler
# * TabularScan:
# |   Num Scan Tasks = 9
# |   Estimated Scan Bytes = 685008
# |   Clustering spec = { Num partitions = 9 }
# |   SQL Queries = [SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) AS subquery WHERE id >= 1 AND id < 1112,..]
# |   Schema: {id#Int64, first_name#Utf8, last_name#Utf8, email#Utf8}
# |   Scan Tasks: [
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   ...
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# |   ]
For embedding the daft features in Apache Beam, there are three steps. 1. Create a query config using
beam.Create()
2. SQL queries are generated in
GenerateQueriesFn()
- I need a way to generate a complete set of queries here! 3. Actual queries are executed in
ProcessQueriesFn()
Here is a sketch.
Copy code
class GenerateQueriesFn(beam.DoFn):
    def process(self, config: MyConfig) -> typing.Iterable[str]:
        for query in self.generate_sql_queries(config):
            yield query

    def generate_sql_queries(self, config):
        # generate sql queries with given config
        # this is what I look into
        return ["qry1", "qry2"]

class ProcessQueriesFn(beam.DoFn, RestrictionProvider):
    def process(
        self,
        element: str,  # sql query from GenerateQueriesFn
        tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
    ):
        restriction = tracker.current_restriction()
        for current_position in range(restriction.start, restriction.stop + 1):
            if tracker.try_claim(current_position):
                # yield daft dataframe
                #   or convert it to a list of dictionaries if not supported
                yield daft.read_sql(sql=element, conn=create_connection)
            else:
                return

with beam.Pipeline() as p:
    (
        p
        | beam.Create([my-config])
        | beam.ParDo(GenerateQueriesFn())
        | beam.ParDo(ProcessQueriesFn())
    )
j
Thanks for the issue! @Colin Ho do you mind taking a look and triaging? We could discuss it in person maybe Thursday too 🙂
c
Yup! will do
j
Thank you all! 👍