Jaehyeon Kim
09/30/2024, 9:47 AMSplittable DoFn objects, and they require a complete set of SQL queries as per partition conditions (eg _partition_col, num_partitons_...). Then, those queries are executed in multiple instances of DoFn objects followed by combining the results before ingesting into subsequent tasks.
For SQL sources, the physical plan scheduler prints SQL queries but they don't seem to be available in Python. I might be able to obtain the queries using the sql scan operator but it looks incomplete because the pushdowns object is not directly obtainable in python.
Can you please guide me if there is a way that I can obtain a complete set of queries by specifying a sql query and partition conditions?
Thank you!
Jaehyeonjay
09/30/2024, 4:01 PMColin Ho
09/30/2024, 4:06 PM.explain(show_all=True) , for example you would see something like:
== Physical Plan ==
* TabularScan:
| Num Scan Tasks = 3
| Estimated Scan Bytes = 0
| Clustering spec = { Num partitions = 3 }
| SQL Queries = [SELECT * FROM (SELECT * FROM test_table) AS subquery WHERE id >= 1 AND id < 4.666666666666666,..]
| Schema: {id#Int64, name#Utf8, age#Int64, email#Utf8}
| Scan Tasks: [
| {Database {sqlite:///test.db}}
| {Database {sqlite:///test.db}}
| {Database {sqlite:///test.db}}
| ]
However, the full set of queries is hidden for brevity.
We could expose a parameter such as print_full_sql_queries_in_explain which would disable truncation, in which case you could in theory capture the output of .explain and use regex to find the queries.
Some other solutions i can think of:
• Include a sql_query_column in the result dataframe that includes the sql query string that was executed
• Allow users to pass in some object that exposes a .append method (like a python list) to read_sql , which would get appended to with the sql queries as they are generated.jay
09/30/2024, 4:10 PMJaehyeon Kim
09/30/2024, 10:41 PMto_dict() method or similar to the physical plan scheduler. I guess it is available for both SQL and open table format sources. Here is an example.
USER_STMT = "SELECT id, first_name, last_name, email FROM staging.users"
## not fully evaluated
df = daft.read_sql(
sql=USER_STMT, conn=create_connection, partition_col="id", num_partitions=9
)
## physical plan schedule should exist for sql and open table formats
physical_plan_scheduler = df._builder.to_physical_plan_scheduler(
daft.context.get_context().daft_execution_config
)
## can we have a to_dict method or another that can obtain a complete sql queries?
physical_plan_scheduler
# * TabularScan:
# | Num Scan Tasks = 9
# | Estimated Scan Bytes = 685008
# | Clustering spec = { Num partitions = 9 }
# | SQL Queries = [SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) AS subquery WHERE id >= 1 AND id < 1112,..]
# | Schema: {id#Int64, first_name#Utf8, last_name#Utf8, email#Utf8}
# | Scan Tasks: [
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | ...
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | {Database {<postgresql+psycopg2://devuser>:***@localhost/devdb}}
# | ]
For embedding the daft features in Apache Beam, there are three steps.
1. Create a query config using beam.Create()
2. SQL queries are generated in GenerateQueriesFn() - I need a way to generate a complete set of queries here!
3. Actual queries are executed in ProcessQueriesFn()
Here is a sketch.
class GenerateQueriesFn(beam.DoFn):
def process(self, config: MyConfig) -> typing.Iterable[str]:
for query in self.generate_sql_queries(config):
yield query
def generate_sql_queries(self, config):
# generate sql queries with given config
# this is what I look into
return ["qry1", "qry2"]
class ProcessQueriesFn(beam.DoFn, RestrictionProvider):
def process(
self,
element: str, # sql query from GenerateQueriesFn
tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
):
restriction = tracker.current_restriction()
for current_position in range(restriction.start, restriction.stop + 1):
if tracker.try_claim(current_position):
# yield daft dataframe
# or convert it to a list of dictionaries if not supported
yield daft.read_sql(sql=element, conn=create_connection)
else:
return
with beam.Pipeline() as p:
(
p
| beam.Create([my-config])
| beam.ParDo(GenerateQueriesFn())
| beam.ParDo(ProcessQueriesFn())
)jay
09/30/2024, 11:58 PMColin Ho
10/01/2024, 12:03 AMJaehyeon Kim
10/01/2024, 5:13 AM