Spaces:
Running
Running
import logging | |
from fastapi import FastAPI, UploadFile, File, HTTPException, Form, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse, JSONResponse | |
import base64 | |
import os | |
import time | |
import jwt | |
from pathlib import Path | |
from typing import List | |
import io | |
import razorpay | |
from razorpay.errors import SignatureVerificationError | |
from supabase import create_client, Client | |
from pydantic import BaseModel | |
from typing import Optional | |
from PIL import Image | |
from io import BytesIO | |
import google.generativeai as genai | |
import tempfile | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize FastAPI app | |
app = FastAPI(title="Gemini Image Editing API with Razorpay") | |
# Enable CORS for frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=[ | |
"https://hivili.web.app", | |
"http://localhost:3000", | |
"https://*.lovable.dev", | |
"https://*.sandbox.lovable.dev", | |
], | |
allow_origin_regex=r"https://.*\.lovable\.dev|https://.*\.sandbox\.lovable\.dev", | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# ===== API CONFIGURATION ===== | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyDL5Rilo7ptJpUOZdY6wy8PJYUcVcnDADs") | |
GEMINI_MODEL = "gemini-2.5-flash-image-preview" | |
# Configure Gemini API | |
genai.configure(api_key=GEMINI_API_KEY) | |
# ===== RAZORPAY CONFIGURATION ===== | |
RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID") | |
RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET") | |
razorpay_client = razorpay.Client(auth=(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)) if RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET else None | |
# ===== SUPABASE CONFIGURATION ===== | |
SUPABASE_URL = os.getenv("SUPABASE_URL") | |
SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None | |
# Pydantic models for JSON input validation | |
class CreateOrderRequest(BaseModel): | |
amount: int | |
class VerifyPaymentRequest(BaseModel): | |
razorpay_order_id: str | |
razorpay_payment_id: str | |
razorpay_signature: str | |
user_id: Optional[str] = None | |
class GenerateImageRequest(BaseModel): | |
prompt: str | |
user_id: Optional[str] = None | |
# ===== AUTHENTICATION ===== | |
def generate_jwt_token(): | |
"""Generate JWT token for API authentication""" | |
payload = { | |
"iss": "gemini-image-editor", | |
"exp": int(time.time()) + 1800, # 30 minutes expiration | |
"nbf": int(time.time()) - 5 # Not before 5 seconds ago | |
} | |
return jwt.encode(payload, GEMINI_API_KEY, algorithm="HS256") | |
# ===== IMAGE PROCESSING ===== | |
def prepare_image_base64(image_content: bytes): | |
"""Convert image bytes to base64 without prefix""" | |
try: | |
return base64.b64encode(image_content).decode('utf-8') | |
except Exception as e: | |
logger.error(f"Image processing failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Image processing failed: {str(e)}") | |
def validate_image(image_content: bytes): | |
"""Validate image meets API requirements""" | |
try: | |
size_mb = len(image_content) / (1024 * 1024) | |
if size_mb > 10: | |
raise HTTPException(status_code=400, detail="Image too large (max 10MB)") | |
img = Image.open(BytesIO(image_content)) | |
if img.format.lower() not in ['png', 'jpg', 'jpeg', 'webp']: | |
raise HTTPException(status_code=400, detail="Unsupported image format. Use PNG, JPG, JPEG, or WEBP") | |
return True, "" | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Image validation error: {str(e)}") | |
def generate_gemini_image(images: List[Image.Image], prompt: str): | |
"""Generate or edit image using Gemini API""" | |
try: | |
contents = images + [prompt] | |
response = genai.GenerativeModel(GEMINI_MODEL).generate_content(contents) | |
text_response = "" | |
image_path = None | |
for part in response.candidates[0].content.parts: | |
if part.text: | |
text_response += part.text + "\n" | |
elif hasattr(part, 'inline_data') and part.inline_data: | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: | |
temp_path = tmp.name | |
generated_image = Image.open(BytesIO(part.inline_data.data)) | |
generated_image.save(temp_path) | |
image_path = temp_path | |
logger.info(f"Generated image saved to: {temp_path} with prompt: {prompt}") | |
if image_path: | |
return image_path, "" | |
else: | |
return None, text_response.strip() | |
except Exception as e: | |
logger.error(f"Gemini API error: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Gemini API error: {str(e)}") | |
# ===== RAZORPAY FUNCTIONS ===== | |
def create_razorpay_order(amount: int): | |
"""Create a Razorpay order""" | |
try: | |
if not razorpay_client: | |
raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
if amount <= 0: | |
raise ValueError("Amount must be a positive integer") | |
order_data = { | |
"amount": amount * 100, # Convert INR to paise | |
"currency": "INR", | |
"payment_capture": 1 # Auto-capture payment | |
} | |
order = razorpay_client.order.create(data=order_data) | |
logger.info(f"Razorpay order created successfully: {order['id']}") | |
return order | |
except Exception as e: | |
logger.error(f"Failed to create Razorpay order: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
def verify_payment_signature(order_id: str, payment_id: str, signature: str): | |
"""Verify Razorpay payment signature""" | |
try: | |
if not razorpay_client: | |
raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
params_dict = { | |
"razorpay_order_id": order_id, | |
"razorpay_payment_id": payment_id, | |
"razorpay_signature": signature | |
} | |
razorpay_client.utility.verify_payment_signature(params_dict) | |
logger.info(f"Payment signature verified successfully for order: {order_id}") | |
return True | |
except SignatureVerificationError as e: | |
logger.error(f"Payment signature verification failed: {str(e)}") | |
return False | |
except Exception as e: | |
logger.error(f"Error verifying payment signature: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
# ===== MAIN PROCESSING ===== | |
async def generate_image(images: List[bytes], prompt: str, user_id: Optional[str] = None): | |
"""Handle complete image generation workflow""" | |
# Validate images | |
for img_content in images: | |
if img_content: | |
validate_image(img_content) | |
# Convert bytes to PIL Images | |
pil_images = [] | |
for img_content in images: | |
try: | |
img = Image.open(BytesIO(img_content)) | |
if img.mode == "RGBA": | |
img = img.convert("RGBA") | |
pil_images.append(img) | |
except Exception as e: | |
logger.error(f"Image conversion failed: {str(e)}") | |
raise HTTPException(status_code=400, detail=f"Image conversion failed: {str(e)}") | |
if len(pil_images) < 1: | |
raise HTTPException(status_code=400, detail="At least one image required") | |
# Check user premium status if Supabase is configured | |
if user_id and supabase: | |
try: | |
user_data = supabase.table("users").select("is_premium").eq("user_id", user_id).execute() | |
if not user_data.data or not user_data.data[0].get("is_premium"): | |
raise HTTPException(status_code=403, detail="Premium subscription required") | |
except Exception as e: | |
logger.error(f"Supabase user check failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"User verification failed: {str(e)}") | |
# Generate image using Gemini API | |
image_path, text_response = generate_gemini_image(pil_images, prompt) | |
if image_path: | |
return image_path | |
else: | |
raise HTTPException(status_code=500, detail=f"Image generation failed: {text_response or 'Unknown error'}") | |
# ===== API ENDPOINTS ===== | |
async def generate_image_endpoint( | |
prompt: str = Form(...), | |
images: List[UploadFile] = File(...), | |
user_id: Optional[str] = Form(None) | |
): | |
"""Endpoint to generate or edit an image using Gemini API""" | |
try: | |
if len(images) < 1: | |
raise HTTPException(status_code=400, detail="At least one image required") | |
if len(images) > 4: | |
raise HTTPException(status_code=400, detail="Maximum 4 images allowed") | |
image_contents = [await image.read() for image in images] | |
output_path = await generate_image(image_contents, prompt, user_id) | |
return FileResponse( | |
path=output_path, | |
media_type="image/png", | |
filename=f"gemini_output_{Path(output_path).stem}.png" | |
) | |
except HTTPException as e: | |
raise | |
except Exception as e: | |
logger.error(f"Error in /generate: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def create_order_endpoint( | |
request: Request, | |
amount: Optional[int] = Form(None), | |
body: Optional[CreateOrderRequest] = None | |
): | |
"""Create a Razorpay order (supports form-data and JSON)""" | |
logger.info("Received create order request") | |
try: | |
if not razorpay_client: | |
raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
# Handle JSON body if provided | |
if body and body.amount: | |
amount = body.amount | |
elif not amount: | |
try: | |
json_body = await request.json() | |
amount = json_body.get('amount') | |
except: | |
pass | |
if not amount or amount <= 0: | |
raise HTTPException(status_code=422, detail="Missing or invalid 'amount' parameter") | |
logger.info(f"Creating order with amount: {amount}") | |
order = create_razorpay_order(amount) | |
response_data = { | |
"id": order["id"], | |
"amount": order["amount"], | |
"currency": order["currency"], | |
"key_id": RAZORPAY_KEY_ID | |
} | |
logger.info(f"Order created successfully: {order['id']}") | |
return JSONResponse(content=response_data) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error creating order: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
async def verify_payment_endpoint( | |
request: Request, | |
razorpay_order_id: Optional[str] = Form(None), | |
razorpay_payment_id: Optional[str] = Form(None), | |
razorpay_signature: Optional[str] = Form(None), | |
user_id: Optional[str] = Form(None), | |
body: Optional[VerifyPaymentRequest] = None | |
): | |
"""Verify Razorpay payment signature (supports form-data and JSON)""" | |
logger.info("Received payment verification request") | |
try: | |
# Handle JSON body if provided | |
if body: | |
razorpay_order_id = razorpay_order_id or body.razorpay_order_id | |
razorpay_payment_id = razorpay_payment_id or body.razorpay_payment_id | |
razorpay_signature = razorpay_signature or body.razorpay_signature | |
user_id = user_id or body.user_id | |
else: | |
try: | |
json_body = await request.json() | |
razorpay_order_id = razorpay_order_id or json_body.get('razorpay_order_id') | |
razorpay_payment_id = razorpay_payment_id or json_body.get('razorpay_payment_id') | |
razorpay_signature = razorpay_signature or json_body.get('razorpay_signature') | |
user_id = user_id or json_body.get('user_id') | |
except: | |
pass | |
# Validate required fields | |
if not all([razorpay_order_id, razorpay_payment_id, razorpay_signature]): | |
missing_fields = [] | |
if not razorpay_order_id: missing_fields.append("razorpay_order_id") | |
if not razorpay_payment_id: missing_fields.append("razorpay_payment_id") | |
if not razorpay_signature: missing_fields.append("razorpay_signature") | |
logger.error(f"Missing required fields: {missing_fields}") | |
raise HTTPException( | |
status_code=422, | |
detail=f"Missing required fields: {', '.join(missing_fields)}" | |
) | |
logger.info(f"Verifying payment for order_id: {razorpay_order_id}") | |
is_valid = verify_payment_signature(razorpay_order_id, razorpay_payment_id, razorpay_signature) | |
if is_valid: | |
if user_id and supabase: | |
logger.info(f"Updating Supabase for user_id: {user_id}") | |
try: | |
supabase.table("users").update({"is_premium": True}).eq("user_id", user_id).execute() | |
logger.info(f"Successfully updated premium status for user: {user_id}") | |
except Exception as e: | |
logger.error(f"Failed to update Supabase: {str(e)}") | |
return JSONResponse(content={"success": True, "message": "Payment verified successfully"}) | |
else: | |
logger.warning(f"Payment verification failed for order: {razorpay_order_id}") | |
return JSONResponse(content={"success": False, "message": "Payment verification failed"}, status_code=400) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error verifying payment: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
async def index(): | |
return { | |
"status": "Gemini Image Editing API with Razorpay is running", | |
"endpoints": { | |
"generate": "POST /generate", | |
"create_order": "POST /create-razorpay-order", | |
"verify_payment": "POST /verify-razorpay-payment" | |
} | |
} | |
async def health_check(): | |
return { | |
"status": "healthy", | |
"razorpay_configured": bool(RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET), | |
"supabase_configured": bool(SUPABASE_URL and SUPABASE_KEY), | |
"gemini_configured": bool(GEMINI_API_KEY) | |
} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |