|
import os |
|
import sys |
|
from pathlib import Path |
|
from typing import Dict, Optional |
|
import logging |
|
|
|
|
|
try: |
|
logger = logging.getLogger(__name__) |
|
if not logger.handlers: |
|
|
|
logger.addHandler(logging.StreamHandler()) |
|
logger.setLevel(logging.INFO) |
|
except: |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.addHandler(logging.StreamHandler()) |
|
logger.setLevel(logging.INFO) |
|
|
|
class EnvironmentPathManager: |
|
"""Dynamic path management for different deployment environments""" |
|
|
|
def __init__(self): |
|
self.environment = self._detect_environment() |
|
self.base_paths = self._configure_paths() |
|
self._ensure_directories() |
|
|
|
def _detect_environment(self) -> str: |
|
"""Detect the current deployment environment""" |
|
|
|
if os.environ.get('SPACE_ID') or os.path.exists('/app/app.py') or os.path.exists('/app/streamlit_app.py'): |
|
return 'huggingface_spaces' |
|
|
|
|
|
if os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER'): |
|
return 'docker' |
|
|
|
|
|
if str(Path.cwd()).startswith('/app'): |
|
return 'container' |
|
|
|
|
|
return 'local' |
|
|
|
def _configure_paths(self) -> Dict[str, Path]: |
|
"""Configure paths based on environment""" |
|
if self.environment == 'huggingface_spaces': |
|
|
|
base_dir = Path('/app') |
|
return { |
|
'base': base_dir, |
|
'data': base_dir / 'data', |
|
'model': base_dir / 'model', |
|
'logs': base_dir / 'logs', |
|
'cache': base_dir / 'cache', |
|
'temp': base_dir / 'temp' |
|
} |
|
|
|
elif self.environment in ['docker', 'container']: |
|
|
|
base_dir = Path('/app') |
|
return { |
|
'base': base_dir, |
|
'data': base_dir / 'data', |
|
'model': base_dir / 'model', |
|
'logs': base_dir / 'logs', |
|
'cache': Path('/tmp/cache'), |
|
'temp': Path('/tmp/temp') |
|
} |
|
|
|
else: |
|
|
|
|
|
current_file = Path(__file__).resolve() |
|
project_root = current_file.parent |
|
|
|
|
|
while project_root.parent != project_root: |
|
if (project_root / 'requirements.txt').exists(): |
|
break |
|
project_root = project_root.parent |
|
|
|
return { |
|
'base': project_root, |
|
'data': project_root / 'data', |
|
'model': project_root / 'model', |
|
'logs': project_root / 'logs', |
|
'cache': project_root / 'cache', |
|
'temp': project_root / 'temp' |
|
} |
|
|
|
def _ensure_directories(self): |
|
"""Ensure all necessary directories exist with proper error handling""" |
|
for path_name, path in self.base_paths.items(): |
|
try: |
|
path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
test_file = path / '.write_test' |
|
try: |
|
test_file.touch() |
|
test_file.unlink() |
|
|
|
if self.environment == 'huggingface_spaces': |
|
print(f"Directory {path} created with write access") |
|
except (PermissionError, OSError): |
|
print(f"Directory exists but no write access: {path}") |
|
|
|
except PermissionError: |
|
print(f"Cannot create directory {path}, using fallback") |
|
if path_name in ['cache', 'temp']: |
|
|
|
fallback_path = Path.home() / f'.fake_news_detector/{path_name}' |
|
try: |
|
fallback_path.mkdir(parents=True, exist_ok=True) |
|
self.base_paths[path_name] = fallback_path |
|
print(f"Using fallback directory: {fallback_path}") |
|
except Exception as e: |
|
print(f"Fallback directory creation failed: {e}") |
|
elif path_name == 'logs': |
|
|
|
try: |
|
import tempfile |
|
temp_logs = Path(tempfile.mkdtemp(prefix='logs_')) |
|
self.base_paths[path_name] = temp_logs |
|
print(f"Using temporary logs directory: {temp_logs}") |
|
except Exception as e: |
|
print(f"Cannot create temporary logs directory: {e}") |
|
except Exception as e: |
|
print(f"Failed to create directory {path}: {e}") |
|
|
|
def get_data_path(self, filename: str = '') -> Path: |
|
"""Get data directory path""" |
|
return self.base_paths['data'] / filename if filename else self.base_paths['data'] |
|
|
|
def get_model_path(self, filename: str = '') -> Path: |
|
"""Get model directory path""" |
|
return self.base_paths['model'] / filename if filename else self.base_paths['model'] |
|
|
|
def get_logs_path(self, filename: str = '') -> Path: |
|
"""Get logs directory path""" |
|
return self.base_paths['logs'] / filename if filename else self.base_paths['logs'] |
|
|
|
def get_cache_path(self, filename: str = '') -> Path: |
|
"""Get cache directory path""" |
|
return self.base_paths['cache'] / filename if filename else self.base_paths['cache'] |
|
|
|
def get_temp_path(self, filename: str = '') -> Path: |
|
"""Get temporary directory path""" |
|
return self.base_paths['temp'] / filename if filename else self.base_paths['temp'] |
|
|
|
def get_activity_log_path(self) -> Path: |
|
"""Get activity log file path""" |
|
return self.get_logs_path('activity_log.json') |
|
|
|
def get_metadata_path(self) -> Path: |
|
"""Get model metadata file path""" |
|
return self.get_model_path('metadata.json') |
|
|
|
def get_combined_dataset_path(self) -> Path: |
|
"""Get combined dataset path""" |
|
return self.get_data_path('combined_dataset.csv') |
|
|
|
def get_scraped_data_path(self) -> Path: |
|
"""Get scraped data path""" |
|
return self.get_data_path('scraped_real.csv') |
|
|
|
def get_generated_data_path(self) -> Path: |
|
"""Get generated fake data path""" |
|
return self.get_data_path('generated_fake.csv') |
|
|
|
def get_model_file_path(self) -> Path: |
|
"""Get main model file path""" |
|
return self.get_model_path('model.pkl') |
|
|
|
def get_vectorizer_path(self) -> Path: |
|
"""Get vectorizer file path""" |
|
return self.get_model_path('vectorizer.pkl') |
|
|
|
def get_pipeline_path(self) -> Path: |
|
"""Get pipeline file path""" |
|
return self.get_model_path('pipeline.pkl') |
|
|
|
def get_candidate_model_path(self) -> Path: |
|
"""Get candidate model file path""" |
|
return self.get_model_path('model_candidate.pkl') |
|
|
|
def get_candidate_vectorizer_path(self) -> Path: |
|
"""Get candidate vectorizer file path""" |
|
return self.get_model_path('vectorizer_candidate.pkl') |
|
|
|
def get_candidate_pipeline_path(self) -> Path: |
|
"""Get candidate pipeline file path""" |
|
return self.get_model_path('pipeline_candidate.pkl') |
|
|
|
def list_available_datasets(self) -> Dict[str, bool]: |
|
"""List available datasets and their existence status""" |
|
datasets = { |
|
'combined_dataset.csv': self.get_combined_dataset_path().exists(), |
|
'scraped_real.csv': self.get_scraped_data_path().exists(), |
|
'generated_fake.csv': self.get_generated_data_path().exists(), |
|
'kaggle/Fake.csv': (self.get_data_path() / 'kaggle' / 'Fake.csv').exists(), |
|
'kaggle/True.csv': (self.get_data_path() / 'kaggle' / 'True.csv').exists(), |
|
} |
|
return datasets |
|
|
|
def list_available_models(self) -> Dict[str, bool]: |
|
"""List available models and their existence status""" |
|
models = { |
|
'model.pkl': self.get_model_file_path().exists(), |
|
'vectorizer.pkl': self.get_vectorizer_path().exists(), |
|
'pipeline.pkl': self.get_pipeline_path().exists(), |
|
'model_candidate.pkl': self.get_candidate_model_path().exists(), |
|
'vectorizer_candidate.pkl': self.get_candidate_vectorizer_path().exists(), |
|
'pipeline_candidate.pkl': self.get_candidate_pipeline_path().exists(), |
|
'metadata.json': self.get_metadata_path().exists() |
|
} |
|
return models |
|
|
|
def get_environment_info(self) -> Dict: |
|
"""Get comprehensive environment information""" |
|
return { |
|
'environment': self.environment, |
|
'base_dir': str(self.base_paths['base']), |
|
'data_dir': str(self.base_paths['data']), |
|
'model_dir': str(self.base_paths['model']), |
|
'logs_dir': str(self.base_paths['logs']), |
|
'available_datasets': self.list_available_datasets(), |
|
'available_models': self.list_available_models(), |
|
'current_working_directory': str(Path.cwd()), |
|
'python_path': sys.path[0], |
|
'space_id': os.environ.get('SPACE_ID', 'Not HF Spaces'), |
|
'docker_env': os.path.exists('/.dockerenv') |
|
} |
|
|
|
def log_environment_info(self): |
|
"""Log detailed environment information""" |
|
info = self.get_environment_info() |
|
logger.info(f"π Environment: {info['environment']}") |
|
logger.info(f"π Base directory: {info['base_dir']}") |
|
logger.info(f"π Data directory: {info['data_dir']}") |
|
logger.info(f"π€ Model directory: {info['model_dir']}") |
|
logger.info(f"π Logs directory: {info['logs_dir']}") |
|
|
|
|
|
datasets = info['available_datasets'] |
|
models = info['available_models'] |
|
|
|
logger.info(f"π Available datasets: {sum(datasets.values())}/{len(datasets)}") |
|
for name, exists in datasets.items(): |
|
status = "β
" if exists else "β" |
|
logger.info(f" {status} {name}") |
|
|
|
logger.info(f"π― Available models: {sum(models.values())}/{len(models)}") |
|
for name, exists in models.items(): |
|
status = "β
" if exists else "β" |
|
logger.info(f" {status} {name}") |
|
|
|
|
|
path_manager = EnvironmentPathManager() |
|
|
|
|
|
def get_data_path(filename: str = '') -> Path: |
|
return path_manager.get_data_path(filename) |
|
|
|
def get_model_path(filename: str = '') -> Path: |
|
return path_manager.get_model_path(filename) |
|
|
|
def get_logs_path(filename: str = '') -> Path: |
|
return path_manager.get_logs_path(filename) |
|
|
|
def get_environment_info() -> Dict: |
|
return path_manager.get_environment_info() |
|
|
|
def log_environment_info(): |
|
path_manager.log_environment_info() |
|
|
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig(level=logging.INFO) |
|
log_environment_info() |