4K_Upscaler / app.py
Malaji71's picture
Update app.py
141e3f6 verified
from flask import Flask, render_template, jsonify, request, send_file
import torch
import os
import time
import threading
from datetime import datetime, timedelta
import cv2
from werkzeug.utils import secure_filename
import uuid
import mimetypes
import numpy as np
from PIL import Image
import schedule
# Configuration
UPLOAD_FOLDER = '/data/uploads'
OUTPUT_FOLDER = '/data/outputs'
CLEANUP_INTERVAL_MINUTES = 10
FILE_MAX_AGE_HOURS = 1
# Global application state
app_state = {
"cuda_available": torch.cuda.is_available(),
"processing_active": False,
"logs": [],
"processed_files": [],
"cleanup_stats": {
"last_cleanup": None,
"files_deleted": 0,
"space_freed_mb": 0
}
}
def ensure_directories():
"""Create necessary directories"""
directories = [UPLOAD_FOLDER, OUTPUT_FOLDER]
for directory in directories:
try:
os.makedirs(directory, exist_ok=True)
print(f"βœ… Directory verified: {directory}")
except Exception as e:
print(f"⚠️ Error creating directory {directory}: {e}")
def allowed_file(filename):
"""Check if file has allowed extension"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ['png', 'jpg', 'jpeg', 'gif', 'mp4', 'avi', 'mov', 'mkv']
def get_file_mimetype(filename):
"""Get correct mimetype for file"""
mimetype, _ = mimetypes.guess_type(filename)
if mimetype is None:
ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else ''
if ext in ['mp4', 'avi', 'mov', 'mkv']:
mimetype = f'video/{ext}'
elif ext in ['png', 'jpg', 'jpeg', 'gif']:
mimetype = f'image/{ext}'
else:
mimetype = 'application/octet-stream'
return mimetype
def log_message(message):
"""Add message to log with timestamp"""
timestamp = datetime.now().strftime("%H:%M:%S")
app_state["logs"].append(f"[{timestamp}] {message}")
if len(app_state["logs"]) > 100:
app_state["logs"] = app_state["logs"][-100:]
print(f"[{timestamp}] {message}")
def cleanup_old_files():
"""Delete files older than FILE_MAX_AGE_HOURS"""
try:
current_time = datetime.now()
cutoff_time = current_time - timedelta(hours=FILE_MAX_AGE_HOURS)
files_deleted = 0
space_freed = 0
# Clean upload folder
for folder_path in [UPLOAD_FOLDER, OUTPUT_FOLDER]:
if not os.path.exists(folder_path):
continue
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
try:
# Get file modification time
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_time:
# Get file size before deletion
file_size = os.path.getsize(file_path)
# Delete the file
os.remove(file_path)
files_deleted += 1
space_freed += file_size
log_message(f"πŸ—‘οΈ Deleted old file: {filename} ({file_size / (1024*1024):.1f}MB)")
except Exception as e:
log_message(f"⚠️ Error deleting {filename}: {str(e)}")
# Update cleanup stats
app_state["cleanup_stats"]["last_cleanup"] = current_time.strftime("%Y-%m-%d %H:%M:%S")
app_state["cleanup_stats"]["files_deleted"] += files_deleted
app_state["cleanup_stats"]["space_freed_mb"] += space_freed / (1024*1024)
if files_deleted > 0:
log_message(f"🧹 Cleanup completed: {files_deleted} files deleted, {space_freed / (1024*1024):.1f}MB freed")
else:
log_message(f"🧹 Cleanup completed: No old files to delete")
# Clean up processed files list to remove references to deleted files
valid_processed_files = []
for file_info in app_state["processed_files"]:
output_path = os.path.join(OUTPUT_FOLDER, file_info["output_file"])
if os.path.exists(output_path):
valid_processed_files.append(file_info)
app_state["processed_files"] = valid_processed_files
except Exception as e:
log_message(f"❌ Error during cleanup: {str(e)}")
def run_scheduler():
"""Run the file cleanup scheduler in background"""
def scheduler_worker():
while True:
try:
schedule.run_pending()
time.sleep(60) # Check every minute
except Exception as e:
log_message(f"❌ Scheduler error: {str(e)}")
time.sleep(300) # Wait 5 minutes before retrying
thread = threading.Thread(target=scheduler_worker, daemon=True)
thread.start()
log_message(f"πŸ•’ File cleanup scheduler started (every {CLEANUP_INTERVAL_MINUTES} minutes)")
def optimize_gpu():
"""Optimize GPU configuration for 4K upscaling"""
try:
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cuda.matmul.allow_tf32 = True
torch.cuda.empty_cache()
# Test GPU
test_tensor = torch.randn(100, 100, device='cuda')
_ = torch.mm(test_tensor, test_tensor)
log_message("βœ… GPU optimized for 4K upscaling")
return True
else:
log_message("⚠️ CUDA not available")
return False
except Exception as e:
log_message(f"❌ Error optimizing GPU: {str(e)}")
return False
def upscale_image_4k(input_path, output_path):
"""Upscale image to 4K using neural methods"""
def process_worker():
try:
log_message(f"🎨 Starting 4K upscaling: {os.path.basename(input_path)}")
app_state["processing_active"] = True
# Read original image
image = cv2.imread(input_path)
if image is None:
log_message("❌ Error: Could not read image")
return
h, w = image.shape[:2]
log_message(f"πŸ“ Original resolution: {w}x{h}")
# Define target dimensions first
target_h, target_w = h * 4, w * 4
# Check GPU memory availability
if torch.cuda.is_available():
device = torch.device('cuda')
available_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()
required_memory = w * h * 4 * 4 * 3 * 4 # Conservative estimation
if required_memory > available_memory * 0.8:
log_message(f"⚠️ Image too large for available GPU memory, using CPU")
device = torch.device('cpu')
else:
log_message(f"πŸš€ Using GPU: {torch.cuda.get_device_name()}")
if device.type == 'cuda':
# Convert image to normalized tensor
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_tensor = torch.from_numpy(image_rgb).float().to(device) / 255.0
image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0) # BCHW format
log_message("🧠 Applying neural upscaling...")
with torch.no_grad():
# Step 1: 2x upscaling with bicubic
intermediate = torch.nn.functional.interpolate(
image_tensor,
size=(h * 2, w * 2),
mode='bicubic',
align_corners=False,
antialias=True
)
# Step 2: Final 2x upscaling with smoothing
upscaled = torch.nn.functional.interpolate(
intermediate,
size=(target_h, target_w),
mode='bicubic',
align_corners=False,
antialias=True
)
# Enhanced sharpening filters
kernel_size = 3
sigma = 0.5
kernel = torch.zeros((kernel_size, kernel_size), device=device)
center = kernel_size // 2
# Create inverted Gaussian kernel for sharpening
for i in range(kernel_size):
for j in range(kernel_size):
dist = ((i - center) ** 2 + (j - center) ** 2) ** 0.5
kernel[i, j] = torch.exp(-0.5 * (dist / sigma) ** 2)
kernel = kernel / kernel.sum()
sharpen_kernel = torch.zeros_like(kernel)
sharpen_kernel[center, center] = 2.0
sharpen_kernel = sharpen_kernel - kernel
sharpen_kernel = sharpen_kernel.unsqueeze(0).unsqueeze(0)
# Apply sharpening to each channel
enhanced_channels = []
for i in range(3):
channel = upscaled[:, i:i+1, :, :]
padded = torch.nn.functional.pad(channel, (1, 1, 1, 1), mode='reflect')
enhanced = torch.nn.functional.conv2d(padded, sharpen_kernel)
enhanced_channels.append(enhanced)
enhanced = torch.cat(enhanced_channels, dim=1)
# Light smoothing to reduce noise
gaussian_kernel = torch.tensor([
[1, 4, 6, 4, 1],
[4, 16, 24, 16, 4],
[6, 24, 36, 24, 6],
[4, 16, 24, 16, 4],
[1, 4, 6, 4, 1]
], dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0) / 256.0
smoothed_channels = []
for i in range(3):
channel = enhanced[:, i:i+1, :, :]
padded = torch.nn.functional.pad(channel, (2, 2, 2, 2), mode='reflect')
smoothed = torch.nn.functional.conv2d(padded, gaussian_kernel)
smoothed_channels.append(smoothed)
smoothed = torch.cat(smoothed_channels, dim=1)
# Blend: 70% enhanced + 30% smoothed for quality/smoothness balance
final_result = 0.7 * enhanced + 0.3 * smoothed
# Clamp values and optimize contrast
final_result = torch.clamp(final_result, 0, 1)
# Adaptive contrast optimization
for i in range(3):
channel = final_result[:, i, :, :]
min_val = channel.min()
max_val = channel.max()
if max_val > min_val:
final_result[:, i, :, :] = (channel - min_val) / (max_val - min_val)
# Convert back to image
result_cpu = final_result.squeeze(0).permute(1, 2, 0).cpu().numpy()
result_image = (result_cpu * 255).astype(np.uint8)
result_bgr = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR)
# Save result
cv2.imwrite(output_path, result_bgr)
final_h, final_w = result_bgr.shape[:2]
log_message(f"βœ… Upscaling completed: {final_w}x{final_h}")
log_message(f"πŸ“ˆ Scale factor: {final_w/w:.1f}x")
# Memory cleanup
del image_tensor, upscaled, enhanced, final_result
torch.cuda.empty_cache()
else:
# CPU fallback
log_message("⚠️ Using CPU - optimized processing")
# Progressive upscaling on CPU
intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC)
upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
# Apply sharpening on CPU
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(upscaled, -1, kernel)
# Blend for smoothing
final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0)
cv2.imwrite(output_path, final_result)
log_message(f"βœ… CPU upscaling completed: {target_w}x{target_h}")
else:
# CPU only fallback (no CUDA available)
log_message("πŸ’» Using CPU processing (CUDA not available)")
# Progressive upscaling on CPU
intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC)
upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
# Apply sharpening on CPU
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
sharpened = cv2.filter2D(upscaled, -1, kernel)
# Blend for smoothing
final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0)
cv2.imwrite(output_path, final_result)
log_message(f"βœ… CPU upscaling completed: {target_w}x{target_h}")
# Add to processed files list
app_state["processed_files"].append({
"input_file": os.path.basename(input_path),
"output_file": os.path.basename(output_path),
"original_size": f"{w}x{h}",
"upscaled_size": f"{target_w}x{target_h}",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
except Exception as e:
log_message(f"❌ Error in processing: {str(e)}")
finally:
app_state["processing_active"] = False
if torch.cuda.is_available():
torch.cuda.empty_cache()
thread = threading.Thread(target=process_worker)
thread.daemon = True
thread.start()
def upscale_video_4k(input_path, output_path):
"""Upscale video to 4K frame by frame"""
def process_worker():
try:
log_message(f"🎬 Starting 4K video upscaling: {os.path.basename(input_path)}")
app_state["processing_active"] = True
# Open video
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
log_message("❌ Error: Could not open video")
return
# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
log_message(f"πŸ“Ή Video: {w}x{h}, {fps}FPS, {frame_count} frames")
# Configure 4K output
target_w, target_h = w * 4, h * 4
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (target_w, target_h))
if torch.cuda.is_available():
device = torch.device('cuda')
log_message(f"πŸš€ Processing with GPU: {torch.cuda.get_device_name()}")
process_frames_gpu(cap, out, device, target_h, target_w, frame_count)
else:
log_message("πŸ’» Processing with CPU (may be slower)")
process_frames_cpu(cap, out, target_h, target_w, frame_count)
cap.release()
out.release()
# Verify the output file was created and has content
if os.path.exists(output_path):
file_size = os.path.getsize(output_path)
if file_size > 0:
log_message(f"βœ… 4K video completed: {target_w}x{target_h}")
log_message(f"πŸ“ Output file size: {file_size / (1024**2):.1f}MB")
else:
log_message(f"❌ Output file is empty: {output_path}")
raise Exception("Output video file is empty")
else:
log_message(f"❌ Output file not created: {output_path}")
raise Exception("Output video file was not created")
# Add to processed files list
app_state["processed_files"].append({
"input_file": os.path.basename(input_path),
"output_file": os.path.basename(output_path),
"original_size": f"{w}x{h}",
"upscaled_size": f"{target_w}x{target_h}",
"frame_count": frame_count,
"fps": fps,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
except Exception as e:
log_message(f"❌ Error processing video: {str(e)}")
finally:
app_state["processing_active"] = False
if torch.cuda.is_available():
torch.cuda.empty_cache()
thread = threading.Thread(target=process_worker)
thread.daemon = True
thread.start()
def process_frames_cpu(cap, out, target_h, target_w, frame_count):
"""Process video frames using CPU"""
frame_num = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_num += 1
# Simple CPU upscaling
upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
out.write(upscaled_frame)
# Progress logging
if frame_num % 30 == 0:
progress = (frame_num / frame_count) * 100
log_message(f"🎞️ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)")
def process_frames_gpu(cap, out, device, target_h, target_w, frame_count):
"""Process video frames using GPU with PyTorch"""
frame_num = 0
torch.backends.cudnn.benchmark = True
while True:
ret, frame = cap.read()
if not ret:
break
frame_num += 1
try:
# Convert to tensor
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0
frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0)
with torch.no_grad():
upscaled = torch.nn.functional.interpolate(
frame_tensor,
size=(target_h, target_w),
mode='bicubic',
align_corners=False
)
# Convert back
result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy()
result_frame = (result_cpu * 255).astype(np.uint8)
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR)
out.write(result_bgr)
except Exception as e:
log_message(f"⚠️ GPU processing failed for frame {frame_num}, using CPU fallback")
# CPU fallback
upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
out.write(upscaled_frame)
# Progress logging
if frame_num % 30 == 0:
progress = (frame_num / frame_count) * 100
log_message(f"🎞️ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)")
# Periodic memory cleanup
if frame_num % 60 == 0 and torch.cuda.is_available():
torch.cuda.empty_cache()
def process_frame_batch(frame_batch, out, device, target_h, target_w):
"""Process batch of frames on GPU for efficiency"""
try:
with torch.no_grad():
# Convert batch to tensor
batch_tensors = []
for frame in frame_batch:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0
frame_tensor = frame_tensor.permute(2, 0, 1) # CHW
batch_tensors.append(frame_tensor)
# Stack in batch
batch_tensor = torch.stack(batch_tensors, dim=0) # BCHW
# Upscale entire batch
upscaled_batch = torch.nn.functional.interpolate(
batch_tensor,
size=(target_h, target_w),
mode='bicubic',
align_corners=False,
antialias=True
)
# Convert each frame back
for i in range(upscaled_batch.shape[0]):
result_cpu = upscaled_batch[i].permute(1, 2, 0).cpu().numpy()
result_frame = (result_cpu * 255).astype(np.uint8)
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR)
out.write(result_bgr)
except Exception as e:
log_message(f"❌ Error in batch processing: {str(e)}")
# Fallback: process frames individually
for frame in frame_batch:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0
frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0)
upscaled = torch.nn.functional.interpolate(
frame_tensor,
size=(target_h, target_w),
mode='bicubic',
align_corners=False
)
result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy()
result_frame = (result_cpu * 255).astype(np.uint8)
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR)
out.write(result_bgr)
# Initialize directories
ensure_directories()
# Set up file cleanup scheduler
schedule.every(CLEANUP_INTERVAL_MINUTES).minutes.do(cleanup_old_files)
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/system')
def api_system():
"""Get system information"""
try:
info = {}
# GPU Info
if torch.cuda.is_available():
info["gpu_available"] = True
info["gpu_name"] = torch.cuda.get_device_name()
total_memory = torch.cuda.get_device_properties(0).total_memory
allocated_memory = torch.cuda.memory_allocated()
info["gpu_memory"] = f"{total_memory / (1024**3):.1f}GB"
info["gpu_memory_used"] = f"{allocated_memory / (1024**3):.1f}GB"
info["gpu_memory_free"] = f"{(total_memory - allocated_memory) / (1024**3):.1f}GB"
info["cuda_version"] = torch.version.cuda
info["pytorch_version"] = torch.__version__
else:
info["gpu_available"] = False
info["gpu_name"] = "CPU Only (No GPU detected)"
info["gpu_memory"] = "N/A"
info["gpu_memory_used"] = "N/A"
info["gpu_memory_free"] = "N/A"
info["cuda_version"] = "Not available"
info["pytorch_version"] = torch.__version__
# Storage info
if os.path.exists("/data"):
info["persistent_storage"] = True
try:
upload_files = os.listdir(UPLOAD_FOLDER) if os.path.exists(UPLOAD_FOLDER) else []
output_files = os.listdir(OUTPUT_FOLDER) if os.path.exists(OUTPUT_FOLDER) else []
upload_size = sum(os.path.getsize(os.path.join(UPLOAD_FOLDER, f))
for f in upload_files if os.path.isfile(os.path.join(UPLOAD_FOLDER, f)))
output_size = sum(os.path.getsize(os.path.join(OUTPUT_FOLDER, f))
for f in output_files if os.path.isfile(os.path.join(OUTPUT_FOLDER, f)))
info["storage_uploads"] = f"{upload_size / (1024**2):.1f}MB"
info["storage_outputs"] = f"{output_size / (1024**2):.1f}MB"
info["upload_files_count"] = len(upload_files)
info["output_files_count"] = len(output_files)
# Add cleanup info
info["cleanup_stats"] = app_state["cleanup_stats"]
info["cleanup_interval"] = f"{CLEANUP_INTERVAL_MINUTES} minutes"
info["file_max_age"] = f"{FILE_MAX_AGE_HOURS} hour(s)"
except Exception as e:
info["storage_uploads"] = f"Error: {str(e)}"
info["storage_outputs"] = "N/A"
info["upload_files_count"] = 0
info["output_files_count"] = 0
else:
info["persistent_storage"] = False
return jsonify({"success": True, "data": info})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/api/upload', methods=['POST'])
def api_upload():
"""Upload and process file for 4K upscaling"""
try:
if 'file' not in request.files:
return jsonify({"success": False, "error": "No file provided"})
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"})
if file and allowed_file(file.filename):
file_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
file_ext = filename.rsplit('.', 1)[1].lower()
input_filename = f"{file_id}_input.{file_ext}"
input_path = os.path.join(UPLOAD_FOLDER, input_filename)
file.save(input_path)
output_filename = f"{file_id}_4k.{file_ext}"
output_path = os.path.join(OUTPUT_FOLDER, output_filename)
if file_ext in ['png', 'jpg', 'jpeg', 'gif']:
upscale_image_4k(input_path, output_path)
media_type = "image"
elif file_ext in ['mp4', 'avi', 'mov', 'mkv']:
upscale_video_4k(input_path, output_path)
media_type = "video"
log_message(f"πŸ“€ File uploaded: {filename}")
log_message(f"🎯 Starting 4K transformation...")
return jsonify({
"success": True,
"file_id": file_id,
"filename": filename,
"output_filename": output_filename,
"media_type": media_type,
"message": "Upload successful, processing started"
})
else:
return jsonify({"success": False, "error": "File type not allowed"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/api/processing-status')
def api_processing_status():
"""Get processing status"""
return jsonify({
"success": True,
"processing": app_state["processing_active"],
"processed_files": app_state["processed_files"]
})
@app.route('/api/download/<filename>')
def api_download(filename):
"""Download processed file"""
try:
file_path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.exists(file_path):
mimetype = get_file_mimetype(filename)
file_ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else ''
if file_ext in ['mp4', 'avi', 'mov', 'mkv']:
return send_file(
file_path,
as_attachment=True,
download_name=f"4k_upscaled_{filename}",
mimetype=mimetype
)
else:
return send_file(
file_path,
as_attachment=True,
download_name=f"4k_upscaled_{filename}",
mimetype=mimetype
)
else:
return jsonify({"error": "File not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/preview/<filename>')
def api_preview(filename):
"""Preview processed file"""
try:
file_path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.exists(file_path):
mimetype = get_file_mimetype(filename)
return send_file(file_path, mimetype=mimetype)
else:
return jsonify({"error": "File not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/logs')
def api_logs():
"""Get application logs"""
return jsonify({
"success": True,
"logs": app_state["logs"]
})
@app.route('/api/clear-logs', methods=['POST'])
def api_clear_logs():
"""Clear application logs"""
app_state["logs"] = []
log_message("🧹 Logs cleared")
return jsonify({"success": True, "message": "Logs cleared"})
@app.route('/api/optimize-gpu', methods=['POST'])
def api_optimize_gpu():
"""Optimize GPU for processing"""
try:
success = optimize_gpu()
if success:
return jsonify({"success": True, "message": "GPU optimized"})
else:
return jsonify({"success": False, "message": "GPU optimization failed"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/api/clear-cache', methods=['POST'])
def api_clear_cache():
"""Clear GPU cache and processed files"""
try:
if torch.cuda.is_available():
torch.cuda.empty_cache()
app_state["processed_files"] = []
log_message("🧹 Cache and history cleared")
return jsonify({"success": True, "message": "Cache cleared"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/api/cleanup-now', methods=['POST'])
def api_cleanup_now():
"""Manually trigger file cleanup"""
try:
cleanup_old_files()
return jsonify({"success": True, "message": "Manual cleanup completed"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/api/storage-stats')
def api_storage_stats():
"""Get detailed storage statistics"""
try:
stats = {
"cleanup_stats": app_state["cleanup_stats"],
"current_files": {},
"total_storage_mb": 0
}
for folder_name, folder_path in [("uploads", UPLOAD_FOLDER), ("outputs", OUTPUT_FOLDER)]:
if os.path.exists(folder_path):
files = []
total_size = 0
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
file_size = os.path.getsize(file_path)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
files.append({
"name": filename,
"size_mb": file_size / (1024*1024),
"created": file_time.strftime("%Y-%m-%d %H:%M:%S"),
"age_hours": (datetime.now() - file_time).total_seconds() / 3600
})
total_size += file_size
stats["current_files"][folder_name] = {
"files": files,
"count": len(files),
"total_size_mb": total_size / (1024*1024)
}
stats["total_storage_mb"] += total_size / (1024*1024)
return jsonify({"success": True, "data": stats})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
if __name__ == '__main__':
# Initialize system
log_message("πŸš€ 4K Upscaler starting...")
try:
# Start file cleanup scheduler
run_scheduler()
# Optimize GPU if available
if optimize_gpu():
log_message("βœ… GPU optimized for 4K upscaling")
else:
log_message("⚠️ GPU optimization failed, using CPU fallback")
# Run initial cleanup
log_message("🧹 Running initial file cleanup...")
cleanup_old_files()
log_message("βœ… 4K Upscaler ready")
log_message("πŸ“€ Upload images or videos to upscale to 4K resolution")
log_message(f"πŸ—‘οΈ Files will be automatically deleted after {FILE_MAX_AGE_HOURS} hour(s)")
except Exception as e:
log_message(f"❌ Initialization error: {str(e)}")
log_message("⚠️ Starting in fallback mode...")
# Run application
try:
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)
except Exception as e:
log_message(f"❌ Server startup error: {str(e)}")
print(f"Critical error: {str(e)}")