import requests import json import os import time from typing import Dict, Any class VideoPipeline: def __init__(self, base_url: str = "http://127.0.0.1:8080"): self.base_url = base_url def download_video(self, url: str, resolution: str = "1080p", output_format: str = "mp4", rename: str = None) -> Dict[str, Any]: """下载视频的第一步""" endpoint = f"{self.base_url}/api/v2/youtube/download" payload = { "url": url, "resolution": resolution, "output_format": output_format, "rename": rename or time.strftime("%Y-%m-%d") } response = requests.post(endpoint, json=payload) response.raise_for_status() return response.json() def generate_script(self, video_path: str, skip_seconds: int = 0, threshold: int = 30, vision_batch_size: int = 10, vision_llm_provider: str = "gemini") -> Dict[str, Any]: """生成脚本的第二步""" endpoint = f"{self.base_url}/api/v2/scripts/generate" payload = { "video_path": video_path, "skip_seconds": skip_seconds, "threshold": threshold, "vision_batch_size": vision_batch_size, "vision_llm_provider": vision_llm_provider } response = requests.post(endpoint, json=payload) response.raise_for_status() return response.json() def crop_video(self, video_path: str, script: list) -> Dict[str, Any]: """剪辑视频的第三步""" endpoint = f"{self.base_url}/api/v2/scripts/crop" payload = { "video_origin_path": video_path, "video_script": script } response = requests.post(endpoint, json=payload) response.raise_for_status() return response.json() def generate_final_video(self, task_id: str, video_path: str, script_path: str, script: list, subclip_videos: Dict[str, str], voice_name: str) -> Dict[str, Any]: """生成最终视频的第四步""" endpoint = f"{self.base_url}/api/v2/scripts/start-subclip" request_data = { "video_clip_json": script, "video_clip_json_path": script_path, "video_origin_path": video_path, "video_aspect": "16:9", "video_language": "zh-CN", "voice_name": voice_name, "voice_volume": 1, "voice_rate": 1.2, "voice_pitch": 1, "bgm_name": "random", "bgm_type": "random", "bgm_file": "", "bgm_volume": 0.3, "subtitle_enabled": True, "subtitle_position": "bottom", "font_name": "STHeitiMedium.ttc", "text_fore_color": "#FFFFFF", "text_background_color": "transparent", "font_size": 75, "stroke_color": "#000000", "stroke_width": 1.5, "custom_position": 70, "n_threads": 8 } payload = { "request": request_data, "subclip_videos": subclip_videos } params = {"task_id": task_id} response = requests.post(endpoint, params=params, json=payload) response.raise_for_status() return response.json() def save_script_to_json(self, script: list, script_path: str) -> str: """保存脚本到json文件""" try: with open(script_path, 'w', encoding='utf-8') as f: json.dump(script, f, ensure_ascii=False, indent=2) print(f"脚本已保存到: {script_path}") return script_path except Exception as e: print(f"保存脚本失败: {str(e)}") raise def run_pipeline(self, task_id: str, script_name: str, youtube_url: str, video_name: str="null", skip_seconds: int = 0, threshold: int = 30, vision_batch_size: int = 10, vision_llm_provider: str = "gemini", voice_name: str = "zh-CN-YunjianNeural") -> Dict[str, Any]: """运行完整的pipeline""" try: current_path = os.path.dirname(os.path.abspath(__file__)) video_path = os.path.join(current_path, "resource", "videos", f"{video_name}.mp4") # 判断视频是否存在 if not os.path.exists(video_path): # 1. 下载视频 print(f"视频不存在, 开始下载视频: {video_path}") download_result = self.download_video(url=youtube_url, resolution="1080p", output_format="mp4", rename=video_name) video_path = download_result["output_path"] else: print(f"视频已存在: {video_path}") # 2. 判断script_name是否存在 # 2.1.1 拼接脚本路径 NarratoAI/resource/scripts script_path = os.path.join(current_path, "resource", "scripts", script_name) if os.path.exists(script_path): script = json.load(open(script_path, "r", encoding="utf-8")) else: # 2.1.2 生成脚本 print("开始生成脚本...") script_result = self.generate_script(video_path=video_path, skip_seconds=skip_seconds, threshold=threshold, vision_batch_size=vision_batch_size, vision_llm_provider=vision_llm_provider) script = script_result["script"] # 2.2 保存脚本到json文件 print("保存脚本到json文件...") self.save_script_to_json(script=script, script_path=script_path) # 3. 剪辑视频 print("开始剪辑视频...") crop_result = self.crop_video(video_path=video_path, script=script) subclip_videos = crop_result["subclip_videos"] # 4. 生成最终视频 print("开始生成最终视频...") self.generate_final_video( task_id=task_id, video_path=video_path, script_path=script_path, script=script, subclip_videos=subclip_videos, voice_name=voice_name ) return { "status": "等待异步生成视频", "path": os.path.join(current_path, "storage", "tasks", task_id) } except Exception as e: return { "status": "error", "error": str(e) } # 使用示例 if __name__ == "__main__": pipeline = VideoPipeline() result = pipeline.run_pipeline( task_id="test_111901", script_name="test.json", youtube_url="https://www.youtube.com/watch?v=vLJ7Yed6FQ4", video_name="2024-11-19-01", skip_seconds=50, threshold=35, vision_batch_size=10, vision_llm_provider="gemini", voice_name="zh-CN-YunjianNeural", ) print(result)