sr4's picture
Update app.py
7828df8 verified
import os
import subprocess
import logging
import asyncio
import threading
import time
import psutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, List, Tuple, Dict, Any
from pathlib import Path
import gradio as gr
from dataclasses import dataclass
import json
import zipfile
import shutil
from io import BytesIO
# Configuración de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Directorios y rutas
UPLOAD_DIR = os.path.join(os.getcwd(), "uploads")
FFMPEG_PATH = os.path.join(os.path.dirname(__file__), "ffmpeg.exe") # Ruta al ejecutable ffmpeg
# Crear directorios
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Definir formatos soportados (solo video y audio)
VIDEO_FORMATS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v', '.3gp', '.ts', '.mts', '.mpg', '.mpeg', '.ogv']
AUDIO_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.wma', '.opus', '.ac3', '.dts', '.amr', '.aiff']
ALL_SUPPORTED_FORMATS = VIDEO_FORMATS + AUDIO_FORMATS
# Configuraciones de compresión agresiva para reducir tamaño
QUALITY_PRESETS = {
"Compresión Máxima": {
"video_bitrate": "200k",
"audio_bitrate": "32k",
"preset": "slow",
"crf": "35"
},
"Compresión Alta": {
"video_bitrate": "400k",
"audio_bitrate": "48k",
"preset": "medium",
"crf": "30"
},
"Compresión Media": {
"video_bitrate": "600k",
"audio_bitrate": "64k",
"preset": "fast",
"crf": "28"
},
"Compresión Rápida": {
"video_bitrate": "800k",
"audio_bitrate": "96k",
"preset": "veryfast",
"crf": "25"
}
}
@dataclass
class ConversionProgress:
filename: str
status: str
progress: float
original_size: int = 0
compressed_size: int = 0
compression_ratio: float = 0.0
output_file: Optional[str] = None
error: Optional[str] = None
class UniversalCompressor:
def __init__(self):
self.conversion_progress = {}
self.active_conversions = 0
self.cpu_cores = psutil.cpu_count(logical=False)
def get_file_size(self, file_path: str) -> int:
"""Obtener tamaño del archivo en bytes."""
try:
return os.path.getsize(file_path)
except:
return 0
def detect_file_type(self, file_path: str) -> str:
"""Detectar tipo de archivo basado en extensión."""
ext = os.path.splitext(file_path)[1].lower()
if ext in VIDEO_FORMATS:
return 'video'
elif ext in AUDIO_FORMATS:
return 'audio'
else:
return 'unsupported'
def sanitize_filename(self, filename: str) -> str:
"""Limpiar y validar nombre de archivo."""
return ''.join(c for c in filename if c.isalnum() or c in ('.', '_', '-')).rstrip()
def ensure_unique_filename(self, directory: str, filename: str) -> str:
"""Generar nombre único para evitar sobreescrituras."""
base, ext = os.path.splitext(filename)
counter = 1
new_filename = filename
while os.path.exists(os.path.join(directory, new_filename)):
new_filename = f"{base}_{counter}{ext}"
counter += 1
return new_filename
def compress_media(self, input_file: str, quality_preset: str, progress_callback=None) -> str:
"""Comprimir video/audio con configuraciones optimizadas para CPU."""
try:
media_type = self.detect_file_type(input_file)
base_name = os.path.basename(input_file)
file_id = f"{base_name}_{int(time.time())}"
# Inicializar progreso
original_size = self.get_file_size(input_file)
self.conversion_progress[file_id] = ConversionProgress(
filename=base_name,
status="Iniciando compresión...",
progress=0.0,
original_size=original_size
)
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
# Determinar extensión de salida
output_extension = 'mp4' if media_type == 'video' else 'm4a'
output_filename = self.ensure_unique_filename(
UPLOAD_DIR,
f"{os.path.splitext(base_name)[0]}_compressed.{output_extension}"
)
output_file = os.path.join(UPLOAD_DIR, output_filename)
quality_config = QUALITY_PRESETS[quality_preset]
# Configuraciones FFmpeg optimizadas para menor uso de CPU
if media_type == 'video':
ffmpeg_command = [
FFMPEG_PATH,
'-i', input_file,
'-c:v', 'libx264',
'-crf', quality_config['crf'],
'-preset', quality_config['preset'],
'-c:a', 'aac',
'-b:a', quality_config['audio_bitrate'],
'-movflags', '+faststart',
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
'-threads', str(min(self.cpu_cores, 4)), # Limitar threads para menor uso CPU
'-y',
output_file
]
else: # audio
ffmpeg_command = [
FFMPEG_PATH,
'-i', input_file,
'-vn',
'-c:a', 'aac',
'-b:a', quality_config['audio_bitrate'],
'-ar', '44100',
'-threads', str(min(self.cpu_cores, 2)),
'-y',
output_file
]
# Ejecutar conversión
self.conversion_progress[file_id].status = "Comprimiendo..."
self.conversion_progress[file_id].progress = 25.0
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
process = subprocess.Popen(
ffmpeg_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Simular progreso
for i in range(25, 90, 10):
if process.poll() is None:
self.conversion_progress[file_id].progress = float(i)
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
time.sleep(0.3)
else:
break
stdout, stderr = process.communicate()
if process.returncode != 0:
error_msg = f"Error FFmpeg: {stderr}"
self.conversion_progress[file_id].status = "Error"
self.conversion_progress[file_id].error = error_msg
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
raise Exception(error_msg)
# Finalizar
compressed_size = self.get_file_size(output_file)
compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0
self.conversion_progress[file_id].status = "Completado"
self.conversion_progress[file_id].progress = 100.0
self.conversion_progress[file_id].compressed_size = compressed_size
self.conversion_progress[file_id].compression_ratio = compression_ratio
self.conversion_progress[file_id].output_file = output_file
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
logger.info(f"Archivo comprimido: {output_file} (Reducción: {compression_ratio:.1f}%)")
return output_file
except Exception as e:
error_msg = f"Error durante compresión: {str(e)}"
logger.error(error_msg)
if file_id in self.conversion_progress:
self.conversion_progress[file_id].status = "Error"
self.conversion_progress[file_id].error = error_msg
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
raise Exception(error_msg)
def compress_archive(self, input_file: str, progress_callback=None) -> str:
"""Recomprimir archivos comprimidos para mejor ratio."""
try:
base_name = os.path.basename(input_file)
file_id = f"{base_name}_{int(time.time())}"
original_size = self.get_file_size(input_file)
self.conversion_progress[file_id] = ConversionProgress(
filename=base_name,
status="Recomprimiendo archivo...",
progress=50.0,
original_size=original_size
)
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
# Crear archivo 7z con máxima compresión
output_filename = self.ensure_unique_filename(
UPLOAD_DIR, f"{os.path.splitext(base_name)[0]}_compressed.7z"
)
output_file = os.path.join(UPLOAD_DIR, output_filename)
# Usar 7z para recompresión
import py7zr
with py7zr.SevenZipFile(output_file, 'w', filters=[{"id": py7zr.FILTER_LZMA2, "preset": 9}]) as archive:
archive.write(input_file, base_name)
compressed_size = self.get_file_size(output_file)
compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0
self.conversion_progress[file_id].status = "Completado"
self.conversion_progress[file_id].progress = 100.0
self.conversion_progress[file_id].compressed_size = compressed_size
self.conversion_progress[file_id].compression_ratio = compression_ratio
self.conversion_progress[file_id].output_file = output_file
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
return output_file
except Exception as e:
error_msg = f"Error recomprimiendo archivo: {str(e)}"
logger.error(error_msg)
if file_id in self.conversion_progress:
self.conversion_progress[file_id].status = "Error"
self.conversion_progress[file_id].error = error_msg
if progress_callback:
progress_callback(file_id, self.conversion_progress[file_id])
raise Exception(error_msg)
def compress_single_file(self, input_file: str, quality_preset: str, progress_callback=None) -> str:
"""Comprimir un archivo según su tipo."""
if not os.path.exists(input_file):
raise ValueError(f"El archivo no existe: {input_file}")
file_type = self.detect_file_type(input_file)
if file_type in ['video', 'audio']:
return self.compress_media(input_file, quality_preset, progress_callback)
else:
raise ValueError(f"Tipo de archivo no soportado: {os.path.splitext(input_file)[1]}. Solo se soportan videos y audios: {', '.join(ALL_SUPPORTED_FORMATS)}")
def compress_multiple_files(self, file_paths: List[str], quality_preset: str, max_workers: int = None, progress_callback=None) -> List[str]:
"""Comprimir múltiples archivos en paralelo con optimización de CPU."""
if max_workers is None:
max_workers = min(self.cpu_cores, 3) # Limitar para no saturar CPU
compressed_files = []
self.active_conversions = len(file_paths)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(self.compress_single_file, file_path, quality_preset, progress_callback): file_path
for file_path in file_paths
}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
compressed_files.append(result)
except Exception as e:
logger.error(f"Error comprimiendo {file_path}: {e}")
self.active_conversions = 0
return compressed_files
# Instancia global del compresor
compressor = UniversalCompressor()
def update_progress_display():
"""Actualizar visualización del progreso con información detallada."""
if not compressor.conversion_progress:
return "No hay compresiones en progreso."
progress_text = "📊 **Estado de Compresiones:**\n\n"
total_original = 0
total_compressed = 0
for file_id, progress in compressor.conversion_progress.items():
status_emoji = {
"Iniciando compresión...": "🔄",
"Procesando imagen...": "🖼️",
"Comprimiendo...": "⚙️",
"Recomprimiendo archivo...": "📦",
"Completado": "✅",
"Error": "❌"
}.get(progress.status, "🔄")
progress_text += f"{status_emoji} **{progress.filename}**\n"
progress_text += f" Estado: {progress.status}\n"
progress_text += f" Progreso: {progress.progress:.1f}%\n"
if progress.original_size > 0:
original_mb = progress.original_size / (1024 * 1024)
progress_text += f" Tamaño original: {original_mb:.2f} MB\n"
total_original += progress.original_size
if progress.compressed_size > 0:
compressed_mb = progress.compressed_size / (1024 * 1024)
progress_text += f" Tamaño comprimido: {compressed_mb:.2f} MB\n"
progress_text += f" Reducción: {progress.compression_ratio:.1f}%\n"
total_compressed += progress.compressed_size
if progress.error:
progress_text += f" Error: {progress.error}\n"
progress_text += "\n"
# Estadísticas totales
if total_original > 0:
total_reduction = ((total_original - total_compressed) / total_original) * 100
progress_text += f"**📈 Estadísticas Totales:**\n"
progress_text += f"Reducción total: {total_reduction:.1f}% ({(total_original - total_compressed) / (1024 * 1024):.2f} MB ahorrados)\n"
return progress_text
def gradio_single_compression(file, quality_preset):
"""Interfaz Gradio para compresión de archivo único."""
if not file:
raise gr.Error("No se ha subido ningún archivo. Por favor, sube un archivo.")
try:
compressor.conversion_progress.clear()
compressed_file = compressor.compress_single_file(file, quality_preset)
return compressed_file, update_progress_display()
except Exception as e:
raise gr.Error(f"Compresión fallida: {str(e)}")
def gradio_multiple_compression(files, quality_preset, max_workers):
"""Interfaz Gradio para compresión de múltiples archivos."""
if not files:
raise gr.Error("No se han subido archivos. Por favor, sube al menos un archivo.")
try:
compressor.conversion_progress.clear()
file_paths = [file.name for file in files]
compressed_files = compressor.compress_multiple_files(
file_paths, quality_preset, max_workers=max_workers
)
if len(compressed_files) > 1:
zip_filename = os.path.join(UPLOAD_DIR, f"compressed_files_{int(time.time())}.zip")
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for file_path in compressed_files:
zipf.write(file_path, os.path.basename(file_path))
return zip_filename, update_progress_display()
elif len(compressed_files) == 1:
return compressed_files[0], update_progress_display()
else:
raise gr.Error("No se pudo comprimir ningún archivo.")
except Exception as e:
raise gr.Error(f"Compresión fallida: {str(e)}")
def refresh_progress():
"""Refrescar estado del progreso."""
return update_progress_display()
# Interfaz Gradio mejorada
with gr.Blocks(title="🗜️ Compresor Universal Pro Max", theme=gr.themes.Soft()) as iface:
gr.Markdown(
"""
# 🗜️ Compresor Universal Pro Max
**El compresor más potente y rápido para todos tus archivos**
✨ **Características Avanzadas:**
- 🚀 Compresión inteligente y optimizada para CPU
- 🎬 Videos, audios, imágenes, GIFs y documentos
- 🖼️ Soporte completo para PNG, JPG, WebP, TIFF, GIF animado
- ⚡ Procesamiento paralelo ultra-rápido
- 📊 Monitoreo detallado con estadísticas de compresión
- 🔧 Algoritmos optimizados para máxima reducción de tamaño
- 💾 Ahorro significativo de espacio en disco
"""
)
with gr.Tabs():
with gr.TabItem("📄 Archivo Único"):
with gr.Row():
with gr.Column(scale=2):
single_file_input = gr.File(
label="📁 Subir Archivo (Todos los formatos soportados)",
type="filepath",
file_types=ALL_SUPPORTED_FORMATS
)
single_quality_input = gr.Dropdown(
choices=list(QUALITY_PRESETS.keys()),
value="Compresión Alta",
label="🎯 Nivel de Compresión"
)
single_compress_btn = gr.Button(
"🗜️ Comprimir Archivo",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
single_output = gr.File(
label="📥 Descargar Archivo Comprimido",
type="filepath"
)
with gr.TabItem("📁 Múltiples Archivos"):
with gr.Row():
with gr.Column(scale=2):
multiple_files_input = gr.File(
label="📁 Subir Múltiples Archivos",
file_count="multiple",
type="filepath",
file_types=ALL_SUPPORTED_FORMATS
)
with gr.Row():
multiple_quality_input = gr.Dropdown(
choices=list(QUALITY_PRESETS.keys()),
value="Compresión Alta",
label="🎯 Nivel de Compresión"
)
workers_input = gr.Slider(
minimum=1,
maximum=min(psutil.cpu_count(), 6),
value=min(psutil.cpu_count(logical=False), 3),
step=1,
label="⚡ Archivos Simultáneos"
)
multiple_compress_btn = gr.Button(
"🗜️ Comprimir Todos los Archivos",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
multiple_output = gr.File(
label="📥 Descargar Archivos Comprimidos",
type="filepath"
)
# Panel de progreso mejorado
gr.Markdown("## 📊 Estado y Estadísticas de Compresión")
progress_display = gr.Markdown(
value="No hay compresiones en progreso.",
label="Estado"
)
refresh_btn = gr.Button("🔄 Actualizar Estado", size="sm")
# Información detallada
with gr.Accordion("ℹ️ Información Detallada", open=False):
gr.Markdown(
f"""
### 🎯 Niveles de Compresión:
- **Compresión Máxima**: Máxima reducción de tamaño, proceso más lento
- **Compresión Alta**: Excelente balance compresión/velocidad
- **Compresión Media**: Buena compresión, procesamiento rápido
- **Compresión Rápida**: Compresión básica, velocidad máxima
### 📁 Formatos Soportados:
**🎬 Video**: {', '.join(VIDEO_FORMATS)}
**🎵 Audio**: {', '.join(AUDIO_FORMATS)}
### ⚡ Optimizaciones:
- **CPU Cores Detectados**: {psutil.cpu_count()} (Lógicos), {psutil.cpu_count(logical=False)} (Físicos)
- **Threads Optimizados**: Máximo {min(psutil.cpu_count(logical=False), 4)} para video, {min(psutil.cpu_count(logical=False), 2)} para audio
- **Compresión Inteligente**: Algoritmos adaptativos según tipo de archivo
- **Gestión de Memoria**: Optimizada para archivos grandes
"""
)
# Eventos
single_compress_btn.click(
fn=gradio_single_compression,
inputs=[single_file_input, single_quality_input],
outputs=[single_output, progress_display]
)
multiple_compress_btn.click(
fn=gradio_multiple_compression,
inputs=[multiple_files_input, multiple_quality_input, workers_input],
outputs=[multiple_output, progress_display]
)
refresh_btn.click(
fn=refresh_progress,
outputs=[progress_display]
)
# Lanzar interfaz
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))