import os import base64 import json import threading import time from datetime import datetime from uuid import uuid4 from flask import Flask, render_template_string, request, redirect, url_for, flash, jsonify from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError from werkzeug.utils import secure_filename from dotenv import load_dotenv import requests load_dotenv() app = Flask(__name__) app.secret_key = 'super_secret_key_store_app_123' DATA_FILE = 'data.json' SYNC_FILES = [DATA_FILE] REPO_ID = os.getenv("REPO_ID", "Kgshop/tronberg") HF_TOKEN_WRITE = os.getenv("HF_TOKEN") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") WHATSAPP_NUMBER = "+77470623684" CURRENCY_CODE = 'T' LOGO_URL = "https://huggingface.co/spaces/Metapp/Tech/resolve/main/file_00000000916c71fa968384de18bef8ef.png" def download_db_from_hf(specific_file=None, retries=3, delay=5): token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE files_to_download = [specific_file] if specific_file else SYNC_FILES all_successful = True for file_name in files_to_download: success = False for attempt in range(retries + 1): try: hf_hub_download( repo_id=REPO_ID, filename=file_name, repo_type="dataset", token=token_to_use, local_dir=".", local_dir_use_symlinks=False, force_download=True, resume_download=False ) success = True break except RepositoryNotFoundError: return False except HfHubHTTPError as e: if e.response.status_code == 404: if attempt == 0 and not os.path.exists(file_name): try: if file_name == DATA_FILE: with open(file_name, 'w', encoding='utf-8') as f: json.dump({'products': [], 'categories': [], 'orders': {}}, f) except Exception: pass success = False break except requests.exceptions.RequestException: pass except Exception: pass if attempt < retries: time.sleep(delay) if not success: all_successful = False return all_successful def upload_db_to_hf(specific_file=None): if not HF_TOKEN_WRITE: return try: api = HfApi() files_to_upload = [specific_file] if specific_file else SYNC_FILES for file_name in files_to_upload: if os.path.exists(file_name): try: api.upload_file( path_or_fileobj=file_name, path_in_repo=file_name, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Sync {file_name} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) except Exception: pass except Exception: pass def periodic_backup(): while True: time.sleep(1800) upload_db_to_hf() def load_data(): default_data = {'products': [], 'categories': [], 'orders': {}} data = default_data try: with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) if not isinstance(data, dict): raise FileNotFoundError if 'products' not in data: data['products'] = [] if 'categories' not in data: data['categories'] = [] if 'orders' not in data: data['orders'] = {} except (FileNotFoundError, json.JSONDecodeError): if download_db_from_hf(specific_file=DATA_FILE): try: with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) if 'products' not in data: data['products'] = [] if 'categories' not in data: data['categories'] = [] if 'orders' not in data: data['orders'] = {} except Exception: data = default_data else: data = default_data except Exception: data = default_data for product in data['products']: if 'product_id' not in product: product['product_id'] = uuid4().hex if 'pieces_per_box' not in product: product['pieces_per_box'] = 1 if not os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(default_data, f) except Exception: pass return data def save_data(data): try: if not isinstance(data, dict): return if 'products' not in data: data['products'] = [] if 'categories' not in data: data['categories'] = [] if 'orders' not in data: data['orders'] = {} with open(DATA_FILE, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) upload_db_to_hf(specific_file=DATA_FILE) except Exception: pass CATALOG_TEMPLATE = ''' Магазин

Каталог

Сумма заказа: 0 {{ currency_code }}
''' ORDER_TEMPLATE = ''' Накладная №{{ order.id }}

Накладная

№ {{ order.id }}
{{ order.created_at.split(' ')[0] }}
Покупатель: {{ order.customer_name }}
Телефон: {{ order.customer_phone }}
Город: {{ order.customer_city }}
Статус: Новый
{% set raw_total = 0 %} {% for item in order.cart %} {% set ppb = item.pieces_per_box|default(1)|int %} {% set boxes = item.quantity // ppb %} {% set remainder = item.quantity % ppb %} {% set item_sum = item.price * item.quantity %} {% set raw_total = raw_total + item_sum %} {% endfor %} {% set discount = order.discount|default(0)|float %} {% if discount > 0 %} {% endif %}
Наименование Фото Кол-во Цена Сумма
{{ loop.index }} {{ item.name }} img
{% if ppb > 1 and boxes > 0 %} {{ boxes }} кор.{% if remainder > 0 %} {{ remainder }} шт.{% endif %} {% else %} {{ item.quantity }} шт. {% endif %}
{{ item.price }} {{ item_sum }}
Сумма: {{ raw_total }} {{ currency_code }}
Скидка: -{{ discount }} {{ currency_code }}
К оплате: {{ order.total_price }} {{ currency_code }}
''' ADMIN_TEMPLATE = ''' Админ-панель

Админ-панель

В каталог
История накладных
{% for order in orders.values()|sort(attribute='created_at', reverse=True) %} {% set raw_total = 0 %} {% for item in order.cart %} {% set raw_total = raw_total + (item.price|float * item.quantity|int) %} {% endfor %} {% endfor %}
ID / Дата Клиент Сумма Скидка К оплате Действия
{{ order.id }}
{{ order.created_at }}
{{ order.customer_name }}
{{ order.customer_phone }}
{{ order.customer_city }}
{{ raw_total }} {{ currency_code }}
{{ order.total_price }} {{ currency_code }}

Управление категориями

{% for category in categories %}
{{ category }}
Добавить товар
Новый товар в категории "{{ category }}"
Можно выбрать до 10 фото
{% for product in products %} {% if product.category == category %}
{% if product.photos and product.photos|length > 0 %} {% else %}
{% endif %}
{{ product.name }} {% if product.description %} {{ product.description[:50] }}{{ '...' if product.description|length > 50 else '' }} {% endif %} {{ product.price }} {{ currency_code }} • В коробке: {{ product.pieces_per_box|default(1) }} шт • Фото: {{ product.photos|length if product.photos else 0 }}/10
Редактирование товара
Оставьте пустым, чтобы не менять фото
{% endif %} {% endfor %}
{% endfor %}
''' @app.route('/') def catalog(): data = load_data() all_products = data.get('products', []) categories = data.get('categories', []) return render_template_string( CATALOG_TEMPLATE, products_json=json.dumps(all_products), categories_json=json.dumps(categories), repo_id=REPO_ID, currency_code=CURRENCY_CODE, logo_url=LOGO_URL ) @app.route('/create_order', methods=['POST']) def create_order(): order_data = request.get_json() if not order_data or 'cart' not in order_data: return jsonify({"error": "Bad request"}), 400 cart_items = order_data['cart'] total_price = sum(float(item['price']) * int(item['quantity']) for item in cart_items) customer_name = order_data.get('customer_name', 'Не указано') customer_phone = order_data.get('customer_phone', 'Не указано') customer_city = order_data.get('customer_city', 'Не указано') processed_cart = [] for item in cart_items: processed_cart.append({ "product_id": item.get('product_id'), "name": item['name'], "price": float(item['price']), "quantity": int(item['quantity']), "pieces_per_box": int(item.get('pieces_per_box', 1)), "photo_url": f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/photos/{item['photos'][0]}" if item.get('photos') else "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIxMDAiIGhlaWdodD0iMTAwIj48cmVjdCB3aWR0aD0iMTAwJSIgaGVpZ2h0PSIxMDAlIiBmaWxsPSIjZjBmMHcwIi8+PC9zdmc+" }) order_id = f"SA-{datetime.now().strftime('%Y%m%d')}-{str(len(load_data().get('orders', {}))+1).zfill(3)}" new_order = { "id": order_id, "created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "cart": processed_cart, "discount": 0, "total_price": total_price, "customer_name": customer_name, "customer_phone": customer_phone, "customer_city": customer_city } data = load_data() data['orders'][order_id] = new_order save_data(data) return jsonify({"order_id": order_id}), 201 @app.route('/order/') def view_order(order_id): data = load_data() order = data.get('orders', {}).get(order_id) if not order: return "Order not found", 404 return render_template_string( ORDER_TEMPLATE, order=order, whatsapp_number=WHATSAPP_NUMBER, currency_code=CURRENCY_CODE, logo_url=LOGO_URL ) @app.route('/edit_order/', methods=['POST']) def edit_order(order_id): data = load_data() order = data.get('orders', {}).get(order_id) if not order: return jsonify({"success": False, "error": "Order not found"}), 404 req_data = request.get_json() product_id = req_data.get('product_id') change = req_data.get('change', 0) exact_qty = req_data.get('exact_qty') remove = req_data.get('remove', False) for item in order['cart']: if item.get('product_id') == product_id: if remove: order['cart'].remove(item) else: if exact_qty is not None: item['quantity'] = int(exact_qty) else: item['quantity'] += change if item['quantity'] <= 0: order['cart'].remove(item) break cart_total = sum(float(i['price']) * int(i['quantity']) for i in order['cart']) discount = order.get('discount', 0) order['total_price'] = max(0, cart_total - discount) save_data(data) return jsonify({"success": True, "total_price": order['total_price']}) @app.route('/admin', methods=['GET', 'POST']) def admin(): data = load_data() products = data.get('products', []) categories = data.get('categories', []) orders = data.get('orders', {}) if request.method == 'POST': action = request.form.get('action') if action == 'apply_discount': order_id = request.form.get('order_id') discount_val = float(request.form.get('discount_amount', 0)) if order_id in orders: order = orders[order_id] order['discount'] = discount_val cart_total = sum(float(i['price']) * int(i['quantity']) for i in order['cart']) order['total_price'] = max(0, cart_total - discount_val) data['orders'] = orders save_data(data) elif action == 'add_category': cat_name = request.form.get('category_name', '').strip() if cat_name and cat_name not in categories: categories.append(cat_name) data['categories'] = categories save_data(data) elif action == 'delete_category': cat_name = request.form.get('category_name') if cat_name in categories: categories.remove(cat_name) data['products'] = [p for p in products if p.get('category') != cat_name] data['categories'] = categories save_data(data) elif action == 'add_product': name = request.form.get('name', '').strip() price = float(request.form.get('price', 0)) pieces_per_box = int(request.form.get('pieces_per_box', 1)) description = request.form.get('description', '').strip() category = request.form.get('category') uploaded_photos = request.files.getlist('photos')[:10] photos_list = [] if uploaded_photos and HF_TOKEN_WRITE: uploads_dir = 'uploads_temp' os.makedirs(uploads_dir, exist_ok=True) api = HfApi() for photo in uploaded_photos: if photo and photo.filename: ext = os.path.splitext(photo.filename)[1].lower() if ext not in ['.jpg', '.jpeg', '.png', '.webp', '.gif']: continue photo_filename = f"{uuid4().hex}{ext}" temp_path = os.path.join(uploads_dir, photo_filename) photo.save(temp_path) try: api.upload_file( path_or_fileobj=temp_path, path_in_repo=f"photos/{photo_filename}", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE ) photos_list.append(photo_filename) except Exception: pass finally: if os.path.exists(temp_path): os.remove(temp_path) new_product = { 'product_id': uuid4().hex, 'name': name, 'price': price, 'pieces_per_box': pieces_per_box, 'description': description, 'category': category, 'photos': photos_list } products.append(new_product) data['products'] = products save_data(data) elif action == 'edit_product': pid = request.form.get('product_id') name = request.form.get('name', '').strip() price = float(request.form.get('price', 0)) pieces_per_box = int(request.form.get('pieces_per_box', 1)) description = request.form.get('description', '').strip() uploaded_photos = request.files.getlist('photos')[:10] photos_list = [] if uploaded_photos and uploaded_photos[0].filename and HF_TOKEN_WRITE: uploads_dir = 'uploads_temp' os.makedirs(uploads_dir, exist_ok=True) api = HfApi() for photo in uploaded_photos: if photo and photo.filename: ext = os.path.splitext(photo.filename)[1].lower() if ext not in ['.jpg', '.jpeg', '.png', '.webp', '.gif']: continue photo_filename = f"{uuid4().hex}{ext}" temp_path = os.path.join(uploads_dir, photo_filename) photo.save(temp_path) try: api.upload_file( path_or_fileobj=temp_path, path_in_repo=f"photos/{photo_filename}", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE ) photos_list.append(photo_filename) except Exception: pass finally: if os.path.exists(temp_path): os.remove(temp_path) for p in products: if p.get('product_id') == pid: p['name'] = name p['price'] = price p['pieces_per_box'] = pieces_per_box p['description'] = description if photos_list: p['photos'] = photos_list break data['products'] = products save_data(data) elif action == 'delete_product': pid = request.form.get('product_id') data['products'] = [p for p in products if p.get('product_id') != pid] save_data(data) return redirect(url_for('admin')) return render_template_string( ADMIN_TEMPLATE, products=products, categories=categories, orders=orders, repo_id=REPO_ID, currency_code=CURRENCY_CODE ) @app.route('/force_upload', methods=['POST']) def force_upload(): upload_db_to_hf() return redirect(url_for('admin')) @app.route('/force_download', methods=['POST']) def force_download(): download_db_from_hf() return redirect(url_for('admin')) if __name__ == '__main__': download_db_from_hf() load_data() if HF_TOKEN_WRITE: threading.Thread(target=periodic_backup, daemon=True).start() port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)