from torch.utils.data import Dataset, IterableDataset, DataLoader
import pandas as pd
import torch
import pandas as pd
import numpy as np
import time
import boto3
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from torch.utils.data import IterableDataset
import polars
def count_partition_folders(s3_bucket, s3_prefix):
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')
folders = set()
for result in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix, Delimiter='/'):
if 'CommonPrefixes' in result:
for prefix in result['CommonPrefixes']:
folders.add(prefix['Prefix'])
return len(folders)
class PowConDataSetIterable(IterableDataset):
def __init__(self, s3_path, seq_len=360, batch_size=64):
self.s3_path = s3_path
self.seq_len = seq_len
self.batch_size = batch_size
# Determine the number of partitions by counting the folders in the S3 path
self.num_partitions = count_partition_folders("d-data-lake", "abcd/")
def read_partition(self, partition_idx):
partition_path = f"{self.s3_path}/groupid={partition_idx}/"
# Here you would implement your code to read the parquet files from S3
# and return the data for the given partition
print(partition_path, "start")
df_pd = polars.read_parquet(f"{self.s3_path}/groupid={partition_idx}/*.parquet")
print(partition_path, "ended")
return df_pd
def __iter__(self):
for partition_idx in range(1, self.num_partitions + 1):
df_pd = self.read_partition(partition_idx)
individual_user_data = df_pd.group_by(["userid"])
batch_features = []
batch_labels = []
for userid, group in individual_user_data:
group = group.sort('date_trans')
features = group.select(polars.col("*").exclude("date_trans", "userid","groupid"))
features =
features.to_numpy().astype('float32')
labels = group.select(polars.col("cea")).to_numpy().astype('float32')
num_sequences = len(group) - self.seq_len
for i in range(0, num_sequences):
X_sequence = torch.tensor(features[i:i+self.seq_len])
y_sequence = torch.tensor(labels[i+self.seq_len])
batch_features.append(X_sequence)
batch_labels.append(y_sequence.unsqueeze(0))
if len(batch_features) == self.batch_size:
yield torch.stack(batch_features), torch.stack(batch_labels)
batch_features = []
batch_labels = []
if batch_features:
yield torch.stack(batch_features), torch.stack(batch_labels)
df_pd = None
# Define your PyTorch Lightning data module
class PowConDataModule(pl.LightningDataModule):
def __init__(self, s3_path, seq_len, batch_size, num_workers=1):
super().__init__()
self.s3_path = s3_path
self.seq_len = seq_len
self.batch_size = batch_size
self.num_workers = num_workers
def setup(self, stage=None):
# No setup required as data loading is done in the iterator
pass
def train_dataloader(self):
return DataLoader(PowConDataSetIterable(self.s3_path, self.seq_len, self.batch_size),
batch_size=None, # Batch size is controlled by the IterableDataset
num_workers=self.num_workers, prefetch_factor=2)
#Define your model and other necessary components
class PowConModel(pl.LightningModule):
def __init__(self, n_features, hidden_size, seq_len,
batch_size, num_layers, dropout,
learning_rate, criterion):
super().__init__()
self.n_features = n_features
self.hidden_size = hidden_size
self.seq_len = seq_len
self.batch_size = batch_size
self.num_layers = num_layers
self.dropout = dropout
self.learning_rate = learning_rate
self.criterion = criterion
self.lstm = nn.LSTM(input_size=n_features,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
batch_first=True)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
lstm_out, _ = self.lstm(x) # lstm_out = (batch_size, seq_len, hidden_size)
x = self.fc(lstm_out[:,-1])
return x
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
return optimizer
def training_step(self, train_batch, batch_idx):
x, y = train_batch
y_hat = self.forward(x)
loss = self.criterion(y_hat, y)
#result = pl.TrainResult(minimize=loss)
self.log('train_loss', loss, prog_bar=True)
return loss
def validation_step(self, val_batch, batch_idx):
x, y = val_batch
y_hat = self.forward(x)
loss = self.criterion(y_hat, y)
#result = pl.EvalResult(checkpoint_on=loss)
self.log('val_loss', loss)
return loss
def test_step(self, test_batch, batch_idx):
x, y = test_batch
y_hat = self.forward(x)
loss = self.criterion(y_hat,y)
#result = pl.EvalResult()
self.log('test_loss', loss)
return loss
# Define your parameters
p = {
'seq_len': 360,
'batch_size': 64,
'criterion': nn.MSELoss(),
'max_epochs': 5,
'n_features': 132,
'hidden_size': 10,
'num_layers': 1,
'dropout': 0.2,
'learning_rate': 0.01,
'gpus': 1,
's3_path': "
s3://abcd",
'n_workers':1
}
# Set random seed for reproducibility
seed_everything(42)
# Create your data module and model
data_module = PowConDataModule(p['s3_path'], p['seq_len'], p['batch_size'],p['n_workers'])
model = PowConModel(n_features=p['n_features'], hidden_size=p['hidden_size'], seq_len=p['seq_len'],
batch_size=p['batch_size'], criterion=p['criterion'], num_layers=p['num_layers'],
dropout=p['dropout'], learning_rate=p['learning_rate'])
# Create your trainer and fit your model
trainer = pl.Trainer(max_epochs=p['max_epochs'],accelerator="gpu", devices=1)
trainer.fit(model, data_module)