Spaces:
Running
Running
from flask import Flask, render_template, jsonify, request, send_file | |
import torch | |
import os | |
import time | |
import threading | |
from datetime import datetime, timedelta | |
import cv2 | |
from werkzeug.utils import secure_filename | |
import uuid | |
import mimetypes | |
import numpy as np | |
from PIL import Image | |
import schedule | |
# Configuration | |
UPLOAD_FOLDER = '/data/uploads' | |
OUTPUT_FOLDER = '/data/outputs' | |
CLEANUP_INTERVAL_MINUTES = 10 | |
FILE_MAX_AGE_HOURS = 1 | |
# Global application state | |
app_state = { | |
"cuda_available": torch.cuda.is_available(), | |
"processing_active": False, | |
"logs": [], | |
"processed_files": [], | |
"cleanup_stats": { | |
"last_cleanup": None, | |
"files_deleted": 0, | |
"space_freed_mb": 0 | |
} | |
} | |
def ensure_directories(): | |
"""Create necessary directories""" | |
directories = [UPLOAD_FOLDER, OUTPUT_FOLDER] | |
for directory in directories: | |
try: | |
os.makedirs(directory, exist_ok=True) | |
print(f"β Directory verified: {directory}") | |
except Exception as e: | |
print(f"β οΈ Error creating directory {directory}: {e}") | |
def allowed_file(filename): | |
"""Check if file has allowed extension""" | |
return '.' in filename and \ | |
filename.rsplit('.', 1)[1].lower() in ['png', 'jpg', 'jpeg', 'gif', 'mp4', 'avi', 'mov', 'mkv'] | |
def get_file_mimetype(filename): | |
"""Get correct mimetype for file""" | |
mimetype, _ = mimetypes.guess_type(filename) | |
if mimetype is None: | |
ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' | |
if ext in ['mp4', 'avi', 'mov', 'mkv']: | |
mimetype = f'video/{ext}' | |
elif ext in ['png', 'jpg', 'jpeg', 'gif']: | |
mimetype = f'image/{ext}' | |
else: | |
mimetype = 'application/octet-stream' | |
return mimetype | |
def log_message(message): | |
"""Add message to log with timestamp""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
app_state["logs"].append(f"[{timestamp}] {message}") | |
if len(app_state["logs"]) > 100: | |
app_state["logs"] = app_state["logs"][-100:] | |
print(f"[{timestamp}] {message}") | |
def cleanup_old_files(): | |
"""Delete files older than FILE_MAX_AGE_HOURS""" | |
try: | |
current_time = datetime.now() | |
cutoff_time = current_time - timedelta(hours=FILE_MAX_AGE_HOURS) | |
files_deleted = 0 | |
space_freed = 0 | |
# Clean upload folder | |
for folder_path in [UPLOAD_FOLDER, OUTPUT_FOLDER]: | |
if not os.path.exists(folder_path): | |
continue | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
if os.path.isfile(file_path): | |
try: | |
# Get file modification time | |
file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
if file_time < cutoff_time: | |
# Get file size before deletion | |
file_size = os.path.getsize(file_path) | |
# Delete the file | |
os.remove(file_path) | |
files_deleted += 1 | |
space_freed += file_size | |
log_message(f"ποΈ Deleted old file: {filename} ({file_size / (1024*1024):.1f}MB)") | |
except Exception as e: | |
log_message(f"β οΈ Error deleting {filename}: {str(e)}") | |
# Update cleanup stats | |
app_state["cleanup_stats"]["last_cleanup"] = current_time.strftime("%Y-%m-%d %H:%M:%S") | |
app_state["cleanup_stats"]["files_deleted"] += files_deleted | |
app_state["cleanup_stats"]["space_freed_mb"] += space_freed / (1024*1024) | |
if files_deleted > 0: | |
log_message(f"π§Ή Cleanup completed: {files_deleted} files deleted, {space_freed / (1024*1024):.1f}MB freed") | |
else: | |
log_message(f"π§Ή Cleanup completed: No old files to delete") | |
# Clean up processed files list to remove references to deleted files | |
valid_processed_files = [] | |
for file_info in app_state["processed_files"]: | |
output_path = os.path.join(OUTPUT_FOLDER, file_info["output_file"]) | |
if os.path.exists(output_path): | |
valid_processed_files.append(file_info) | |
app_state["processed_files"] = valid_processed_files | |
except Exception as e: | |
log_message(f"β Error during cleanup: {str(e)}") | |
def run_scheduler(): | |
"""Run the file cleanup scheduler in background""" | |
def scheduler_worker(): | |
while True: | |
try: | |
schedule.run_pending() | |
time.sleep(60) # Check every minute | |
except Exception as e: | |
log_message(f"β Scheduler error: {str(e)}") | |
time.sleep(300) # Wait 5 minutes before retrying | |
thread = threading.Thread(target=scheduler_worker, daemon=True) | |
thread.start() | |
log_message(f"π File cleanup scheduler started (every {CLEANUP_INTERVAL_MINUTES} minutes)") | |
def optimize_gpu(): | |
"""Optimize GPU configuration for 4K upscaling""" | |
try: | |
if torch.cuda.is_available(): | |
torch.backends.cudnn.benchmark = True | |
torch.backends.cudnn.allow_tf32 = True | |
torch.backends.cuda.matmul.allow_tf32 = True | |
torch.cuda.empty_cache() | |
# Test GPU | |
test_tensor = torch.randn(100, 100, device='cuda') | |
_ = torch.mm(test_tensor, test_tensor) | |
log_message("β GPU optimized for 4K upscaling") | |
return True | |
else: | |
log_message("β οΈ CUDA not available") | |
return False | |
except Exception as e: | |
log_message(f"β Error optimizing GPU: {str(e)}") | |
return False | |
def upscale_image_4k(input_path, output_path): | |
"""Upscale image to 4K using neural methods""" | |
def process_worker(): | |
try: | |
log_message(f"π¨ Starting 4K upscaling: {os.path.basename(input_path)}") | |
app_state["processing_active"] = True | |
# Read original image | |
image = cv2.imread(input_path) | |
if image is None: | |
log_message("β Error: Could not read image") | |
return | |
h, w = image.shape[:2] | |
log_message(f"π Original resolution: {w}x{h}") | |
# Define target dimensions first | |
target_h, target_w = h * 4, w * 4 | |
# Check GPU memory availability | |
if torch.cuda.is_available(): | |
device = torch.device('cuda') | |
available_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() | |
required_memory = w * h * 4 * 4 * 3 * 4 # Conservative estimation | |
if required_memory > available_memory * 0.8: | |
log_message(f"β οΈ Image too large for available GPU memory, using CPU") | |
device = torch.device('cpu') | |
else: | |
log_message(f"π Using GPU: {torch.cuda.get_device_name()}") | |
if device.type == 'cuda': | |
# Convert image to normalized tensor | |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
image_tensor = torch.from_numpy(image_rgb).float().to(device) / 255.0 | |
image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0) # BCHW format | |
log_message("π§ Applying neural upscaling...") | |
with torch.no_grad(): | |
# Step 1: 2x upscaling with bicubic | |
intermediate = torch.nn.functional.interpolate( | |
image_tensor, | |
size=(h * 2, w * 2), | |
mode='bicubic', | |
align_corners=False, | |
antialias=True | |
) | |
# Step 2: Final 2x upscaling with smoothing | |
upscaled = torch.nn.functional.interpolate( | |
intermediate, | |
size=(target_h, target_w), | |
mode='bicubic', | |
align_corners=False, | |
antialias=True | |
) | |
# Enhanced sharpening filters | |
kernel_size = 3 | |
sigma = 0.5 | |
kernel = torch.zeros((kernel_size, kernel_size), device=device) | |
center = kernel_size // 2 | |
# Create inverted Gaussian kernel for sharpening | |
for i in range(kernel_size): | |
for j in range(kernel_size): | |
dist = ((i - center) ** 2 + (j - center) ** 2) ** 0.5 | |
kernel[i, j] = torch.exp(-0.5 * (dist / sigma) ** 2) | |
kernel = kernel / kernel.sum() | |
sharpen_kernel = torch.zeros_like(kernel) | |
sharpen_kernel[center, center] = 2.0 | |
sharpen_kernel = sharpen_kernel - kernel | |
sharpen_kernel = sharpen_kernel.unsqueeze(0).unsqueeze(0) | |
# Apply sharpening to each channel | |
enhanced_channels = [] | |
for i in range(3): | |
channel = upscaled[:, i:i+1, :, :] | |
padded = torch.nn.functional.pad(channel, (1, 1, 1, 1), mode='reflect') | |
enhanced = torch.nn.functional.conv2d(padded, sharpen_kernel) | |
enhanced_channels.append(enhanced) | |
enhanced = torch.cat(enhanced_channels, dim=1) | |
# Light smoothing to reduce noise | |
gaussian_kernel = torch.tensor([ | |
[1, 4, 6, 4, 1], | |
[4, 16, 24, 16, 4], | |
[6, 24, 36, 24, 6], | |
[4, 16, 24, 16, 4], | |
[1, 4, 6, 4, 1] | |
], dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0) / 256.0 | |
smoothed_channels = [] | |
for i in range(3): | |
channel = enhanced[:, i:i+1, :, :] | |
padded = torch.nn.functional.pad(channel, (2, 2, 2, 2), mode='reflect') | |
smoothed = torch.nn.functional.conv2d(padded, gaussian_kernel) | |
smoothed_channels.append(smoothed) | |
smoothed = torch.cat(smoothed_channels, dim=1) | |
# Blend: 70% enhanced + 30% smoothed for quality/smoothness balance | |
final_result = 0.7 * enhanced + 0.3 * smoothed | |
# Clamp values and optimize contrast | |
final_result = torch.clamp(final_result, 0, 1) | |
# Adaptive contrast optimization | |
for i in range(3): | |
channel = final_result[:, i, :, :] | |
min_val = channel.min() | |
max_val = channel.max() | |
if max_val > min_val: | |
final_result[:, i, :, :] = (channel - min_val) / (max_val - min_val) | |
# Convert back to image | |
result_cpu = final_result.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
result_image = (result_cpu * 255).astype(np.uint8) | |
result_bgr = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR) | |
# Save result | |
cv2.imwrite(output_path, result_bgr) | |
final_h, final_w = result_bgr.shape[:2] | |
log_message(f"β Upscaling completed: {final_w}x{final_h}") | |
log_message(f"π Scale factor: {final_w/w:.1f}x") | |
# Memory cleanup | |
del image_tensor, upscaled, enhanced, final_result | |
torch.cuda.empty_cache() | |
else: | |
# CPU fallback | |
log_message("β οΈ Using CPU - optimized processing") | |
# Progressive upscaling on CPU | |
intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) | |
upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
# Apply sharpening on CPU | |
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) | |
sharpened = cv2.filter2D(upscaled, -1, kernel) | |
# Blend for smoothing | |
final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) | |
cv2.imwrite(output_path, final_result) | |
log_message(f"β CPU upscaling completed: {target_w}x{target_h}") | |
else: | |
# CPU only fallback (no CUDA available) | |
log_message("π» Using CPU processing (CUDA not available)") | |
# Progressive upscaling on CPU | |
intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) | |
upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
# Apply sharpening on CPU | |
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) | |
sharpened = cv2.filter2D(upscaled, -1, kernel) | |
# Blend for smoothing | |
final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) | |
cv2.imwrite(output_path, final_result) | |
log_message(f"β CPU upscaling completed: {target_w}x{target_h}") | |
# Add to processed files list | |
app_state["processed_files"].append({ | |
"input_file": os.path.basename(input_path), | |
"output_file": os.path.basename(output_path), | |
"original_size": f"{w}x{h}", | |
"upscaled_size": f"{target_w}x{target_h}", | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
except Exception as e: | |
log_message(f"β Error in processing: {str(e)}") | |
finally: | |
app_state["processing_active"] = False | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
thread = threading.Thread(target=process_worker) | |
thread.daemon = True | |
thread.start() | |
def upscale_video_4k(input_path, output_path): | |
"""Upscale video to 4K frame by frame""" | |
def process_worker(): | |
try: | |
log_message(f"π¬ Starting 4K video upscaling: {os.path.basename(input_path)}") | |
app_state["processing_active"] = True | |
# Open video | |
cap = cv2.VideoCapture(input_path) | |
if not cap.isOpened(): | |
log_message("β Error: Could not open video") | |
return | |
# Get video properties | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
log_message(f"πΉ Video: {w}x{h}, {fps}FPS, {frame_count} frames") | |
# Configure 4K output | |
target_w, target_h = w * 4, h * 4 | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_path, fourcc, fps, (target_w, target_h)) | |
if torch.cuda.is_available(): | |
device = torch.device('cuda') | |
log_message(f"π Processing with GPU: {torch.cuda.get_device_name()}") | |
process_frames_gpu(cap, out, device, target_h, target_w, frame_count) | |
else: | |
log_message("π» Processing with CPU (may be slower)") | |
process_frames_cpu(cap, out, target_h, target_w, frame_count) | |
cap.release() | |
out.release() | |
# Verify the output file was created and has content | |
if os.path.exists(output_path): | |
file_size = os.path.getsize(output_path) | |
if file_size > 0: | |
log_message(f"β 4K video completed: {target_w}x{target_h}") | |
log_message(f"π Output file size: {file_size / (1024**2):.1f}MB") | |
else: | |
log_message(f"β Output file is empty: {output_path}") | |
raise Exception("Output video file is empty") | |
else: | |
log_message(f"β Output file not created: {output_path}") | |
raise Exception("Output video file was not created") | |
# Add to processed files list | |
app_state["processed_files"].append({ | |
"input_file": os.path.basename(input_path), | |
"output_file": os.path.basename(output_path), | |
"original_size": f"{w}x{h}", | |
"upscaled_size": f"{target_w}x{target_h}", | |
"frame_count": frame_count, | |
"fps": fps, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
except Exception as e: | |
log_message(f"β Error processing video: {str(e)}") | |
finally: | |
app_state["processing_active"] = False | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
thread = threading.Thread(target=process_worker) | |
thread.daemon = True | |
thread.start() | |
def process_frames_cpu(cap, out, target_h, target_w, frame_count): | |
"""Process video frames using CPU""" | |
frame_num = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_num += 1 | |
# Simple CPU upscaling | |
upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
out.write(upscaled_frame) | |
# Progress logging | |
if frame_num % 30 == 0: | |
progress = (frame_num / frame_count) * 100 | |
log_message(f"ποΈ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") | |
def process_frames_gpu(cap, out, device, target_h, target_w, frame_count): | |
"""Process video frames using GPU with PyTorch""" | |
frame_num = 0 | |
torch.backends.cudnn.benchmark = True | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_num += 1 | |
try: | |
# Convert to tensor | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) | |
with torch.no_grad(): | |
upscaled = torch.nn.functional.interpolate( | |
frame_tensor, | |
size=(target_h, target_w), | |
mode='bicubic', | |
align_corners=False | |
) | |
# Convert back | |
result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
result_frame = (result_cpu * 255).astype(np.uint8) | |
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
out.write(result_bgr) | |
except Exception as e: | |
log_message(f"β οΈ GPU processing failed for frame {frame_num}, using CPU fallback") | |
# CPU fallback | |
upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
out.write(upscaled_frame) | |
# Progress logging | |
if frame_num % 30 == 0: | |
progress = (frame_num / frame_count) * 100 | |
log_message(f"ποΈ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") | |
# Periodic memory cleanup | |
if frame_num % 60 == 0 and torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
def process_frame_batch(frame_batch, out, device, target_h, target_w): | |
"""Process batch of frames on GPU for efficiency""" | |
try: | |
with torch.no_grad(): | |
# Convert batch to tensor | |
batch_tensors = [] | |
for frame in frame_batch: | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
frame_tensor = frame_tensor.permute(2, 0, 1) # CHW | |
batch_tensors.append(frame_tensor) | |
# Stack in batch | |
batch_tensor = torch.stack(batch_tensors, dim=0) # BCHW | |
# Upscale entire batch | |
upscaled_batch = torch.nn.functional.interpolate( | |
batch_tensor, | |
size=(target_h, target_w), | |
mode='bicubic', | |
align_corners=False, | |
antialias=True | |
) | |
# Convert each frame back | |
for i in range(upscaled_batch.shape[0]): | |
result_cpu = upscaled_batch[i].permute(1, 2, 0).cpu().numpy() | |
result_frame = (result_cpu * 255).astype(np.uint8) | |
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
out.write(result_bgr) | |
except Exception as e: | |
log_message(f"β Error in batch processing: {str(e)}") | |
# Fallback: process frames individually | |
for frame in frame_batch: | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) | |
upscaled = torch.nn.functional.interpolate( | |
frame_tensor, | |
size=(target_h, target_w), | |
mode='bicubic', | |
align_corners=False | |
) | |
result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
result_frame = (result_cpu * 255).astype(np.uint8) | |
result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
out.write(result_bgr) | |
# Initialize directories | |
ensure_directories() | |
# Set up file cleanup scheduler | |
schedule.every(CLEANUP_INTERVAL_MINUTES).minutes.do(cleanup_old_files) | |
app = Flask(__name__) | |
def index(): | |
return render_template('index.html') | |
def api_system(): | |
"""Get system information""" | |
try: | |
info = {} | |
# GPU Info | |
if torch.cuda.is_available(): | |
info["gpu_available"] = True | |
info["gpu_name"] = torch.cuda.get_device_name() | |
total_memory = torch.cuda.get_device_properties(0).total_memory | |
allocated_memory = torch.cuda.memory_allocated() | |
info["gpu_memory"] = f"{total_memory / (1024**3):.1f}GB" | |
info["gpu_memory_used"] = f"{allocated_memory / (1024**3):.1f}GB" | |
info["gpu_memory_free"] = f"{(total_memory - allocated_memory) / (1024**3):.1f}GB" | |
info["cuda_version"] = torch.version.cuda | |
info["pytorch_version"] = torch.__version__ | |
else: | |
info["gpu_available"] = False | |
info["gpu_name"] = "CPU Only (No GPU detected)" | |
info["gpu_memory"] = "N/A" | |
info["gpu_memory_used"] = "N/A" | |
info["gpu_memory_free"] = "N/A" | |
info["cuda_version"] = "Not available" | |
info["pytorch_version"] = torch.__version__ | |
# Storage info | |
if os.path.exists("/data"): | |
info["persistent_storage"] = True | |
try: | |
upload_files = os.listdir(UPLOAD_FOLDER) if os.path.exists(UPLOAD_FOLDER) else [] | |
output_files = os.listdir(OUTPUT_FOLDER) if os.path.exists(OUTPUT_FOLDER) else [] | |
upload_size = sum(os.path.getsize(os.path.join(UPLOAD_FOLDER, f)) | |
for f in upload_files if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))) | |
output_size = sum(os.path.getsize(os.path.join(OUTPUT_FOLDER, f)) | |
for f in output_files if os.path.isfile(os.path.join(OUTPUT_FOLDER, f))) | |
info["storage_uploads"] = f"{upload_size / (1024**2):.1f}MB" | |
info["storage_outputs"] = f"{output_size / (1024**2):.1f}MB" | |
info["upload_files_count"] = len(upload_files) | |
info["output_files_count"] = len(output_files) | |
# Add cleanup info | |
info["cleanup_stats"] = app_state["cleanup_stats"] | |
info["cleanup_interval"] = f"{CLEANUP_INTERVAL_MINUTES} minutes" | |
info["file_max_age"] = f"{FILE_MAX_AGE_HOURS} hour(s)" | |
except Exception as e: | |
info["storage_uploads"] = f"Error: {str(e)}" | |
info["storage_outputs"] = "N/A" | |
info["upload_files_count"] = 0 | |
info["output_files_count"] = 0 | |
else: | |
info["persistent_storage"] = False | |
return jsonify({"success": True, "data": info}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
def api_upload(): | |
"""Upload and process file for 4K upscaling""" | |
try: | |
if 'file' not in request.files: | |
return jsonify({"success": False, "error": "No file provided"}) | |
file = request.files['file'] | |
if file.filename == '': | |
return jsonify({"success": False, "error": "No file selected"}) | |
if file and allowed_file(file.filename): | |
file_id = str(uuid.uuid4()) | |
filename = secure_filename(file.filename) | |
file_ext = filename.rsplit('.', 1)[1].lower() | |
input_filename = f"{file_id}_input.{file_ext}" | |
input_path = os.path.join(UPLOAD_FOLDER, input_filename) | |
file.save(input_path) | |
output_filename = f"{file_id}_4k.{file_ext}" | |
output_path = os.path.join(OUTPUT_FOLDER, output_filename) | |
if file_ext in ['png', 'jpg', 'jpeg', 'gif']: | |
upscale_image_4k(input_path, output_path) | |
media_type = "image" | |
elif file_ext in ['mp4', 'avi', 'mov', 'mkv']: | |
upscale_video_4k(input_path, output_path) | |
media_type = "video" | |
log_message(f"π€ File uploaded: {filename}") | |
log_message(f"π― Starting 4K transformation...") | |
return jsonify({ | |
"success": True, | |
"file_id": file_id, | |
"filename": filename, | |
"output_filename": output_filename, | |
"media_type": media_type, | |
"message": "Upload successful, processing started" | |
}) | |
else: | |
return jsonify({"success": False, "error": "File type not allowed"}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
def api_processing_status(): | |
"""Get processing status""" | |
return jsonify({ | |
"success": True, | |
"processing": app_state["processing_active"], | |
"processed_files": app_state["processed_files"] | |
}) | |
def api_download(filename): | |
"""Download processed file""" | |
try: | |
file_path = os.path.join(OUTPUT_FOLDER, filename) | |
if os.path.exists(file_path): | |
mimetype = get_file_mimetype(filename) | |
file_ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' | |
if file_ext in ['mp4', 'avi', 'mov', 'mkv']: | |
return send_file( | |
file_path, | |
as_attachment=True, | |
download_name=f"4k_upscaled_{filename}", | |
mimetype=mimetype | |
) | |
else: | |
return send_file( | |
file_path, | |
as_attachment=True, | |
download_name=f"4k_upscaled_{filename}", | |
mimetype=mimetype | |
) | |
else: | |
return jsonify({"error": "File not found"}), 404 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def api_preview(filename): | |
"""Preview processed file""" | |
try: | |
file_path = os.path.join(OUTPUT_FOLDER, filename) | |
if os.path.exists(file_path): | |
mimetype = get_file_mimetype(filename) | |
return send_file(file_path, mimetype=mimetype) | |
else: | |
return jsonify({"error": "File not found"}), 404 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def api_logs(): | |
"""Get application logs""" | |
return jsonify({ | |
"success": True, | |
"logs": app_state["logs"] | |
}) | |
def api_clear_logs(): | |
"""Clear application logs""" | |
app_state["logs"] = [] | |
log_message("π§Ή Logs cleared") | |
return jsonify({"success": True, "message": "Logs cleared"}) | |
def api_optimize_gpu(): | |
"""Optimize GPU for processing""" | |
try: | |
success = optimize_gpu() | |
if success: | |
return jsonify({"success": True, "message": "GPU optimized"}) | |
else: | |
return jsonify({"success": False, "message": "GPU optimization failed"}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
def api_clear_cache(): | |
"""Clear GPU cache and processed files""" | |
try: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
app_state["processed_files"] = [] | |
log_message("π§Ή Cache and history cleared") | |
return jsonify({"success": True, "message": "Cache cleared"}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
def api_cleanup_now(): | |
"""Manually trigger file cleanup""" | |
try: | |
cleanup_old_files() | |
return jsonify({"success": True, "message": "Manual cleanup completed"}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
def api_storage_stats(): | |
"""Get detailed storage statistics""" | |
try: | |
stats = { | |
"cleanup_stats": app_state["cleanup_stats"], | |
"current_files": {}, | |
"total_storage_mb": 0 | |
} | |
for folder_name, folder_path in [("uploads", UPLOAD_FOLDER), ("outputs", OUTPUT_FOLDER)]: | |
if os.path.exists(folder_path): | |
files = [] | |
total_size = 0 | |
for filename in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, filename) | |
if os.path.isfile(file_path): | |
file_size = os.path.getsize(file_path) | |
file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
files.append({ | |
"name": filename, | |
"size_mb": file_size / (1024*1024), | |
"created": file_time.strftime("%Y-%m-%d %H:%M:%S"), | |
"age_hours": (datetime.now() - file_time).total_seconds() / 3600 | |
}) | |
total_size += file_size | |
stats["current_files"][folder_name] = { | |
"files": files, | |
"count": len(files), | |
"total_size_mb": total_size / (1024*1024) | |
} | |
stats["total_storage_mb"] += total_size / (1024*1024) | |
return jsonify({"success": True, "data": stats}) | |
except Exception as e: | |
return jsonify({"success": False, "error": str(e)}) | |
if __name__ == '__main__': | |
# Initialize system | |
log_message("π 4K Upscaler starting...") | |
try: | |
# Start file cleanup scheduler | |
run_scheduler() | |
# Optimize GPU if available | |
if optimize_gpu(): | |
log_message("β GPU optimized for 4K upscaling") | |
else: | |
log_message("β οΈ GPU optimization failed, using CPU fallback") | |
# Run initial cleanup | |
log_message("π§Ή Running initial file cleanup...") | |
cleanup_old_files() | |
log_message("β 4K Upscaler ready") | |
log_message("π€ Upload images or videos to upscale to 4K resolution") | |
log_message(f"ποΈ Files will be automatically deleted after {FILE_MAX_AGE_HOURS} hour(s)") | |
except Exception as e: | |
log_message(f"β Initialization error: {str(e)}") | |
log_message("β οΈ Starting in fallback mode...") | |
# Run application | |
try: | |
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) | |
except Exception as e: | |
log_message(f"β Server startup error: {str(e)}") | |
print(f"Critical error: {str(e)}") |