peace2024's picture
saving changes
992bd88
import boto3, os
from dotenv import load_dotenv
s3 = boto3.client(
"s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION"),
)
bucket = os.getenv("S3_BUCKET")
def upload_to_s3(file, key):
s3.upload_fileobj(file.file, bucket, key)
return f"https://{bucket}.s3.amazonaws.com/{key}"
def upload_pdf_bytes(data: bytes, key: str):
s3.put_object(Bucket=bucket, Key=key, Body=data, ContentType="application/pdf")
return f"https://{bucket}.s3.amazonaws.com/{key}"
def delete_s3_key(key: str):
if not key:
return
s3.delete_object(Bucket=bucket, Key=key)
def key_from_url(url: str) -> str | None:
if not url:
return None
prefix = f"https://{bucket}.s3.amazonaws.com/"
if url.startswith(prefix):
return url[len(prefix):]
# Try generic split for virtual-hosted–style URLs
try:
parts = url.split('.s3.amazonaws.com/', 1)
if len(parts) == 2:
return parts[1]
except Exception:
pass
return None