When daft starts a local ray cluster. Does it star...
# general
p
When daft starts a local ray cluster. Does it start with dashboard? On which port the dashboard listens on? By default, ray should have dashboard started at port 8265. I am not seeing the port open on my system. In addition, I am getting OOM issue when I am load a large database table with many partitions. Is there any way that I can avoid the issue by limiting partition key range. Can I load a subset of the table by specifying the partition key range?
j
I don’t believe it starts with a dashboard by default You can actually manually call
ray.init(_*include_dashboard=True*_)
first before running Daft! This will spin up Ray first (with a dashboard) and then Daft should actually also automatically pick it up when you do run it
c
Yes, you can limit the partition key range. The easiest way would be to add a
where
clause in your query, i.e.
WHERE partition_key >= lower_bound AND partition_key < upper_bound
p
Great, thanks you Jay and Collin.
I'm pulling large database table migrate our data database to the cloud. Looks like when a ray worker node gets OOM killed due to OOM, it is is retried and eventually the raft job completed. However, I am not sure if there is any guarantee of the output to be duplicates free or no missing data. In other words, are the write process ensure exact once across failure recovery? For this large table, I set partition to be 20, but when I look at the s3 bucket folder, I saw 34 parquet files. I would think we should have exact 20 files and one for each partition?
c
Our writes guarantee that there will be no missing data, however, duplicate files are not guaranteed. i.e. you may have two files with different names but exactly the same data. One way you can check for this is by inspecting the output of the write operation, e.g.
print(df.write_parquet(..))
, which will show you the number of files and the file paths that Daft wrote to. These files are guaranteed to contain the results of your query.
j
Going off that point, some other suggestions: • You can look at the list of final results (
df.write_parquet(…)
produces a dataframe that has the written filepaths) and save that somewhere as a manifest of the complete dataset written • You may also consider using something like DeltaLake or Iceberg to get transactional guarantees around writing data
Ah this is essentially what databricks’ Spark does: 😛 https://kb.databricks.com/jobs/uncommitted-files-causing-data-duplication It writes the final manifest (filepaths committed) as a _committed_timestamp JSON file… And expects future clients to ignore any files that are not on the JSON because this is “duplicated data” 🤣 🤣 🤣 We just discussed it within the team, and will implement something similar. More updates to come soon.
As a workaround for now @Phil Chen, you can just use the final manifest and delete any resultant files that are not in the manifest. That should give you the intended effect!
p
Thanks Jay and Collin. This is very helpful. So, basically, the proposed workaround is list the files in the destination folder and compare the list with df.write_parquet. Delete any extra files that are not in the results list. Do I get this right?
👍 2
Some observations from my test: 1. By reducing ray parallelism, it cut down OOM kill. 2. The above proposed vacuum scheme doesn't seem to work as the wrtie_parquet returns all the files written which include those duplicated files. For example, I have a table partitioned by 20, after OOM kill/recover (automatically by Ray), the total number of files written is 40. This is the list of files returned by write_parquet. Hence we cannot rely on this to detect duplicated files. To detect duplicates, I think we will need to compare the num of partition we passed as argument to read_sql and total number of files written? If the number of files written is more than number of partitions, then we have duplicate data. But this doesn't tell us which files are duplicates and hence don't know how to deduplicate. To avoid OOM kill from happening at all, in addition to tune the ray parallelism, we can also tune the num of partitions. The more the partition, the less likely the OOM. I will try that and see how it goes. In the mean time, any advice on how to deduplicate the data will be appreciated. One thing I can think about is the if we detect data duplication, we will clean up all the files and retry with larger number of partitions. This is a little inefficient, but that is the only work around that I can come up for now.
c
Ok, just to confirm: 1. Are you including the
partition_cols
argument in write_parquet? This will subdivide a partition further based on the given columns. 2. Are you performing any additional operations between read_sql and write_parquet? This could affect the number of partitions. 3. In the list of files returned from write_parquet, do you notice any files with the same uuid prefix, but different suffix? For example:
Copy code
"path"
"c2f241b6-a54e-46bd-8453-fb7fb9aff4eb-1.parquet"
"c2f241b6-a54e-46bd-8453-fb7fb9aff4eb-0.parquet"
"c2f241b6-a54e-46bd-8453-fb7fb9aff4eb-2.parquet"
Notice that these files have the same uuid prefix, but are suffixed differently with '-1' or '-0' or '2'. This can happen if the partition is too large, and Daft will create multiple files per partition (You can actually configure the
parquet_target_filesize
, see: https://getdaft.io/projects/docs/en/stable/api_docs/doc_gen/configuration_functions/daft.set_execution_config.html). Apologies, I only just realised that we have this behavior 😅.
p
I am trying migrate data from SQL server to s3. So I am not doing any transformation. Read the table and write out to s3 is all I am doing. I do set partition_cols and num_partitions based on the table size. For large tables the num_partitions are large but limit to 100 currently. I do see the file name with the same hash name with different "-0", "-1" at the end. So, if I understand currently, the final number of files may not be the same as the num_partitions value passed to read_sql?
c
So, if I understand currently, the final number of files may not be the same as the num_partitions value passed to read_sql?
Yes, this is correct, the number of files can be greater. By default, Daft's target parquet file size is 512mb. write_parquet will split partitions accordingly to match this target size.
p
Thanks Colin for confirming and clarification. This is very helpful. So, basically the list of files returned by writer are all successfully written files. We can still use this a heuristics for performing vacuum for any partially written files due to ray system failure (OOM kill for example)--for remove any duplicated data.
👍 2