Spaces:
Running
Running
File size: 9,926 Bytes
d2cd03f 2c6b761 d2cd03f 548a895 d2cd03f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
import gradio as gr
from transformers import AutoConfig
from typing import Dict, Any, Tuple, Optional
import math
def get_model_config(model_id: str) -> AutoConfig:
"""获取模型配置信息"""
try:
# 使用transformers的AutoConfig,更加可靠
config = AutoConfig.from_pretrained(
model_id,
trust_remote_code=True, # 支持自定义模型
revision="main"
)
return config
except Exception as e:
raise Exception(f"无法获取模型配置: {str(e)}")
def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]:
"""分析注意力机制类型"""
model_type = getattr(config, "model_type", "").lower()
architecture = getattr(config, "architectures", [])
# 检测各种优化技术
attention_info = {
"uses_gqa": False,
"uses_mla": False,
"uses_sliding_window": False,
"attention_type": "Multi-Head Attention (MHA)"
}
# 检测GQA (Grouped Query Attention)
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
if num_key_value_heads < num_attention_heads and num_key_value_heads > 0:
attention_info["uses_gqa"] = True
attention_info["attention_type"] = "Grouped Query Attention (GQA)"
# 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中
if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture):
if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"):
attention_info["uses_mla"] = True
attention_info["attention_type"] = "Multi-head Latent Attention (MLA)"
# 检测滑动窗口注意力
if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"):
attention_info["uses_sliding_window"] = True
# 特殊模型类型检测
if "llama" in model_type:
attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA"
elif "mistral" in model_type:
attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA"
elif "qwen" in model_type:
attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention"
return attention_info
def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]:
"""计算KV cache大小"""
# 获取基本参数,兼容不同的参数名
num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0)))
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0)))
# 计算head dimension
head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0
# 如果是MLA,需要特殊处理
kv_lora_rank = getattr(config, "kv_lora_rank", 0)
if kv_lora_rank > 0: # MLA架构
# MLA中KV的维度被压缩
effective_kv_dim = kv_lora_rank
else:
effective_kv_dim = head_dim * num_key_value_heads
# 计算每个token的KV cache大小 (Key + Value)
# 使用FP16 (2 bytes per element)
bytes_per_element = 2
kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element # K + V
# 总的KV cache大小
total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size
# 转换为更友好的单位
def format_bytes(bytes_val):
if bytes_val < 1024:
return f"{bytes_val} B"
elif bytes_val < 1024**2:
return f"{bytes_val/1024:.2f} KB"
elif bytes_val < 1024**3:
return f"{bytes_val/(1024**2):.2f} MB"
else:
return f"{bytes_val/(1024**3):.2f} GB"
return {
"num_layers": num_layers,
"num_attention_heads": num_attention_heads,
"num_key_value_heads": num_key_value_heads,
"head_dim": head_dim,
"hidden_size": hidden_size,
"effective_kv_dim": effective_kv_dim,
"kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers),
"total_kv_cache": format_bytes(total_kv_cache_bytes),
"total_kv_cache_bytes": total_kv_cache_bytes,
"kv_lora_rank": kv_lora_rank
}
def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str:
"""分析模型并返回结果"""
try:
# 获取模型配置
config = get_model_config(model_id)
# 分析注意力机制
attention_info = analyze_attention_mechanism(config)
# 计算KV cache大小
kv_info = calculate_kv_cache_size(config, sequence_length, batch_size)
# 格式化输出
result = f"""
## 模型信息分析 - {model_id}
### 基本参数
- **模型类型**: {getattr(config, 'model_type', 'Unknown')}
- **层数**: {kv_info['num_layers']}
- **隐藏层大小**: {kv_info['hidden_size']}
- **注意力头数**: {kv_info['num_attention_heads']}
- **KV头数**: {kv_info['num_key_value_heads']}
- **每个头的维度**: {kv_info['head_dim']}
### 注意力机制优化
- **注意力类型**: {attention_info['attention_type']}
- **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'}
- **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'}
- **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'}
### KV Cache 存储分析
- **序列长度**: {sequence_length}
- **批量大小**: {batch_size}
- **有效KV维度**: {kv_info['effective_kv_dim']}
- **每个token的KV存储**: {kv_info['kv_size_per_token']}
- **总KV Cache大小**: {kv_info['total_kv_cache']}
### 优化效果分析
"""
# 计算GQA的内存节省
if attention_info['uses_gqa']:
original_kv_heads = kv_info['num_attention_heads']
actual_kv_heads = kv_info['num_key_value_heads']
memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100
result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n"
# MLA的特殊说明
if attention_info['uses_mla']:
result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n"
# 内存使用建议
total_gb = kv_info['total_kv_cache_bytes'] / (1024**3)
if total_gb > 8:
result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU"
elif total_gb > 4:
result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行"
else:
result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存"
return result
except Exception as e:
return f"❌ 分析失败: {str(e)}"
# 创建Gradio界面
def create_interface():
with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface:
gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器")
gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况")
with gr.Row():
with gr.Column(scale=3):
model_input = gr.Textbox(
label="模型ID",
placeholder="例如: microsoft/DialoGPT-medium, meta-llama/Llama-2-7b-hf",
value="microsoft/DialoGPT-medium"
)
with gr.Column(scale=1):
seq_len_input = gr.Number(
label="序列长度",
value=2048,
minimum=1,
maximum=131072
)
with gr.Column(scale=1):
batch_size_input = gr.Number(
label="批量大小",
value=1,
minimum=1,
maximum=128
)
analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg")
output = gr.Markdown(label="分析结果")
# 添加一些示例模型
gr.Markdown("### 💡 热门模型示例")
example_models = [
["deepseek-ai/DeepSeek-V3-0324", 32768, 1],
["Qwen/Qwen3-8B", 32768, 1],
]
gr.Examples(
examples=example_models,
inputs=[model_input, seq_len_input, batch_size_input],
outputs=output,
fn=analyze_model,
cache_examples=False
)
analyze_btn.click(
fn=analyze_model,
inputs=[model_input, seq_len_input, batch_size_input],
outputs=output
)
gr.Markdown("""
### 📖 说明
- **GQA**: Grouped Query Attention,通过减少KV头数来节省内存
- **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache
- **滑动窗口**: 限制注意力范围来减少计算和内存使用
- KV Cache大小计算基于FP16精度 (每个元素2字节)
- 使用 `transformers.AutoConfig` 获取配置,支持自定义模型
### 🛠️ 安装依赖
```bash
pip install gradio transformers torch
```
""")
return iface
if __name__ == "__main__":
app = create_interface()
app.launch(share=True) |