|
|
"""
|
|
|
β‘ Performance Optimization & Dataset Integration β‘
|
|
|
|
|
|
1. Website Loading:
|
|
|
- Telegram and Gmail email sending are commented out to avoid
|
|
|
restrictions on Hugging Face free trial spaces.
|
|
|
- Focus on fast page load and smooth API responses.
|
|
|
- Images are served from optimized folders for faster rendering.
|
|
|
|
|
|
2. Booking Data:
|
|
|
- All booking data is now saved directly to Hugging Face Dataset
|
|
|
using save_output_to_dataset().
|
|
|
- Local JSON file writes are disabled to reduce disk I/O and latency.
|
|
|
- Each booking is stored with a timestamp and full user/product info.
|
|
|
|
|
|
3. API Updates:
|
|
|
- search_image and search_text APIs are fully functional.
|
|
|
- Frontend can use these APIs without delay.
|
|
|
- Optimized static files are served via /optimized/ route.
|
|
|
|
|
|
Overall, these changes improve website performance, reduce dependency on
|
|
|
external services, and securely store all booking data in the cloud dataset.
|
|
|
"""
|
|
|
|
|
|
from huggingface_hub import HfApi
|
|
|
from datetime import datetime
|
|
|
from flask import Flask, request, jsonify, render_template, send_from_directory
|
|
|
from siglip_search_2 import search_image, search_text
|
|
|
import os
|
|
|
import json
|
|
|
import requests
|
|
|
import base64
|
|
|
import io
|
|
|
import mimetypes
|
|
|
import smtplib
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
|
from email.mime.text import MIMEText
|
|
|
from email.mime.image import MIMEImage
|
|
|
from dotenv import load_dotenv
|
|
|
load_dotenv()
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
env_blob = os.environ.get("find_it_or_create", "")
|
|
|
secrets = {}
|
|
|
|
|
|
for line in env_blob.splitlines():
|
|
|
if "=" in line:
|
|
|
k, v = line.split("=", 1)
|
|
|
secrets[k.strip()] = v.strip().strip('"')
|
|
|
|
|
|
|
|
|
def get_secret(key: str, default=None):
|
|
|
return os.environ.get(key) or secrets.get(key, default)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GMAIL_USER = get_secret("GMAIL_USER")
|
|
|
GMAIL_PASSWORD = get_secret("GMAIL_PASSWORD")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bot_token = get_secret("TELEGRAM_BOT_TOKEN")
|
|
|
chat_id = get_secret("TELEGRAM_CHAT_ID")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BOOKING_FILE = "bookings.json"
|
|
|
UPLOAD_FOLDER = "uploads"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
env_blob = os.environ.get("find_it_or_create", "")
|
|
|
secrets = {}
|
|
|
|
|
|
for line in env_blob.splitlines():
|
|
|
if "=" in line:
|
|
|
k, v = line.split("=", 1)
|
|
|
secrets[k.strip()] = v.strip().strip('"')
|
|
|
|
|
|
|
|
|
|
|
|
hf_token = get_secret("HF_TOKEN")
|
|
|
dataset_repo = "mlbhanuprakash/data"
|
|
|
|
|
|
api = HfApi()
|
|
|
|
|
|
def save_output_to_dataset(output):
|
|
|
|
|
|
timestamp = datetime.now().isoformat().replace(":", "-")
|
|
|
filename = f"data_{timestamp}.json"
|
|
|
local_file = f"/tmp/{filename}"
|
|
|
|
|
|
|
|
|
entry = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"output": output
|
|
|
}
|
|
|
|
|
|
|
|
|
with open(local_file, "w") as f:
|
|
|
json.dump(entry, f, indent=2)
|
|
|
|
|
|
|
|
|
api.upload_file(
|
|
|
path_or_fileobj=local_file,
|
|
|
path_in_repo="JSON_data/"+filename,
|
|
|
repo_id=dataset_repo,
|
|
|
token=hf_token,
|
|
|
repo_type="dataset",
|
|
|
commit_message=f"Add new output file {filename}"
|
|
|
)
|
|
|
|
|
|
print(f"β
Output saved as {filename} in Hugging Face Dataset!")
|
|
|
|
|
|
|
|
|
UPLOAD_FOLDER = "/tmp"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""""
|
|
|
def send_email(to_email, subject, body, product_image_str=None):
|
|
|
msg = MIMEMultipart()
|
|
|
msg['From'] = GMAIL_USER
|
|
|
msg['To'] = to_email
|
|
|
msg['Subject'] = subject
|
|
|
|
|
|
# Attach text body
|
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
|
|
# If product image is available, try attaching
|
|
|
if product_image_str:
|
|
|
try:
|
|
|
img_bytes = None
|
|
|
|
|
|
# Case 1: Data URI (base64)
|
|
|
if product_image_str.startswith("data:"):
|
|
|
header, b64data = product_image_str.split(",", 1)
|
|
|
img_bytes = base64.b64decode(b64data)
|
|
|
|
|
|
# Case 2: HTTP(S) URL β download the image
|
|
|
elif product_image_str.startswith("http://") or product_image_str.startswith("https://"):
|
|
|
resp = requests.get(product_image_str, timeout=10)
|
|
|
if resp.status_code == 200:
|
|
|
img_bytes = resp.content
|
|
|
|
|
|
# Case 3: Local file path
|
|
|
elif os.path.exists(product_image_str):
|
|
|
with open(product_image_str, "rb") as f:
|
|
|
img_bytes = f.read()
|
|
|
|
|
|
# If we got image bytes, attach them
|
|
|
if img_bytes:
|
|
|
image = MIMEImage(img_bytes, name="product_image.jpg")
|
|
|
msg.attach(image)
|
|
|
|
|
|
except Exception as e:
|
|
|
app.logger.error(f"Failed to attach product image: {e}")
|
|
|
|
|
|
# Send email via Gmail SMTP
|
|
|
try:
|
|
|
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
|
|
|
server.login(GMAIL_USER, GMAIL_PASSWORD)
|
|
|
server.sendmail(GMAIL_USER, to_email, msg.as_string())
|
|
|
app.logger.info(f"Email sent to {to_email}")
|
|
|
except Exception as e:
|
|
|
app.logger.error(f"Email sending failed: {e}")
|
|
|
return False
|
|
|
return True"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/")
|
|
|
def index():
|
|
|
return render_template("index.html")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/search_image", methods=["POST"])
|
|
|
|
|
|
def search_image_api():
|
|
|
"""Handle image upload and run SigLIP image search"""
|
|
|
if "file" not in request.files:
|
|
|
return jsonify({"error": "No file uploaded"}), 400
|
|
|
|
|
|
file = request.files["file"]
|
|
|
if file.filename == "":
|
|
|
return jsonify({"error": "Empty filename"}), 400
|
|
|
|
|
|
filepath = os.path.join(UPLOAD_FOLDER, file.filename)
|
|
|
file.save(filepath)
|
|
|
app.logger.info(f"Received image: {filepath}")
|
|
|
try:
|
|
|
results = search_image(filepath, topk=1)
|
|
|
app.logger.info(f"Returning {len(results)} image results")
|
|
|
except Exception as e:
|
|
|
app.logger.error(f"{e}")
|
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
return jsonify(results)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/search_text", methods=["POST"])
|
|
|
def search_text_api():
|
|
|
"""Handle text query and run SigLIP text search"""
|
|
|
data = request.get_json()
|
|
|
query = data.get("query") if data else None
|
|
|
|
|
|
if not query:
|
|
|
return jsonify({"error": "No search query provided"}), 400
|
|
|
|
|
|
app.logger.info(f"Received text query: {query}")
|
|
|
try:
|
|
|
results = search_text(query, topk=5)
|
|
|
app.logger.info(f"Returning {len(results)} text results")
|
|
|
except Exception as e:
|
|
|
app.logger.error(f"{e}")
|
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
return jsonify(results)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/save_booking", methods=["POST"])
|
|
|
def save_booking():
|
|
|
data = request.get_json()
|
|
|
if not data:
|
|
|
return jsonify({"error": "No data received"}), 400
|
|
|
|
|
|
|
|
|
product = data.get("productResult", {})
|
|
|
|
|
|
|
|
|
product_image_str = (data.get("productImage")
|
|
|
or product.get("image_url")
|
|
|
or product.get("uploaded_image")
|
|
|
or "")
|
|
|
|
|
|
saved_data = {
|
|
|
"name": data.get("name"),
|
|
|
"email": data.get("email"),
|
|
|
"phone": data.get("phone"),
|
|
|
"location": data.get("location"),
|
|
|
"chest": data.get("chest"),
|
|
|
"waist": data.get("waist"),
|
|
|
"shoulder": data.get("shoulder"),
|
|
|
"sleeve": data.get("sleeve"),
|
|
|
"selected Size": data.get("selectedSize"),
|
|
|
"product Image": product_image_str,
|
|
|
"search Query": (product.get("search_query") or data.get("searchQuery") or ""),
|
|
|
"designer ID/Employee ID ": data.get("designerID"),
|
|
|
"designer Name": data.get("designerName")
|
|
|
}
|
|
|
|
|
|
if os.path.exists(BOOKING_FILE):
|
|
|
with open(BOOKING_FILE, "r", encoding="utf-8") as f:
|
|
|
try:
|
|
|
bookings = json.load(f)
|
|
|
except json.JSONDecodeError:
|
|
|
bookings = []
|
|
|
else:
|
|
|
bookings = []
|
|
|
|
|
|
save_output_to_dataset(saved_data)
|
|
|
"""
|
|
|
bookings.append(saved_data)
|
|
|
# with open(BOOKING_FILE, "w", encoding="utf-8") as f:
|
|
|
# json.dump(bookings, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
print("sssssssssssssssssssssssssss",saved_data)
|
|
|
subject = f"New Task Assigned: {saved_data.get('name')}"
|
|
|
|
|
|
# -----------------------------
|
|
|
# Telegram message & photo sending
|
|
|
# -----------------------------
|
|
|
global bot_token, chat_id
|
|
|
message = (
|
|
|
f"π’ *New Task Assignment* π’\n\n"
|
|
|
f"Hello {saved_data.get('designer Name')},\n\n"
|
|
|
f"Assigned Designer: {saved_data.get('designer Name')} (Employee ID: {saved_data.get('designer ID/Employee ID ')})\n\n"
|
|
|
f"A new customer request has been assigned to you automatically by our system.\n\n"
|
|
|
f"--- π€ Customer Details ---\n"
|
|
|
f"Name: {saved_data.get('name')}\n"
|
|
|
f"Email: {saved_data.get('email')}\n"
|
|
|
f"Phone: {saved_data.get('phone')}\n"
|
|
|
f"Location: {saved_data.get('location')}\n\n"
|
|
|
f"--- π Measurements ---\n"
|
|
|
f"Chest: {saved_data.get('chest')} in\n"
|
|
|
f"Waist: {saved_data.get('waist')} in\n"
|
|
|
f"Shoulder: {saved_data.get('shoulder')} in\n"
|
|
|
f"Sleeve: {saved_data.get('sleeve')} in\n"
|
|
|
f"Selected Size: {saved_data.get('selected Size')}\n\n"
|
|
|
f"--- ποΈ Product Details ---\n"
|
|
|
f"Search Query: {saved_data.get('search Query')}\n\n"
|
|
|
f"--- π Assignment Info ---\n"
|
|
|
f"Task generated automatically by system bot.\n\n"
|
|
|
f"If you have any queries, please reach out to Krish (Team Lead).\n\n"
|
|
|
f"β‘ Please proceed with this task at your earliest convenience.\n\n"
|
|
|
f"Thank you! π"
|
|
|
)
|
|
|
body = message"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
if product_image_str:
|
|
|
try:
|
|
|
# Data URI
|
|
|
if product_image_str.startswith("data:"):
|
|
|
header, b64data = product_image_str.split(",", 1)
|
|
|
img_bytes = base64.b64decode(b64data)
|
|
|
mime = "image/jpeg"
|
|
|
try:
|
|
|
mime = header.split(";")[0].split(":")[1]
|
|
|
except Exception:
|
|
|
pass
|
|
|
ext = mimetypes.guess_extension(mime) or ".jpg"
|
|
|
filename = f"photo{ext}"
|
|
|
bio = io.BytesIO(img_bytes)
|
|
|
bio.name = filename
|
|
|
files = {'photo': (filename, bio, mime)}
|
|
|
resp = requests.post(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto',
|
|
|
data={'chat_id': chat_id, 'caption': f"Product image for {message}"},
|
|
|
files=files,
|
|
|
timeout=30
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
app.logger.error("Telegram sendPhoto (data URI) failed: %s %s", resp.status_code, resp.text)
|
|
|
# HTTP(S) URL
|
|
|
elif product_image_str.startswith("http://") or product_image_str.startswith("https://"):
|
|
|
resp = requests.post(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto',
|
|
|
data={'chat_id': chat_id, 'photo': product_image_str, 'caption': f"Product image for {saved_data.get('name')}"},
|
|
|
timeout=15
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
app.logger.error("Telegram sendPhoto (URL) returned %s: %s", resp.status_code, resp.text)
|
|
|
requests.get(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendMessage',
|
|
|
params={'chat_id': chat_id, 'text': message},
|
|
|
timeout=10
|
|
|
)
|
|
|
# Local file or path
|
|
|
else:
|
|
|
candidates = [
|
|
|
product_image_str,
|
|
|
os.path.join(os.getcwd(), product_image_str),
|
|
|
os.path.join(os.getcwd(), UPLOAD_FOLDER, os.path.basename(product_image_str)),
|
|
|
os.path.join(os.getcwd(), 'images', os.path.basename(product_image_str))
|
|
|
]
|
|
|
found = None
|
|
|
for p in candidates:
|
|
|
if p and os.path.exists(p):
|
|
|
found = p
|
|
|
break
|
|
|
if found:
|
|
|
with open(found, 'rb') as photo_f:
|
|
|
filename = os.path.basename(found)
|
|
|
mime = mimetypes.guess_type(found)[0] or 'application/octet-stream'
|
|
|
files = {'photo': (filename, photo_f, mime)}
|
|
|
resp = requests.post(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto',
|
|
|
data={'chat_id': chat_id, 'caption': f"Product image for {saved_data.get('name')}"},
|
|
|
files=files,
|
|
|
timeout=30
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
app.logger.error("Telegram sendPhoto (local) returned %s: %s", resp.status_code, resp.text)
|
|
|
else:
|
|
|
try:
|
|
|
base = request.host_url.rstrip('/')
|
|
|
photo_url = base + '/' + product_image_str.lstrip('/')
|
|
|
resp = requests.post(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendPhoto',
|
|
|
data={'chat_id': chat_id, 'photo': photo_url, 'caption': f"Product image for {saved_data.get('name')}"},
|
|
|
timeout=15
|
|
|
)
|
|
|
if resp.status_code != 200:
|
|
|
app.logger.error("Telegram sendPhoto (fallback URL) returned %s: %s", resp.status_code, resp.text)
|
|
|
except Exception as e:
|
|
|
app.logger.error("Telegram fallback URL attempt failed: %s", e)
|
|
|
except Exception as e:
|
|
|
app.logger.exception("Telegram photo send failed: %s", e)
|
|
|
else:
|
|
|
requests.get(
|
|
|
f'https://api.telegram.org/bot{bot_token}/sendMessage',
|
|
|
params={'chat_id': chat_id, 'text': message},
|
|
|
timeout=10
|
|
|
)
|
|
|
# Send email
|
|
|
email_sent = send_email("bbhanu410@gmail.com", subject, body, product_image_str)
|
|
|
if not email_sent:
|
|
|
return jsonify({"error": "Booking saved but email failed"}), 500
|
|
|
"""
|
|
|
|
|
|
return jsonify({"message": "Booking successfully saved"})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/optimized/<path:filename>")
|
|
|
def serve_images(filename):
|
|
|
return send_from_directory("images", filename)
|
|
|
|
|
|
@app.route("/uploads/<path:filename>")
|
|
|
def serve_uploads(filename):
|
|
|
return send_from_directory(UPLOAD_FOLDER, filename)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000)), debug=True) |