Spaces:
Running
Running
File size: 14,903 Bytes
fae3a33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
import gradio as gr
import requests
import json
from transformers import AutoConfig
import math
from typing import Dict, Tuple, Optional
class LLMMemoryCalculator:
def __init__(self):
self.precision_bytes = {
'fp32': 4,
'fp16': 2,
'bf16': 2,
'int8': 1,
'int4': 0.5
}
# -------------------------------------------------
# 📥 基础工具
# -------------------------------------------------
def get_model_config(self, model_id: str) -> Dict:
"""获取模型配置"""
try:
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True)
return config
except Exception as e:
raise Exception(f"无法获取模型配置: {str(e)}")
def get_file_size_from_url(self, model_id: str, filename: str) -> int:
"""通过 HEAD 请求获取文件大小(备用)"""
try:
url = f"https://huggingface.co/{model_id}/resolve/main/{filename}"
response = requests.head(url, timeout=10)
if response.status_code == 200:
content_length = response.headers.get('Content-Length')
if content_length:
return int(content_length)
return 0
except:
return 0
# -------------------------------------------------
# 📦 获取模型权重大小
# -------------------------------------------------
def get_model_size_from_hf(self, model_id: str) -> Tuple[float, str]:
"""优先使用 *.index.json 中的 metadata.total_size,回退到文件列表/HEAD"""
try:
# 1️⃣ 尝试读取 index.json(safetensors > pytorch)
for index_name, tag in [
("model.safetensors.index.json", "safetensors_index"),
("pytorch_model.bin.index.json", "pytorch_index")
]:
url = f"https://huggingface.co/{model_id}/resolve/main/{index_name}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
try:
data = resp.json()
except ValueError:
# 某些仓库 index.json 以文本形式存储,需要手动解析
data = json.loads(resp.text)
total_bytes = data.get("metadata", {}).get("total_size", 0)
if total_bytes > 0:
return total_bytes / (1024 ** 3), tag
# 2️⃣ 调用 Hub API,尝试直接读取 size 字段
api_url = f"https://huggingface.co/api/models/{model_id}"
response = requests.get(api_url, timeout=10)
if response.status_code != 200:
raise Exception(f"API请求失败: {response.status_code}")
model_info = response.json()
# 2a. 查找 siblings 列表中带 size 的 .safetensors 文件
safetensors_files = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.safetensors') and 'size' in f]
if safetensors_files:
total_size = sum(f['size'] for f in safetensors_files)
return total_size / (1024 ** 3), "safetensors_files"
# 2b. 使用 HEAD 请求补全未包含 size 的 .safetensors 文件
safetensors_no_size = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.safetensors')]
if safetensors_no_size:
total_size = 0
for f in safetensors_no_size:
total_size += self.get_file_size_from_url(model_id, f['rfilename'])
if total_size > 0:
return total_size / (1024 ** 3), "safetensors_head"
# 2c. 同理处理 pytorch_model-xxxxx.bin
pytorch_files = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.bin') and 'size' in f]
if pytorch_files:
total_size = sum(f['size'] for f in pytorch_files)
return total_size / (1024 ** 3), "pytorch_files"
pytorch_no_size = [f for f in model_info.get('siblings', [])
if f['rfilename'].endswith('.bin')]
if pytorch_no_size:
total_size = 0
for f in pytorch_no_size:
total_size += self.get_file_size_from_url(model_id, f['rfilename'])
if total_size > 0:
return total_size / (1024 ** 3), "pytorch_head"
# 3️⃣ 如果仍然无法确定大小,走估算逻辑
raise Exception("未找到权重大小信息")
except Exception:
# 估算
return self.estimate_model_size_from_config(model_id)
# -------------------------------------------------
# 📐 估算逻辑(与原始保持一致)
# -------------------------------------------------
def estimate_model_size_from_config(self, model_id: str) -> Tuple[float, str]:
"""根据 config.json 估算模型大小(FP16)"""
try:
config = self.get_model_config(model_id)
vocab_size = getattr(config, 'vocab_size', 50000)
hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096))
num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32))
intermediate_size = getattr(config, 'intermediate_size', hidden_size * 4)
# Embedding
embedding_params = vocab_size * hidden_size
# Transformer layer
attention_params = 4 * hidden_size * hidden_size
ffn_params = 2 * hidden_size * intermediate_size
ln_params = 2 * hidden_size
params_per_layer = attention_params + ffn_params + ln_params
total_params = embedding_params + num_layers * params_per_layer
if hasattr(config, 'tie_word_embeddings') and not config.tie_word_embeddings:
total_params += vocab_size * hidden_size
model_size_gb = (total_params * 2) / (1024 ** 3) # 默认 fp16
return model_size_gb, "estimated"
except Exception as e:
raise Exception(f"无法估算模型大小: {str(e)}")
# -------------------------------------------------
# 🗄️ KV Cache 计算(原逻辑保持)
# -------------------------------------------------
def calculate_kv_cache_size(self, config, context_length: int, batch_size: int = 1) -> Dict[str, float]:
try:
num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32))
hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096))
num_attention_heads = getattr(config, 'num_attention_heads', getattr(config, 'num_heads', 32))
num_key_value_heads = getattr(config, 'num_key_value_heads', num_attention_heads)
is_mla = hasattr(config, 'kv_lora_rank') and config.kv_lora_rank is not None
head_dim = hidden_size // num_attention_heads
if is_mla:
kv_lora_rank = getattr(config, 'kv_lora_rank', 512)
kv_cache_per_token = kv_lora_rank * 2
attention_type = "MLA"
elif num_key_value_heads < num_attention_heads:
kv_cache_per_token = num_key_value_heads * head_dim * 2
attention_type = "GQA"
else:
kv_cache_per_token = num_attention_heads * head_dim * 2
attention_type = "MHA"
total_kv_cache = (kv_cache_per_token * context_length * num_layers * batch_size * 2) / (1024 ** 3)
return {
'size_gb': total_kv_cache,
'attention_type': attention_type,
'num_kv_heads': num_key_value_heads,
'num_attention_heads': num_attention_heads,
'head_dim': head_dim
}
except Exception as e:
raise Exception(f"计算KV Cache失败: {str(e)}")
# -------------------------------------------------
# 🧮 综合内存需求计算(保持不变)
# -------------------------------------------------
def calculate_memory_requirements(self, model_id: str, gpu_memory_gb: float, num_gpus: int,
context_length: int, utilization_rate: float = 0.9) -> Dict:
try:
config = self.get_model_config(model_id)
model_size_gb, size_source = self.get_model_size_from_hf(model_id)
kv_info = self.calculate_kv_cache_size(config, context_length)
available_memory = gpu_memory_gb * num_gpus * utilization_rate
other_overhead = model_size_gb * 0.1
total_memory_needed = model_size_gb + kv_info['size_gb'] + other_overhead
is_feasible = total_memory_needed <= available_memory
memory_margin = available_memory - total_memory_needed
memory_per_gpu = total_memory_needed / num_gpus
return {
'model_id': model_id,
'model_size_gb': round(model_size_gb, 2),
'size_source': size_source,
'kv_cache_gb': round(kv_info['size_gb'], 2),
'attention_type': kv_info['attention_type'],
'other_overhead_gb': round(other_overhead, 2),
'total_memory_needed_gb': round(total_memory_needed, 2),
'available_memory_gb': round(available_memory, 2),
'memory_margin_gb': round(memory_margin, 2),
'memory_per_gpu_gb': round(memory_per_gpu, 2),
'is_feasible': is_feasible,
'utilization_per_gpu': round((memory_per_gpu / gpu_memory_gb) * 100, 1),
'config_info': {
'num_layers': getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 'N/A')),
'hidden_size': getattr(config, 'hidden_size', getattr(config, 'd_model', 'N/A')),
'num_attention_heads': kv_info['num_attention_heads'],
'num_kv_heads': kv_info['num_kv_heads'],
'head_dim': kv_info['head_dim']
}
}
except Exception as e:
return {'error': str(e)}
# -------------------------------------------------
# 🌟 Gradio 界面构建(保持原逻辑)
# -------------------------------------------------
def create_gradio_interface():
calculator = LLMMemoryCalculator()
def calculate_memory(model_id, gpu_memory, num_gpus, context_length, utilization_rate):
if not model_id.strip():
return "请输入模型ID"
try:
result = calculator.calculate_memory_requirements(
model_id.strip(),
float(gpu_memory),
int(num_gpus),
int(context_length),
float(utilization_rate) / 100
)
if 'error' in result:
return f"❌ 错误: {result['error']}"
status = "✅ 可以运行" if result['is_feasible'] else "❌ 显存不足"
output = f"""
## 模型分析结果
**模型**: {result['model_id']}
**状态**: {status}
### 📊 内存分析
- **模型大小**: {result['model_size_gb']} GB ({result['size_source']})
- **KV Cache**: {result['kv_cache_gb']} GB
- **其他开销**: {result['other_overhead_gb']} GB
- **总需求**: {result['total_memory_needed_gb']} GB
- **可用显存**: {result['available_memory_gb']} GB
- **剩余显存**: {result['memory_margin_gb']} GB
### 🔧 模型配置
- **注意力类型**: {result['attention_type']}
- **层数**: {result['config_info']['num_layers']}
- **隐藏维度**: {result['config_info']['hidden_size']}
- **注意力头数**: {result['config_info']['num_attention_heads']}
- **KV头数**: {result['config_info']['num_kv_heads']}
- **头维度**: {result['config_info']['head_dim']}
### 💾 GPU使用情况
- **每GPU内存**: {result['memory_per_gpu_gb']} GB
- **每GPU利用率**: {result['utilization_per_gpu']}%
### 💡 建议
"""
if result['is_feasible']:
output += f"✅ 当前配置可以成功运行该模型。剩余 {result['memory_margin_gb']} GB 显存。"
else:
needed_extra = abs(result['memory_margin_gb'])
output += f"❌ 需要额外 {needed_extra} GB 显存才能运行。\n建议:\n- 增加GPU数量\n- 使用更大显存的GPU\n- 减少上下文长度\n- 使用模型量化(如int8/int4)"
return output
except Exception as e:
return f"❌ 计算出错: {str(e)}"
with gr.Blocks(title="LLM GPU内存计算器", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🚀 LLM GPU内存需求计算器")
gr.Markdown("输入模型信息和硬件配置,计算是否能够成功运行大语言模型")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 📝 输入参数")
model_id = gr.Textbox(label="🤗 Hugging Face 模型ID",
placeholder="例如: deepseek-ai/DeepSeek-R1-0528-Qwen3-8B",
value="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B")
with gr.Row():
gpu_memory = gr.Number(label="💾 单张GPU显存 (GB)", value=24, minimum=1, maximum=1000)
num_gpus = gr.Number(label="🔢 GPU数量", value=1, minimum=1, maximum=64, precision=0)
with gr.Row():
context_length = gr.Number(label="📏 上下文长度", value=16384, minimum=512, maximum=1000000, precision=0)
utilization_rate = gr.Slider(label="⚡ 显存利用率 (%)", minimum=50, maximum=95, value=90, step=5)
calculate_btn = gr.Button("🔍 计算内存需求", variant="primary")
with gr.Column(scale=2):
gr.Markdown("## 📊 计算结果")
output = gr.Markdown("点击计算按钮开始分析...")
calculate_btn.click(fn=calculate_memory,
inputs=[model_id, gpu_memory, num_gpus, context_length, utilization_rate],
outputs=output)
gr.Markdown("""
## 📚 使用示例
**小型模型**: `microsoft/DialoGPT-medium`
**中型模型**: `microsoft/DialoGPT-large`
**大型模型**: `meta-llama/Llama-2-7b-hf`
**超大模型**: `meta-llama/Llama-2-13b-hf`
注意:某些模型可能需要申请访问权限。
""")
return demo
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(share=True, debug=True) |