|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
import logging |
|
import re |
|
from typing import Optional, Tuple |
|
from sklearn.model_selection import train_test_split |
|
import hashlib |
|
import json |
|
from datetime import datetime |
|
from data.data_validator import DataValidationPipeline |
|
from data.validation_schemas import ValidationLevel, DataSource |
|
from typing import Tuple, Dict |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('/tmp/data_preparation.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class DatasetPreparer: |
|
"""Robust dataset preparation with comprehensive validation and error handling""" |
|
|
|
def __init__(self, base_dir: Path = None): |
|
self.base_dir = base_dir or Path(__file__).resolve().parent |
|
self.setup_paths() |
|
|
|
def setup_paths(self): |
|
"""Setup all necessary paths""" |
|
|
|
self.kaggle_fake = self.base_dir / "kaggle" / "Fake.csv" |
|
self.kaggle_real = self.base_dir / "kaggle" / "True.csv" |
|
self.liar_paths = [ |
|
self.base_dir / "liar" / "train.tsv", |
|
self.base_dir / "liar" / "test.tsv", |
|
self.base_dir / "liar" / "valid.tsv" |
|
] |
|
|
|
|
|
self.output_dir = Path("/tmp/data") |
|
self.output_dir.mkdir(parents=True, exist_ok=True) |
|
self.output_path = self.output_dir / "combined_dataset.csv" |
|
self.metadata_path = self.output_dir / "dataset_metadata.json" |
|
|
|
def validate_text_quality(self, text: str) -> bool: |
|
"""Validate text quality with comprehensive checks""" |
|
if not isinstance(text, str): |
|
return False |
|
|
|
text = text.strip() |
|
|
|
|
|
if len(text) < 10: |
|
return False |
|
|
|
|
|
if not any(c.isalpha() for c in text): |
|
return False |
|
|
|
|
|
if not any(punct in text for punct in '.!?'): |
|
return False |
|
|
|
|
|
words = text.lower().split() |
|
if len(words) > 0: |
|
most_common_word_count = max(words.count(word) for word in set(words)) |
|
if most_common_word_count > len(words) * 0.5: |
|
return False |
|
|
|
|
|
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text) |
|
if special_char_ratio > 0.3: |
|
return False |
|
|
|
return True |
|
|
|
def clean_text(self, text: str) -> str: |
|
"""Clean and normalize text""" |
|
if not isinstance(text, str): |
|
return "" |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'http\S+|www\S+|https\S+', '', text) |
|
|
|
|
|
text = re.sub(r'[!]{2,}', '!', text) |
|
text = re.sub(r'[?]{2,}', '?', text) |
|
text = re.sub(r'[.]{2,}', '...', text) |
|
|
|
|
|
text = ''.join(char for char in text if ord(char) >= 32) |
|
|
|
return text.strip() |
|
|
|
def load_kaggle_dataset(self) -> Optional[pd.DataFrame]: |
|
"""Load and process Kaggle dataset with error handling""" |
|
try: |
|
logger.info("Loading Kaggle dataset...") |
|
|
|
|
|
if not self.kaggle_fake.exists() or not self.kaggle_real.exists(): |
|
logger.warning("Kaggle dataset files not found") |
|
return None |
|
|
|
|
|
df_fake = pd.read_csv(self.kaggle_fake) |
|
df_real = pd.read_csv(self.kaggle_real) |
|
|
|
logger.info(f"Loaded {len(df_fake)} fake and {len(df_real)} real articles from Kaggle") |
|
|
|
|
|
df_fake['label'] = 1 |
|
df_fake['text'] = df_fake['title'].fillna('') + ". " + df_fake['text'].fillna('') |
|
df_fake['source'] = 'kaggle_fake' |
|
|
|
|
|
df_real['label'] = 0 |
|
df_real['text'] = df_real['title'].fillna('') + ". " + df_real['text'].fillna('') |
|
df_real['source'] = 'kaggle_real' |
|
|
|
|
|
df_combined = pd.concat([ |
|
df_fake[['text', 'label', 'source']], |
|
df_real[['text', 'label', 'source']] |
|
], ignore_index=True) |
|
|
|
logger.info(f"Combined Kaggle dataset: {len(df_combined)} samples") |
|
return self.validate_dataset_with_schemas(df_combined, 'kaggle_combined') |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading Kaggle dataset: {e}") |
|
return None |
|
|
|
def load_liar_dataset(self) -> Optional[pd.DataFrame]: |
|
"""Load and process LIAR dataset with robust error handling""" |
|
try: |
|
logger.info("Loading LIAR dataset...") |
|
|
|
liar_dfs = [] |
|
total_processed = 0 |
|
|
|
for path in self.liar_paths: |
|
if not path.exists(): |
|
logger.warning(f"LIAR file not found: {path}") |
|
continue |
|
|
|
try: |
|
|
|
df = pd.read_csv( |
|
path, |
|
sep='\t', |
|
header=None, |
|
quoting=3, |
|
on_bad_lines='skip', |
|
low_memory=False |
|
) |
|
|
|
|
|
expected_columns = [ |
|
'id', 'label_text', 'statement', 'subject', 'speaker', 'job', |
|
'state', 'party', 'barely_true', 'false', 'half_true', |
|
'mostly_true', 'pants_on_fire', 'context' |
|
] |
|
|
|
|
|
if len(df.columns) >= 3: |
|
df.columns = expected_columns[:len(df.columns)] |
|
|
|
|
|
if 'label_text' in df.columns: |
|
df['label'] = df['label_text'].apply( |
|
lambda x: 1 if str(x).lower() in ['false', 'pants-fire', 'barely-true'] else 0 |
|
) |
|
else: |
|
continue |
|
|
|
|
|
if 'statement' in df.columns: |
|
df['text'] = df['statement'].astype(str) |
|
else: |
|
continue |
|
|
|
df['source'] = f'liar_{path.stem}' |
|
|
|
processed_df = df[['text', 'label', 'source']].copy() |
|
liar_dfs.append(processed_df) |
|
total_processed += len(processed_df) |
|
|
|
logger.info(f"Processed {len(processed_df)} samples from {path.name}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing LIAR file {path}: {e}") |
|
continue |
|
|
|
if liar_dfs: |
|
combined_liar = pd.concat(liar_dfs, ignore_index=True) |
|
logger.info(f"Combined LIAR dataset: {len(combined_liar)} samples") |
|
return self.validate_dataset_with_schemas(combined_liar, 'liar_combined') |
|
else: |
|
logger.warning("No LIAR data could be processed") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading LIAR dataset: {e}") |
|
return None |
|
|
|
def validate_dataset(self, df: pd.DataFrame) -> pd.DataFrame: |
|
"""Comprehensive dataset validation and cleaning""" |
|
logger.info("Starting dataset validation...") |
|
|
|
initial_count = len(df) |
|
|
|
|
|
df = df.dropna(subset=['text']) |
|
logger.info(f"Removed {initial_count - len(df)} null text entries") |
|
|
|
|
|
df['text'] = df['text'].apply(self.clean_text) |
|
|
|
|
|
valid_mask = df['text'].apply(self.validate_text_quality) |
|
df = df[valid_mask] |
|
|
|
logger.info(f"Removed {initial_count - valid_mask.sum()} low-quality texts") |
|
|
|
|
|
before_dedup = len(df) |
|
df = df.drop_duplicates(subset=['text']) |
|
logger.info(f"Removed {before_dedup - len(df)} duplicate texts") |
|
|
|
|
|
label_counts = df['label'].value_counts() |
|
logger.info(f"Label distribution: {label_counts.to_dict()}") |
|
|
|
|
|
if len(label_counts) > 1: |
|
balance_ratio = label_counts.min() / label_counts.max() |
|
if balance_ratio < 0.3: |
|
logger.warning(f"Dataset is imbalanced (ratio: {balance_ratio:.2f})") |
|
|
|
|
|
df['text_length'] = df['text'].str.len() |
|
df['word_count'] = df['text'].str.split().str.len() |
|
df['processed_timestamp'] = datetime.now().isoformat() |
|
|
|
return df |
|
|
|
def generate_dataset_metadata(self, df: pd.DataFrame) -> dict: |
|
"""Generate comprehensive dataset metadata""" |
|
metadata = { |
|
'total_samples': len(df), |
|
'label_distribution': df['label'].value_counts().to_dict(), |
|
'source_distribution': df['source'].value_counts().to_dict(), |
|
'text_length_stats': { |
|
'mean': float(df['text_length'].mean()), |
|
'std': float(df['text_length'].std()), |
|
'min': int(df['text_length'].min()), |
|
'max': int(df['text_length'].max()), |
|
'median': float(df['text_length'].median()) |
|
}, |
|
'word_count_stats': { |
|
'mean': float(df['word_count'].mean()), |
|
'std': float(df['word_count'].std()), |
|
'min': int(df['word_count'].min()), |
|
'max': int(df['word_count'].max()), |
|
'median': float(df['word_count'].median()) |
|
}, |
|
'data_hash': hashlib.md5(df['text'].str.cat().encode()).hexdigest(), |
|
'creation_timestamp': datetime.now().isoformat(), |
|
'quality_score': self.calculate_quality_score(df) |
|
} |
|
|
|
return metadata |
|
|
|
def calculate_quality_score(self, df: pd.DataFrame) -> float: |
|
"""Calculate overall dataset quality score""" |
|
scores = [] |
|
|
|
|
|
label_counts = df['label'].value_counts() |
|
if len(label_counts) > 1: |
|
balance_score = label_counts.min() / label_counts.max() |
|
scores.append(balance_score) |
|
|
|
|
|
diversity_score = df['text'].nunique() / len(df) |
|
scores.append(diversity_score) |
|
|
|
|
|
text_lengths = df['text_length'] |
|
length_cv = text_lengths.std() / text_lengths.mean() |
|
length_score = max(0, 1 - length_cv / 2) |
|
scores.append(length_score) |
|
|
|
return float(np.mean(scores)) |
|
|
|
def prepare_datasets(self) -> Tuple[bool, str]: |
|
"""Main method to prepare all datasets with validation""" |
|
logger.info("Starting dataset preparation with validation...") |
|
|
|
try: |
|
|
|
kaggle_result = self.load_kaggle_dataset() |
|
liar_result = self.load_liar_dataset() |
|
|
|
|
|
if kaggle_result is None: |
|
logger.warning("Kaggle dataset loading failed") |
|
kaggle_df, kaggle_validation = pd.DataFrame(), { |
|
'source': 'kaggle_combined', 'original_count': 0, 'valid_count': 0, |
|
'success_rate': 0, 'overall_quality_score': 0, 'validation_timestamp': datetime.now().isoformat() |
|
} |
|
else: |
|
kaggle_df, kaggle_validation = kaggle_result |
|
|
|
if liar_result is None: |
|
logger.warning("LIAR dataset loading failed") |
|
liar_df, liar_validation = pd.DataFrame(), { |
|
'source': 'liar_combined', 'original_count': 0, 'valid_count': 0, |
|
'success_rate': 0, 'overall_quality_score': 0, 'validation_timestamp': datetime.now().isoformat() |
|
} |
|
else: |
|
liar_df, liar_validation = liar_result |
|
|
|
|
|
datasets_to_combine = [df for df in [kaggle_df, liar_df] if not df.empty] |
|
|
|
if not datasets_to_combine: |
|
return False, "No datasets could be loaded and validated" |
|
|
|
combined_df = pd.concat(datasets_to_combine, ignore_index=True) |
|
|
|
|
|
combined_df.to_csv(self.output_path, index=False) |
|
|
|
|
|
total_original = kaggle_validation['original_count'] + liar_validation['original_count'] |
|
validation_report = { |
|
'datasets': { |
|
'kaggle': kaggle_validation, |
|
'liar': liar_validation |
|
}, |
|
'combined_stats': { |
|
'total_articles': len(combined_df), |
|
'total_original': total_original, |
|
'overall_success_rate': len(combined_df) / max(1, total_original), |
|
'validation_timestamp': datetime.now().isoformat() |
|
} |
|
} |
|
|
|
validation_report_path = self.output_dir / "dataset_validation_report.json" |
|
with open(validation_report_path, 'w') as f: |
|
json.dump(validation_report, f, indent=2) |
|
|
|
logger.info(f"Dataset preparation complete. Validation report saved to {validation_report_path}") |
|
return True, f"Successfully prepared {len(combined_df)} validated articles" |
|
|
|
except Exception as e: |
|
logger.error(f"Dataset preparation failed: {e}") |
|
return False, f"Dataset preparation failed: {str(e)}" |
|
|
|
def validate_dataset_with_schemas(self, df: pd.DataFrame, source_name: str) -> Tuple[pd.DataFrame, Dict]: |
|
"""Validate dataset using comprehensive schemas""" |
|
logger.info(f"Starting schema validation for {source_name}...") |
|
|
|
validator = DataValidationPipeline() |
|
|
|
|
|
articles_data = [] |
|
for _, row in df.iterrows(): |
|
article_data = { |
|
'text': str(row.get('text', '')), |
|
'label': int(row.get('label', 0)), |
|
'source': source_name |
|
} |
|
|
|
if 'title' in row and pd.notna(row['title']): |
|
article_data['title'] = str(row['title']) |
|
if 'url' in row and pd.notna(row['url']): |
|
article_data['url'] = str(row['url']) |
|
|
|
articles_data.append(article_data) |
|
|
|
|
|
validation_result = validator.validate_batch( |
|
articles_data, |
|
batch_id=f"{source_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", |
|
validation_level=ValidationLevel.MODERATE |
|
) |
|
|
|
|
|
valid_indices = [i for i, result in enumerate(validation_result.validation_results) if result.is_valid] |
|
|
|
if valid_indices: |
|
valid_df = df.iloc[valid_indices].copy() |
|
quality_scores = [validation_result.validation_results[i].quality_metrics.get('overall_quality_score', 0.0) |
|
for i in valid_indices] |
|
valid_df['validation_quality_score'] = quality_scores |
|
valid_df['validation_timestamp'] = datetime.now().isoformat() |
|
else: |
|
valid_df = pd.DataFrame(columns=df.columns) |
|
|
|
validation_summary = { |
|
'source': source_name, |
|
'original_count': len(df), |
|
'valid_count': len(valid_df), |
|
'success_rate': validation_result.success_rate, |
|
'overall_quality_score': validation_result.overall_quality_score, |
|
'validation_timestamp': datetime.now().isoformat() |
|
} |
|
|
|
return valid_df, validation_summary |
|
|
|
|
|
|
|
def main(): |
|
"""Main execution function""" |
|
preparer = DatasetPreparer() |
|
success, message = preparer.prepare_datasets() |
|
|
|
if success: |
|
print(f"β
{message}") |
|
else: |
|
print(f"β {message}") |
|
exit(1) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|