Magic-plus-1 / src /enhanced_magic_wrapper.py
HF User
🚀 Fresh deploy of Magic Articulate Enhanced MVP
e7b9fb6
"""
Enhanced MagicArticulate包装器
集成MagicArticulate-Plus用户上传支持
基于我们已完善的articulate_api.py
"""
import os
import sys
import time
import logging
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, Tuple, List
# 添加必要的路径
parent_dir = os.path.join(os.path.dirname(__file__), '..')
sys.path.append(parent_dir) # 添加mvp-space根目录
sys.path.append(os.path.join(parent_dir, 'magic_articulate_plus'))
# 详细的调试信息
print(f"🔍 DEBUG: Current working directory: {os.getcwd()}")
print(f"🔍 DEBUG: Script directory: {os.path.dirname(__file__)}")
print(f"🔍 DEBUG: Parent directory: {parent_dir}")
print(f"🔍 DEBUG: Python path includes:")
for i, path in enumerate(sys.path):
print(f" {i}: {path}")
# 检查关键目录是否存在
utils_dir = os.path.join(parent_dir, 'utils')
magic_plus_dir = os.path.join(parent_dir, 'magic_articulate_plus')
skeleton_dir = os.path.join(parent_dir, 'skeleton_models')
print(f"🔍 DEBUG: Directory existence:")
print(f" utils directory exists: {os.path.exists(utils_dir)}")
print(f" magic_articulate_plus directory exists: {os.path.exists(magic_plus_dir)}")
print(f" skeleton_models directory exists: {os.path.exists(skeleton_dir)}")
if os.path.exists(utils_dir):
print(f"🔍 DEBUG: utils directory contents: {os.listdir(utils_dir)}")
if os.path.exists(magic_plus_dir):
print(f"🔍 DEBUG: magic_articulate_plus directory contents: {os.listdir(magic_plus_dir)}")
# 导入我们已经完善的MagicArticulate-Plus功能
try:
print("🔍 DEBUG: Attempting to import magic_articulate_plus.articulate_api...")
from magic_articulate_plus.articulate_api import (
MagicArticulateAPI,
ModelValidator,
process_model_file
)
print("✅ DEBUG: Successfully imported MagicArticulate-Plus components")
ENHANCED_AVAILABLE = True
except ImportError as e:
print(f"❌ DEBUG: Import failed with error: {e}")
print(f"❌ DEBUG: Error type: {type(e)}")
import traceback
print(f"❌ DEBUG: Full traceback:")
traceback.print_exc()
logging.warning(f"MagicArticulate-Plus not available: {e}")
ENHANCED_AVAILABLE = False
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnhancedMagicWrapper:
"""
增强版MagicArticulate包装器
支持用户上传任意3D模型文件
"""
def __init__(self, model_weights_path: Optional[str] = None):
# 如果没有指定权重路径,使用默认的空间模型(匹配demo.py hier_order=False)
if model_weights_path is None:
model_weights_path = "skeleton_ckpt/checkpoint_trainonv2_spatial.pth"
self.model_weights_path = model_weights_path
self.initialized = False
if ENHANCED_AVAILABLE:
# 使用我们完善的MagicArticulate-Plus API
self.api = MagicArticulateAPI(
model_weights_path=model_weights_path,
device="auto",
session_base_dir="hf_user_sessions"
)
logger.info(f"✅ 使用增强版MagicArticulate-Plus API (weights: {model_weights_path})")
else:
# 降级到原始包装器
logger.error("❌ MagicArticulate-Plus不可用,请检查集成")
self.api = None
def initialize(self) -> bool:
"""初始化API"""
try:
if not ENHANCED_AVAILABLE:
logger.error("增强版API不可用")
return False
logger.info("🚀 初始化增强版MagicArticulate...")
# 使用我们已经完善的初始化逻辑
success = self.api.initialize_model()
if success:
self.initialized = True
logger.info("✅ 增强版MagicArticulate初始化成功")
else:
logger.error("❌ 增强版MagicArticulate初始化失败")
return success
except Exception as e:
logger.error(f"💥 初始化失败: {str(e)}")
return False
def validate_uploaded_file(self, file_path: str) -> Tuple[bool, str, Dict[str, Any]]:
"""
验证用户上传的文件
使用我们已完善的ModelValidator
"""
try:
if not ENHANCED_AVAILABLE:
return False, "增强功能不可用", {}
# 使用我们已经完善的验证逻辑
is_valid, error_msg, model_info = ModelValidator.validate_file(file_path)
if is_valid:
logger.info(f"✅ 文件验证通过: {model_info.get('file_name', 'Unknown')}")
else:
logger.warning(f"⚠️ 文件验证失败: {error_msg}")
return is_valid, error_msg, model_info
except Exception as e:
error_msg = f"文件验证过程出错: {str(e)}"
logger.error(error_msg)
return False, error_msg, {}
def process_3d_model(self,
model_file_path: str,
prompt: str = "",
confidence_threshold: float = 0.8,
generate_preview: bool = True,
**kwargs) -> Dict[str, Any]:
"""
处理3D模型 - 支持用户上传
使用我们已完善的处理管道
"""
try:
if not self.initialized:
return {
'success': False,
'error': 'API未初始化',
'skeleton_data': None,
'output_files': None,
'processing_info': None
}
if not ENHANCED_AVAILABLE:
return {
'success': False,
'error': '增强功能不可用',
'skeleton_data': None,
'output_files': None,
'processing_info': None
}
logger.info(f"🔄 开始处理用户上传的模型: {model_file_path}")
# 首先验证文件
is_valid, error_msg, model_info = self.validate_uploaded_file(model_file_path)
if not is_valid:
return {
'success': False,
'error': f'文件验证失败: {error_msg}',
'skeleton_data': None,
'output_files': None,
'processing_info': model_info
}
# 准备处理选项
processing_options = {
'auto_repair': kwargs.get('auto_repair', True),
'target_faces': kwargs.get('target_faces', 10000),
'confidence_threshold': confidence_threshold,
'generate_preview': generate_preview
}
# 使用我们已完善的处理API
result = self.api.process_uploaded_model(
file_path=model_file_path,
user_prompt=prompt,
processing_options=processing_options
)
# 转换为MVP期望的格式
if result['success']:
logger.info("✅ 模型处理完成")
# 添加处理信息
processing_info = {
'input_file': model_info.get('file_name', 'Unknown'),
'prompt': prompt,
'joint_count': result['skeleton_data'].get('joint_count', 0),
'bone_count': result['skeleton_data'].get('bone_count', 0),
'confidence_threshold': confidence_threshold,
'vertex_count': model_info.get('vertex_count', 0),
'face_count': model_info.get('face_count', 0),
'file_size_mb': model_info.get('file_size_mb', 0),
'preprocessing_log': result.get('preprocessing_log', [])
}
return {
'success': True,
'skeleton_data': result['skeleton_data'],
'output_files': result['output_files'],
'processing_info': processing_info
}
else:
logger.error(f"❌ 处理失败: {result.get('error', 'Unknown error')}")
return {
'success': False,
'error': result.get('error', 'Unknown error'),
'skeleton_data': None,
'output_files': None,
'processing_info': None
}
except Exception as e:
error_msg = f"处理过程中发生错误: {str(e)}"
logger.error(f"💥 {error_msg}")
return {
'success': False,
'error': error_msg,
'skeleton_data': None,
'output_files': None,
'processing_info': None
}
def get_supported_formats(self) -> List[str]:
"""获取支持的文件格式"""
if ENHANCED_AVAILABLE:
# 返回我们已完善的格式列表
return list(ModelValidator.SUPPORTED_FORMATS)
else:
# 降级到基础格式
return ['.obj', '.glb', '.ply', '.stl']
def get_session_info(self, session_id: str) -> Dict[str, Any]:
"""获取会话信息"""
try:
if self.api and hasattr(self.api, 'get_session_info'):
return self.api.get_session_info(session_id)
else:
return {}
except Exception as e:
logger.error(f"获取会话信息失败: {str(e)}")
return {}
def cleanup_sessions(self, max_age_days: int = 1):
"""清理旧会话(HF Space内存限制)"""
try:
if self.api and hasattr(self.api, 'cleanup_sessions'):
self.api.cleanup_sessions(max_age_days)
logger.info(f"✅ 清理了超过 {max_age_days} 天的旧会话")
except Exception as e:
logger.error(f"清理会话失败: {str(e)}")
# 为了保持兼容性,提供原始类名的别名
MagicArticulateWrapper = EnhancedMagicWrapper
# 简化的处理函数,直接使用我们完善的API
def process_user_model(file_path: str,
prompt: str = "",
model_weights_path: Optional[str] = None) -> Dict[str, Any]:
"""
简化的用户模型处理接口
直接使用我们已完善的process_model_file函数
"""
try:
if ENHANCED_AVAILABLE:
# 使用我们已完善的简化接口
return process_model_file(
file_path=file_path,
user_prompt=prompt,
model_weights_path=model_weights_path,
output_dir="hf_temp_sessions"
)
else:
return {
'success': False,
'error': 'Enhanced API not available'
}
except Exception as e:
return {
'success': False,
'error': f'Processing failed: {str(e)}'
}