from flask import Flask, render_template, jsonify, request, send_file import torch import os import time import threading from datetime import datetime, timedelta import cv2 from werkzeug.utils import secure_filename import uuid import mimetypes import numpy as np from PIL import Image import schedule # Configuration UPLOAD_FOLDER = '/data/uploads' OUTPUT_FOLDER = '/data/outputs' CLEANUP_INTERVAL_MINUTES = 10 FILE_MAX_AGE_HOURS = 1 # Global application state app_state = { "cuda_available": torch.cuda.is_available(), "processing_active": False, "logs": [], "processed_files": [], "cleanup_stats": { "last_cleanup": None, "files_deleted": 0, "space_freed_mb": 0 } } def ensure_directories(): """Create necessary directories""" directories = [UPLOAD_FOLDER, OUTPUT_FOLDER] for directory in directories: try: os.makedirs(directory, exist_ok=True) print(f"โœ… Directory verified: {directory}") except Exception as e: print(f"โš ๏ธ Error creating directory {directory}: {e}") def allowed_file(filename): """Check if file has allowed extension""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ['png', 'jpg', 'jpeg', 'gif', 'mp4', 'avi', 'mov', 'mkv'] def get_file_mimetype(filename): """Get correct mimetype for file""" mimetype, _ = mimetypes.guess_type(filename) if mimetype is None: ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' if ext in ['mp4', 'avi', 'mov', 'mkv']: mimetype = f'video/{ext}' elif ext in ['png', 'jpg', 'jpeg', 'gif']: mimetype = f'image/{ext}' else: mimetype = 'application/octet-stream' return mimetype def log_message(message): """Add message to log with timestamp""" timestamp = datetime.now().strftime("%H:%M:%S") app_state["logs"].append(f"[{timestamp}] {message}") if len(app_state["logs"]) > 100: app_state["logs"] = app_state["logs"][-100:] print(f"[{timestamp}] {message}") def cleanup_old_files(): """Delete files older than FILE_MAX_AGE_HOURS""" try: current_time = datetime.now() cutoff_time = current_time - timedelta(hours=FILE_MAX_AGE_HOURS) files_deleted = 0 space_freed = 0 # Clean upload folder for folder_path in [UPLOAD_FOLDER, OUTPUT_FOLDER]: if not os.path.exists(folder_path): continue for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) if os.path.isfile(file_path): try: # Get file modification time file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) if file_time < cutoff_time: # Get file size before deletion file_size = os.path.getsize(file_path) # Delete the file os.remove(file_path) files_deleted += 1 space_freed += file_size log_message(f"๐Ÿ—‘๏ธ Deleted old file: {filename} ({file_size / (1024*1024):.1f}MB)") except Exception as e: log_message(f"โš ๏ธ Error deleting {filename}: {str(e)}") # Update cleanup stats app_state["cleanup_stats"]["last_cleanup"] = current_time.strftime("%Y-%m-%d %H:%M:%S") app_state["cleanup_stats"]["files_deleted"] += files_deleted app_state["cleanup_stats"]["space_freed_mb"] += space_freed / (1024*1024) if files_deleted > 0: log_message(f"๐Ÿงน Cleanup completed: {files_deleted} files deleted, {space_freed / (1024*1024):.1f}MB freed") else: log_message(f"๐Ÿงน Cleanup completed: No old files to delete") # Clean up processed files list to remove references to deleted files valid_processed_files = [] for file_info in app_state["processed_files"]: output_path = os.path.join(OUTPUT_FOLDER, file_info["output_file"]) if os.path.exists(output_path): valid_processed_files.append(file_info) app_state["processed_files"] = valid_processed_files except Exception as e: log_message(f"โŒ Error during cleanup: {str(e)}") def run_scheduler(): """Run the file cleanup scheduler in background""" def scheduler_worker(): while True: try: schedule.run_pending() time.sleep(60) # Check every minute except Exception as e: log_message(f"โŒ Scheduler error: {str(e)}") time.sleep(300) # Wait 5 minutes before retrying thread = threading.Thread(target=scheduler_worker, daemon=True) thread.start() log_message(f"๐Ÿ•’ File cleanup scheduler started (every {CLEANUP_INTERVAL_MINUTES} minutes)") def optimize_gpu(): """Optimize GPU configuration for 4K upscaling""" try: if torch.cuda.is_available(): torch.backends.cudnn.benchmark = True torch.backends.cudnn.allow_tf32 = True torch.backends.cuda.matmul.allow_tf32 = True torch.cuda.empty_cache() # Test GPU test_tensor = torch.randn(100, 100, device='cuda') _ = torch.mm(test_tensor, test_tensor) log_message("โœ… GPU optimized for 4K upscaling") return True else: log_message("โš ๏ธ CUDA not available") return False except Exception as e: log_message(f"โŒ Error optimizing GPU: {str(e)}") return False def upscale_image_4k(input_path, output_path): """Upscale image to 4K using neural methods""" def process_worker(): try: log_message(f"๐ŸŽจ Starting 4K upscaling: {os.path.basename(input_path)}") app_state["processing_active"] = True # Read original image image = cv2.imread(input_path) if image is None: log_message("โŒ Error: Could not read image") return h, w = image.shape[:2] log_message(f"๐Ÿ“ Original resolution: {w}x{h}") # Define target dimensions first target_h, target_w = h * 4, w * 4 # Check GPU memory availability if torch.cuda.is_available(): device = torch.device('cuda') available_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() required_memory = w * h * 4 * 4 * 3 * 4 # Conservative estimation if required_memory > available_memory * 0.8: log_message(f"โš ๏ธ Image too large for available GPU memory, using CPU") device = torch.device('cpu') else: log_message(f"๐Ÿš€ Using GPU: {torch.cuda.get_device_name()}") if device.type == 'cuda': # Convert image to normalized tensor image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image_tensor = torch.from_numpy(image_rgb).float().to(device) / 255.0 image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0) # BCHW format log_message("๐Ÿง  Applying neural upscaling...") with torch.no_grad(): # Step 1: 2x upscaling with bicubic intermediate = torch.nn.functional.interpolate( image_tensor, size=(h * 2, w * 2), mode='bicubic', align_corners=False, antialias=True ) # Step 2: Final 2x upscaling with smoothing upscaled = torch.nn.functional.interpolate( intermediate, size=(target_h, target_w), mode='bicubic', align_corners=False, antialias=True ) # Enhanced sharpening filters kernel_size = 3 sigma = 0.5 kernel = torch.zeros((kernel_size, kernel_size), device=device) center = kernel_size // 2 # Create inverted Gaussian kernel for sharpening for i in range(kernel_size): for j in range(kernel_size): dist = ((i - center) ** 2 + (j - center) ** 2) ** 0.5 kernel[i, j] = torch.exp(-0.5 * (dist / sigma) ** 2) kernel = kernel / kernel.sum() sharpen_kernel = torch.zeros_like(kernel) sharpen_kernel[center, center] = 2.0 sharpen_kernel = sharpen_kernel - kernel sharpen_kernel = sharpen_kernel.unsqueeze(0).unsqueeze(0) # Apply sharpening to each channel enhanced_channels = [] for i in range(3): channel = upscaled[:, i:i+1, :, :] padded = torch.nn.functional.pad(channel, (1, 1, 1, 1), mode='reflect') enhanced = torch.nn.functional.conv2d(padded, sharpen_kernel) enhanced_channels.append(enhanced) enhanced = torch.cat(enhanced_channels, dim=1) # Light smoothing to reduce noise gaussian_kernel = torch.tensor([ [1, 4, 6, 4, 1], [4, 16, 24, 16, 4], [6, 24, 36, 24, 6], [4, 16, 24, 16, 4], [1, 4, 6, 4, 1] ], dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0) / 256.0 smoothed_channels = [] for i in range(3): channel = enhanced[:, i:i+1, :, :] padded = torch.nn.functional.pad(channel, (2, 2, 2, 2), mode='reflect') smoothed = torch.nn.functional.conv2d(padded, gaussian_kernel) smoothed_channels.append(smoothed) smoothed = torch.cat(smoothed_channels, dim=1) # Blend: 70% enhanced + 30% smoothed for quality/smoothness balance final_result = 0.7 * enhanced + 0.3 * smoothed # Clamp values and optimize contrast final_result = torch.clamp(final_result, 0, 1) # Adaptive contrast optimization for i in range(3): channel = final_result[:, i, :, :] min_val = channel.min() max_val = channel.max() if max_val > min_val: final_result[:, i, :, :] = (channel - min_val) / (max_val - min_val) # Convert back to image result_cpu = final_result.squeeze(0).permute(1, 2, 0).cpu().numpy() result_image = (result_cpu * 255).astype(np.uint8) result_bgr = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR) # Save result cv2.imwrite(output_path, result_bgr) final_h, final_w = result_bgr.shape[:2] log_message(f"โœ… Upscaling completed: {final_w}x{final_h}") log_message(f"๐Ÿ“ˆ Scale factor: {final_w/w:.1f}x") # Memory cleanup del image_tensor, upscaled, enhanced, final_result torch.cuda.empty_cache() else: # CPU fallback log_message("โš ๏ธ Using CPU - optimized processing") # Progressive upscaling on CPU intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) # Apply sharpening on CPU kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) sharpened = cv2.filter2D(upscaled, -1, kernel) # Blend for smoothing final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) cv2.imwrite(output_path, final_result) log_message(f"โœ… CPU upscaling completed: {target_w}x{target_h}") else: # CPU only fallback (no CUDA available) log_message("๐Ÿ’ป Using CPU processing (CUDA not available)") # Progressive upscaling on CPU intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) # Apply sharpening on CPU kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) sharpened = cv2.filter2D(upscaled, -1, kernel) # Blend for smoothing final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) cv2.imwrite(output_path, final_result) log_message(f"โœ… CPU upscaling completed: {target_w}x{target_h}") # Add to processed files list app_state["processed_files"].append({ "input_file": os.path.basename(input_path), "output_file": os.path.basename(output_path), "original_size": f"{w}x{h}", "upscaled_size": f"{target_w}x{target_h}", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: log_message(f"โŒ Error in processing: {str(e)}") finally: app_state["processing_active"] = False if torch.cuda.is_available(): torch.cuda.empty_cache() thread = threading.Thread(target=process_worker) thread.daemon = True thread.start() def upscale_video_4k(input_path, output_path): """Upscale video to 4K frame by frame""" def process_worker(): try: log_message(f"๐ŸŽฌ Starting 4K video upscaling: {os.path.basename(input_path)}") app_state["processing_active"] = True # Open video cap = cv2.VideoCapture(input_path) if not cap.isOpened(): log_message("โŒ Error: Could not open video") return # Get video properties fps = int(cap.get(cv2.CAP_PROP_FPS)) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) log_message(f"๐Ÿ“น Video: {w}x{h}, {fps}FPS, {frame_count} frames") # Configure 4K output target_w, target_h = w * 4, h * 4 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (target_w, target_h)) if torch.cuda.is_available(): device = torch.device('cuda') log_message(f"๐Ÿš€ Processing with GPU: {torch.cuda.get_device_name()}") process_frames_gpu(cap, out, device, target_h, target_w, frame_count) else: log_message("๐Ÿ’ป Processing with CPU (may be slower)") process_frames_cpu(cap, out, target_h, target_w, frame_count) cap.release() out.release() # Verify the output file was created and has content if os.path.exists(output_path): file_size = os.path.getsize(output_path) if file_size > 0: log_message(f"โœ… 4K video completed: {target_w}x{target_h}") log_message(f"๐Ÿ“ Output file size: {file_size / (1024**2):.1f}MB") else: log_message(f"โŒ Output file is empty: {output_path}") raise Exception("Output video file is empty") else: log_message(f"โŒ Output file not created: {output_path}") raise Exception("Output video file was not created") # Add to processed files list app_state["processed_files"].append({ "input_file": os.path.basename(input_path), "output_file": os.path.basename(output_path), "original_size": f"{w}x{h}", "upscaled_size": f"{target_w}x{target_h}", "frame_count": frame_count, "fps": fps, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: log_message(f"โŒ Error processing video: {str(e)}") finally: app_state["processing_active"] = False if torch.cuda.is_available(): torch.cuda.empty_cache() thread = threading.Thread(target=process_worker) thread.daemon = True thread.start() def process_frames_cpu(cap, out, target_h, target_w, frame_count): """Process video frames using CPU""" frame_num = 0 while True: ret, frame = cap.read() if not ret: break frame_num += 1 # Simple CPU upscaling upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) out.write(upscaled_frame) # Progress logging if frame_num % 30 == 0: progress = (frame_num / frame_count) * 100 log_message(f"๐ŸŽž๏ธ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") def process_frames_gpu(cap, out, device, target_h, target_w, frame_count): """Process video frames using GPU with PyTorch""" frame_num = 0 torch.backends.cudnn.benchmark = True while True: ret, frame = cap.read() if not ret: break frame_num += 1 try: # Convert to tensor frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) with torch.no_grad(): upscaled = torch.nn.functional.interpolate( frame_tensor, size=(target_h, target_w), mode='bicubic', align_corners=False ) # Convert back result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() result_frame = (result_cpu * 255).astype(np.uint8) result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) out.write(result_bgr) except Exception as e: log_message(f"โš ๏ธ GPU processing failed for frame {frame_num}, using CPU fallback") # CPU fallback upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) out.write(upscaled_frame) # Progress logging if frame_num % 30 == 0: progress = (frame_num / frame_count) * 100 log_message(f"๐ŸŽž๏ธ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") # Periodic memory cleanup if frame_num % 60 == 0 and torch.cuda.is_available(): torch.cuda.empty_cache() def process_frame_batch(frame_batch, out, device, target_h, target_w): """Process batch of frames on GPU for efficiency""" try: with torch.no_grad(): # Convert batch to tensor batch_tensors = [] for frame in frame_batch: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 frame_tensor = frame_tensor.permute(2, 0, 1) # CHW batch_tensors.append(frame_tensor) # Stack in batch batch_tensor = torch.stack(batch_tensors, dim=0) # BCHW # Upscale entire batch upscaled_batch = torch.nn.functional.interpolate( batch_tensor, size=(target_h, target_w), mode='bicubic', align_corners=False, antialias=True ) # Convert each frame back for i in range(upscaled_batch.shape[0]): result_cpu = upscaled_batch[i].permute(1, 2, 0).cpu().numpy() result_frame = (result_cpu * 255).astype(np.uint8) result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) out.write(result_bgr) except Exception as e: log_message(f"โŒ Error in batch processing: {str(e)}") # Fallback: process frames individually for frame in frame_batch: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) upscaled = torch.nn.functional.interpolate( frame_tensor, size=(target_h, target_w), mode='bicubic', align_corners=False ) result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() result_frame = (result_cpu * 255).astype(np.uint8) result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) out.write(result_bgr) # Initialize directories ensure_directories() # Set up file cleanup scheduler schedule.every(CLEANUP_INTERVAL_MINUTES).minutes.do(cleanup_old_files) app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/api/system') def api_system(): """Get system information""" try: info = {} # GPU Info if torch.cuda.is_available(): info["gpu_available"] = True info["gpu_name"] = torch.cuda.get_device_name() total_memory = torch.cuda.get_device_properties(0).total_memory allocated_memory = torch.cuda.memory_allocated() info["gpu_memory"] = f"{total_memory / (1024**3):.1f}GB" info["gpu_memory_used"] = f"{allocated_memory / (1024**3):.1f}GB" info["gpu_memory_free"] = f"{(total_memory - allocated_memory) / (1024**3):.1f}GB" info["cuda_version"] = torch.version.cuda info["pytorch_version"] = torch.__version__ else: info["gpu_available"] = False info["gpu_name"] = "CPU Only (No GPU detected)" info["gpu_memory"] = "N/A" info["gpu_memory_used"] = "N/A" info["gpu_memory_free"] = "N/A" info["cuda_version"] = "Not available" info["pytorch_version"] = torch.__version__ # Storage info if os.path.exists("/data"): info["persistent_storage"] = True try: upload_files = os.listdir(UPLOAD_FOLDER) if os.path.exists(UPLOAD_FOLDER) else [] output_files = os.listdir(OUTPUT_FOLDER) if os.path.exists(OUTPUT_FOLDER) else [] upload_size = sum(os.path.getsize(os.path.join(UPLOAD_FOLDER, f)) for f in upload_files if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))) output_size = sum(os.path.getsize(os.path.join(OUTPUT_FOLDER, f)) for f in output_files if os.path.isfile(os.path.join(OUTPUT_FOLDER, f))) info["storage_uploads"] = f"{upload_size / (1024**2):.1f}MB" info["storage_outputs"] = f"{output_size / (1024**2):.1f}MB" info["upload_files_count"] = len(upload_files) info["output_files_count"] = len(output_files) # Add cleanup info info["cleanup_stats"] = app_state["cleanup_stats"] info["cleanup_interval"] = f"{CLEANUP_INTERVAL_MINUTES} minutes" info["file_max_age"] = f"{FILE_MAX_AGE_HOURS} hour(s)" except Exception as e: info["storage_uploads"] = f"Error: {str(e)}" info["storage_outputs"] = "N/A" info["upload_files_count"] = 0 info["output_files_count"] = 0 else: info["persistent_storage"] = False return jsonify({"success": True, "data": info}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route('/api/upload', methods=['POST']) def api_upload(): """Upload and process file for 4K upscaling""" try: if 'file' not in request.files: return jsonify({"success": False, "error": "No file provided"}) file = request.files['file'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}) if file and allowed_file(file.filename): file_id = str(uuid.uuid4()) filename = secure_filename(file.filename) file_ext = filename.rsplit('.', 1)[1].lower() input_filename = f"{file_id}_input.{file_ext}" input_path = os.path.join(UPLOAD_FOLDER, input_filename) file.save(input_path) output_filename = f"{file_id}_4k.{file_ext}" output_path = os.path.join(OUTPUT_FOLDER, output_filename) if file_ext in ['png', 'jpg', 'jpeg', 'gif']: upscale_image_4k(input_path, output_path) media_type = "image" elif file_ext in ['mp4', 'avi', 'mov', 'mkv']: upscale_video_4k(input_path, output_path) media_type = "video" log_message(f"๐Ÿ“ค File uploaded: {filename}") log_message(f"๐ŸŽฏ Starting 4K transformation...") return jsonify({ "success": True, "file_id": file_id, "filename": filename, "output_filename": output_filename, "media_type": media_type, "message": "Upload successful, processing started" }) else: return jsonify({"success": False, "error": "File type not allowed"}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route('/api/processing-status') def api_processing_status(): """Get processing status""" return jsonify({ "success": True, "processing": app_state["processing_active"], "processed_files": app_state["processed_files"] }) @app.route('/api/download/') def api_download(filename): """Download processed file""" try: file_path = os.path.join(OUTPUT_FOLDER, filename) if os.path.exists(file_path): mimetype = get_file_mimetype(filename) file_ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' if file_ext in ['mp4', 'avi', 'mov', 'mkv']: return send_file( file_path, as_attachment=True, download_name=f"4k_upscaled_{filename}", mimetype=mimetype ) else: return send_file( file_path, as_attachment=True, download_name=f"4k_upscaled_{filename}", mimetype=mimetype ) else: return jsonify({"error": "File not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/preview/') def api_preview(filename): """Preview processed file""" try: file_path = os.path.join(OUTPUT_FOLDER, filename) if os.path.exists(file_path): mimetype = get_file_mimetype(filename) return send_file(file_path, mimetype=mimetype) else: return jsonify({"error": "File not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/logs') def api_logs(): """Get application logs""" return jsonify({ "success": True, "logs": app_state["logs"] }) @app.route('/api/clear-logs', methods=['POST']) def api_clear_logs(): """Clear application logs""" app_state["logs"] = [] log_message("๐Ÿงน Logs cleared") return jsonify({"success": True, "message": "Logs cleared"}) @app.route('/api/optimize-gpu', methods=['POST']) def api_optimize_gpu(): """Optimize GPU for processing""" try: success = optimize_gpu() if success: return jsonify({"success": True, "message": "GPU optimized"}) else: return jsonify({"success": False, "message": "GPU optimization failed"}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route('/api/clear-cache', methods=['POST']) def api_clear_cache(): """Clear GPU cache and processed files""" try: if torch.cuda.is_available(): torch.cuda.empty_cache() app_state["processed_files"] = [] log_message("๐Ÿงน Cache and history cleared") return jsonify({"success": True, "message": "Cache cleared"}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route('/api/cleanup-now', methods=['POST']) def api_cleanup_now(): """Manually trigger file cleanup""" try: cleanup_old_files() return jsonify({"success": True, "message": "Manual cleanup completed"}) except Exception as e: return jsonify({"success": False, "error": str(e)}) @app.route('/api/storage-stats') def api_storage_stats(): """Get detailed storage statistics""" try: stats = { "cleanup_stats": app_state["cleanup_stats"], "current_files": {}, "total_storage_mb": 0 } for folder_name, folder_path in [("uploads", UPLOAD_FOLDER), ("outputs", OUTPUT_FOLDER)]: if os.path.exists(folder_path): files = [] total_size = 0 for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) if os.path.isfile(file_path): file_size = os.path.getsize(file_path) file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) files.append({ "name": filename, "size_mb": file_size / (1024*1024), "created": file_time.strftime("%Y-%m-%d %H:%M:%S"), "age_hours": (datetime.now() - file_time).total_seconds() / 3600 }) total_size += file_size stats["current_files"][folder_name] = { "files": files, "count": len(files), "total_size_mb": total_size / (1024*1024) } stats["total_storage_mb"] += total_size / (1024*1024) return jsonify({"success": True, "data": stats}) except Exception as e: return jsonify({"success": False, "error": str(e)}) if __name__ == '__main__': # Initialize system log_message("๐Ÿš€ 4K Upscaler starting...") try: # Start file cleanup scheduler run_scheduler() # Optimize GPU if available if optimize_gpu(): log_message("โœ… GPU optimized for 4K upscaling") else: log_message("โš ๏ธ GPU optimization failed, using CPU fallback") # Run initial cleanup log_message("๐Ÿงน Running initial file cleanup...") cleanup_old_files() log_message("โœ… 4K Upscaler ready") log_message("๐Ÿ“ค Upload images or videos to upscale to 4K resolution") log_message(f"๐Ÿ—‘๏ธ Files will be automatically deleted after {FILE_MAX_AGE_HOURS} hour(s)") except Exception as e: log_message(f"โŒ Initialization error: {str(e)}") log_message("โš ๏ธ Starting in fallback mode...") # Run application try: app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) except Exception as e: log_message(f"โŒ Server startup error: {str(e)}") print(f"Critical error: {str(e)}")