import os import subprocess import logging import asyncio import threading import time import psutil from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional, List, Tuple, Dict, Any from pathlib import Path import gradio as gr from dataclasses import dataclass import json import zipfile import shutil from io import BytesIO # Configuración de logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Directorios y rutas UPLOAD_DIR = os.path.join(os.getcwd(), "uploads") FFMPEG_PATH = os.path.join(os.path.dirname(__file__), "ffmpeg.exe") # Ruta al ejecutable ffmpeg # Crear directorios os.makedirs(UPLOAD_DIR, exist_ok=True) # Definir formatos soportados (solo video y audio) VIDEO_FORMATS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.m4v', '.3gp', '.ts', '.mts', '.mpg', '.mpeg', '.ogv'] AUDIO_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.wma', '.opus', '.ac3', '.dts', '.amr', '.aiff'] ALL_SUPPORTED_FORMATS = VIDEO_FORMATS + AUDIO_FORMATS # Configuraciones de compresión agresiva para reducir tamaño QUALITY_PRESETS = { "Compresión Máxima": { "video_bitrate": "200k", "audio_bitrate": "32k", "preset": "slow", "crf": "35" }, "Compresión Alta": { "video_bitrate": "400k", "audio_bitrate": "48k", "preset": "medium", "crf": "30" }, "Compresión Media": { "video_bitrate": "600k", "audio_bitrate": "64k", "preset": "fast", "crf": "28" }, "Compresión Rápida": { "video_bitrate": "800k", "audio_bitrate": "96k", "preset": "veryfast", "crf": "25" } } @dataclass class ConversionProgress: filename: str status: str progress: float original_size: int = 0 compressed_size: int = 0 compression_ratio: float = 0.0 output_file: Optional[str] = None error: Optional[str] = None class UniversalCompressor: def __init__(self): self.conversion_progress = {} self.active_conversions = 0 self.cpu_cores = psutil.cpu_count(logical=False) def get_file_size(self, file_path: str) -> int: """Obtener tamaño del archivo en bytes.""" try: return os.path.getsize(file_path) except: return 0 def detect_file_type(self, file_path: str) -> str: """Detectar tipo de archivo basado en extensión.""" ext = os.path.splitext(file_path)[1].lower() if ext in VIDEO_FORMATS: return 'video' elif ext in AUDIO_FORMATS: return 'audio' else: return 'unsupported' def sanitize_filename(self, filename: str) -> str: """Limpiar y validar nombre de archivo.""" return ''.join(c for c in filename if c.isalnum() or c in ('.', '_', '-')).rstrip() def ensure_unique_filename(self, directory: str, filename: str) -> str: """Generar nombre único para evitar sobreescrituras.""" base, ext = os.path.splitext(filename) counter = 1 new_filename = filename while os.path.exists(os.path.join(directory, new_filename)): new_filename = f"{base}_{counter}{ext}" counter += 1 return new_filename def compress_media(self, input_file: str, quality_preset: str, progress_callback=None) -> str: """Comprimir video/audio con configuraciones optimizadas para CPU.""" try: media_type = self.detect_file_type(input_file) base_name = os.path.basename(input_file) file_id = f"{base_name}_{int(time.time())}" # Inicializar progreso original_size = self.get_file_size(input_file) self.conversion_progress[file_id] = ConversionProgress( filename=base_name, status="Iniciando compresión...", progress=0.0, original_size=original_size ) if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) # Determinar extensión de salida output_extension = 'mp4' if media_type == 'video' else 'm4a' output_filename = self.ensure_unique_filename( UPLOAD_DIR, f"{os.path.splitext(base_name)[0]}_compressed.{output_extension}" ) output_file = os.path.join(UPLOAD_DIR, output_filename) quality_config = QUALITY_PRESETS[quality_preset] # Configuraciones FFmpeg optimizadas para menor uso de CPU if media_type == 'video': ffmpeg_command = [ FFMPEG_PATH, '-i', input_file, '-c:v', 'libx264', '-crf', quality_config['crf'], '-preset', quality_config['preset'], '-c:a', 'aac', '-b:a', quality_config['audio_bitrate'], '-movflags', '+faststart', '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', '-threads', str(min(self.cpu_cores, 4)), # Limitar threads para menor uso CPU '-y', output_file ] else: # audio ffmpeg_command = [ FFMPEG_PATH, '-i', input_file, '-vn', '-c:a', 'aac', '-b:a', quality_config['audio_bitrate'], '-ar', '44100', '-threads', str(min(self.cpu_cores, 2)), '-y', output_file ] # Ejecutar conversión self.conversion_progress[file_id].status = "Comprimiendo..." self.conversion_progress[file_id].progress = 25.0 if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) process = subprocess.Popen( ffmpeg_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Simular progreso for i in range(25, 90, 10): if process.poll() is None: self.conversion_progress[file_id].progress = float(i) if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) time.sleep(0.3) else: break stdout, stderr = process.communicate() if process.returncode != 0: error_msg = f"Error FFmpeg: {stderr}" self.conversion_progress[file_id].status = "Error" self.conversion_progress[file_id].error = error_msg if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) raise Exception(error_msg) # Finalizar compressed_size = self.get_file_size(output_file) compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0 self.conversion_progress[file_id].status = "Completado" self.conversion_progress[file_id].progress = 100.0 self.conversion_progress[file_id].compressed_size = compressed_size self.conversion_progress[file_id].compression_ratio = compression_ratio self.conversion_progress[file_id].output_file = output_file if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) logger.info(f"Archivo comprimido: {output_file} (Reducción: {compression_ratio:.1f}%)") return output_file except Exception as e: error_msg = f"Error durante compresión: {str(e)}" logger.error(error_msg) if file_id in self.conversion_progress: self.conversion_progress[file_id].status = "Error" self.conversion_progress[file_id].error = error_msg if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) raise Exception(error_msg) def compress_archive(self, input_file: str, progress_callback=None) -> str: """Recomprimir archivos comprimidos para mejor ratio.""" try: base_name = os.path.basename(input_file) file_id = f"{base_name}_{int(time.time())}" original_size = self.get_file_size(input_file) self.conversion_progress[file_id] = ConversionProgress( filename=base_name, status="Recomprimiendo archivo...", progress=50.0, original_size=original_size ) if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) # Crear archivo 7z con máxima compresión output_filename = self.ensure_unique_filename( UPLOAD_DIR, f"{os.path.splitext(base_name)[0]}_compressed.7z" ) output_file = os.path.join(UPLOAD_DIR, output_filename) # Usar 7z para recompresión import py7zr with py7zr.SevenZipFile(output_file, 'w', filters=[{"id": py7zr.FILTER_LZMA2, "preset": 9}]) as archive: archive.write(input_file, base_name) compressed_size = self.get_file_size(output_file) compression_ratio = ((original_size - compressed_size) / original_size) * 100 if original_size > 0 else 0 self.conversion_progress[file_id].status = "Completado" self.conversion_progress[file_id].progress = 100.0 self.conversion_progress[file_id].compressed_size = compressed_size self.conversion_progress[file_id].compression_ratio = compression_ratio self.conversion_progress[file_id].output_file = output_file if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) return output_file except Exception as e: error_msg = f"Error recomprimiendo archivo: {str(e)}" logger.error(error_msg) if file_id in self.conversion_progress: self.conversion_progress[file_id].status = "Error" self.conversion_progress[file_id].error = error_msg if progress_callback: progress_callback(file_id, self.conversion_progress[file_id]) raise Exception(error_msg) def compress_single_file(self, input_file: str, quality_preset: str, progress_callback=None) -> str: """Comprimir un archivo según su tipo.""" if not os.path.exists(input_file): raise ValueError(f"El archivo no existe: {input_file}") file_type = self.detect_file_type(input_file) if file_type in ['video', 'audio']: return self.compress_media(input_file, quality_preset, progress_callback) else: raise ValueError(f"Tipo de archivo no soportado: {os.path.splitext(input_file)[1]}. Solo se soportan videos y audios: {', '.join(ALL_SUPPORTED_FORMATS)}") def compress_multiple_files(self, file_paths: List[str], quality_preset: str, max_workers: int = None, progress_callback=None) -> List[str]: """Comprimir múltiples archivos en paralelo con optimización de CPU.""" if max_workers is None: max_workers = min(self.cpu_cores, 3) # Limitar para no saturar CPU compressed_files = [] self.active_conversions = len(file_paths) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_file = { executor.submit(self.compress_single_file, file_path, quality_preset, progress_callback): file_path for file_path in file_paths } for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() compressed_files.append(result) except Exception as e: logger.error(f"Error comprimiendo {file_path}: {e}") self.active_conversions = 0 return compressed_files # Instancia global del compresor compressor = UniversalCompressor() def update_progress_display(): """Actualizar visualización del progreso con información detallada.""" if not compressor.conversion_progress: return "No hay compresiones en progreso." progress_text = "📊 **Estado de Compresiones:**\n\n" total_original = 0 total_compressed = 0 for file_id, progress in compressor.conversion_progress.items(): status_emoji = { "Iniciando compresión...": "🔄", "Procesando imagen...": "🖼️", "Comprimiendo...": "⚙️", "Recomprimiendo archivo...": "📦", "Completado": "✅", "Error": "❌" }.get(progress.status, "🔄") progress_text += f"{status_emoji} **{progress.filename}**\n" progress_text += f" Estado: {progress.status}\n" progress_text += f" Progreso: {progress.progress:.1f}%\n" if progress.original_size > 0: original_mb = progress.original_size / (1024 * 1024) progress_text += f" Tamaño original: {original_mb:.2f} MB\n" total_original += progress.original_size if progress.compressed_size > 0: compressed_mb = progress.compressed_size / (1024 * 1024) progress_text += f" Tamaño comprimido: {compressed_mb:.2f} MB\n" progress_text += f" Reducción: {progress.compression_ratio:.1f}%\n" total_compressed += progress.compressed_size if progress.error: progress_text += f" Error: {progress.error}\n" progress_text += "\n" # Estadísticas totales if total_original > 0: total_reduction = ((total_original - total_compressed) / total_original) * 100 progress_text += f"**📈 Estadísticas Totales:**\n" progress_text += f"Reducción total: {total_reduction:.1f}% ({(total_original - total_compressed) / (1024 * 1024):.2f} MB ahorrados)\n" return progress_text def gradio_single_compression(file, quality_preset): """Interfaz Gradio para compresión de archivo único.""" if not file: raise gr.Error("No se ha subido ningún archivo. Por favor, sube un archivo.") try: compressor.conversion_progress.clear() compressed_file = compressor.compress_single_file(file, quality_preset) return compressed_file, update_progress_display() except Exception as e: raise gr.Error(f"Compresión fallida: {str(e)}") def gradio_multiple_compression(files, quality_preset, max_workers): """Interfaz Gradio para compresión de múltiples archivos.""" if not files: raise gr.Error("No se han subido archivos. Por favor, sube al menos un archivo.") try: compressor.conversion_progress.clear() file_paths = [file.name for file in files] compressed_files = compressor.compress_multiple_files( file_paths, quality_preset, max_workers=max_workers ) if len(compressed_files) > 1: zip_filename = os.path.join(UPLOAD_DIR, f"compressed_files_{int(time.time())}.zip") with zipfile.ZipFile(zip_filename, 'w') as zipf: for file_path in compressed_files: zipf.write(file_path, os.path.basename(file_path)) return zip_filename, update_progress_display() elif len(compressed_files) == 1: return compressed_files[0], update_progress_display() else: raise gr.Error("No se pudo comprimir ningún archivo.") except Exception as e: raise gr.Error(f"Compresión fallida: {str(e)}") def refresh_progress(): """Refrescar estado del progreso.""" return update_progress_display() # Interfaz Gradio mejorada with gr.Blocks(title="🗜️ Compresor Universal Pro Max", theme=gr.themes.Soft()) as iface: gr.Markdown( """ # 🗜️ Compresor Universal Pro Max **El compresor más potente y rápido para todos tus archivos** ✨ **Características Avanzadas:** - 🚀 Compresión inteligente y optimizada para CPU - 🎬 Videos, audios, imágenes, GIFs y documentos - 🖼️ Soporte completo para PNG, JPG, WebP, TIFF, GIF animado - ⚡ Procesamiento paralelo ultra-rápido - 📊 Monitoreo detallado con estadísticas de compresión - 🔧 Algoritmos optimizados para máxima reducción de tamaño - 💾 Ahorro significativo de espacio en disco """ ) with gr.Tabs(): with gr.TabItem("📄 Archivo Único"): with gr.Row(): with gr.Column(scale=2): single_file_input = gr.File( label="📁 Subir Archivo (Todos los formatos soportados)", type="filepath", file_types=ALL_SUPPORTED_FORMATS ) single_quality_input = gr.Dropdown( choices=list(QUALITY_PRESETS.keys()), value="Compresión Alta", label="🎯 Nivel de Compresión" ) single_compress_btn = gr.Button( "🗜️ Comprimir Archivo", variant="primary", size="lg" ) with gr.Column(scale=1): single_output = gr.File( label="📥 Descargar Archivo Comprimido", type="filepath" ) with gr.TabItem("📁 Múltiples Archivos"): with gr.Row(): with gr.Column(scale=2): multiple_files_input = gr.File( label="📁 Subir Múltiples Archivos", file_count="multiple", type="filepath", file_types=ALL_SUPPORTED_FORMATS ) with gr.Row(): multiple_quality_input = gr.Dropdown( choices=list(QUALITY_PRESETS.keys()), value="Compresión Alta", label="🎯 Nivel de Compresión" ) workers_input = gr.Slider( minimum=1, maximum=min(psutil.cpu_count(), 6), value=min(psutil.cpu_count(logical=False), 3), step=1, label="⚡ Archivos Simultáneos" ) multiple_compress_btn = gr.Button( "🗜️ Comprimir Todos los Archivos", variant="primary", size="lg" ) with gr.Column(scale=1): multiple_output = gr.File( label="📥 Descargar Archivos Comprimidos", type="filepath" ) # Panel de progreso mejorado gr.Markdown("## 📊 Estado y Estadísticas de Compresión") progress_display = gr.Markdown( value="No hay compresiones en progreso.", label="Estado" ) refresh_btn = gr.Button("🔄 Actualizar Estado", size="sm") # Información detallada with gr.Accordion("ℹ️ Información Detallada", open=False): gr.Markdown( f""" ### 🎯 Niveles de Compresión: - **Compresión Máxima**: Máxima reducción de tamaño, proceso más lento - **Compresión Alta**: Excelente balance compresión/velocidad - **Compresión Media**: Buena compresión, procesamiento rápido - **Compresión Rápida**: Compresión básica, velocidad máxima ### 📁 Formatos Soportados: **🎬 Video**: {', '.join(VIDEO_FORMATS)} **🎵 Audio**: {', '.join(AUDIO_FORMATS)} ### ⚡ Optimizaciones: - **CPU Cores Detectados**: {psutil.cpu_count()} (Lógicos), {psutil.cpu_count(logical=False)} (Físicos) - **Threads Optimizados**: Máximo {min(psutil.cpu_count(logical=False), 4)} para video, {min(psutil.cpu_count(logical=False), 2)} para audio - **Compresión Inteligente**: Algoritmos adaptativos según tipo de archivo - **Gestión de Memoria**: Optimizada para archivos grandes """ ) # Eventos single_compress_btn.click( fn=gradio_single_compression, inputs=[single_file_input, single_quality_input], outputs=[single_output, progress_display] ) multiple_compress_btn.click( fn=gradio_multiple_compression, inputs=[multiple_files_input, multiple_quality_input, workers_input], outputs=[multiple_output, progress_display] ) refresh_btn.click( fn=refresh_progress, outputs=[progress_display] ) # Lanzar interfaz if __name__ == "__main__": iface.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))