Rishabh Agarwal Jain
03/27/2024, 9:05 PMIntroduction
ETL is a crucial step in the data preprocessing pipeline, transforming raw data into meaningful features that enhance the performance of machine learning models. When dealing with large datasets, performing feature engineering at scale is essential for efficient data processing and extracting valuable insights.
Components of Feature Engineering:
Feature engineering at scale involves three main types of computations:
• Reads and Writes: Operations for reading data from various sources, such as files, databases, or distributed file systems, and writing the transformed data to the desired destination.
• SQL QueriesSuper Important: Utilizing SQL queries for efficient data manipulation, allowing complex transformations and aggregations for generating informative features.
• Dataframe Operations: Working with tabular data structures, such as dataframes, to efficiently manipulate structured data. This includes operations like filtering, grouping, merging, and other transformations, enabling feature engineering at scale.
Requirements
Core Capabilities
• Read and write data from different sources in parquet format, including: Iceberg, Delta, Redshift
• Support for executing SQL queries for data manipulation.
• Support for dataframe operations, including:
◦ Statistical computations.
◦ SQL operations such as joins, group by, map, and sort.
◦ Calculating correlations and covariance among columns.
◦ Custom UDFs like map_with_pandas
• Fault tolerance to handle failures(include node failures) and ensure robustness.
• GPU support for accelerating computations on compatible hardware.
• Integration with external catalogues such as Glue to access and manage metadata.
• Ability to operate on larger-than-memory datasets by employing techniques like:
◦ Pipelining of operations for efficient processing.
◦ Disk spilling to manage data that exceeds available memory.
Usability
• Provide a familiar API for users familiar with pyspark, enabling a seamless transition.
• Support remote execution, allowing connection to a remote cluster from any location.
• Offer full autoscaling support, eliminating the need for users to specify the number or size of workers manually.
• Enable conversion to Ray dataframe and integration with other libraries such as Dask and Modin.
• Ensure compatibility with notebooks, including features like type hints, schema inference, and autocomplete.
Extensibility
• Provide a Python API that allows users to add new features and customisations to the framework.
• Built on top of Ray, leveraging its integrations with other libraries like TensorFlow and PyTorch.
• Designed with extensibility in mind, allowing for future extensions to support streaming use cases utilising core streaming primitives.
Performance
• Operate in distributed mode, efficiently utilising full resources across a cluster.
• Implement lazy execution, deferring computation until an action is called, optimising resource utilisation.
• Utilise push-based execution to enable parallel execution of tasks, improving overall performance.
• Implement pipelining of operations to optimise data processing workflows.
• Incorporate adaptive query execution, dynamically optimising query plans for the next retry in case of failure, ensuring efficient execution even in the presence of errors.
Debuggability
• Provide Python-friendly error messages to facilitate easier debugging and troubleshooting.
• Generate explainable query plans that help users understand how the data processing steps are executed.
• Offer actionable errors that guide users on resolving issues encountered during feature engineering tasks.Kiril Aleksovski
04/04/2024, 8:53 AMKiril Aleksovski
04/04/2024, 10:19 AMavril
04/04/2024, 3:20 PMdf["dogs"] = [Dog("ruffles"), Dog("waffles"), Dog("doofus")]
to add col dogs
to df
.
• AFAICT df.where
only accepts Expressions
and so is not suitable for this (?)
• the pandas syntax of direct column assignment is not supported (probably for good reason, I've always found it messy/risky) 😅
• so the 'easiest' way I came up with was to a join
- this works but seems a bit overkill in terms of complexity
Am I overlooking something here or is this the recommended approach atm?Kiril Aleksovski
04/04/2024, 8:51 PMSlackbot
04/09/2024, 8:54 PMjay
04/09/2024, 10:21 PMdaft.read_delta_lake
support.
Really excited that we now have support for Iceberg, Delta Lake and Hudi!Tony Wang
04/10/2024, 10:33 PMjay
04/17/2024, 3:42 AMS3Config.from_env
to automatically discover credentials from the environment (cc @Tony Wang!)
Really exciting community contributions from @MeepoWin, @murex971, @pangwu 👏 👏 👏Akshay Verma
04/22/2024, 10:43 AMjay
04/24/2024, 7:31 PMJoshua Pedrick
04/26/2024, 7:18 PMtimestamps = pl.datetime_range(
start=date.date + time_range[0],
end=date.date + time_range[1],
interval=interval,
).alias("datetime")
Joshua Pedrick
04/26/2024, 7:19 PMjay
04/26/2024, 8:25 PM.datetime_range
in Daft (seems like it converts 2 timestamp
columns into a list[timestamp]
column according to the provided interval)
However, this is actually pretty easy to do via a UDF as an escape hatch:
import daft
import polars as pl
import datetime
@daft.udf(return_dtype=daft.DataType.list(daft.DataType.timestamp("us")))
def my_datetime_range(start: daft.Series, end: daft.Series, interval: str) -> pl.Series:
polars_df = pl.DataFrame({"start": pl.Series(start.to_arrow()), "end": pl.Series(end.to_arrow())})
polars_results = polars_df.select(range=pl.datetime_ranges(start="start", end="end", interval=interval))
return daft.Series.from_arrow(polars_results["range"].to_arrow())
df = daft.from_pydict({"start": [datetime.date(2023, 1, 1), datetime.date(2023, 1, 2)], "end": [datetime.date(2023, 1, 5),
datetime.date(2023, 1, 7)]})
df = df.with_column("range", my_datetime_range(df["start"], df["end"], "1d"))
df.show()
We can make this a little cleaner too by supporting easier conversions between Daft Series
and Polars Series
. Let me know if that’s something you might want!Kiril Aleksovski
04/28/2024, 9:33 PMdaft_df = daft.read_parquet(trip_data).sort(col("pickup_datetime"))
daft_df.write_parquet(output + "tripdata_sort_daft.parquet")
Trip data parquet file in question:
https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2023-01.parquet
num_columns: 24
num_rows: 18479031
num_row_groups: 9
format_version: 2.6
serialized_size: 34549
Here is a plot with other libraries timings:Sammy Sidhu
05/02/2024, 1:15 AMjay
05/02/2024, 6:32 PM.sqrt()
, .log*()
⚡ Add a .with_columns
API to the dataframe, and optimizing .with_column
calls (cc @Peter) #2167
🔍 Read from the Hudi table format (thanks to the folks from the OneHouse team!! cc @Dipankar Mazumdar)
Big thanks to all of our contributors 🙏 and for everyone else with the feedback in #daft-dev. Keep them coming!jay
05/02/2024, 8:05 PMSammy Sidhu
05/02/2024, 8:39 PMjay
05/02/2024, 8:40 PMjay
05/02/2024, 8:50 PMjay
05/02/2024, 10:23 PMjay
05/03/2024, 6:47 AMSammy Sidhu
05/03/2024, 7:00 AMjay
05/03/2024, 7:01 AMJake Waller
05/07/2024, 9:49 AMnp.ndarray
)
2. PyArrow Arrays (pa.Array
)
3. Python lists (list
)
However the actual code (below) only seems to work for Series, lists and numpy arrays - not native pyarrow arrays (I found this out when attempting to return a pyarrow large string array) https://github.com/Eventual-Inc/Daft/blob/3e9dcd45945c75e3c8c3a30d721fc0d2727eb8b7/daft/udf.py#L108
# Post-processing of results into a Series of the appropriate dtype
if isinstance(result, Series):
return result.rename(name).cast(self.udf.return_dtype)._series
elif isinstance(result, list):
if self.udf.return_dtype == DataType.python():
return Series.from_pylist(result, name=name, pyobj="force")._series
else:
return Series.from_pylist(result, name=name, pyobj="allow").cast(self.udf.return_dtype)._series
elif _NUMPY_AVAILABLE and isinstance(result, np.ndarray):
return Series.from_numpy(result, name=name).cast(self.udf.return_dtype)._series
else:
raise NotImplementedError(f"Return type not supported for UDF: {type(result)}")
Nicolay Gerold
05/08/2024, 3:37 AMJake Waller
05/08/2024, 8:58 AMjay
05/15/2024, 6:20 PMRegister Now
button.
See y’all there!jay
05/16/2024, 7:52 PM