import streamlit as st import os import sys from loguru import logger from app.config import config from webui.components import basic_settings, video_settings, audio_settings, subtitle_settings, script_settings, \ review_settings, merge_settings, system_settings # from webui.utils import cache, file_utils from app.utils import utils from app.utils import ffmpeg_utils from app.models.schema import VideoClipParams, VideoAspect # 初始化配置 - 必须是第一个 Streamlit 命令 st.set_page_config( page_title="NarratoAI", page_icon="📽️", layout="wide", initial_sidebar_state="auto", menu_items={ "Report a bug": "https://github.com/linyqh/NarratoAI/issues", 'About': f"# Narrato:blue[AI] :sunglasses: 📽️ \n #### Version: v{config.project_version} \n " f"自动化影视解说视频详情请移步:https://github.com/linyqh/NarratoAI" }, ) # 设置页面样式 hide_streamlit_style = """ """ st.markdown(hide_streamlit_style, unsafe_allow_html=True) def init_log(): """初始化日志配置""" from loguru import logger logger.remove() _lvl = "DEBUG" def format_record(record): # 简化日志格式化处理,不尝试按特定字符串过滤torch相关内容 file_path = record["file"].path relative_path = os.path.relpath(file_path, config.root_dir) record["file"].path = f"./{relative_path}" record['message'] = record['message'].replace(config.root_dir, ".") _format = '{time:%Y-%m-%d %H:%M:%S} | ' + \ '{level} | ' + \ '"{file.path}:{line}": {function} ' + \ '- {message}' + "\n" return _format # 替换为更简单的过滤方式,避免在过滤时访问message内容 # 此处先不设置复杂的过滤器,等应用启动后再动态添加 logger.add( sys.stdout, level=_lvl, format=format_record, colorize=True ) # 应用启动后,可以再添加更复杂的过滤器 def setup_advanced_filters(): """在应用完全启动后设置高级过滤器""" try: for handler_id in logger._core.handlers: logger.remove(handler_id) # 重新添加带有高级过滤的处理器 def advanced_filter(record): """更复杂的过滤器,在应用启动后安全使用""" ignore_messages = [ "Examining the path of torch.classes raised", "torch.cuda.is_available()", "CUDA initialization" ] return not any(msg in record["message"] for msg in ignore_messages) logger.add( sys.stdout, level=_lvl, format=format_record, colorize=True, filter=advanced_filter ) except Exception as e: # 如果过滤器设置失败,确保日志仍然可用 logger.add( sys.stdout, level=_lvl, format=format_record, colorize=True ) logger.error(f"设置高级日志过滤器失败: {e}") # 将高级过滤器设置放到启动主逻辑后 import threading threading.Timer(5.0, setup_advanced_filters).start() def init_global_state(): """初始化全局状态""" if 'video_clip_json' not in st.session_state: st.session_state['video_clip_json'] = [] if 'video_plot' not in st.session_state: st.session_state['video_plot'] = '' if 'ui_language' not in st.session_state: st.session_state['ui_language'] = config.ui.get("language", utils.get_system_locale()) if 'subclip_videos' not in st.session_state: st.session_state['subclip_videos'] = {} def tr(key): """翻译函数""" i18n_dir = os.path.join(os.path.dirname(__file__), "webui", "i18n") locales = utils.load_locales(i18n_dir) loc = locales.get(st.session_state['ui_language'], {}) return loc.get("Translation", {}).get(key, key) def render_generate_button(): """渲染生成按钮和处理逻辑""" if st.button(tr("Generate Video"), use_container_width=True, type="primary"): from app.services import task as tm # 重置日志容器和记录 log_container = st.empty() log_records = [] def log_received(msg): with log_container: log_records.append(msg) st.code("\n".join(log_records)) from loguru import logger logger.add(log_received) config.save_config() task_id = st.session_state.get('task_id') if not task_id: st.error(tr("请先裁剪视频")) return if not st.session_state.get('video_clip_json_path'): st.error(tr("脚本文件不能为空")) return if not st.session_state.get('video_origin_path'): st.error(tr("视频文件不能为空")) return st.toast(tr("生成视频")) logger.info(tr("开始生成视频")) # 获取所有参数 script_params = script_settings.get_script_params() video_params = video_settings.get_video_params() audio_params = audio_settings.get_audio_params() subtitle_params = subtitle_settings.get_subtitle_params() # 合并所有参数 all_params = { **script_params, **video_params, **audio_params, **subtitle_params } # 创建参数对象 params = VideoClipParams(**all_params) result = tm.start_subclip( task_id=task_id, params=params, subclip_path_videos=st.session_state['subclip_videos'] ) video_files = result.get("videos", []) st.success(tr("视生成完成")) try: if video_files: player_cols = st.columns(len(video_files) * 2 + 1) for i, url in enumerate(video_files): player_cols[i * 2 + 1].video(url) except Exception as e: logger.error(f"播放视频失败: {e}") # file_utils.open_task_folder(config.root_dir, task_id) logger.info(tr("视频生成完成")) # 全局变量,记录是否已经打印过硬件加速信息 _HAS_LOGGED_HWACCEL_INFO = False def main(): """主函数""" global _HAS_LOGGED_HWACCEL_INFO init_log() init_global_state() # 检测FFmpeg硬件加速,但只打印一次日志 hwaccel_info = ffmpeg_utils.detect_hardware_acceleration() if not _HAS_LOGGED_HWACCEL_INFO: if hwaccel_info["available"]: logger.info(f"FFmpeg硬件加速检测结果: 可用 | 类型: {hwaccel_info['type']} | 编码器: {hwaccel_info['encoder']} | 独立显卡: {hwaccel_info['is_dedicated_gpu']} | 参数: {hwaccel_info['hwaccel_args']}") else: logger.warning(f"FFmpeg硬件加速不可用: {hwaccel_info['message']}, 将使用CPU软件编码") _HAS_LOGGED_HWACCEL_INFO = True # 仅初始化基本资源,避免过早地加载依赖PyTorch的资源 # 检查是否能分解utils.init_resources()为基本资源和高级资源(如依赖PyTorch的资源) try: utils.init_resources() except Exception as e: logger.warning(f"资源初始化时出现警告: {e}") st.title(f"Narrato:blue[AI]:sunglasses: 📽️") st.write(tr("Get Help")) # 首先渲染不依赖PyTorch的UI部分 # 渲染基础设置面板 basic_settings.render_basic_settings(tr) # 渲染合并设置 merge_settings.render_merge_settings(tr) # 渲染主面板 panel = st.columns(3) with panel[0]: script_settings.render_script_panel(tr) with panel[1]: video_settings.render_video_panel(tr) audio_settings.render_audio_panel(tr) with panel[2]: subtitle_settings.render_subtitle_panel(tr) # 渲染视频审查面板 review_settings.render_review_panel(tr) # 放到最后渲染可能使用PyTorch的部分 # 渲染系统设置面板 with panel[2]: system_settings.render_system_panel(tr) # 放到最后渲染生成按钮和处理逻辑 render_generate_button() if __name__ == "__main__": main()