|
import os |
|
import pandas as pd |
|
from pathlib import Path |
|
import numpy as np |
|
from sklearn.model_selection import train_test_split |
|
|
|
def concatenate_and_split_parquet( |
|
input_dir: str, |
|
output_dir: str, |
|
val_size: int = 10000, |
|
test_size: int = 5000, |
|
random_state: int = 42 |
|
): |
|
""" |
|
Concatenate all parquet files in a directory and split into train/val/test sets. |
|
|
|
Args: |
|
input_dir: Path to directory containing parquet files |
|
output_dir: Path to directory where split files will be saved |
|
val_size: Number of samples for validation set (default: 10000) |
|
test_size: Number of samples for test set (default: 5000) |
|
random_state: Random seed for reproducibility |
|
""" |
|
|
|
|
|
Path(output_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
input_path = Path(input_dir) |
|
parquet_files = list(input_path.glob("*.parquet")) |
|
|
|
if not parquet_files: |
|
raise ValueError(f"No parquet files found in {input_dir}") |
|
|
|
print(f"Found {len(parquet_files)} parquet files") |
|
|
|
|
|
print("Reading and concatenating parquet files...") |
|
dataframes = [] |
|
|
|
for file_path in parquet_files: |
|
print(f"Reading {file_path.name}...") |
|
df = pd.read_parquet(file_path) |
|
dataframes.append(df) |
|
|
|
|
|
combined_df = pd.concat(dataframes, ignore_index=True) |
|
print(f"Combined dataset shape: {combined_df.shape}") |
|
|
|
|
|
total_samples = len(combined_df) |
|
required_samples = val_size + test_size |
|
|
|
if total_samples < required_samples: |
|
raise ValueError( |
|
f"Not enough samples. Required: {required_samples}, Available: {total_samples}" |
|
) |
|
|
|
|
|
combined_df = combined_df.sample(frac=1, random_state=random_state).reset_index(drop=True) |
|
|
|
|
|
print("Splitting data...") |
|
|
|
|
|
temp_df, test_df = train_test_split( |
|
combined_df, |
|
test_size=test_size, |
|
random_state=random_state |
|
) |
|
|
|
|
|
train_df, val_df = train_test_split( |
|
temp_df, |
|
test_size=val_size, |
|
random_state=random_state |
|
) |
|
|
|
print(f"Training set shape: {train_df.shape}") |
|
print(f"Validation set shape: {val_df.shape}") |
|
print(f"Test set shape: {test_df.shape}") |
|
|
|
|
|
output_path = Path(output_dir) |
|
|
|
train_path = output_path / "train.parquet" |
|
val_path = output_path / "validation.parquet" |
|
test_path = output_path / "test.parquet" |
|
|
|
print("Saving split datasets...") |
|
train_df.to_parquet(train_path, index=False) |
|
val_df.to_parquet(val_path, index=False) |
|
test_df.to_parquet(test_path, index=False) |
|
|
|
print(f"Files saved to:") |
|
print(f" Training: {train_path}") |
|
print(f" Validation: {val_path}") |
|
print(f" Test: {test_path}") |
|
|
|
return train_df, val_df, test_df |
|
|
|
|
|
def concatenate_and_split_parquet_arrow( |
|
input_dir: str, |
|
output_dir: str, |
|
val_size: int = 10000, |
|
test_size: int = 5000, |
|
random_state: int = 42 |
|
): |
|
""" |
|
Same functionality as above but using PyArrow for better performance. |
|
""" |
|
import pyarrow as pa |
|
import pyarrow.parquet as pq |
|
|
|
|
|
Path(output_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
input_path = Path(input_dir) |
|
parquet_files = list(input_path.glob("*.parquet")) |
|
|
|
if not parquet_files: |
|
raise ValueError(f"No parquet files found in {input_dir}") |
|
|
|
print(f"Found {len(parquet_files)} parquet files") |
|
|
|
|
|
print("Reading and concatenating parquet files...") |
|
tables = [] |
|
|
|
for file_path in parquet_files: |
|
print(f"Reading {file_path.name}...") |
|
table = pq.read_table(file_path) |
|
tables.append(table) |
|
|
|
|
|
combined_table = pa.concat_tables(tables) |
|
combined_df = combined_table.to_pandas() |
|
|
|
print(f"Combined dataset shape: {combined_df.shape}") |
|
|
|
|
|
total_samples = len(combined_df) |
|
required_samples = val_size + test_size |
|
|
|
if total_samples < required_samples: |
|
raise ValueError( |
|
f"Not enough samples. Required: {required_samples}, Available: {total_samples}" |
|
) |
|
|
|
|
|
combined_df = combined_df.sample(frac=1, random_state=random_state).reset_index(drop=True) |
|
|
|
temp_df, test_df = train_test_split( |
|
combined_df, test_size=test_size, random_state=random_state |
|
) |
|
|
|
train_df, val_df = train_test_split( |
|
temp_df, test_size=val_size, random_state=random_state |
|
) |
|
|
|
print(f"Training set shape: {train_df.shape}") |
|
print(f"Validation set shape: {val_df.shape}") |
|
print(f"Test set shape: {test_df.shape}") |
|
|
|
|
|
output_path = Path(output_dir) |
|
|
|
pq.write_table(pa.Table.from_pandas(train_df), output_path / "train.parquet") |
|
pq.write_table(pa.Table.from_pandas(val_df), output_path / "validation.parquet") |
|
pq.write_table(pa.Table.from_pandas(test_df), output_path / "test.parquet") |
|
|
|
print(f"Files saved to {output_dir}") |
|
|
|
return train_df, val_df, test_df |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
input_directory = "data" |
|
output_directory = "data/polymer_splits" |
|
|
|
|
|
train_df, val_df, test_df = concatenate_and_split_parquet( |
|
input_dir=input_directory, |
|
output_dir=output_directory, |
|
val_size=10000, |
|
test_size=5000, |
|
random_state=42 |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|