|
""" |
|
File handling utilities for different input types. |
|
""" |
|
|
|
import os |
|
import tempfile |
|
import logging |
|
from typing import Optional, Union, Tuple |
|
from pathlib import Path |
|
|
|
from ..config.settings import ( |
|
SUPPORTED_IMAGE_FORMATS, |
|
SUPPORTED_AUDIO_FORMATS, |
|
SUPPORTED_VIDEO_FORMATS, |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def validate_file_format(filename: str, supported_formats: list) -> bool: |
|
""" |
|
Validate if a file has a supported format. |
|
|
|
Args: |
|
filename: Name of the file to validate |
|
supported_formats: List of supported file extensions |
|
|
|
Returns: |
|
True if file format is supported, False otherwise |
|
""" |
|
if not filename: |
|
return False |
|
|
|
file_extension = Path(filename).suffix.lower().lstrip(".") |
|
return file_extension in supported_formats |
|
|
|
|
|
def validate_image_file(filename: str) -> bool: |
|
"""Validate if a file is a supported image format.""" |
|
return validate_file_format(filename, SUPPORTED_IMAGE_FORMATS) |
|
|
|
|
|
def validate_audio_file(filename: str) -> bool: |
|
"""Validate if a file is a supported audio format.""" |
|
return validate_file_format(filename, SUPPORTED_AUDIO_FORMATS) |
|
|
|
|
|
def validate_video_file(filename: str) -> bool: |
|
"""Validate if a file is a supported video format.""" |
|
return validate_file_format(filename, SUPPORTED_VIDEO_FORMATS) |
|
|
|
|
|
def get_file_info(file_object) -> dict: |
|
""" |
|
Extract file information from a file object. |
|
|
|
Args: |
|
file_object: File object (e.g., StreamlitUploadedFile) |
|
|
|
Returns: |
|
Dictionary containing file information |
|
""" |
|
try: |
|
if hasattr(file_object, "getvalue"): |
|
file_size = len(file_object.getvalue()) |
|
file_name = getattr(file_object, "name", "Unknown") |
|
else: |
|
file_size = len(file_object) |
|
file_name = "Unknown" |
|
|
|
file_extension = ( |
|
Path(file_name).suffix.lower().lstrip(".") |
|
if file_name != "Unknown" |
|
else "Unknown" |
|
) |
|
|
|
return { |
|
"name": file_name, |
|
"size_bytes": file_size, |
|
"size_kb": file_size / 1024, |
|
"size_mb": file_size / (1024 * 1024), |
|
"extension": file_extension, |
|
"is_valid_image": ( |
|
validate_image_file(file_name) if file_extension != "Unknown" else False |
|
), |
|
"is_valid_audio": ( |
|
validate_audio_file(file_name) if file_extension != "Unknown" else False |
|
), |
|
"is_valid_video": ( |
|
validate_video_file(file_name) if file_extension != "Unknown" else False |
|
), |
|
} |
|
except Exception as e: |
|
logger.error(f"Error getting file info: {str(e)}") |
|
return { |
|
"name": "Unknown", |
|
"size_bytes": 0, |
|
"size_kb": 0, |
|
"size_mb": 0, |
|
"extension": "Unknown", |
|
"is_valid_image": False, |
|
"is_valid_audio": False, |
|
"is_valid_video": False, |
|
} |
|
|
|
|
|
def create_temp_file( |
|
suffix: str = "", prefix: str = "temp_" |
|
) -> Tuple[str, tempfile.NamedTemporaryFile]: |
|
""" |
|
Create a temporary file with proper cleanup handling. |
|
|
|
Args: |
|
suffix: File extension suffix |
|
prefix: File name prefix |
|
|
|
Returns: |
|
Tuple of (file_path, temp_file_object) |
|
""" |
|
temp_file = tempfile.NamedTemporaryFile(suffix=suffix, prefix=prefix, delete=False) |
|
return temp_file.name, temp_file |
|
|
|
|
|
def cleanup_temp_file(file_path: str) -> bool: |
|
""" |
|
Safely cleanup a temporary file. |
|
|
|
Args: |
|
file_path: Path to the temporary file |
|
|
|
Returns: |
|
True if cleanup was successful, False otherwise |
|
""" |
|
try: |
|
if os.path.exists(file_path): |
|
os.unlink(file_path) |
|
return True |
|
return True |
|
except (OSError, PermissionError) as e: |
|
logger.warning(f"Could not delete temporary file {file_path}: {e}") |
|
return False |
|
|
|
|
|
def format_file_size(size_bytes: int) -> str: |
|
""" |
|
Format file size in human-readable format. |
|
|
|
Args: |
|
size_bytes: File size in bytes |
|
|
|
Returns: |
|
Formatted file size string |
|
""" |
|
if size_bytes < 1024: |
|
return f"{size_bytes} B" |
|
elif size_bytes < 1024 * 1024: |
|
return f"{size_bytes / 1024:.1f} KB" |
|
elif size_bytes < 1024 * 1024 * 1024: |
|
return f"{size_bytes / (1024 * 1024):.1f} MB" |
|
else: |
|
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" |
|
|
|
|
|
def safe_file_operation(operation_func, *args, **kwargs): |
|
""" |
|
Safely execute a file operation with proper error handling. |
|
|
|
Args: |
|
operation_func: Function to execute |
|
*args: Positional arguments for the function |
|
**kwargs: Keyword arguments for the function |
|
|
|
Returns: |
|
Result of the operation or None if it fails |
|
""" |
|
try: |
|
return operation_func(*args, **kwargs) |
|
except FileNotFoundError as e: |
|
logger.error(f"File not found: {e}") |
|
return None |
|
except PermissionError as e: |
|
logger.error(f"Permission denied: {e}") |
|
return None |
|
except OSError as e: |
|
logger.error(f"OS error: {e}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Unexpected error in file operation: {e}") |
|
return None |
|
|