import io import os import mimetypes from uuid import uuid4 from typing import BinaryIO, Optional from .config import settings if settings.STORAGE_PROVIDER != "local": import boto3 import botocore s3 = boto3.client( "s3", endpoint_url=settings.S3_ENDPOINT, aws_access_key_id=settings.S3_ACCESS_KEY, aws_secret_access_key=settings.S3_SECRET_KEY, region_name=getattr(settings, "S3_REGION", None), ) else: class DummyS3Client: def __init__(self): pass def get_object(self, **kwargs): raise RuntimeError("S3 client not available in local storage mode") def generate_presigned_url(self, **kwargs): raise RuntimeError("S3 client not available in local storage mode") s3 = DummyS3Client() def _ensure_bucket() -> None: """Ensure bucket exists. Safe to call on every upload.""" if settings.STORAGE_PROVIDER == "local": os.makedirs(settings.STORAGE_DIR, exist_ok=True) return try: s3.head_bucket(Bucket=settings.S3_BUCKET) except botocore.exceptions.ClientError as e: create_kwargs = {"Bucket": settings.S3_BUCKET} region = getattr(settings, "S3_REGION", None) if region and (settings.S3_ENDPOINT is None or "amazonaws.com" in str(settings.S3_ENDPOINT).lower()): create_kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region} s3.create_bucket(**create_kwargs) def get_object_url(key: str, *, expires_in: int = 3600) -> str: """Return browser-usable URL for object.""" if settings.STORAGE_PROVIDER == "local": return f"/uploads/{key}" public_base = getattr(settings, "S3_PUBLIC_URL_BASE", None) if public_base: return f"{public_base.rstrip('/')}/{key}" return generate_presigned_url(key, expires_in=expires_in) def generate_presigned_url(key: str, expires_in: int = 3600) -> str: """Generate presigned URL for GETting object.""" if settings.STORAGE_PROVIDER == "local": return f"/uploads/{key}" return s3.generate_presigned_url( ClientMethod="get_object", Params={"Bucket": settings.S3_BUCKET, "Key": key}, ExpiresIn=expires_in, ) def upload_fileobj( fileobj: BinaryIO, filename: str, *, content_type: Optional[str] = None, cache_control: Optional[str] = "public, max-age=31536000, immutable", ) -> str: """Upload file-like object to configured storage. Returns object key.""" if settings.STORAGE_PROVIDER == "local": return _upload_local(fileobj, filename, content_type) else: return _upload_s3(fileobj, filename, content_type, cache_control) def _upload_local( fileobj: BinaryIO, filename: str, content_type: Optional[str] = None, ) -> str: """Upload to local filesystem""" os.makedirs(settings.STORAGE_DIR, exist_ok=True) safe_name = filename or "upload.bin" key = f"maps/{uuid4()}_{safe_name}" filepath = os.path.join(settings.STORAGE_DIR, key) os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, 'wb') as f: fileobj.seek(0) f.write(fileobj.read()) return key def _upload_s3( fileobj: BinaryIO, filename: str, content_type: Optional[str] = None, cache_control: Optional[str] = "public, max-age=31536000, immutable", ) -> str: """Upload to S3/MinIO""" _ensure_bucket() safe_name = filename or "upload.bin" key = f"maps/{uuid4()}_{safe_name}" ct = content_type or (mimetypes.guess_type(safe_name)[0] or "application/octet-stream") try: fileobj.seek(0) except Exception: pass extra_args = {"ContentType": ct} if cache_control: extra_args["CacheControl"] = cache_control if getattr(settings, "S3_PUBLIC_READ", False): extra_args["ACL"] = "public-read" s3.upload_fileobj(fileobj, settings.S3_BUCKET, key, ExtraArgs=extra_args) return key def upload_bytes( data: bytes, filename: str, *, content_type: Optional[str] = None, cache_control: Optional[str] = "public, max-age=31536000, immutable", ) -> str: """Upload raw bytes. Returns object key.""" buf = io.BytesIO(data) return upload_fileobj(buf, filename, content_type=content_type, cache_control=cache_control) def copy_object( src_key: str, *, new_filename: Optional[str] = None, cache_control: Optional[str] = "public, max-age=31536000, immutable", ) -> str: """Server-side copy within same bucket. Returns new object key.""" if settings.STORAGE_PROVIDER == "local": return _copy_local(src_key, new_filename) else: return _copy_s3(src_key, new_filename, cache_control) def _copy_local(src_key: str, new_filename: Optional[str] = None) -> str: """Copy file locally""" src_path = os.path.join(settings.STORAGE_DIR, src_key) if not os.path.exists(src_path): raise FileNotFoundError(f"Source file not found: {src_key}") tail = new_filename or src_key.split("/")[-1] dest_key = f"maps/{uuid4()}_{tail}" dest_path = os.path.join(settings.STORAGE_DIR, dest_key) os.makedirs(os.path.dirname(dest_path), exist_ok=True) with open(src_path, 'rb') as src, open(dest_path, 'wb') as dest: dest.write(src.read()) return dest_key def _copy_s3(src_key: str, new_filename: Optional[str] = None, cache_control: Optional[str] = None) -> str: """Copy file in S3""" _ensure_bucket() tail = new_filename or src_key.split("/")[-1] dest_key = f"maps/{uuid4()}_{tail}" extra_args = {} if cache_control: extra_args["CacheControl"] = cache_control if getattr(settings, "S3_PUBLIC_READ", False): extra_args["ACL"] = "public-read" s3.copy( {"Bucket": settings.S3_BUCKET, "Key": src_key}, settings.S3_BUCKET, dest_key, ExtraArgs=extra_args or None, ) return dest_key def delete_object(key: str) -> None: """Delete object (best-effort).""" if settings.STORAGE_PROVIDER == "local": _delete_local(key) else: _delete_s3(key) def _delete_local(key: str) -> None: """Delete file locally""" try: file_path = os.path.join(settings.STORAGE_DIR, key) if os.path.exists(file_path): os.remove(file_path) except (OSError, FileNotFoundError): pass def _delete_s3(key: str) -> None: """Delete file in S3""" try: s3.delete_object(Bucket=settings.S3_BUCKET, Key=key) except botocore.exceptions.ClientError: pass