Spaces:
Running
Running
import boto3, os | |
from dotenv import load_dotenv | |
s3 = boto3.client( | |
"s3", | |
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), | |
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
region_name=os.getenv("AWS_REGION"), | |
) | |
bucket = os.getenv("S3_BUCKET") | |
def upload_to_s3(file, key): | |
s3.upload_fileobj(file.file, bucket, key) | |
return f"https://{bucket}.s3.amazonaws.com/{key}" | |
def upload_pdf_bytes(data: bytes, key: str): | |
s3.put_object(Bucket=bucket, Key=key, Body=data, ContentType="application/pdf") | |
return f"https://{bucket}.s3.amazonaws.com/{key}" | |
def delete_s3_key(key: str): | |
if not key: | |
return | |
s3.delete_object(Bucket=bucket, Key=key) | |
def key_from_url(url: str) -> str | None: | |
if not url: | |
return None | |
prefix = f"https://{bucket}.s3.amazonaws.com/" | |
if url.startswith(prefix): | |
return url[len(prefix):] | |
# Try generic split for virtual-hosted–style URLs | |
try: | |
parts = url.split('.s3.amazonaws.com/', 1) | |
if len(parts) == 2: | |
return parts[1] | |
except Exception: | |
pass | |
return None |