Spaces:
Sleeping
Sleeping
from flask import Blueprint, render_template, request, session, jsonify, redirect, url_for, Flask | |
from flask_cors import CORS | |
from salesforce import get_salesforce_connection | |
import re | |
# Initialize Salesforce connection | |
sf = get_salesforce_connection() | |
# Create the cart blueprint | |
cart_blueprint = Blueprint('cart', __name__) | |
# Utility function to sanitize SOQL inputs | |
def sanitize_input(value): | |
if value: | |
value = re.sub(r'[;\'"\\]', '', value) | |
return value | |
def cart(): | |
email = session.get('user_email') | |
if not email: | |
return redirect(url_for("cart.login")) | |
try: | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Name, Price__c, Quantity__c, Add_Ons__c, Add_Ons_Price__c, Image1__c, Instructions__c, Category__c, Section__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
subtotal = sum(item['Price__c'] for item in cart_items) | |
return render_template( | |
"cart.html", | |
cart_items=cart_items, | |
subtotal=subtotal, | |
customer_email=email | |
) | |
except Exception as e: | |
print(f"Error fetching cart items: {e}") | |
return render_template("cart.html", cart_items=[], subtotal=0, customer_email=email) | |
def bill(): | |
email = session.get('user_email') | |
if not email: | |
return redirect(url_for("cart.login")) | |
try: | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Name, Price__c, Quantity__c, Add_Ons__c, Add_Ons_Price__c, Image1__c, Instructions__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
if not cart_items: | |
return render_template( | |
"bill.html", | |
cart_items=[], | |
subtotal=0, | |
customer_email=email, | |
error_message="Your cart is empty. Please add items to proceed." | |
) | |
subtotal = sum(item['Price__c'] for item in cart_items) | |
return render_template( | |
"bill.html", | |
cart_items=cart_items, | |
subtotal=subtotal, | |
customer_email=email, | |
error_message=None | |
) | |
except Exception as e: | |
print(f"Error fetching bill items: {e}") | |
return render_template( | |
"bill.html", | |
cart_items=[], | |
subtotal=0, | |
customer_email=email, | |
error_message=f"Error fetching cart items: {str(e)}" | |
), 500 | |
def fetch_cart(): | |
email = session.get('user_email') | |
if not email: | |
return jsonify({"success": False, "error": "User not logged in"}), 401 | |
try: | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Name, Price__c, Quantity__c, Add_Ons__c, Add_Ons_Price__c, Image1__c, Instructions__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
subtotal = sum(item['Price__c'] for item in cart_items) | |
return jsonify({ | |
"success": True, | |
"cart_items": cart_items, | |
"subtotal": subtotal | |
}) | |
except Exception as e: | |
print(f"Error fetching cart items: {e}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def fetch_menu_items(): | |
try: | |
category = request.args.get('category', 'All') | |
section = request.args.get('section', '') | |
category = sanitize_input(category) | |
section = sanitize_input(section) | |
query = """ | |
SELECT Name, Price__c, Image1__c, Veg_NonVeg__c, Section__c | |
FROM Menu_Item__c | |
""" | |
if category != 'All': | |
query += f" AND Veg_NonVeg__c = '{category}'" | |
if section: | |
query += f" AND Section__c = '{section}'" | |
query += " LIMIT 20" | |
result = sf.query(query) | |
menu_items = result.get("records", []) | |
return jsonify({"success": True, "menu_items": menu_items}) | |
except Exception as e: | |
print(f"Error fetching menu items: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def fetch_add_ons(): | |
try: | |
query = """ | |
SELECT Name, Price__c | |
FROM Add_On__c | |
WHERE Active__c = true | |
LIMIT 10 | |
""" | |
result = sf.query(query) | |
add_ons = [ | |
{"name": f"{addon['Name']} (${addon['Price__c']:.2f})", "price": addon['Price__c']} | |
for addon in result.get("records", []) | |
] | |
return jsonify({"success": True, "add_ons": add_ons}) | |
except Exception as e: | |
print(f"Error fetching add-ons: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def add_suggestion_to_cart(): | |
try: | |
data = request.get_json() | |
item_name = sanitize_input(data.get('item_name').strip()) | |
item_price = data.get('item_price') | |
item_image = data.get('item_image') | |
customer_email = sanitize_input(data.get('customer_email')) | |
addons = data.get('addons', []) | |
instructions = sanitize_input(data.get('instructions', "")) | |
quantity = data.get('quantity', 1) | |
try: | |
quantity = int(quantity) | |
if quantity < 1: | |
raise ValueError("Quantity must be at least 1") | |
item_price = float(item_price) | |
except (ValueError, TypeError): | |
return jsonify({"success": False, "error": "Invalid quantity or price"}), 400 | |
if not all([item_name, item_price, customer_email]): | |
return jsonify({"success": False, "error": "Missing required fields"}), 400 | |
addons_price = 0 | |
addons_string = ", ".join(addons) if addons else "None" | |
if addons: | |
addons_price = sum( | |
float(addon.split("($")[1][:-1]) for addon in addons if "($" in addon | |
) | |
query = f""" | |
SELECT Id, Quantity__c, Add_Ons__c, Add_Ons_Price__c, Instructions__c, Base_Price__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{customer_email}' AND Name = '{item_name}' | |
""" | |
result = sf.query(query) | |
cart_items = result.get("records", []) | |
if cart_items: | |
cart_item_id = cart_items[0]['Id'] | |
existing_quantity = cart_items[0]['Quantity__c'] | |
existing_addons = cart_items[0].get('Add_Ons__c', "None") | |
existing_addons_price = cart_items[0].get('Add_Ons_Price__c', 0) | |
existing_instructions = cart_items[0].get('Instructions__c', "") | |
base_price = cart_items[0].get('Base_Price__c', item_price) | |
combined_addons = existing_addons if existing_addons != "None" else "" | |
if addons: | |
combined_addons = f"{combined_addons}, {addons_string}".strip(", ") | |
combined_instructions = existing_instructions | |
if instructions: | |
combined_instructions = f"{combined_instructions} | {instructions}".strip(" | ") | |
new_quantity = existing_quantity + quantity | |
new_addons_price = existing_addons_price + (addons_price * quantity) | |
new_total_price = (base_price * new_quantity) + new_addons_price | |
sf.Cart_Item__c.update(cart_item_id, { | |
"Quantity__c": new_quantity, | |
"Add_Ons__c": combined_addons if combined_addons else "None", | |
"Add_Ons_Price__c": new_addons_price, | |
"Instructions__c": combined_instructions, | |
"Price__c": new_total_price | |
}) | |
else: | |
total_price = (item_price * quantity) + (addons_price * quantity) | |
sf.Cart_Item__c.create({ | |
"Name": item_name, | |
"Price__c": total_price, | |
"Base_Price__c": item_price, | |
"Quantity__c": quantity, | |
"Add_Ons_Price__c": addons_price * quantity, | |
"Add_Ons__c": addons_string, | |
"Image1__c": item_image, | |
"Customer_Email__c": customer_email, | |
"Instructions__c": instructions | |
}) | |
return jsonify({"success": True, "message": "Item added to cart successfully."}) | |
except Exception as e: | |
print(f"Error adding item to cart: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def remove_cart_item(item_name): | |
try: | |
customer_email = session.get('user_email') | |
if not customer_email: | |
return jsonify({'success': False, 'message': 'User email not found.'}), 400 | |
sanitized_email = sanitize_input(customer_email) | |
sanitized_item_name = sanitize_input(item_name) | |
query = f""" | |
SELECT Id FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' AND Name = '{sanitized_item_name}' | |
""" | |
result = sf.query(query) | |
if result['totalSize'] == 0: | |
return jsonify({'success': False, 'message': 'Item not found in cart.'}), 400 | |
cart_item_id = result['records'][0]['Id'] | |
sf.Cart_Item__c.delete(cart_item_id) | |
return jsonify({'success': True, 'message': f"'{item_name}' removed successfully!"}), 200 | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
return jsonify({'success': False, 'message': f"An error occurred: {str(e)}"}), 500 | |
def update_quantity(): | |
data = request.json | |
email = sanitize_input(data.get('email')) | |
item_name = sanitize_input(data.get('item_name')) | |
try: | |
quantity = int(data.get('quantity')) | |
except (ValueError, TypeError): | |
return jsonify({"success": False, "error": "Invalid quantity provided."}), 400 | |
if not email or not item_name or quantity is None: | |
return jsonify({"success": False, "error": "Email, item name, and quantity are required."}), 400 | |
try: | |
cart_items = sf.query( | |
f"SELECT Id, Quantity__c, Price__c, Base_Price__c, Add_Ons_Price__c FROM Cart_Item__c " | |
f"WHERE Customer_Email__c = '{email}' AND Name = '{item_name}'" | |
)['records'] | |
if not cart_items: | |
return jsonify({"success": False, "error": "Cart item not found."}), 404 | |
cart_item_id = cart_items[0]['Id'] | |
base_price = cart_items[0]['Base_Price__c'] | |
addons_price = cart_items[0].get('Add_Ons_Price__c', 0) | |
new_item_price = (base_price * quantity) + addons_price | |
sf.Cart_Item__c.update(cart_item_id, { | |
"Quantity__c": quantity, | |
"Price__c": new_item_price, | |
}) | |
cart_items = sf.query(f""" | |
SELECT Price__c, Add_Ons_Price__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{email}' | |
""")['records'] | |
new_subtotal = sum(item['Price__c'] for item in cart_items) | |
return jsonify({"success": True, "new_item_price": new_item_price, "subtotal": new_subtotal}) | |
except Exception as e: | |
print(f"Error updating quantity: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def checkout(): | |
email = session.get('user_email') | |
if not email: | |
return jsonify({"success": False, "message": "User not logged in"}), 401 | |
try: | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Id, Name, Price__c, Add_Ons_Price__c, Quantity__c, Add_Ons__c, Instructions__c, Image1__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
if not cart_items: | |
return jsonify({"success": False, "message": "Cart is empty"}), 400 | |
subtotal = sum(item['Price__c'] for item in cart_items) | |
total = subtotal # No discount for simplicity | |
bill_items = [ | |
{ | |
"name": item['Name'], | |
"quantity": item['Quantity__c'], | |
"price": item['Price__c'], | |
"addons": item.get('Add_Ons__c', 'None'), | |
"instructions": item.get('Instructions__c', 'None'), | |
"image": item['Image1__c'] | |
} for item in cart_items | |
] | |
menu_query = """ | |
SELECT Name, Price__c, Image1__c, Veg_NonVeg__c, Section__c | |
FROM Menu_Item__c | |
LIMIT 20 | |
""" | |
menu_result = sf.query(menu_query) | |
menu_items = menu_result.get("records", []) | |
return jsonify({ | |
"success": True, | |
"cart": { | |
"items": bill_items, | |
"subtotal": subtotal, | |
"total": total | |
}, | |
"menu_items": menu_items | |
}) | |
except Exception as e: | |
print(f"Error during checkout: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def submit_order(): | |
email = session.get('user_email') | |
user_id = session.get('user_name') | |
table_number = session.get('table_number') | |
if not email or not user_id: | |
return jsonify({"success": False, "message": "User not logged in"}), 401 | |
try: | |
sanitized_email = sanitize_input(email) | |
data = request.json | |
cart = data.get("cart") | |
if not cart: | |
return jsonify({"success": False, "message": "Cart data is required"}), 400 | |
result = sf.query(f""" | |
SELECT Id, Name, Price__c, Add_Ons_Price__c, Quantity__c, Add_Ons__c, Instructions__c, Image1__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
if not cart_items: | |
return jsonify({"success": False, "message": "Cart is empty"}), 400 | |
subtotal = cart.get("subtotal") | |
total = cart.get("total") | |
order_details = "\n".join( | |
f"{item['Name']} x{item['Quantity__c']} | Add-Ons: {item.get('Add_Ons__c', 'None')} | " | |
f"Instructions: {item.get('Instructions__c', 'None')} | " | |
f"Price: ${item['Price__c']} | Image: {item['Image1__c']}" | |
for item in cart_items | |
) | |
customer_query = sf.query(f""" | |
SELECT Id FROM Customer_Login__c | |
WHERE Email__c = '{sanitized_email}' | |
""") | |
customer_id = customer_query["records"][0]["Id"] if customer_query["records"] else None | |
if not customer_id: | |
return jsonify({"success": False, "message": "Customer record not found"}), 404 | |
table_number = table_number if table_number != 'null' else None | |
order_data = { | |
"Customer_Name__c": user_id, | |
"Customer_Email__c": email, | |
"Total_Amount__c": subtotal, | |
"Total_Bill__c": total, | |
"Order_Status__c": "Pending", | |
"Customer2__c": customer_id, | |
"Order_Details__c": order_details, | |
"Table_Number__c": table_number | |
} | |
order_response = sf.Order__c.create(order_data) | |
if order_response: | |
for item in cart_items: | |
sf.Cart_Item__c.delete(item["Id"]) | |
return jsonify({"success": True, "message": "Order placed successfully!"}) | |
except Exception as e: | |
print(f"Error during order submission: {str(e)}") | |
return jsonify({"success": False, "error": str(e)}), 500 | |
def order(): | |
email = session.get('user_email') | |
if not email: | |
return redirect(url_for("cart.login")) | |
try: | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Id, Order_Details__c, Total_Bill__c, CreatedDate | |
FROM Order__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
ORDER BY CreatedDate DESC | |
LIMIT 1 | |
""") | |
orders = result.get("records", []) | |
if not orders: | |
return render_template("order.html", order=None, message="No orders found.") | |
order = orders[0] | |
order_id = order['Id'] | |
total = order['Total_Bill__c'] | |
order_details_raw = order['Order_Details__c'] or "" | |
order_items = [] | |
for line in order_details_raw.split("\n"): | |
if not line.strip(): | |
continue | |
try: | |
parts = line.split(" | ") | |
name_quantity = parts[0].split(" x") | |
name = name_quantity[0] | |
quantity = int(name_quantity[1]) | |
addons = parts[1].replace("Add-Ons: ", "") if len(parts) > 1 else "None" | |
instructions = parts[2].replace("Instructions: ", "") if len(parts) > 2 else "None" | |
price = float(parts[3].replace("Price: $", "")) if len(parts) > 3 else 0 | |
image = parts[4].replace("Image: ", "") if len(parts) > 4 else "/static/placeholder.jpg" | |
order_items.append({ | |
"name": name, | |
"quantity": quantity, | |
"addons": addons, | |
"instructions": instructions, | |
"price": price, | |
"image": image | |
}) | |
except Exception as e: | |
print(f"Error parsing order line '{line}': {e}") | |
continue | |
return render_template( | |
"order.html", | |
order={"id": order_id, "items": order_items, "total": total}, | |
message="Thank you for your order! It has been placed successfully." | |
) | |
except Exception as e: | |
print(f"Error fetching order: {e}") | |
return render_template("order.html", order=None, message="Error fetching your order. Please contact support.") | |
def validate_redirect(): | |
try: | |
email = session.get('user_email') | |
if not email: | |
return jsonify({ | |
"success": False, | |
"message": "Please log in to proceed to the bill page.", | |
"redirect": url_for("cart.login") | |
}), 401 | |
sanitized_email = sanitize_input(email) | |
result = sf.query(f""" | |
SELECT Id | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{sanitized_email}' | |
""") | |
cart_items = result.get("records", []) | |
if not cart_items: | |
return jsonify({ | |
"success": False, | |
"message": "Your cart is empty. Add items before proceeding to the bill page.", | |
"redirect": "/menu" | |
}), 400 | |
return jsonify({"success": True, "message": "Redirect allowed."}) | |
except Exception as e: | |
print(f"Error validating redirect: {e}") | |
return jsonify({ | |
"success": False, | |
"message": "An error occurred while validating the redirect. Please try again." | |
}), 500 | |
def login(): | |
if request.method == "POST": | |
email = request.form.get("email") | |
if not email: | |
return render_template("login.html", error="Email is required.") | |
session['user_email'] = email | |
session['user_name'] = "Test User" | |
session['table_number'] = "1" | |
return redirect(url_for("cart.cart")) | |
return render_template("login.html") | |
# Create the Flask app and register the blueprint | |
app = Flask(__name__) | |
app.secret_key = "your-secret-key-12345" # Replace with a secure key | |
CORS(app) # Enable CORS for Hugging Face Spaces | |
app.register_blueprint(cart_blueprint) | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) |