Spaces:
Paused
Paused
import os | |
import requests | |
import gzip | |
import shutil | |
import logging | |
from flask import Flask, send_file, jsonify | |
import xml.etree.ElementTree as ET | |
from datetime import datetime | |
import pytz # New import for timezone handling | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
# Ensure the extracted directory exists | |
extract_to_dir = 'extracted' | |
if not os.path.exists(extract_to_dir): | |
os.makedirs(extract_to_dir, exist_ok=True) | |
# Function to download the file | |
def download_file(url, output_path): | |
logger.info(f"Downloading from {url}...") | |
try: | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open(output_path, 'wb') as f: | |
f.write(response.content) | |
logger.info(f"Downloaded file to {output_path} with size {len(response.content)} bytes") | |
else: | |
logger.error(f"Failed to download file. Status code: {response.status_code}") | |
except PermissionError as e: | |
logger.error(f"Permission error occurred while downloading the file: {e}") | |
except Exception as e: | |
logger.error(f"An error occurred while downloading the file: {e}") | |
# Function to extract .gz files | |
def extract_gz_file(gz_path, extract_to_dir): | |
logger.info(f"Extracting {gz_path} to {extract_to_dir}...") | |
if not os.path.exists(gz_path): | |
logger.error(f"GZ file does not exist: {gz_path}") | |
return | |
try: | |
extracted_file_path = os.path.join(extract_to_dir, 'it_wltv_full.xml') # Updated file extension | |
with gzip.open(gz_path, 'rb') as f_in: | |
with open(extracted_file_path, 'wb') as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
logger.info(f"Extracted GZ file to {extracted_file_path}") | |
except Exception as e: | |
logger.error(f"An error occurred while extracting the file: {e}") | |
# Function to get the current program for a channel | |
def get_current_program(channel_id): | |
xml_file = os.path.join(extract_to_dir, 'it_wltv_full.xml') | |
if not os.path.exists(xml_file): | |
logger.error("XML file does not exist for program data") | |
return None | |
try: | |
tree = ET.parse(xml_file) | |
root = tree.getroot() | |
# Set timezone to Italy | |
italy_tz = pytz.timezone('Europe/Rome') | |
# Get the current time in Italy's timezone | |
current_time = datetime.now(italy_tz) | |
logger.info(f"Current Italy time: {current_time}") | |
for programme in root.findall('programme'): | |
if programme.get('channel') == channel_id: | |
# Parse the start and stop times from XML and set them to Italy's timezone | |
start_time = datetime.strptime(programme.get('start')[:14], "%Y%m%d%H%M%S") | |
start_time = italy_tz.localize(start_time) | |
stop_time = datetime.strptime(programme.get('stop')[:14], "%Y%m%d%H%M%S") | |
stop_time = italy_tz.localize(stop_time) | |
logger.info(f"Checking program: {programme.find('title').text} ({start_time} - {stop_time})") | |
# Check if current time is between start and stop times | |
if start_time <= current_time < stop_time: | |
title = programme.find('title').text if programme.find('title') is not None else "No title" | |
desc = programme.find('desc').text if programme.find('desc') is not None else "No description" | |
return {"title": title, "description": desc} | |
logger.info(f"No current program found for {channel_id}.") | |
return None | |
except Exception as e: | |
logger.error(f"Failed to parse XML file: {e}") | |
return None | |
# Flask route to serve the extracted file | |
def serve_extracted_file(): | |
try: | |
extracted_files = os.listdir(extract_to_dir) | |
logger.info(f"Files in 'extracted' directory: {extracted_files}") | |
for file_name in extracted_files: | |
logger.info(f"File: {file_name}, Extension: {os.path.splitext(file_name)[1]}") | |
except Exception as e: | |
logger.error(f"Error listing files in 'extracted' directory: {e}") | |
return "Error listing files", 500 | |
file_name = 'it_wltv_full.xml' # Updated file name with extension | |
file_path = os.path.join(extract_to_dir, file_name) | |
if os.path.exists(file_path): | |
logger.info(f"Serving file from {file_path}") | |
return send_file(file_path, as_attachment=False, mimetype='text/plain') | |
else: | |
logger.error("File not found") | |
return "File not found", 404 | |
# Route to fetch logs | |
def get_logs(): | |
try: | |
with open('app.log', 'r') as log_file: | |
logs = log_file.read() | |
return logs, 200 | |
except Exception as e: | |
logger.error(f"Failed to read logs: {e}") | |
return jsonify({"error": "Failed to read logs"}), 500 | |
# Route to update (download and extract again) | |
def update_files(): | |
file_url = 'http://epg-guide.com/wltv.gz' | |
file_path = '/tmp/it_wltv_full.gz' | |
# Download and extract the gz file | |
download_file(file_url, file_path) | |
extract_gz_file(file_path, extract_to_dir) | |
return "Files updated", 200 | |
# Route to serve the current program for a specific channel | |
def current_program(channel_id): | |
program_info = get_current_program(channel_id) | |
if program_info: | |
return jsonify(program_info) | |
else: | |
return "No current program found for this channel.", 404 | |
if __name__ == '__main__': | |
file_url = 'http://epg-guide.com/wltv.gz' | |
file_path = '/tmp/it_wltv_full.gz' | |
# Download and extract the gz file initially | |
download_file(file_url, file_path) | |
extract_gz_file(file_path, extract_to_dir) | |
# Start the Flask web server | |
app.run(host='0.0.0.0', port=8080) | |