Spaces:
Sleeping
Sleeping
from flask import Flask, jsonify, Response, request | |
import requests | |
import json | |
import time | |
import threading | |
import queue | |
import signal | |
import sys | |
from datetime import datetime | |
app = Flask(__name__) | |
# Global variables | |
previous_stock_data = None | |
clients = set() | |
client_queues = {} | |
def fetch_stock_data(): | |
"""Function to fetch data from the source""" | |
try: | |
headers = { | |
'Accept': '*/*', | |
'Accept-Encoding': 'gzip, deflate', | |
'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8', | |
'Connection': 'keep-alive', | |
'Host': '88.99.61.159:4000', | |
'If-None-Match': 'W/"3b80-5FY+gUMGy2CzPm8HBz2ejQ"', | |
'Origin': 'http://88.99.61.159:5000', | |
'Referer': 'http://88.99.61.159:5000/', | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36' | |
} | |
response = requests.get( | |
'http://88.99.61.159:4000/getdata', | |
headers=headers, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return None | |
except Exception as error: | |
print(f'Error fetching stock data: {error}') | |
return None | |
def ping_render_backend(): | |
"""Internal ping function to keep Render backend alive""" | |
try: | |
response = requests.get( | |
'https://backend-52f8.onrender.com', | |
timeout=5 | |
) | |
if response.status_code == 200: | |
print(f'Render backend ping successful: {datetime.now().isoformat()}') | |
return True | |
else: | |
return False | |
except Exception as error: | |
print(f'Render backend ping error: {error}') | |
return False | |
def get_rate_changed_stocks(current_data, previous_data): | |
"""Function to compare stock data and find ONLY rate changes""" | |
if not previous_data or not current_data: | |
# Store initial data but don't return it (no broadcast) | |
return None | |
changed_stocks = {} | |
# Handle the actual API response structure - data is in array format | |
current_array = current_data.get('data', current_data) if isinstance(current_data, dict) else current_data | |
previous_array = previous_data.get('data', previous_data) if isinstance(previous_data, dict) else previous_data | |
# Convert arrays to objects keyed by Symbol for comparison | |
current_stocks = {} | |
previous_stocks = {} | |
if isinstance(current_array, list): | |
for stock in current_array: | |
if stock.get('Symbol'): | |
current_stocks[stock['Symbol']] = stock | |
if isinstance(previous_array, list): | |
for stock in previous_array: | |
if stock.get('Symbol'): | |
previous_stocks[stock['Symbol']] = stock | |
# Only look for rate changes in existing stocks | |
for symbol, current_stock in current_stocks.items(): | |
previous_stock = previous_stocks.get(symbol) | |
# Use "Last Traded Price" field as the rate | |
try: | |
current_rate = float(current_stock.get("Last Traded Price", 0)) | |
previous_rate = float(previous_stock.get("Last Traded Price", 0)) if previous_stock else None | |
except (ValueError, TypeError): | |
continue | |
# Only include if rate has changed (ignore new stocks) | |
if previous_stock and current_rate != previous_rate: | |
# Include the full stock data from API response with change info | |
changed_stocks[symbol] = { | |
**current_stock, # Full original API data | |
'previousRate': previous_rate, | |
'changeType': 'increase' if current_rate > previous_rate else 'decrease', | |
'changeAmount': current_rate - previous_rate | |
} | |
return changed_stocks if changed_stocks else None | |
def broadcast_to_clients(data, event_type='rateUpdate'): | |
"""Function to send data to all SSE clients""" | |
if not client_queues: | |
return | |
message = { | |
'event': event_type, | |
'data': data | |
} | |
# Add message to all client queues | |
for client_id, client_queue in list(client_queues.items()): | |
try: | |
client_queue.put(message, block=False) | |
except queue.Full: | |
# Remove client if queue is full | |
client_queues.pop(client_id, None) | |
print(f'Removed client {client_id} due to full queue') | |
def data_polling_worker(): | |
"""Background worker for data polling""" | |
global previous_stock_data | |
while True: | |
try: | |
current_data = fetch_stock_data() | |
if current_data: | |
rate_changed_stocks = get_rate_changed_stocks(current_data, previous_stock_data) | |
if rate_changed_stocks: | |
print(f'Broadcasting rate changes for {len(rate_changed_stocks)} stocks') | |
broadcast_to_clients(rate_changed_stocks, 'rateUpdate') | |
# Always update previous data (even if no changes to broadcast) | |
previous_stock_data = current_data | |
except Exception as error: | |
print(f'Data fetch error: {error}') | |
broadcast_to_clients({'error': 'Failed to fetch stock data'}, 'error') | |
time.sleep(1) # Check every second | |
def ping_worker(): | |
"""Background worker for pinging Render backend""" | |
while True: | |
try: | |
ping_render_backend() | |
except Exception as error: | |
print(f'Ping error: {error}') | |
time.sleep(30) # Ping every 30 seconds | |
def events(): | |
"""SSE endpoint - NO INITIAL DATA""" | |
def event_stream(): | |
client_id = id(request) | |
client_queue = queue.Queue(maxsize=100) | |
client_queues[client_id] = client_queue | |
print(f'Client {client_id} connected. Total clients: {len(client_queues)}') | |
# Send connection confirmation | |
yield f"event: connected\ndata: {json.dumps({'message': 'Connected - waiting for rate updates'})}\n\n" | |
try: | |
while True: | |
try: | |
# Get message from queue with timeout | |
message = client_queue.get(timeout=30) | |
yield f"event: {message['event']}\ndata: {json.dumps(message['data'])}\n\n" | |
except queue.Empty: | |
# Send keepalive ping | |
yield f"event: ping\ndata: {json.dumps({'timestamp': int(time.time() * 1000)})}\n\n" | |
except GeneratorExit: | |
# Client disconnected | |
client_queues.pop(client_id, None) | |
print(f'Client {client_id} disconnected. Total clients: {len(client_queues)}') | |
return Response( | |
event_stream(), | |
mimetype='text/event-stream', | |
headers={ | |
'Cache-Control': 'no-cache', | |
'Connection': 'keep-alive', | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Headers': 'Cache-Control' | |
} | |
) | |
def rate(): | |
"""Endpoint that returns exact API response""" | |
try: | |
data = fetch_stock_data() | |
if not data: | |
raise Exception('Failed to fetch data') | |
# Return exact response from scraped API | |
return jsonify(data) | |
except Exception as error: | |
print(f'Error fetching rates: {error}') | |
return jsonify({ | |
'error': 'Failed to fetch rates', | |
'details': str(error) | |
}), 500 | |
def health(): | |
"""Health check endpoint""" | |
return jsonify({ | |
'status': 'healthy', | |
'clients': len(client_queues), | |
'hasInitialData': previous_stock_data is not None, | |
'timestamp': datetime.now().isoformat() | |
}) | |
def signal_handler(sig, frame): | |
"""Handle shutdown signals""" | |
print('Shutting down gracefully...') | |
# Close all client connections | |
for client_queue in client_queues.values(): | |
try: | |
client_queue.put({'event': 'shutdown', 'data': {'message': 'Server shutting down'}}) | |
except: | |
pass | |
client_queues.clear() | |
sys.exit(0) | |
if __name__ == '__main__': | |
# Set up signal handlers | |
signal.signal(signal.SIGTERM, signal_handler) | |
signal.signal(signal.SIGINT, signal_handler) | |
# Start background workers | |
data_thread = threading.Thread(target=data_polling_worker, daemon=True) | |
ping_thread = threading.Thread(target=ping_worker, daemon=True) | |
data_thread.start() | |
ping_thread.start() | |
print('Starting Flask app on 0.0.0.0:7860') | |
# Run the Flask app | |
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) |