Have anybody used daft read_sql to load data from ...
# general
p
Have anybody used daft read_sql to load data from SQL server? I am getting RuntimeError with "SELECT * FROM (SELECT * FROM mydatabase) AS subquery LIMIT 1". It looks like this is not a valid SQL statement generated by daft. T-SQL doesn't support LIMIT, instead it uses TOP 1. So, the correct statement should have been: "SELECT TOP 1 * FROM (SELECT * FROM mydatabase) AS subquery". What am I missing here? In addition, how do I run this on ray cluster? By default, I think the daft using multiple thread rather than ray node.
j
Hey @Phil Chen! I’m guessing there is a problem with generating SQL for this specific SQL dialect. @Colin Ho can probably advise here. For your Ray cluster, you can take a look our guide for Ray: https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/ray.html TLDR: Easiest way to get started (recommended for interactive jobs):
Copy code
daft.context.set_runner_ray(address="<ray://address-to-ray-head-with-port-usually-at:10001>")
Running as a Ray job (recommended for production jobs) You can write a script (e.g.
my_script.py
) and then run your script as a Ray job:
Copy code
ray job submit \
    --working-dir wd \
    --address "http://<head_node_host>:8265" \
    --runtime-env-json '{"pip": ["getdaft"]}' \
    -- python job.py
Copy code
# From your script, it should automatically detect the Ray cluster since it is being run from inside the job
daft.context.set_runner_ray()
c
Hey @Phil Chen! Just wanted to check, are you running read_sql using the URL to your database or a SQLAlchemy connection?
p
Thank Jay and Colin for the quick response!!! Colin, I'm using a connection_str like this: "Driver={ODBC Driver 17 for SQL Server};Server=DBServer;Database=mydatabase;Trusted_Connection=yes;"
I can try SQLAlchemy, but don't think it will help because the generated SQL statement with LIMIT 1 is a not a valid T-SQL statement.
c
Actually could you try it with SQLAlchemy? We have different logic for generating SQL based on the connection type, so it might work!
p
@Colin Ho yes, using SQLAlchemy works. However, the data seems to be incorrect. The table has 5 columns. I am only showing first 5 rows. I used 3 method to query the data. The first 4 columns the values are exactly the same for all 3 methods, but the 5th column (value), the daft returns wrong value. 1. using Microsoft SQL Server Management Studio: value NULL 7616100000 7569977539 7569977539 0 2. Using pandas.read_sql value NaN 7.616100e+09 7.569978e+09 0.000000e+00 3. Using Daft.read_sql value None None None None None
As you can see the pandas.read_sql return the value column in scientific format, but it matches the SQL Server Studio, but Daft returns None for all 5 rows.
Other observation: looks like the daft automatically run in distributed mode using local ray cluster as long as ray is installed. We don't need to explicitly call daft.context.set_runner_ray(). So it is indeed automatically detect ray.
Question, by specifying SqlAlchemy does that mean it won't be using ConnectorX? In addition, when I create the SqlAlachemy engine, I have to provide a URL. Here is how I construct URL from the connection_string I showed earlier. Note here I specify pyodbc as the driver. Will we still be using arrow at the driver layer if we specify pyodbc as the driver? url = sqlalachemy.engine.URL.create(drivername="mssql+pyodbc", query={"odbc_connect": connection_string}) sqlalachemy.create_engine(url).connect()
In summary, we seems to have a number of issues here; 1. The incorrect value seems to be a serious one 2. Follow by not supporting connection_string directly (resulting in wrong SQL query statement with RuntimeError exception). 3. Follow by have to use SqlAlachemy explicitly and not sure if ConnectorX is really used under the hood 4. Follow by have to explicitly specify pyodbc driver
👍 1
👀 1
j
Yes! We made that change a few months ago to auto detect Ray :)
👍 1
p
BTW, the schema type for the 'value' column is "float" in the SQL server if that might provide any hint for troubleshooting.
👍 2
j
Yeah the way we do schema inference right now is we read the first value (LIMIT 1) The issue is that supporting type schema conversions from every possible backend is really hard 😅
We can maybe expose a parameter for how many rows we read for inference… and also for specific backends (the more common ones such as Postgres and SQL Server) we can define type mappings
p
Allow user to specify schema, num of rows to read for schema inference sounds good. Something similar to Polars read_database api. https://docs.pola.rs/api/python/stable/reference/api/polars.read_database.html
A quick fix of this is to increase limit from 1 to something like 10 or even 100 to improve schema inference accuracy. In case of any column with NULL, try to fetch more data for that particular column. If all rows are null for the column in question, issue a warning. Or in the case of database, would it be simpler just call INFORMATION_SCHEMA to find out the data type than infering from data?
A quick question, what is the default data type if daft cannot infer data type correctly. Will it be string type? In that case, we should still have a string representation of the data rather than None, I'd assume?
Answer my own question early, I see why daft cannot rely on INFORMATION_SCHEMA to learn schema. For example, a query could use any SQL function to transform table columns to a new value with different types. In this case, we can only rely on inferencing or user provided schema.
👍 1
j
Exactly 🫡
Today if we receive all None, we infer it as a null column which is why you’re seeing all null data. We should allow you to hint Daft though. That mechanism already exists for our other reads and should be easy to add
🙏 1
p
Jay, do you know when we may have the schema hint support for read_sql? Do we have a github issue that I can track? Or do you need me to create the issue? In addition, will daft support append/override semantics for the write operation, such as write_parquet? Actually, looks like schema_hints is deprecated. instead we should be using schema and infer_schema parameters.
c
I'm working on providing
schema
and
infer_schema
right now! should have a PR out soon
🙌 1
❤️ 1
🙏 1
Additionally, to provide some clarification: • If a user provides a connection string, we will parse the string to determine the dialect and generate sql appropriately. I believe the reason why the connection string did not work for you is because it did not match a url format like ``mssql://host:port`` . You could try reformatting your connection string. That being said, i'm not 100% confident that the read will execute, because ConnectorX does have some issues with MSSQL. See: https://github.com/sfu-db/connector-x/issues/233 for instance. • Regarding whether or not ConnectorX/SQLAlchemy is used under the hood, there's essentially 2 rules. 1: if user passes in a connection string and the dialect parsed is not supported, use SQLAlchemy. 2: If user passes in SQLAlchemy connection, use SQLAlchemy. • Unfortunately, SQLAlchemy is not an arrow backed driver. This means when Daft executes a query using SQLAlchemy, the results are first returned into a Python dictionary, which is subsequently converted into Arrow format.
Regarding append/overwrites for writes: There is an outstanding issue for this: https://github.com/Eventual-Inc/Daft/issues/1768, and it's definitely something we'd like to get done! The biggest challenge is figuring out how to do it in a distributed fashion.
p
Wow, this is great! Thanks for very quick turn around. Looking forward to seeing the PR approved, merged and Hopefully a new release will be cut soon! Also thank you very much providing answers to my other questions. I just tried with reformatting my connection string. It apparently doesn't work. Someone got it working with "mssql://{server_name}/{database_name}?trusted_connection=true", I tried and it doesn't work. Hopefully will ConnectorX will fix the issue someday, but SqlAlachmey is acceptable for now. Will keep on eye on the support of overwrite feature. In the mean time, I guess it is possible we could workaround this. Again, thank you both of you Colin. and Jay. I will take another ride when this is ready to try.
🙏 1
c
hey @Phil Chen, we just released Daft 0.3.0, so the new
infer_schema
and
schema
features should be ready for you to try. let me know how it goes!
p
That’s awesome, @Colin Ho! I will give it a try. Thank you!
@Colin Ho, quick feedback. upgrade to 0.3.0 and the read_sql works fine now! I haven't tried with explicitly providing schema yet, but it works out of box with default parameters. I guess the default 10 samples made the difference for schema inference. Thanks you so much for fixing the issue so quickly. Had a quick look of release notes for 0.3.0, awesome progresses. Keep up the great work.
clapclap e 2
🔥 1
@Colin Ho I tried the read_sql with schema, it works like a charm. Great work. I am able to read a lot of tables, large and small. Everything went smoothly until I hot a very large table. It crashes with this statement: SELECT count(*) FROM (....) AS Subquery with Arithmetic overflow error converting expression into data type int. I believe if we change the generated SQL code to use count_big rather than count, it will fix the issue: SELECT count_big(*) FROM (...) AS Subquery Of course any return variable should also be changed to int64 if it is int32.
I guess this is also dialect dependent. As some database, for example, for postgrelsql, count() always return bigint.
c
Awesome, thanks for stress testing read_sql! glad to know it works. I can make a fix to use
count_big
instead.
p
That will be great if we can fix the count overflow issue Colin. Much appreciated!
c
just a quick update, we use an external library called SQLGlot to help us generate SQL, and so I've made a PR on their end to enable the
count_big
semantics: https://github.com/tobymao/sqlglot/pull/3996 Will update once everything goes through
p
Thanks @Colin Ho for your quick action. Really appreciate. Looking forward to the PR merge.
c
Hey @Phil Chen! Looks like they just cut a release, and you can upgrade with
pip install sqlglot==25.19.0
, this should allow Daft to use count_big now
p
This is great. Thanks you so much @Colin Ho !
Hi @Colin Ho, just got back to run my ETL job (was on vacation) and the arithmetic overflow issue is not longer happening. Thanks again for quick fix!
☺️ 1
c
Thats awesome, great to hear!