Hi, I ran into a strange MS SQL Server query runti...
# general
p
Hi, I ran into a strange MS SQL Server query runtime error when I use daft.read_sql to read data from the database and write parquet files in s3. This doesn't seem to happen all the time, so far it happened a couple of times and usually for large tables. I can see some of the parquet files are already written into the s3 before it fails. Not sure if this is some kind of memory issue--I don't see OOM kill for Ray worker. From the error, it looks like an SQLalchmey related and not an daft issue. But I am wondering if anybody encountered this issue before and is there any way daft could do retry for the failed partition (especially if this is a transit issue). This is a little had for application to include retry logic because we don't want to retry entire ETL job. Much appreciated for any suggestions. The stack trace show this failure happened at micropartition.py#80 in _from_scan_task._ _return MicoPartition.___from_pymicropartition(PyMicroPartition.from_scan_task(scan_task))_ _daft.exceptions.DaftCoreExecption: DaftError: External PyIOError: RuntimeError: Failed to execute sql: SELECT * (SELECT * FROM (SELECT * FROM mytable) AS subquery WHERE id >= 14988954.29 AND id < 14988960.2) AS subquery from connecetion: create_connection, error (pyodbc.OperationalError ('08S01', '[08S01] [Microsoft][ODBC Driver 17 for SQL Server]TCP Provider: Error code 0x274C (10060) (SQLGetData)')_ _(Background on this error at: <https://s>_qlalche.me/e/20/e3q8)
The failure in create_connection factory. Hit the connection limit, or memory allocation failure in creating connection?
BTW, does the generated query look correct? Why we have 2 nested subqueries? In addition, id is the index and its type is int. Why the generated query where clause range limit is in float?
Currently, I am passing a factory method to the read_sql method. In the factory method, I am creating a connection url object and use that to create engine object and finally return a connect object by calling connect() on the engine object. I am wonder if any one of those objects could be cached (each ray worker only need to create one engine and one connection for the same connection_url).
Does the read_sql close the connection created by the factory?
c
> BTW, does the generated query look correct? Why we have 2 nested subqueries? Ah, looks like there is an additional subquery, which should be unnecessary. This is something we can fix on our end, but it shouldn't affect the results of the query.
In addition, id is the index and its type is int. Why the generated query where clause range limit is in float?
In order to have partitioned reads, Daft will first calculate percentiles from the partition column via
PERCENTILE_DISC
. These percentiles can be floats. It will then use these percentiles as bounds for each read. Here's a more in depth explanation: https://www.getdaft.io/projects/docs/en/stable/user_guide/integrations/sql.html#parallel-distributed-reads
I am wonder if any one of those objects could be cached (each ray worker only need to create one engine and one connection for the same connection_url).
This will be quite tricky to implement as Daft currently uses Ray tasks, which are stateless. We also can't reuse the same connection across tasks as they are not serializable.
Does the read_sql close the connection created by the factory?
Yes, the connection will be closed once the read is complete
Given that the error you are facing is a transient error that happens usually for large tables, we can implement a configurable retry mechanism that might help. I'll scope it out and get back to you
In the meantime, it may be worth trying retry logic on the connection factory itself! For example:
Copy code
from tenacity import retry, stop_after_attempt, wait_fixed

@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
def get_connection():
    engine = create_engine(connection_url)
    return engine.connect()
You can use https://tenacity.readthedocs.io/en/latest/ to implement the retry logic
p
Thanks @Colin Ho and the answers and suggestions, I will retry get_conneection!
@Colin Ho quick feedback: I am not able to get the retry logic to work. The decorator uses thread local object which is not serializable. I will try code the retry logic the next and hope that will work.
Now I am getting the connection every time. This only happen when I set the ray as runner. For local default runner this works fine. I'm not sure what have changed. I upgraded the sqlglot and getdaft. Not sure what is causing this issue. I was able to run without the issue yesterday--after the upgrade of both sqlglot and getdaft. Any clue what I am not getting this issue 100%?
c
Interesting, given that it works on the local runner and not the ray runner, it could be that your MS SQL Server is not accepting the connection from the ray workers. I found these two links online that describe the error: 1. https://stackoverflow.com/questions/64876580/sql-server-odbc-tcp-provider-error-code-0x274c 2. https://learn.microsoft.com/en-us/troubleshoot/sql/database-engine/connect/network-rel[…]tance-specific-error-occurred-while-establishing-connection maybe these can help
p
It is really strange and I just tried on one of early code that I committed to the git repo and it works fine (with ray runner). So, there is something in my new code that upsets the daft ray runner . Just don't know what exactly is causing the issue. I might have to walk backwards to find the root cause.
Found the issue. The problem is that when I added the connection retry logic, I moved the function call to construct connection string into the connection method. Some how that give me some issue because the function was decorated with functionals cache. Once I moved the connection string call outside of the factory method again, it worked. Now instead of calling the function to construct connection string inside of factory method, the factory method just capture the connection string closure. This solved solved the mystery. So the bottom line is that the connection factory method cannot call any other methods that may decorated with cache.
I need to figure out how to get ConnectorX work, so we don't have to pass a factory method.
👍 1
BTW, ConnectorX + daft works fine on Windows, but doesn't work on Windows. I started a discussion here. Should we create a new issue? I think support Linux platform is very important. The user of MSSQL servers are not always Windows users. https://github.com/sfu-db/connector-x/discussions/684
c
Yeah, I believe they have many ongoing issues regarding MSSQL, see: https://github.com/sfu-db/connector-x/issues/560 as well. It also looks like someone has a draft PR out: https://github.com/sfu-db/connector-x/pull/680, hopefully it get's merged soon!
🙏 1