import gradio as gr from transformers import AutoConfig from typing import Dict, Any, Tuple, Optional import math def get_model_config(model_id: str) -> AutoConfig: """获取模型配置信息""" try: # 使用transformers的AutoConfig,更加可靠 config = AutoConfig.from_pretrained( model_id, trust_remote_code=True, # 支持自定义模型 revision="main" ) return config except Exception as e: raise Exception(f"无法获取模型配置: {str(e)}") def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]: """分析注意力机制类型""" model_type = getattr(config, "model_type", "").lower() architecture = getattr(config, "architectures", []) # 检测各种优化技术 attention_info = { "uses_gqa": False, "uses_mla": False, "uses_sliding_window": False, "attention_type": "Multi-Head Attention (MHA)" } # 检测GQA (Grouped Query Attention) num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) if num_key_value_heads < num_attention_heads and num_key_value_heads > 0: attention_info["uses_gqa"] = True attention_info["attention_type"] = "Grouped Query Attention (GQA)" # 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中 if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture): if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"): attention_info["uses_mla"] = True attention_info["attention_type"] = "Multi-head Latent Attention (MLA)" # 检测滑动窗口注意力 if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"): attention_info["uses_sliding_window"] = True # 特殊模型类型检测 if "llama" in model_type: attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA" elif "mistral" in model_type: attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA" elif "qwen" in model_type: attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention" return attention_info def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]: """计算KV cache大小""" # 获取基本参数,兼容不同的参数名 num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0))) num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0))) # 计算head dimension head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0 # 如果是MLA,需要特殊处理 kv_lora_rank = getattr(config, "kv_lora_rank", 0) if kv_lora_rank > 0: # MLA架构 # MLA中KV的维度被压缩 effective_kv_dim = kv_lora_rank else: effective_kv_dim = head_dim * num_key_value_heads # 计算每个token的KV cache大小 (Key + Value) # 使用FP16 (2 bytes per element) bytes_per_element = 2 kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element # K + V # 总的KV cache大小 total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size # 转换为更友好的单位 def format_bytes(bytes_val): if bytes_val < 1024: return f"{bytes_val} B" elif bytes_val < 1024**2: return f"{bytes_val/1024:.2f} KB" elif bytes_val < 1024**3: return f"{bytes_val/(1024**2):.2f} MB" else: return f"{bytes_val/(1024**3):.2f} GB" return { "num_layers": num_layers, "num_attention_heads": num_attention_heads, "num_key_value_heads": num_key_value_heads, "head_dim": head_dim, "hidden_size": hidden_size, "effective_kv_dim": effective_kv_dim, "kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers), "total_kv_cache": format_bytes(total_kv_cache_bytes), "total_kv_cache_bytes": total_kv_cache_bytes, "kv_lora_rank": kv_lora_rank } def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str: """分析模型并返回结果""" try: # 获取模型配置 config = get_model_config(model_id) # 分析注意力机制 attention_info = analyze_attention_mechanism(config) # 计算KV cache大小 kv_info = calculate_kv_cache_size(config, sequence_length, batch_size) # 格式化输出 result = f""" ## 模型信息分析 - {model_id} ### 基本参数 - **模型类型**: {getattr(config, 'model_type', 'Unknown')} - **层数**: {kv_info['num_layers']} - **隐藏层大小**: {kv_info['hidden_size']} - **注意力头数**: {kv_info['num_attention_heads']} - **KV头数**: {kv_info['num_key_value_heads']} - **每个头的维度**: {kv_info['head_dim']} ### 注意力机制优化 - **注意力类型**: {attention_info['attention_type']} - **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'} - **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'} - **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'} ### KV Cache 存储分析 - **序列长度**: {sequence_length} - **批量大小**: {batch_size} - **有效KV维度**: {kv_info['effective_kv_dim']} - **每个token的KV存储**: {kv_info['kv_size_per_token']} - **总KV Cache大小**: {kv_info['total_kv_cache']} ### 优化效果分析 """ # 计算GQA的内存节省 if attention_info['uses_gqa']: original_kv_heads = kv_info['num_attention_heads'] actual_kv_heads = kv_info['num_key_value_heads'] memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100 result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n" # MLA的特殊说明 if attention_info['uses_mla']: result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n" # 内存使用建议 total_gb = kv_info['total_kv_cache_bytes'] / (1024**3) if total_gb > 8: result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU" elif total_gb > 4: result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行" else: result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存" return result except Exception as e: return f"❌ 分析失败: {str(e)}" # 创建Gradio界面 def create_interface(): with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface: gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器") gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况") with gr.Row(): with gr.Column(scale=3): model_input = gr.Textbox( label="模型ID", placeholder="例如: microsoft/DialoGPT-medium, meta-llama/Llama-2-7b-hf", value="microsoft/DialoGPT-medium" ) with gr.Column(scale=1): seq_len_input = gr.Number( label="序列长度", value=2048, minimum=1, maximum=131072 ) with gr.Column(scale=1): batch_size_input = gr.Number( label="批量大小", value=1, minimum=1, maximum=128 ) analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg") output = gr.Markdown(label="分析结果") # 添加一些示例模型 gr.Markdown("### 💡 热门模型示例") example_models = [ ["deepseek-ai/DeepSeek-V3-0324", 32768, 1], ["Qwen/Qwen3-8B", 32768, 1], ] gr.Examples( examples=example_models, inputs=[model_input, seq_len_input, batch_size_input], outputs=output, fn=analyze_model, cache_examples=False ) analyze_btn.click( fn=analyze_model, inputs=[model_input, seq_len_input, batch_size_input], outputs=output ) gr.Markdown(""" ### 📖 说明 - **GQA**: Grouped Query Attention,通过减少KV头数来节省内存 - **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache - **滑动窗口**: 限制注意力范围来减少计算和内存使用 - KV Cache大小计算基于FP16精度 (每个元素2字节) - 使用 `transformers.AutoConfig` 获取配置,支持自定义模型 ### 🛠️ 安装依赖 ```bash pip install gradio transformers torch ``` """) return iface if __name__ == "__main__": app = create_interface() app.launch(share=True)