Hi team, unable to find any solution around it thr...
# daft-dev
a
Hi team, unable to find any solution around it through polars Any idea if I can solve this through Daft. (HAVE ATTACHED CODE ALSO IN COMMENT) I have a usecase and what to understand how can we solve it, can you plz guide me in that. Dataset contains 5 Billion rows and stored in parquet format with paritionby on groupid column. So inside a main folder of s3 it created around 460 subfolders corresponding to each groupid. Each folder have around 8 Million rows which can be loaded on machine. I want to train pytorch lightning model with below cases. Case 1:- I have one GPU Machine and one CPU Machine. I want to load the data of each groupid folder in CPU machine which is around 7M rows (since i cannot load whole data of 5B rows) and then train that data on my GPU. Now while my training is going on GPU i want to load the second groupid data (subfolder) into CPU machine such that once the first groupid data is completed, immediately this second groupid data will given to GPU for training so that my GPU won't stay idle for long. Case 2:- I have one GPU machine with CPU. Now while training is going on on 1st groupid partiton folder, meanwhile the second groupid partiton folder should be loaded. I have enough CPU memory to load both the partitino folder data. Case 3: Same as case 1 but now have multiple GPU machine and multilple CPU machine. My main reason to use DAFT<>RAY is to avoid my GPU getting IDLE and want to fetch the partition folder data while training is going on. Thanks 😀
from torch.utils.data import Dataset, IterableDataset, DataLoader import pandas as pd import torch import pandas as pd import numpy as np import time import boto3 import torch import torch.nn as nn from torch.utils.data import DataLoader, Dataset import pytorch_lightning as pl from pytorch_lightning import Trainer, seed_everything from torch.utils.data import IterableDataset import polars def count_partition_folders(s3_bucket, s3_prefix): s3 = boto3.client('s3') paginator = s3.get_paginator('list_objects_v2') folders = set() for result in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix, Delimiter='/'): if 'CommonPrefixes' in result: for prefix in result['CommonPrefixes']: folders.add(prefix['Prefix']) return len(folders) class PowConDataSetIterable(IterableDataset): def __init__(self, s3_path, seq_len=360, batch_size=64): self.s3_path = s3_path self.seq_len = seq_len self.batch_size = batch_size # Determine the number of partitions by counting the folders in the S3 path self.num_partitions = count_partition_folders("d-data-lake", "abcd/") def read_partition(self, partition_idx): partition_path = f"{self.s3_path}/groupid={partition_idx}/" # Here you would implement your code to read the parquet files from S3 # and return the data for the given partition print(partition_path, "start") df_pd = polars.read_parquet(f"{self.s3_path}/groupid={partition_idx}/*.parquet") print(partition_path, "ended") return df_pd def __iter__(self): for partition_idx in range(1, self.num_partitions + 1): df_pd = self.read_partition(partition_idx) individual_user_data = df_pd.group_by(["userid"]) batch_features = [] batch_labels = [] for userid, group in individual_user_data: group = group.sort('date_trans') features = group.select(polars.col("*").exclude("date_trans", "userid","groupid")) features = features.to_numpy().astype('float32') labels = group.select(polars.col("cea")).to_numpy().astype('float32') num_sequences = len(group) - self.seq_len for i in range(0, num_sequences): X_sequence = torch.tensor(features[i:i+self.seq_len]) y_sequence = torch.tensor(labels[i+self.seq_len]) batch_features.append(X_sequence) batch_labels.append(y_sequence.unsqueeze(0)) if len(batch_features) == self.batch_size: yield torch.stack(batch_features), torch.stack(batch_labels) batch_features = [] batch_labels = [] if batch_features: yield torch.stack(batch_features), torch.stack(batch_labels) df_pd = None # Define your PyTorch Lightning data module class PowConDataModule(pl.LightningDataModule): def __init__(self, s3_path, seq_len, batch_size, num_workers=1): super().__init__() self.s3_path = s3_path self.seq_len = seq_len self.batch_size = batch_size self.num_workers = num_workers def setup(self, stage=None): # No setup required as data loading is done in the iterator pass def train_dataloader(self): return DataLoader(PowConDataSetIterable(self.s3_path, self.seq_len, self.batch_size), batch_size=None, # Batch size is controlled by the IterableDataset num_workers=self.num_workers, prefetch_factor=2) #Define your model and other necessary components class PowConModel(pl.LightningModule): def __init__(self, n_features, hidden_size, seq_len, batch_size, num_layers, dropout, learning_rate, criterion): super().__init__() self.n_features = n_features self.hidden_size = hidden_size self.seq_len = seq_len self.batch_size = batch_size self.num_layers = num_layers self.dropout = dropout self.learning_rate = learning_rate self.criterion = criterion self.lstm = nn.LSTM(input_size=n_features, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) self.fc = nn.Linear(hidden_size, 1) def forward(self, x): lstm_out, _ = self.lstm(x) # lstm_out = (batch_size, seq_len, hidden_size) x = self.fc(lstm_out[:,-1]) return x def configure_optimizers(self): optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) return optimizer def training_step(self, train_batch, batch_idx): x, y = train_batch y_hat = self.forward(x) loss = self.criterion(y_hat, y) #result = pl.TrainResult(minimize=loss) self.log('train_loss', loss, prog_bar=True) return loss def validation_step(self, val_batch, batch_idx): x, y = val_batch y_hat = self.forward(x) loss = self.criterion(y_hat, y) #result = pl.EvalResult(checkpoint_on=loss) self.log('val_loss', loss) return loss def test_step(self, test_batch, batch_idx): x, y = test_batch y_hat = self.forward(x) loss = self.criterion(y_hat,y) #result = pl.EvalResult() self.log('test_loss', loss) return loss # Define your parameters p = { 'seq_len': 360, 'batch_size': 64, 'criterion': nn.MSELoss(), 'max_epochs': 5, 'n_features': 132, 'hidden_size': 10, 'num_layers': 1, 'dropout': 0.2, 'learning_rate': 0.01, 'gpus': 1, 's3_path': "s3://abcd", 'n_workers':1 } # Set random seed for reproducibility seed_everything(42) # Create your data module and model data_module = PowConDataModule(p['s3_path'], p['seq_len'], p['batch_size'],p['n_workers']) model = PowConModel(n_features=p['n_features'], hidden_size=p['hidden_size'], seq_len=p['seq_len'], batch_size=p['batch_size'], criterion=p['criterion'], num_layers=p['num_layers'], dropout=p['dropout'], learning_rate=p['learning_rate']) # Create your trainer and fit your model trainer = pl.Trainer(max_epochs=p['max_epochs'],accelerator="gpu", devices=1) trainer.fit(model, data_module)
for partition_idx in range(1, self.num_partitions + 1):
Here
partition_idx
is called in serial fashion, In my usecase I want that while training is going on next
partition_idx
data will be ready so that my GPU won't remain idle
j
cc @Clark Zinzow @Sammy Sidhu for any thoughts!
We just had a discussion in the team @Akshat Suwalka! Here are our thoughts: Daft current exposes a
df.iter_partitions()
API which can be used like so:
Copy code
df = daft.read_parquet(...)
for partition in df.iter_partitions():
    # partition is a Ray object reference if running in Ray mode
    my_training_function.remote(partition)
You can then define a Ray remote function that takes as input a Daft micropartition, and performs training:
Copy code
@ray.remote
def my_training_function(data: daft.table.MicroPartition):
    # Convert Daft MicroPartiton to PyArrow table
    arrow_table = data.to_arrow()
    
    # Run your pytorch lightning code here, just fitting
    # on a normal PyArrow table
    ...
    trainer.fit(model, arrow_table)
• Case 1: We are adding a way to limit the buffer size for
df.iter_partitions()
to 1, which should give you the pipelining effect you’re looking for here! • Case 2: This should be similar to case 1 as well. • Case 3: distributed training across multiple GPU nodes is hard! Our team has a lot of experience with it from Lyft/Tesla/AnyScale though, so we can hop on a call to chat about it if you really do need to go to distributed training 🙂 Here is a PR to enable `df.iter_partitions(buffer_size=1)`: https://github.com/Eventual-Inc/Daft/pull/2265
I’m working on an example notebook, will share when ready 🙂
a
Thank you very much team for your insight and quick response 🙌
Let me know @jay once you are able to setup the notebook Thanks!
j
Yes! Will definitely do
Apologies, I’ve been busy preparing for the Iceberg Summit tomorrow 😛
a
No problem, Please take you time @jay Good luck for the Summit
Jay, eagerly waiting if any leads are there 😀
j
😭 got it, cc @Sammy Sidhu let’s check in again on the iter_partitions PR? I need that to go through before I can make a tutorial
a
😇