Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
import gradio as gr | |
import os | |
import logging | |
import asyncio | |
from typing import Tuple, Optional | |
from pathlib import Path | |
# 设置日志 | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
# MCP 初始化状态管理 | |
class MCPManager: | |
def __init__(self): | |
self.ready_event = asyncio.Event() | |
self.initialization_started = False | |
self.initialization_complete = False | |
async def initialize(self): | |
"""异步初始化MCP相关组件""" | |
if self.initialization_started: | |
await self.ready_event.wait() | |
return | |
self.initialization_started = True | |
logger.info("🔧 Starting MCP initialization...") | |
try: | |
# 模拟初始化过程,确保有足够时间 | |
await asyncio.sleep(1.0) | |
# 这里可以添加实际的MCP初始化逻辑 | |
logger.info("✅ MCP initialization complete") | |
self.initialization_complete = True | |
self.ready_event.set() | |
except Exception as e: | |
logger.error(f"❌ MCP initialization failed: {e}") | |
raise | |
# 全局MCP管理器实例 | |
mcp_manager = MCPManager() | |
# 文档提取器 | |
class SimpleDocumentExtractor: | |
def __init__(self): | |
self.initialized = False | |
def initialize(self): | |
"""同步初始化""" | |
if not self.initialized: | |
logger.info("📄 Initializing document extractor...") | |
self._check_dependencies() | |
self.initialized = True | |
logger.info("✅ Document extractor initialized") | |
def _check_dependencies(self): | |
"""检查必要的依赖项""" | |
try: | |
import importlib.util | |
if importlib.util.find_spec("PyPDF2") is not None: | |
logger.info("✅ PyPDF2 available for PDF processing") | |
else: | |
logger.warning("⚠️ PyPDF2 not available") | |
if importlib.util.find_spec("docx") is not None: | |
logger.info("✅ python-docx available for DOCX processing") | |
else: | |
logger.warning("⚠️ python-docx not available") | |
except Exception as e: | |
logger.warning(f"⚠️ Error checking dependencies: {e}") | |
def extract(self, file_path: str) -> str: | |
"""提取文档内容""" | |
file_name = Path(file_path).name | |
file_ext = Path(file_path).suffix.lower() | |
try: | |
if file_ext == ".pdf": | |
content = self._extract_pdf(file_path) | |
elif file_ext == ".docx": | |
content = self._extract_docx(file_path) | |
elif file_ext == ".txt": | |
content = self._extract_txt(file_path) | |
else: | |
return f"# {file_name}\n\n❌ Unsupported file format: {file_ext}" | |
if not content.strip(): | |
return f"# {file_name}\n\n⚠️ No text content found in the document." | |
return f"# {file_name}\n\n{content}" | |
except Exception as e: | |
logger.error(f"Error extracting content from {file_name}: {e}") | |
return f"# {file_name}\n\n❌ Error extracting content: {str(e)}" | |
def _extract_pdf(self, file_path: str) -> str: | |
"""从PDF文件提取文本""" | |
try: | |
import PyPDF2 | |
with open(file_path, "rb") as file: | |
reader = PyPDF2.PdfReader(file) | |
text_content = [] | |
for page_num, page in enumerate(reader.pages, 1): | |
page_text = page.extract_text() | |
if page_text.strip(): | |
text_content.append(f"## Page {page_num}\n\n{page_text}") | |
return "\n\n".join(text_content) | |
except ImportError: | |
return "❌ PyPDF2 library not available. Please install it with: pip install PyPDF2" | |
except Exception as e: | |
return f"❌ Error reading PDF: {str(e)}" | |
def _extract_docx(self, file_path: str) -> str: | |
"""从DOCX文件提取文本""" | |
try: | |
import docx | |
doc = docx.Document(file_path) | |
text_content = [] | |
for para in doc.paragraphs: | |
if para.text.strip(): | |
text_content.append(para.text) | |
return "\n\n".join(text_content) | |
except ImportError: | |
return "❌ python-docx library not available. Please install it with: pip install python-docx" | |
except Exception as e: | |
return f"❌ Error reading DOCX: {str(e)}" | |
def _extract_txt(self, file_path: str) -> str: | |
"""从TXT文件提取文本""" | |
try: | |
with open(file_path, "r", encoding="utf-8") as file: | |
return file.read() | |
except UnicodeDecodeError: | |
# 尝试其他编码 | |
try: | |
with open(file_path, "r", encoding="latin-1") as file: | |
return file.read() | |
except Exception as e: | |
return f"❌ Error reading text file with encoding: {str(e)}" | |
except Exception as e: | |
return f"❌ Error reading text file: {str(e)}" | |
_extractor = None | |
def get_extractor() -> SimpleDocumentExtractor: | |
global _extractor | |
if _extractor is None: | |
_extractor = SimpleDocumentExtractor() | |
_extractor.initialize() | |
return _extractor | |
def extract_document(file) -> Tuple[str, str]: | |
"""处理文档提取请求""" | |
if file is None: | |
return "", "❌ Please upload a file" | |
try: | |
# 添加调试信息 | |
logger.info(f"Received file object type: {type(file)}") | |
logger.info(f"File object content: {file}") | |
file_path = _extract_file_path(file) | |
if not file_path: | |
return "", f"❌ Invalid file object: {type(file)}" | |
logger.info(f"Extracted file path: {file_path}") | |
if not os.path.exists(file_path): | |
return "", f"❌ File not found: {file_path}" | |
content = get_extractor().extract(file_path) | |
return content, f"✅ Extracted content from {Path(file_path).name}" | |
except Exception as e: | |
logger.error(f"Extraction error: {e}") | |
return "", f"❌ Extraction failed: {str(e)}" | |
def _extract_file_path(file) -> Optional[str]: | |
"""从file对象中提取文件路径""" | |
try: | |
# 处理 Gradio 文件对象 | |
if file is None: | |
return None | |
# 如果是字符串路径,直接返回 | |
if isinstance(file, str) and file.strip(): | |
return file.strip() | |
# 如果有 name 属性(标准文件对象) | |
if hasattr(file, "name") and file.name: | |
return str(file.name) | |
# 如果是字典格式 | |
if isinstance(file, dict): | |
# 检查不同的可能键名 | |
for key in ["name", "path", "filepath", "file_path"]: | |
if key in file and file[key]: | |
return str(file[key]) | |
# 如果支持文件系统路径协议 | |
if hasattr(file, "__fspath__"): | |
return str(file) | |
# 记录未知的文件对象类型以便调试 | |
logger.debug(f"Unknown file object type: {type(file)}, content: {file}") | |
return None | |
except Exception as e: | |
logger.error(f"Error extracting file path: {e}") | |
return None | |
def check_mcp_status() -> str: | |
"""检查MCP状态""" | |
if mcp_manager.initialization_complete: | |
return "🟢 Ready" | |
elif mcp_manager.initialization_started: | |
return "🟡 Initializing..." | |
else: | |
return "🔴 Not Started" | |
def create_interface(): | |
"""创建Gradio界面""" | |
with gr.Blocks( | |
title="Document Extractor with MCP", | |
theme=gr.themes.Soft(), | |
css=""" | |
.status-ready { color: green !important; } | |
.status-init { color: orange !important; } | |
.status-error { color: red !important; } | |
""", | |
) as app: | |
gr.Markdown("# 📄 Document Extraction Tool with MCP Support") | |
gr.Markdown("Upload PDF or DOCX files to extract content as Markdown.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
status_output = gr.Textbox( | |
label="🔧 MCP Server Status", | |
interactive=False, | |
value="🟡 Initializing...", | |
) | |
with gr.Column(scale=1): | |
check_btn = gr.Button("🔄 Refresh Status", variant="secondary") | |
check_btn.click(fn=check_mcp_status, inputs=[], outputs=status_output) | |
gr.Markdown("---") | |
with gr.Row(): | |
with gr.Column(): | |
file_input = gr.File( | |
file_types=[".pdf", ".docx", ".txt"], | |
label="📁 Upload Document", | |
type="filepath", | |
) | |
extract_btn = gr.Button("🚀 Extract Content", variant="primary") | |
with gr.Column(): | |
status_text = gr.Textbox( | |
label="📊 Processing Status", | |
interactive=False, | |
placeholder="Upload a file and click Extract Content", | |
) | |
content_output = gr.Textbox( | |
label="📝 Extracted Markdown Content", | |
lines=20, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Extracted content will appear here...", | |
) | |
extract_btn.click( | |
fn=extract_document, | |
inputs=file_input, | |
outputs=[content_output, status_text], | |
) | |
# 初始加载时更新状态 | |
app.load(fn=check_mcp_status, inputs=[], outputs=status_output) | |
return app | |
async def async_main(): | |
"""异步主函数""" | |
logger.info("🚀 Starting document extraction tool with MCP support...") | |
try: | |
# 初始化文档提取器 | |
logger.info("📄 Initializing document extractor...") | |
get_extractor() | |
# 异步初始化MCP | |
await mcp_manager.initialize() | |
# 创建界面 | |
app = create_interface() | |
# 启动应用(适配 Hugging Face Spaces,必须加 share=True) | |
app.launch(share=True, show_error=True) | |
except Exception as e: | |
logger.error(f"❌ Application startup failed: {e}") | |
raise | |
def main(): | |
"""主入口函数""" | |
try: | |
# 确保事件循环正确设置 | |
if os.name == "nt": # Windows | |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
# 使用同步方式运行,避免异步问题 | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(async_main()) | |
finally: | |
loop.close() | |
except KeyboardInterrupt: | |
logger.info("🛑 Application stopped by user") | |
except Exception as e: | |
logger.error(f"❌ Application error: {e}") | |
raise | |
if __name__ == "__main__": | |
main() | |