#!/usr/bin/env python3 import gradio as gr import os import logging import asyncio from typing import Tuple, Optional from pathlib import Path # 设置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # MCP 初始化状态管理 class MCPManager: def __init__(self): self.ready_event = asyncio.Event() self.initialization_started = False self.initialization_complete = False async def initialize(self): """异步初始化MCP相关组件""" if self.initialization_started: await self.ready_event.wait() return self.initialization_started = True logger.info("🔧 Starting MCP initialization...") try: # 模拟初始化过程,确保有足够时间 await asyncio.sleep(1.0) # 这里可以添加实际的MCP初始化逻辑 logger.info("✅ MCP initialization complete") self.initialization_complete = True self.ready_event.set() except Exception as e: logger.error(f"❌ MCP initialization failed: {e}") raise # 全局MCP管理器实例 mcp_manager = MCPManager() # 文档提取器 class SimpleDocumentExtractor: def __init__(self): self.initialized = False def initialize(self): """同步初始化""" if not self.initialized: logger.info("📄 Initializing document extractor...") self._check_dependencies() self.initialized = True logger.info("✅ Document extractor initialized") def _check_dependencies(self): """检查必要的依赖项""" try: import importlib.util if importlib.util.find_spec("PyPDF2") is not None: logger.info("✅ PyPDF2 available for PDF processing") else: logger.warning("⚠️ PyPDF2 not available") if importlib.util.find_spec("docx") is not None: logger.info("✅ python-docx available for DOCX processing") else: logger.warning("⚠️ python-docx not available") except Exception as e: logger.warning(f"⚠️ Error checking dependencies: {e}") def extract(self, file_path: str) -> str: """提取文档内容""" file_name = Path(file_path).name file_ext = Path(file_path).suffix.lower() try: if file_ext == ".pdf": content = self._extract_pdf(file_path) elif file_ext == ".docx": content = self._extract_docx(file_path) elif file_ext == ".txt": content = self._extract_txt(file_path) else: return f"# {file_name}\n\n❌ Unsupported file format: {file_ext}" if not content.strip(): return f"# {file_name}\n\n⚠️ No text content found in the document." return f"# {file_name}\n\n{content}" except Exception as e: logger.error(f"Error extracting content from {file_name}: {e}") return f"# {file_name}\n\n❌ Error extracting content: {str(e)}" def _extract_pdf(self, file_path: str) -> str: """从PDF文件提取文本""" try: import PyPDF2 with open(file_path, "rb") as file: reader = PyPDF2.PdfReader(file) text_content = [] for page_num, page in enumerate(reader.pages, 1): page_text = page.extract_text() if page_text.strip(): text_content.append(f"## Page {page_num}\n\n{page_text}") return "\n\n".join(text_content) except ImportError: return "❌ PyPDF2 library not available. Please install it with: pip install PyPDF2" except Exception as e: return f"❌ Error reading PDF: {str(e)}" def _extract_docx(self, file_path: str) -> str: """从DOCX文件提取文本""" try: import docx doc = docx.Document(file_path) text_content = [] for para in doc.paragraphs: if para.text.strip(): text_content.append(para.text) return "\n\n".join(text_content) except ImportError: return "❌ python-docx library not available. Please install it with: pip install python-docx" except Exception as e: return f"❌ Error reading DOCX: {str(e)}" def _extract_txt(self, file_path: str) -> str: """从TXT文件提取文本""" try: with open(file_path, "r", encoding="utf-8") as file: return file.read() except UnicodeDecodeError: # 尝试其他编码 try: with open(file_path, "r", encoding="latin-1") as file: return file.read() except Exception as e: return f"❌ Error reading text file with encoding: {str(e)}" except Exception as e: return f"❌ Error reading text file: {str(e)}" _extractor = None def get_extractor() -> SimpleDocumentExtractor: global _extractor if _extractor is None: _extractor = SimpleDocumentExtractor() _extractor.initialize() return _extractor def extract_document(file) -> Tuple[str, str]: """处理文档提取请求""" if file is None: return "", "❌ Please upload a file" try: # 添加调试信息 logger.info(f"Received file object type: {type(file)}") logger.info(f"File object content: {file}") file_path = _extract_file_path(file) if not file_path: return "", f"❌ Invalid file object: {type(file)}" logger.info(f"Extracted file path: {file_path}") if not os.path.exists(file_path): return "", f"❌ File not found: {file_path}" content = get_extractor().extract(file_path) return content, f"✅ Extracted content from {Path(file_path).name}" except Exception as e: logger.error(f"Extraction error: {e}") return "", f"❌ Extraction failed: {str(e)}" def _extract_file_path(file) -> Optional[str]: """从file对象中提取文件路径""" try: # 处理 Gradio 文件对象 if file is None: return None # 如果是字符串路径,直接返回 if isinstance(file, str) and file.strip(): return file.strip() # 如果有 name 属性(标准文件对象) if hasattr(file, "name") and file.name: return str(file.name) # 如果是字典格式 if isinstance(file, dict): # 检查不同的可能键名 for key in ["name", "path", "filepath", "file_path"]: if key in file and file[key]: return str(file[key]) # 如果支持文件系统路径协议 if hasattr(file, "__fspath__"): return str(file) # 记录未知的文件对象类型以便调试 logger.debug(f"Unknown file object type: {type(file)}, content: {file}") return None except Exception as e: logger.error(f"Error extracting file path: {e}") return None def check_mcp_status() -> str: """检查MCP状态""" if mcp_manager.initialization_complete: return "🟢 Ready" elif mcp_manager.initialization_started: return "🟡 Initializing..." else: return "🔴 Not Started" def create_interface(): """创建Gradio界面""" with gr.Blocks( title="Document Extractor with MCP", theme=gr.themes.Soft(), css=""" .status-ready { color: green !important; } .status-init { color: orange !important; } .status-error { color: red !important; } """, ) as app: gr.Markdown("# 📄 Document Extraction Tool with MCP Support") gr.Markdown("Upload PDF or DOCX files to extract content as Markdown.") with gr.Row(): with gr.Column(scale=2): status_output = gr.Textbox( label="🔧 MCP Server Status", interactive=False, value="🟡 Initializing...", ) with gr.Column(scale=1): check_btn = gr.Button("🔄 Refresh Status", variant="secondary") check_btn.click(fn=check_mcp_status, inputs=[], outputs=status_output) gr.Markdown("---") with gr.Row(): with gr.Column(): file_input = gr.File( file_types=[".pdf", ".docx", ".txt"], label="📁 Upload Document", type="filepath", ) extract_btn = gr.Button("🚀 Extract Content", variant="primary") with gr.Column(): status_text = gr.Textbox( label="📊 Processing Status", interactive=False, placeholder="Upload a file and click Extract Content", ) content_output = gr.Textbox( label="📝 Extracted Markdown Content", lines=20, interactive=False, show_copy_button=True, placeholder="Extracted content will appear here...", ) extract_btn.click( fn=extract_document, inputs=file_input, outputs=[content_output, status_text], ) # 初始加载时更新状态 app.load(fn=check_mcp_status, inputs=[], outputs=status_output) return app async def async_main(): """异步主函数""" logger.info("🚀 Starting document extraction tool with MCP support...") try: # 初始化文档提取器 logger.info("📄 Initializing document extractor...") get_extractor() # 异步初始化MCP await mcp_manager.initialize() # 创建界面 app = create_interface() # 启动应用(适配 Hugging Face Spaces,必须加 share=True) app.launch(share=True, show_error=True) except Exception as e: logger.error(f"❌ Application startup failed: {e}") raise def main(): """主入口函数""" try: # 确保事件循环正确设置 if os.name == "nt": # Windows asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 使用同步方式运行,避免异步问题 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(async_main()) finally: loop.close() except KeyboardInterrupt: logger.info("🛑 Application stopped by user") except Exception as e: logger.error(f"❌ Application error: {e}") raise if __name__ == "__main__": main()