weiyi01191's picture
Update app.py
6ffc9b1 verified
#!/usr/bin/env python3
"""
🎥 Video Content Safety Analysis - MiniGPT4-Video + 巨量引擎规则集成版
基于MiniGPT4-Video的真实视频内容分析 + 巨量引擎299条禁投规则检测
"""
# ZeroGPU装饰器 - 必须在torch等包之前导入!
try:
import spaces
GPU_AVAILABLE = True
print("✅ ZeroGPU spaces 可用")
except ImportError:
print("⚠️ ZeroGPU spaces 不可用,使用CPU模式")
GPU_AVAILABLE = False
# 创建一个空的装饰器
class spaces:
@staticmethod
def GPU(duration=60):
def decorator(func):
return func
return decorator
import os
import gradio as gr
import torch
import gc
import whisper
import argparse
import yaml
import random
import numpy as np
import torch.backends.cudnn as cudnn
from minigpt4.common.eval_utils import init_model
from minigpt4.conversation.conversation import CONV_VISION
import tempfile
import shutil
import cv2
import webvtt
import moviepy.editor as mp
from torchvision import transforms
from datetime import timedelta
from moviepy.editor import VideoFileClip
# 导入巨量引擎禁投规则引擎
from prohibited_rules import ProhibitedRulesEngine
# 设置中国镜像
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
# 全局变量
model = None
vis_processor = None
whisper_model = None
args = None
seed = 42
# 初始化巨量引擎规则引擎
rules_engine = ProhibitedRulesEngine()
print("✅ 巨量引擎299条禁投规则引擎初始化完成")
# ======================== MiniGPT4-Video 核心函数 ========================
def format_timestamp(seconds):
"""格式化时间戳为VTT格式"""
td = timedelta(seconds=seconds)
total_seconds = int(td.total_seconds())
milliseconds = int(td.microseconds / 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"
def extract_video_info(video_path, max_images_length):
"""提取视频信息"""
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
sampling_interval = int(total_num_frames / max_images_length)
if sampling_interval == 0:
sampling_interval = 1
return sampling_interval, clip.fps
def time_to_milliseconds(time_str):
"""将时间格式转换为毫秒"""
h, m, s = map(float, time_str.split(':'))
return int((h * 3600 + m * 60 + s) * 1000)
def extract_subtitles(subtitle_path):
"""提取字幕"""
if not subtitle_path or not os.path.exists(subtitle_path):
return []
subtitles = []
try:
for caption in webvtt.read(subtitle_path):
start_ms = time_to_milliseconds(caption.start)
end_ms = time_to_milliseconds(caption.end)
text = caption.text.strip().replace('\n', ' ')
subtitles.append((start_ms, end_ms, text))
except:
return []
return subtitles
def find_subtitle(subtitles, frame_count, fps):
"""查找对应帧的字幕"""
if not subtitles:
return None
frame_time = (frame_count / fps) * 1000
left, right = 0, len(subtitles) - 1
while left <= right:
mid = (left + right) // 2
start, end, subtitle_text = subtitles[mid]
if start <= frame_time <= end:
return subtitle_text
elif frame_time < start:
right = mid - 1
else:
left = mid + 1
return None
def match_frames_and_subtitles(video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames):
"""匹配视频帧和字幕"""
global vis_processor
cap = cv2.VideoCapture(video_path)
images = []
frame_count = 0
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_words = 0
transform = transforms.Compose([
transforms.ToPILImage(),
])
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if len(subtitles) > 0:
frame_subtitle = find_subtitle(subtitles, frame_count, fps)
if frame_subtitle and not history_subtitles.get(frame_subtitle, False):
subtitle_text_in_interval += frame_subtitle + " "
history_subtitles[frame_subtitle] = True
if frame_count % sampling_interval == 0:
frame = transform(frame[:,:,::-1]) # 转换为RGB
frame = vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if subtitle_text_in_interval != "" and number_of_words < max_sub_len:
img_placeholder += f'<Cap>{subtitle_text_in_interval}'
number_of_words += len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= max_frames:
break
cap.release()
cv2.destroyAllWindows()
if len(images) == 0:
return None, None
images = torch.stack(images)
return images, img_placeholder
def extract_audio(video_path, audio_path):
"""提取音频"""
video_clip = mp.VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(audio_path, codec="libmp3lame", bitrate="320k", verbose=False, logger=None)
video_clip.close()
def get_subtitles(video_path):
"""生成字幕"""
global whisper_model
if whisper_model is None:
return None
audio_dir = "workspace/inference_subtitles/mp3"
subtitle_dir = "workspace/inference_subtitles"
os.makedirs(subtitle_dir, exist_ok=True)
os.makedirs(audio_dir, exist_ok=True)
video_id = video_path.split('/')[-1].split('.')[0]
audio_path = f"{audio_dir}/{video_id}.mp3"
subtitle_path = f"{subtitle_dir}/{video_id}.vtt"
# 如果字幕已存在,直接返回
if os.path.exists(subtitle_path):
return subtitle_path
try:
extract_audio(video_path, audio_path)
# 🔧 优化中文语音识别
result = whisper_model.transcribe(
audio_path,
language="zh", # 明确指定中文
task="transcribe", # 明确指定转录任务
temperature=0.0, # 降低随机性
best_of=5, # 使用最佳结果
beam_size=5, # 增加beam搜索
patience=2.0, # 增加耐心参数
initial_prompt="以下是一段中文视频的语音内容:" # 中文提示
)
# 创建VTT文件
with open(subtitle_path, "w", encoding="utf-8") as vtt_file:
vtt_file.write("WEBVTT\n\n")
for segment in result['segments']:
start = format_timestamp(segment['start'])
end = format_timestamp(segment['end'])
text = segment['text']
vtt_file.write(f"{start} --> {end}\n{text}\n\n")
return subtitle_path
except Exception as e:
print(f"字幕生成错误: {e}")
return None
def prepare_input(video_path, subtitle_path, instruction):
"""准备输入"""
global args
# 根据模型设置参数
if args and "mistral" in args.ckpt:
max_frames = 90
max_sub_len = 800
else:
max_frames = 45
max_sub_len = 400
sampling_interval, fps = extract_video_info(video_path, max_frames)
subtitles = extract_subtitles(subtitle_path)
frames_features, input_placeholder = match_frames_and_subtitles(
video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames
)
if input_placeholder:
input_placeholder += "\n" + instruction
else:
input_placeholder = instruction
return frames_features, input_placeholder
def model_generate(*model_args, **kwargs):
"""模型生成函数"""
global model
with model.maybe_autocast():
output = model.llama_model.generate(*model_args, **kwargs)
return output
def generate_prediction(video_path, instruction, gen_subtitles=True, stream=False):
"""生成预测结果"""
global model, args, seed
if gen_subtitles:
subtitle_path = get_subtitles(video_path)
else:
subtitle_path = None
prepared_images, prepared_instruction = prepare_input(video_path, subtitle_path, instruction)
if prepared_images is None:
return "视频无法打开,请检查视频路径"
length = len(prepared_images)
prepared_images = prepared_images.unsqueeze(0)
conv = CONV_VISION.copy()
conv.system = ""
conv.append_message(conv.roles[0], prepared_instruction)
conv.append_message(conv.roles[1], None)
prompt = [conv.get_prompt()]
# 设置随机种子
setup_seeds(seed)
# 🔧 GPU内存优化和cuBLAS错误处理
if torch.cuda.is_available():
torch.cuda.empty_cache() # 清理缓存
torch.cuda.synchronize() # 同步GPU操作
# 🚀 H200特定优化
gpu_name = torch.cuda.get_device_name(0)
if "H200" in gpu_name:
# H200额外内存清理
gc.collect()
torch.cuda.reset_peak_memory_stats()
try:
# 🔧 使用更保守的生成参数避免cuBLAS错误
answers = model.generate(
prepared_images,
prompt,
max_new_tokens=512, # 增加token数以获得更详细的分析
do_sample=True,
lengths=[length],
num_beams=1, # 保持beam=1减少计算
temperature=0.7, # 稍微降低温度获得更稳定输出
top_p=0.9, # 添加top_p参数
repetition_penalty=1.1 # 避免重复
)
return answers[0]
except RuntimeError as e:
if "cublasLt" in str(e) or "cuBLAS" in str(e):
# 🚨 cuBLAS错误特殊处理
print(f"⚠️ 检测到cuBLAS错误,尝试降级处理: {e}")
# 强制清理GPU内存
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
# 🚀 H200特定恢复策略
gpu_name = torch.cuda.get_device_name(0)
if "H200" in gpu_name:
print("🔧 应用H200特定恢复策略...")
torch.cuda.reset_peak_memory_stats()
# 临时禁用TF32以避免H200精度问题
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
try:
# 🔧 使用更小的参数重试
answers = model.generate(
prepared_images,
prompt,
max_new_tokens=256, # 减少token数
do_sample=False, # 关闭采样减少计算
lengths=[min(length, 24)], # 增加一点长度,但不要太多
num_beams=1,
temperature=1.0,
use_cache=False # H200上禁用缓存
)
# 🚀 H200恢复TF32设置
if torch.cuda.is_available() and "H200" in torch.cuda.get_device_name(0):
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
return answers[0]
except Exception as e2:
return f"GPU运算错误,请重试。H200特定优化已应用。错误信息: {str(e2)}"
else:
return f"生成预测时出错: {str(e)}"
except Exception as e:
return f"生成预测时出错: {str(e)}"
# ======================== 巨量引擎规则检测函数 ========================
def format_violations_report(violations_result):
"""格式化违规检测报告"""
if not violations_result["has_violations"]:
return """
🛡️ **巨量引擎规则检测结果**: ✅ 无违规内容
- 已检测规则: 299条巨量引擎禁投规则
- 检测维度: 低危(P1) + 中危(P2) + 高危(P3)
- 检测结果: 内容符合平台规范
"""
report = f"""
🚨 **巨量引擎规则检测结果**: ⚠️ 发现 {violations_result["total_violations"]} 项违规
📊 **违规统计**:
- 🔴 高危违规(P3): {violations_result["high_risk"]["count"]}
- 🟡 中危违规(P2): {violations_result["medium_risk"]["count"]}
- 🟠 低危违规(P1): {violations_result["low_risk"]["count"]}
📋 **详细违规列表**:
"""
# 按风险等级排序显示违规
for violation in sorted(violations_result["all_violations"],
key=lambda x: {"P3": 3, "P2": 2, "P1": 1}[x["risk_level"]],
reverse=True):
risk_icon = {"P3": "🚨", "P2": "⚠️", "P1": "💭"}[violation["risk_level"]]
report += f"""
{risk_icon} **{violation["risk_level"]} - {violation["category"]}**
规则: {violation["description"]}
匹配词: "{violation["matched_keyword"]}"
规则ID: {violation["rule_id"]}
"""
return report
def get_overall_risk_level(violations_result):
"""获取综合风险等级"""
if not violations_result["has_violations"]:
return "✅ P3 (安全) - 内容健康,符合平台规范"
if violations_result["high_risk"]["count"] > 0:
return f"🚨 P0 (极高危) - 发现 {violations_result['high_risk']['count']} 项高危违规,禁止投放"
elif violations_result["medium_risk"]["count"] > 2:
return f"⚠️ P1 (高危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需严格审核"
elif violations_result["medium_risk"]["count"] > 0:
return f"⚠️ P1 (中危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需要审核"
else:
return f"⚡ P2 (低危) - 发现 {violations_result['low_risk']['count']} 项低危违规,建议关注"
# ======================== 应用主要函数 ========================
def setup_seeds(seed):
"""设置随机种子"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
cudnn.benchmark = False
cudnn.deterministic = True
def optimize_gpu_memory():
"""GPU内存优化"""
print("🔍 开始GPU内存优化...")
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
print(f"🔍 GPU: {gpu_name}")
# 🔧 H200特定优化
if "H200" in gpu_name:
print("🚀 检测到H200显卡,应用特定优化...")
# H200优化设置
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128,garbage_collection_threshold:0.8,expandable_segments:True'
os.environ['CUDA_LAUNCH_BLOCKING'] = '0' # H200上设置为0
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8' # H200 cuBLAS优化
os.environ['NCCL_AVOID_RECORD_STREAMS'] = '1' # 避免H200内存问题
# 设置混合精度优化
torch.backends.cudnn.allow_tf32 = True # 启用TF32提升H200性能
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True # H200上启用benchmark
else:
# 标准设置(A100等)
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:256,garbage_collection_threshold:0.6'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
print(f"💾 总显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
# 强制清理所有GPU缓存
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
print(f"💾 清理后可用显存: {(torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)) / 1024**3:.1f} GB")
def get_arguments():
"""获取参数配置"""
parser = argparse.ArgumentParser(description="MiniGPT4-Video参数")
parser.add_argument("--cfg-path", help="配置文件路径",
default="test_configs/mistral_test_config.yaml") # 使用mistral配置
parser.add_argument("--ckpt", type=str,
default='checkpoints/video_mistral_checkpoint_last.pth', # 使用mistral checkpoint
help="模型检查点路径")
parser.add_argument("--max_new_tokens", type=int, default=512,
help="最大生成token数")
parser.add_argument("--lora_r", type=int, default=64, help="LoRA rank") # 修改为64匹配checkpoint
parser.add_argument("--lora_alpha", type=int, default=16, help="LoRA alpha") # 修改为16匹配checkpoint
parser.add_argument("--options", nargs="+", help="覆盖配置选项")
return parser.parse_args()
def load_minigpt4_model():
"""加载MiniGPT4-Video模型"""
global model, vis_processor, whisper_model, args, seed
if model is not None:
return model, vis_processor, whisper_model
try:
print("🔄 正在加载MiniGPT4-Video模型...")
# 获取参数
args = get_arguments()
# 加载配置
config_path = args.cfg_path
if not os.path.exists(config_path):
config_path = "test_configs/llama2_test_config.yaml" # 回退到默认配置
with open(config_path) as file:
config = yaml.load(file, Loader=yaml.FullLoader)
seed = config['run']['seed']
setup_seeds(seed)
# GPU内存优化
optimize_gpu_memory()
print("🚀 开始初始化MiniGPT4-Video模型...")
model, vis_processor, whisper_gpu_id, minigpt4_gpu_id, answer_module_gpu_id = init_model(args)
# 清理缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(f"💾 模型加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB")
print("🚀 开始初始化Whisper模型...")
# 🔧 使用更强的Whisper模型以提升中文识别
whisper_model = whisper.load_model("medium").to(f"cuda:{whisper_gpu_id}" if torch.cuda.is_available() else "cpu")
print("✅ Whisper模型加载完成 (medium版本,优化中文识别)")
if torch.cuda.is_available():
print(f"💾 全部加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB")
print("✅ 所有模型加载完成!")
return model, vis_processor, whisper_model
except Exception as e:
print(f"❌ 模型加载失败: {e}")
print("🔄 回退到模拟模式...")
return None, None, None
@spaces.GPU(duration=600) # 增加到10分钟以支持模型下载
def analyze_video_with_minigpt4(video_file, instruction):
"""使用MiniGPT4-Video分析视频内容并进行巨量引擎规则检测"""
if video_file is None:
return "❌ 请上传视频文件", "无法评估"
try:
# 加载模型
model_loaded, vis_proc, whisper_loaded = load_minigpt4_model()
if model_loaded is None:
# 模拟模式
return f"""
🎬 **视频内容分析结果 (模拟模式)**
📋 **基本信息**:
- 视频文件: {video_file}
- 分析指令: {instruction}
⚠️ **注意**: 当前运行在模拟模式,真实模型加载失败
请检查模型文件和配置是否正确
🛡️ **巨量引擎规则检测**: 仅在真实模式下可用
""", "⚠️ 模拟模式"
print(f"🔄 开始分析视频: {video_file}")
print(f"📝 分析指令: {instruction}")
# 复制视频到临时路径(如果需要)
temp_video_path = video_file
if not os.path.exists(video_file):
# 如果是Gradio的临时文件,复制到工作目录
temp_dir = "workspace/tmp"
os.makedirs(temp_dir, exist_ok=True)
temp_video_path = os.path.join(temp_dir, "analysis_video.mp4")
shutil.copy2(video_file, temp_video_path)
# 使用MiniGPT4-Video进行真实分析
if not instruction or instruction.strip() == "":
instruction = "请详细分析这个视频的内容,包括场景、人物、动作、对话等。请用中文输出,并详细记录视频中谁说了什么话。"
# 🧠 使用智能规则感知指令
intelligent_instruction = create_intelligent_instruction(instruction)
print(f"🧠 使用智能规则感知指令进行分析...")
# 调用MiniGPT4-Video的生成函数
prediction = generate_prediction(
video_path=temp_video_path,
instruction=intelligent_instruction, # 使用智能指令
gen_subtitles=True, # 生成字幕
stream=False
)
# 🚨 巨量引擎规则检测 🚨
print("🔍 开始巨量引擎299条规则检测...")
violations_result = rules_engine.check_all_content(prediction, instruction)
# 格式化完整分析报告
enhanced_result = f"""
🎬 **MiniGPT4-Video 视频内容分析 + 巨量引擎规则检测报告**
📋 **基本信息**:
- 视频文件: {os.path.basename(video_file)}
- 分析设备: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU模式'}
- 分析指令: {instruction}
🔍 **视频内容描述**:
{prediction}
{format_violations_report(violations_result)}
📊 **技术信息**:
- 内容理解: MiniGPT4-Video + Whisper
- 规则引擎: 巨量引擎299条禁投规则
- 检测等级: P1(低危) + P2(中危) + P3(高危)
- 分析模式: 多模态理解 (视觉+语音+文本)
💡 **说明**:
基于MiniGPT4-Video的深度内容理解,结合巨量引擎完整禁投规则库进行专业违规检测。
"""
# 获取综合风险等级
safety_score = get_overall_risk_level(violations_result)
return enhanced_result, safety_score
except Exception as e:
error_msg = f"""
❌ **分析过程中出错**
错误信息: {str(e)}
🔄 **可能的解决方案**:
1. 检查视频文件格式 (建议MP4)
2. 确认模型文件是否正确加载
3. 检查GPU内存是否充足
4. 验证配置文件路径
💡 **提示**: 如果问题持续,请检查模型和依赖项安装
"""
return error_msg, "⚠️ 错误"
def create_app():
"""创建Gradio应用"""
interface = gr.Interface(
fn=analyze_video_with_minigpt4,
inputs=[
gr.Video(label="上传视频文件"),
gr.Textbox(
label="分析指令",
value="请详细分析这个视频的内容,包括场景、人物、动作、对话等。请用中文输出,并详细记录视频中谁说了什么话。",
placeholder="输入您希望AI如何分析这个视频...",
lines=3
)
],
outputs=[
gr.Textbox(label="MiniGPT4-Video 内容分析 + 巨量引擎规则检测", lines=20),
gr.Textbox(label="巨量引擎风险评级")
],
title="🎥 智能视频内容安全分析 - MiniGPT4-Video + 巨量引擎",
description="""
## 🎬 基于MiniGPT4-Video + 巨量引擎299条禁投规则的专业视频安全检测系统
⚡ **ZeroGPU加速** | 🎬 **MiniGPT4-Video** | 🎙️ **Whisper语音** | 🛡️ **巨量引擎299条规则**
**🔥 核心功能:**
- 🎞️ **深度视频理解**: MiniGPT4-Video多模态分析
- 🎙️ **语音转文字**: Whisper自动生成字幕
- 🛡️ **专业违规检测**: 巨量引擎完整禁投规则库
- 📊 **智能风险评级**: P0-P3四级风险等级
**🎯 检测维度:**
- **高危(P3)**: 违法出版物、烟草、医疗等严重违规
- **中危(P2)**: 赌博周边、房地产、金融等中等风险
- **低危(P1)**: 化妆品、汽车、游戏等轻微风险
**📋 规则覆盖:**
涵盖化妆品类、汽车类、游戏类、赌博类、房地产类、工具软件类、教育培训类、
金融类、医疗类、烟草类等全部299条巨量引擎禁投规则
""",
examples=[
[None, "分析这个视频是否包含禁投内容"],
[None, "检测视频中是否有巨量引擎禁止的产品或服务"],
[None, "评估视频内容的投放风险等级"],
[None, "详细描述视频内容并进行合规检查"]
],
cache_examples=False
)
return interface
def main():
"""主函数"""
print("🚀 启动MiniGPT4-Video + 巨量引擎视频安全分析应用")
print("🎬 MiniGPT4-Video: 深度视频内容理解")
print("🛡️ 巨量引擎: 299条禁投规则检测")
if torch.cuda.is_available():
print(f"✅ GPU可用: {torch.cuda.get_device_name(0)}")
else:
print("⚠️ 使用CPU模式")
# 创建必要的目录
os.makedirs("workspace/tmp", exist_ok=True)
os.makedirs("workspace/inference_subtitles", exist_ok=True)
os.makedirs("workspace/inference_subtitles/mp3", exist_ok=True)
print("📁 工作目录准备完成")
print("🚀 正在启动Gradio应用...")
app = create_app()
# 启动应用
app.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
def create_intelligent_instruction(original_instruction):
"""创建具备规则理解能力的智能分析指令"""
# 核心禁投规则摘要 - 让AI知道需要检测什么
rules_summary = """
请特别注意以下巨量引擎禁投内容(如发现请在描述中明确指出):
🚨 **高危违规内容 (P3)**:
- 医疗器械、药品、保健品、医美服务
- 烟草制品、电子烟相关产品
- 虚拟货币、区块链、NFT、数字藏品
- 违法出版物、政治敏感内容
- 贷款、信贷、金融投资、股票
- 赌博、博彩、棋牌游戏
⚠️ **中危违规内容 (P2)**:
- 房地产买卖、租赁、中介服务
- 工具软件、刷机、破解软件
- 教育培训、学历提升、考试代办
- 翡翠、玉石、文玩、珠宝盲盒
- 黄金回收、贵金属投资
💭 **低危违规内容 (P1)**:
- 化妆品中的特殊功效产品
- 汽车修复、代办服务
- 游戏账号交易、代练
- 特殊食品、减肥产品
"""
intelligent_instruction = f"""
你是专业的巨量引擎广告内容审核专家。请用中文详细分析这个视频,包括:
📹 **视频内容详细描述**:
- 场景环境:描述视频拍摄场所、背景环境
- 人物信息:谁在视频中出现,年龄、性别、穿着特征
- 关键动作:详细描述人物的具体动作和行为
- 产品展示:如有产品展示,请详细描述产品外观、材质、用途
- 文字信息:视频中出现的任何文字、标识、品牌名称
🎙️ **语音对话内容**:
- 详细记录视频中的所有对话内容
- 明确标注"谁说了什么话"
- 记录任何产品介绍、价格信息、功效宣传
- 注意推销话术、营销用语
🔍 **潜在违规风险分析**:
{rules_summary}
🎯 **分析要求**:
1. 用中文输出所有内容
2. 对于任何可能涉及上述违规内容的元素,请明确指出
3. 重点关注翡翠、玉石、珠宝等文玩制品
4. 注意医疗、金融、房产、教育等敏感行业
5. 记录所有营销宣传语句
原始指令:{original_instruction}
"""
return intelligent_instruction
if __name__ == "__main__":
main()