Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import logging | |
import asyncio | |
import threading | |
import time | |
import psutil | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from typing import Optional, List, Tuple, Dict, Any | |
from pathlib import Path | |
import gradio as gr | |
from dataclasses import dataclass | |
import json | |
import zipfile | |
import shutil | |
from io import BytesIO | |
# Configuración de logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Directorios y rutas | |
UPLOAD_DIR = os.path.join(os.getcwd(), "uploads") | |
FFMPEG_PATH = os.path.join(os.path.dirname(__file__), "ffmpeg.exe") # Ruta al ejecutable ffmpeg | |
# Crear directorios | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
# Definir formatos soportados (solo video y audio) | |
VIDEO_FORMATS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v', '.3gp', '.ts', '.mts', '.mpg', '.mpeg', '.ogv'] | |
AUDIO_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.wma', '.opus', '.ac3', '.dts', '.amr', '.aiff'] | |
ALL_SUPPORTED_FORMATS = VIDEO_FORMATS + AUDIO_FORMATS | |
# Configuraciones de compresión agresiva para reducir tamaño | |
QUALITY_PRESETS = { | |
"Compresión Máxima": { | |
"video_bitrate": "200k", | |
"audio_bitrate": "32k", | |
"preset": "slow", | |
"crf": "35" | |
}, | |
"Compresión Alta": { | |
"video_bitrate": "400k", | |
"audio_bitrate": "48k", | |
"preset": "medium", | |
"crf": "30" | |
}, | |
"Compresión Media": { | |
"video_bitrate": "600k", | |
"audio_bitrate": "64k", | |
"preset": "fast", | |
"crf": "28" | |
}, | |
"Compresión Rápida": { | |
"video_bitrate": "800k", | |
"audio_bitrate": "96k", | |
"preset": "veryfast", | |
"crf": "25" | |
} | |
} | |
class ConversionProgress: | |
filename: str | |
status: str | |
progress: float | |
original_size: int = 0 | |
compressed_size: int = 0 | |
compression_ratio: float = 0.0 | |
output_file: Optional[str] = None | |
error: Optional[str] = None | |
class UniversalCompressor: | |
def __init__(self): | |
self.conversion_progress = {} | |
self.active_conversions = 0 | |
self.cpu_cores = psutil.cpu_count(logical=False) | |
def get_file_size(self, file_path: str) -> int: | |
"""Obtener tamaño del archivo en bytes.""" | |
try: | |
return os.path.getsize(file_path) | |
except: | |
return 0 | |
def detect_file_type(self, file_path: str) -> str: | |
"""Detectar tipo de archivo basado en extensión.""" | |
ext = os.path.splitext(file_path)[1].lower() | |
if ext in VIDEO_FORMATS: | |
return 'video' | |
elif ext in AUDIO_FORMATS: | |
return 'audio' | |
else: | |
return 'unsupported' | |
def sanitize_filename(self, filename: str) -> str: | |
"""Limpiar y validar nombre de archivo.""" | |
return ''.join(c for c in filename if c.isalnum() or c in ('.', '_', '-')).rstrip() | |
def ensure_unique_filename(self, directory: str, filename: str) -> str: | |
"""Generar nombre único para evitar sobreescrituras.""" | |
base, ext = os.path.splitext(filename) | |
counter = 1 | |
new_filename = filename | |
while os.path.exists(os.path.join(directory, new_filename)): | |
new_filename = f"{base}_{counter}{ext}" | |
counter += 1 | |
return new_filename | |
def compress_media(self, input_file: str, quality_preset: str, progress_callback=None) -> str: | |
"""Comprimir video/audio con configuraciones optimizadas para CPU.""" | |
try: | |
media_type = self.detect_file_type(input_file) | |
base_name = os.path.basename(input_file) | |
file_id = f"{base_name}_{int(time.time())}" | |
# Inicializar progreso | |
original_size = self.get_file_size(input_file) | |
self.conversion_progress[file_id] = ConversionProgress( | |
filename=base_name, | |
status="Iniciando compresión...", | |
progress=0.0, | |
original_size=original_size | |
) | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
# Determinar extensión de salida | |
output_extension = 'mp4' if media_type == 'video' else 'm4a' | |
output_filename = self.ensure_unique_filename( | |
UPLOAD_DIR, | |
f"{os.path.splitext(base_name)[0]}_compressed.{output_extension}" | |
) | |
output_file = os.path.join(UPLOAD_DIR, output_filename) | |
quality_config = QUALITY_PRESETS[quality_preset] | |
# Configuraciones FFmpeg optimizadas para menor uso de CPU | |
if media_type == 'video': | |
ffmpeg_command = [ | |
FFMPEG_PATH, | |
'-i', input_file, | |
'-c:v', 'libx264', | |
'-crf', quality_config['crf'], | |
'-preset', quality_config['preset'], | |
'-c:a', 'aac', | |
'-b:a', quality_config['audio_bitrate'], | |
'-movflags', '+faststart', | |
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', | |
'-threads', str(min(self.cpu_cores, 4)), # Limitar threads para menor uso CPU | |
'-y', | |
output_file | |
] | |
else: # audio | |
ffmpeg_command = [ | |
FFMPEG_PATH, | |
'-i', input_file, | |
'-vn', | |
'-c:a', 'aac', | |
'-b:a', quality_config['audio_bitrate'], | |
'-ar', '44100', | |
'-threads', str(min(self.cpu_cores, 2)), | |
'-y', | |
output_file | |
] | |
# Ejecutar conversión | |
self.conversion_progress[file_id].status = "Comprimiendo..." | |
self.conversion_progress[file_id].progress = 25.0 | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
process = subprocess.Popen( | |
ffmpeg_command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
# Simular progreso | |
for i in range(25, 90, 10): | |
if process.poll() is None: | |
self.conversion_progress[file_id].progress = float(i) | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
time.sleep(0.3) | |
else: | |
break | |
stdout, stderr = process.communicate() | |
if process.returncode != 0: | |
error_msg = f"Error FFmpeg: {stderr}" | |
self.conversion_progress[file_id].status = "Error" | |
self.conversion_progress[file_id].error = error_msg | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
raise Exception(error_msg) | |
# Finalizar | |
compressed_size = self.get_file_size(output_file) | |
compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0 | |
self.conversion_progress[file_id].status = "Completado" | |
self.conversion_progress[file_id].progress = 100.0 | |
self.conversion_progress[file_id].compressed_size = compressed_size | |
self.conversion_progress[file_id].compression_ratio = compression_ratio | |
self.conversion_progress[file_id].output_file = output_file | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
logger.info(f"Archivo comprimido: {output_file} (Reducción: {compression_ratio:.1f}%)") | |
return output_file | |
except Exception as e: | |
error_msg = f"Error durante compresión: {str(e)}" | |
logger.error(error_msg) | |
if file_id in self.conversion_progress: | |
self.conversion_progress[file_id].status = "Error" | |
self.conversion_progress[file_id].error = error_msg | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
raise Exception(error_msg) | |
def compress_archive(self, input_file: str, progress_callback=None) -> str: | |
"""Recomprimir archivos comprimidos para mejor ratio.""" | |
try: | |
base_name = os.path.basename(input_file) | |
file_id = f"{base_name}_{int(time.time())}" | |
original_size = self.get_file_size(input_file) | |
self.conversion_progress[file_id] = ConversionProgress( | |
filename=base_name, | |
status="Recomprimiendo archivo...", | |
progress=50.0, | |
original_size=original_size | |
) | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
# Crear archivo 7z con máxima compresión | |
output_filename = self.ensure_unique_filename( | |
UPLOAD_DIR, f"{os.path.splitext(base_name)[0]}_compressed.7z" | |
) | |
output_file = os.path.join(UPLOAD_DIR, output_filename) | |
# Usar 7z para recompresión | |
import py7zr | |
with py7zr.SevenZipFile(output_file, 'w', filters=[{"id": py7zr.FILTER_LZMA2, "preset": 9}]) as archive: | |
archive.write(input_file, base_name) | |
compressed_size = self.get_file_size(output_file) | |
compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0 | |
self.conversion_progress[file_id].status = "Completado" | |
self.conversion_progress[file_id].progress = 100.0 | |
self.conversion_progress[file_id].compressed_size = compressed_size | |
self.conversion_progress[file_id].compression_ratio = compression_ratio | |
self.conversion_progress[file_id].output_file = output_file | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
return output_file | |
except Exception as e: | |
error_msg = f"Error recomprimiendo archivo: {str(e)}" | |
logger.error(error_msg) | |
if file_id in self.conversion_progress: | |
self.conversion_progress[file_id].status = "Error" | |
self.conversion_progress[file_id].error = error_msg | |
if progress_callback: | |
progress_callback(file_id, self.conversion_progress[file_id]) | |
raise Exception(error_msg) | |
def compress_single_file(self, input_file: str, quality_preset: str, progress_callback=None) -> str: | |
"""Comprimir un archivo según su tipo.""" | |
if not os.path.exists(input_file): | |
raise ValueError(f"El archivo no existe: {input_file}") | |
file_type = self.detect_file_type(input_file) | |
if file_type in ['video', 'audio']: | |
return self.compress_media(input_file, quality_preset, progress_callback) | |
else: | |
raise ValueError(f"Tipo de archivo no soportado: {os.path.splitext(input_file)[1]}. Solo se soportan videos y audios: {', '.join(ALL_SUPPORTED_FORMATS)}") | |
def compress_multiple_files(self, file_paths: List[str], quality_preset: str, max_workers: int = None, progress_callback=None) -> List[str]: | |
"""Comprimir múltiples archivos en paralelo con optimización de CPU.""" | |
if max_workers is None: | |
max_workers = min(self.cpu_cores, 3) # Limitar para no saturar CPU | |
compressed_files = [] | |
self.active_conversions = len(file_paths) | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_file = { | |
executor.submit(self.compress_single_file, file_path, quality_preset, progress_callback): file_path | |
for file_path in file_paths | |
} | |
for future in as_completed(future_to_file): | |
file_path = future_to_file[future] | |
try: | |
result = future.result() | |
compressed_files.append(result) | |
except Exception as e: | |
logger.error(f"Error comprimiendo {file_path}: {e}") | |
self.active_conversions = 0 | |
return compressed_files | |
# Instancia global del compresor | |
compressor = UniversalCompressor() | |
def update_progress_display(): | |
"""Actualizar visualización del progreso con información detallada.""" | |
if not compressor.conversion_progress: | |
return "No hay compresiones en progreso." | |
progress_text = "📊 **Estado de Compresiones:**\n\n" | |
total_original = 0 | |
total_compressed = 0 | |
for file_id, progress in compressor.conversion_progress.items(): | |
status_emoji = { | |
"Iniciando compresión...": "🔄", | |
"Procesando imagen...": "🖼️", | |
"Comprimiendo...": "⚙️", | |
"Recomprimiendo archivo...": "📦", | |
"Completado": "✅", | |
"Error": "❌" | |
}.get(progress.status, "🔄") | |
progress_text += f"{status_emoji} **{progress.filename}**\n" | |
progress_text += f" Estado: {progress.status}\n" | |
progress_text += f" Progreso: {progress.progress:.1f}%\n" | |
if progress.original_size > 0: | |
original_mb = progress.original_size / (1024 * 1024) | |
progress_text += f" Tamaño original: {original_mb:.2f} MB\n" | |
total_original += progress.original_size | |
if progress.compressed_size > 0: | |
compressed_mb = progress.compressed_size / (1024 * 1024) | |
progress_text += f" Tamaño comprimido: {compressed_mb:.2f} MB\n" | |
progress_text += f" Reducción: {progress.compression_ratio:.1f}%\n" | |
total_compressed += progress.compressed_size | |
if progress.error: | |
progress_text += f" Error: {progress.error}\n" | |
progress_text += "\n" | |
# Estadísticas totales | |
if total_original > 0: | |
total_reduction = ((total_original - total_compressed) / total_original) * 100 | |
progress_text += f"**📈 Estadísticas Totales:**\n" | |
progress_text += f"Reducción total: {total_reduction:.1f}% ({(total_original - total_compressed) / (1024 * 1024):.2f} MB ahorrados)\n" | |
return progress_text | |
def gradio_single_compression(file, quality_preset): | |
"""Interfaz Gradio para compresión de archivo único.""" | |
if not file: | |
raise gr.Error("No se ha subido ningún archivo. Por favor, sube un archivo.") | |
try: | |
compressor.conversion_progress.clear() | |
compressed_file = compressor.compress_single_file(file, quality_preset) | |
return compressed_file, update_progress_display() | |
except Exception as e: | |
raise gr.Error(f"Compresión fallida: {str(e)}") | |
def gradio_multiple_compression(files, quality_preset, max_workers): | |
"""Interfaz Gradio para compresión de múltiples archivos.""" | |
if not files: | |
raise gr.Error("No se han subido archivos. Por favor, sube al menos un archivo.") | |
try: | |
compressor.conversion_progress.clear() | |
file_paths = [file.name for file in files] | |
compressed_files = compressor.compress_multiple_files( | |
file_paths, quality_preset, max_workers=max_workers | |
) | |
if len(compressed_files) > 1: | |
zip_filename = os.path.join(UPLOAD_DIR, f"compressed_files_{int(time.time())}.zip") | |
with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
for file_path in compressed_files: | |
zipf.write(file_path, os.path.basename(file_path)) | |
return zip_filename, update_progress_display() | |
elif len(compressed_files) == 1: | |
return compressed_files[0], update_progress_display() | |
else: | |
raise gr.Error("No se pudo comprimir ningún archivo.") | |
except Exception as e: | |
raise gr.Error(f"Compresión fallida: {str(e)}") | |
def refresh_progress(): | |
"""Refrescar estado del progreso.""" | |
return update_progress_display() | |
# Interfaz Gradio mejorada | |
with gr.Blocks(title="🗜️ Compresor Universal Pro Max", theme=gr.themes.Soft()) as iface: | |
gr.Markdown( | |
""" | |
# 🗜️ Compresor Universal Pro Max | |
**El compresor más potente y rápido para todos tus archivos** | |
✨ **Características Avanzadas:** | |
- 🚀 Compresión inteligente y optimizada para CPU | |
- 🎬 Videos, audios, imágenes, GIFs y documentos | |
- 🖼️ Soporte completo para PNG, JPG, WebP, TIFF, GIF animado | |
- ⚡ Procesamiento paralelo ultra-rápido | |
- 📊 Monitoreo detallado con estadísticas de compresión | |
- 🔧 Algoritmos optimizados para máxima reducción de tamaño | |
- 💾 Ahorro significativo de espacio en disco | |
""" | |
) | |
with gr.Tabs(): | |
with gr.TabItem("📄 Archivo Único"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
single_file_input = gr.File( | |
label="📁 Subir Archivo (Todos los formatos soportados)", | |
type="filepath", | |
file_types=ALL_SUPPORTED_FORMATS | |
) | |
single_quality_input = gr.Dropdown( | |
choices=list(QUALITY_PRESETS.keys()), | |
value="Compresión Alta", | |
label="🎯 Nivel de Compresión" | |
) | |
single_compress_btn = gr.Button( | |
"🗜️ Comprimir Archivo", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Column(scale=1): | |
single_output = gr.File( | |
label="📥 Descargar Archivo Comprimido", | |
type="filepath" | |
) | |
with gr.TabItem("📁 Múltiples Archivos"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
multiple_files_input = gr.File( | |
label="📁 Subir Múltiples Archivos", | |
file_count="multiple", | |
type="filepath", | |
file_types=ALL_SUPPORTED_FORMATS | |
) | |
with gr.Row(): | |
multiple_quality_input = gr.Dropdown( | |
choices=list(QUALITY_PRESETS.keys()), | |
value="Compresión Alta", | |
label="🎯 Nivel de Compresión" | |
) | |
workers_input = gr.Slider( | |
minimum=1, | |
maximum=min(psutil.cpu_count(), 6), | |
value=min(psutil.cpu_count(logical=False), 3), | |
step=1, | |
label="⚡ Archivos Simultáneos" | |
) | |
multiple_compress_btn = gr.Button( | |
"🗜️ Comprimir Todos los Archivos", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Column(scale=1): | |
multiple_output = gr.File( | |
label="📥 Descargar Archivos Comprimidos", | |
type="filepath" | |
) | |
# Panel de progreso mejorado | |
gr.Markdown("## 📊 Estado y Estadísticas de Compresión") | |
progress_display = gr.Markdown( | |
value="No hay compresiones en progreso.", | |
label="Estado" | |
) | |
refresh_btn = gr.Button("🔄 Actualizar Estado", size="sm") | |
# Información detallada | |
with gr.Accordion("ℹ️ Información Detallada", open=False): | |
gr.Markdown( | |
f""" | |
### 🎯 Niveles de Compresión: | |
- **Compresión Máxima**: Máxima reducción de tamaño, proceso más lento | |
- **Compresión Alta**: Excelente balance compresión/velocidad | |
- **Compresión Media**: Buena compresión, procesamiento rápido | |
- **Compresión Rápida**: Compresión básica, velocidad máxima | |
### 📁 Formatos Soportados: | |
**🎬 Video**: {', '.join(VIDEO_FORMATS)} | |
**🎵 Audio**: {', '.join(AUDIO_FORMATS)} | |
### ⚡ Optimizaciones: | |
- **CPU Cores Detectados**: {psutil.cpu_count()} (Lógicos), {psutil.cpu_count(logical=False)} (Físicos) | |
- **Threads Optimizados**: Máximo {min(psutil.cpu_count(logical=False), 4)} para video, {min(psutil.cpu_count(logical=False), 2)} para audio | |
- **Compresión Inteligente**: Algoritmos adaptativos según tipo de archivo | |
- **Gestión de Memoria**: Optimizada para archivos grandes | |
""" | |
) | |
# Eventos | |
single_compress_btn.click( | |
fn=gradio_single_compression, | |
inputs=[single_file_input, single_quality_input], | |
outputs=[single_output, progress_display] | |
) | |
multiple_compress_btn.click( | |
fn=gradio_multiple_compression, | |
inputs=[multiple_files_input, multiple_quality_input, workers_input], | |
outputs=[multiple_output, progress_display] | |
) | |
refresh_btn.click( | |
fn=refresh_progress, | |
outputs=[progress_display] | |
) | |
# Lanzar interfaz | |
if __name__ == "__main__": | |
iface.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) | |