File size: 9,926 Bytes
d2cd03f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2c6b761
d2cd03f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
548a895
 
d2cd03f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import gradio as gr
from transformers import AutoConfig
from typing import Dict, Any, Tuple, Optional
import math

def get_model_config(model_id: str) -> AutoConfig:
    """获取模型配置信息"""
    try:
        # 使用transformers的AutoConfig,更加可靠
        config = AutoConfig.from_pretrained(
            model_id, 
            trust_remote_code=True,  # 支持自定义模型
            revision="main"
        )
        return config
    except Exception as e:
        raise Exception(f"无法获取模型配置: {str(e)}")

def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]:
    """分析注意力机制类型"""
    model_type = getattr(config, "model_type", "").lower()
    architecture = getattr(config, "architectures", [])
    
    # 检测各种优化技术
    attention_info = {
        "uses_gqa": False,
        "uses_mla": False,
        "uses_sliding_window": False,
        "attention_type": "Multi-Head Attention (MHA)"
    }
    
    # 检测GQA (Grouped Query Attention)
    num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
    num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
    
    if num_key_value_heads < num_attention_heads and num_key_value_heads > 0:
        attention_info["uses_gqa"] = True
        attention_info["attention_type"] = "Grouped Query Attention (GQA)"
    
    # 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中
    if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture):
        if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"):
            attention_info["uses_mla"] = True
            attention_info["attention_type"] = "Multi-head Latent Attention (MLA)"
    
    # 检测滑动窗口注意力
    if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"):
        attention_info["uses_sliding_window"] = True
    
    # 特殊模型类型检测
    if "llama" in model_type:
        attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA"
    elif "mistral" in model_type:
        attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA"
    elif "qwen" in model_type:
        attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention"
    
    return attention_info

def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]:
    """计算KV cache大小"""
    
    # 获取基本参数,兼容不同的参数名
    num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0)))
    num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
    num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
    hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0)))
    
    # 计算head dimension
    head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0
    
    # 如果是MLA,需要特殊处理
    kv_lora_rank = getattr(config, "kv_lora_rank", 0)
    if kv_lora_rank > 0:  # MLA架构
        # MLA中KV的维度被压缩
        effective_kv_dim = kv_lora_rank
    else:
        effective_kv_dim = head_dim * num_key_value_heads
    
    # 计算每个token的KV cache大小 (Key + Value)
    # 使用FP16 (2 bytes per element)
    bytes_per_element = 2
    kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element  # K + V
    
    # 总的KV cache大小
    total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size
    
    # 转换为更友好的单位
    def format_bytes(bytes_val):
        if bytes_val < 1024:
            return f"{bytes_val} B"
        elif bytes_val < 1024**2:
            return f"{bytes_val/1024:.2f} KB"
        elif bytes_val < 1024**3:
            return f"{bytes_val/(1024**2):.2f} MB"
        else:
            return f"{bytes_val/(1024**3):.2f} GB"
    
    return {
        "num_layers": num_layers,
        "num_attention_heads": num_attention_heads,
        "num_key_value_heads": num_key_value_heads,
        "head_dim": head_dim,
        "hidden_size": hidden_size,
        "effective_kv_dim": effective_kv_dim,
        "kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers),
        "total_kv_cache": format_bytes(total_kv_cache_bytes),
        "total_kv_cache_bytes": total_kv_cache_bytes,
        "kv_lora_rank": kv_lora_rank
    }

def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str:
    """分析模型并返回结果"""
    try:
        # 获取模型配置
        config = get_model_config(model_id)
        
        # 分析注意力机制
        attention_info = analyze_attention_mechanism(config)
        
        # 计算KV cache大小
        kv_info = calculate_kv_cache_size(config, sequence_length, batch_size)
        
        # 格式化输出
        result = f"""
## 模型信息分析 - {model_id}

        ### 基本参数
- **模型类型**: {getattr(config, 'model_type', 'Unknown')}
- **层数**: {kv_info['num_layers']}
- **隐藏层大小**: {kv_info['hidden_size']}
- **注意力头数**: {kv_info['num_attention_heads']}
- **KV头数**: {kv_info['num_key_value_heads']}
- **每个头的维度**: {kv_info['head_dim']}

### 注意力机制优化
- **注意力类型**: {attention_info['attention_type']}
- **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'}
- **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'}
- **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'}

### KV Cache 存储分析
- **序列长度**: {sequence_length}
- **批量大小**: {batch_size}
- **有效KV维度**: {kv_info['effective_kv_dim']}
- **每个token的KV存储**: {kv_info['kv_size_per_token']}
- **总KV Cache大小**: {kv_info['total_kv_cache']}

### 优化效果分析
"""
        
        # 计算GQA的内存节省
        if attention_info['uses_gqa']:
            original_kv_heads = kv_info['num_attention_heads']
            actual_kv_heads = kv_info['num_key_value_heads']
            memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100
            result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n"
        
        # MLA的特殊说明
        if attention_info['uses_mla']:
            result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n"
        
        # 内存使用建议
        total_gb = kv_info['total_kv_cache_bytes'] / (1024**3)
        if total_gb > 8:
            result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU"
        elif total_gb > 4:
            result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行"
        else:
            result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存"
            
        return result
        
    except Exception as e:
        return f"❌ 分析失败: {str(e)}"

# 创建Gradio界面
def create_interface():
    with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface:
        gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器")
        gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况")
        
        with gr.Row():
            with gr.Column(scale=3):
                model_input = gr.Textbox(
                    label="模型ID",
                    placeholder="例如: microsoft/DialoGPT-medium, meta-llama/Llama-2-7b-hf",
                    value="microsoft/DialoGPT-medium"
                )
            with gr.Column(scale=1):
                seq_len_input = gr.Number(
                    label="序列长度",
                    value=2048,
                    minimum=1,
                    maximum=131072
                )
            with gr.Column(scale=1):
                batch_size_input = gr.Number(
                    label="批量大小", 
                    value=1,
                    minimum=1,
                    maximum=128
                )
        
        analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg")
        
        output = gr.Markdown(label="分析结果")
        
        # 添加一些示例模型
        gr.Markdown("### 💡 热门模型示例")
        example_models = [
            ["deepseek-ai/DeepSeek-V3-0324", 32768, 1],
            ["Qwen/Qwen3-8B", 32768, 1],
        ]
        
        gr.Examples(
            examples=example_models,
            inputs=[model_input, seq_len_input, batch_size_input],
            outputs=output,
            fn=analyze_model,
            cache_examples=False
        )
        
        analyze_btn.click(
            fn=analyze_model,
            inputs=[model_input, seq_len_input, batch_size_input],
            outputs=output
        )
        
        gr.Markdown("""
        ### 📖 说明
        - **GQA**: Grouped Query Attention,通过减少KV头数来节省内存
        - **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache
        - **滑动窗口**: 限制注意力范围来减少计算和内存使用
        - KV Cache大小计算基于FP16精度 (每个元素2字节)
        - 使用 `transformers.AutoConfig` 获取配置,支持自定义模型
        
        ### 🛠️ 安装依赖
        ```bash
        pip install gradio transformers torch
        ```
        """)
    
    return iface

if __name__ == "__main__":
    app = create_interface()
    app.launch(share=True)