wang.lingxiao
merge
9ca24f2
#!/usr/bin/env python3
import gradio as gr
import os
import logging
import asyncio
from typing import Tuple, Optional
from pathlib import Path
# 设置日志
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# MCP 初始化状态管理
class MCPManager:
def __init__(self):
self.ready_event = asyncio.Event()
self.initialization_started = False
self.initialization_complete = False
async def initialize(self):
"""异步初始化MCP相关组件"""
if self.initialization_started:
await self.ready_event.wait()
return
self.initialization_started = True
logger.info("🔧 Starting MCP initialization...")
try:
# 模拟初始化过程,确保有足够时间
await asyncio.sleep(1.0)
# 这里可以添加实际的MCP初始化逻辑
logger.info("✅ MCP initialization complete")
self.initialization_complete = True
self.ready_event.set()
except Exception as e:
logger.error(f"❌ MCP initialization failed: {e}")
raise
# 全局MCP管理器实例
mcp_manager = MCPManager()
# 文档提取器
class SimpleDocumentExtractor:
def __init__(self):
self.initialized = False
def initialize(self):
"""同步初始化"""
if not self.initialized:
logger.info("📄 Initializing document extractor...")
self._check_dependencies()
self.initialized = True
logger.info("✅ Document extractor initialized")
def _check_dependencies(self):
"""检查必要的依赖项"""
try:
import importlib.util
if importlib.util.find_spec("PyPDF2") is not None:
logger.info("✅ PyPDF2 available for PDF processing")
else:
logger.warning("⚠️ PyPDF2 not available")
if importlib.util.find_spec("docx") is not None:
logger.info("✅ python-docx available for DOCX processing")
else:
logger.warning("⚠️ python-docx not available")
except Exception as e:
logger.warning(f"⚠️ Error checking dependencies: {e}")
def extract(self, file_path: str) -> str:
"""提取文档内容"""
file_name = Path(file_path).name
file_ext = Path(file_path).suffix.lower()
try:
if file_ext == ".pdf":
content = self._extract_pdf(file_path)
elif file_ext == ".docx":
content = self._extract_docx(file_path)
elif file_ext == ".txt":
content = self._extract_txt(file_path)
else:
return f"# {file_name}\n\n❌ Unsupported file format: {file_ext}"
if not content.strip():
return f"# {file_name}\n\n⚠️ No text content found in the document."
return f"# {file_name}\n\n{content}"
except Exception as e:
logger.error(f"Error extracting content from {file_name}: {e}")
return f"# {file_name}\n\n❌ Error extracting content: {str(e)}"
def _extract_pdf(self, file_path: str) -> str:
"""从PDF文件提取文本"""
try:
import PyPDF2
with open(file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text_content = []
for page_num, page in enumerate(reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
text_content.append(f"## Page {page_num}\n\n{page_text}")
return "\n\n".join(text_content)
except ImportError:
return "❌ PyPDF2 library not available. Please install it with: pip install PyPDF2"
except Exception as e:
return f"❌ Error reading PDF: {str(e)}"
def _extract_docx(self, file_path: str) -> str:
"""从DOCX文件提取文本"""
try:
import docx
doc = docx.Document(file_path)
text_content = []
for para in doc.paragraphs:
if para.text.strip():
text_content.append(para.text)
return "\n\n".join(text_content)
except ImportError:
return "❌ python-docx library not available. Please install it with: pip install python-docx"
except Exception as e:
return f"❌ Error reading DOCX: {str(e)}"
def _extract_txt(self, file_path: str) -> str:
"""从TXT文件提取文本"""
try:
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
except UnicodeDecodeError:
# 尝试其他编码
try:
with open(file_path, "r", encoding="latin-1") as file:
return file.read()
except Exception as e:
return f"❌ Error reading text file with encoding: {str(e)}"
except Exception as e:
return f"❌ Error reading text file: {str(e)}"
_extractor = None
def get_extractor() -> SimpleDocumentExtractor:
global _extractor
if _extractor is None:
_extractor = SimpleDocumentExtractor()
_extractor.initialize()
return _extractor
def extract_document(file) -> Tuple[str, str]:
"""处理文档提取请求"""
if file is None:
return "", "❌ Please upload a file"
try:
# 添加调试信息
logger.info(f"Received file object type: {type(file)}")
logger.info(f"File object content: {file}")
file_path = _extract_file_path(file)
if not file_path:
return "", f"❌ Invalid file object: {type(file)}"
logger.info(f"Extracted file path: {file_path}")
if not os.path.exists(file_path):
return "", f"❌ File not found: {file_path}"
content = get_extractor().extract(file_path)
return content, f"✅ Extracted content from {Path(file_path).name}"
except Exception as e:
logger.error(f"Extraction error: {e}")
return "", f"❌ Extraction failed: {str(e)}"
def _extract_file_path(file) -> Optional[str]:
"""从file对象中提取文件路径"""
try:
# 处理 Gradio 文件对象
if file is None:
return None
# 如果是字符串路径,直接返回
if isinstance(file, str) and file.strip():
return file.strip()
# 如果有 name 属性(标准文件对象)
if hasattr(file, "name") and file.name:
return str(file.name)
# 如果是字典格式
if isinstance(file, dict):
# 检查不同的可能键名
for key in ["name", "path", "filepath", "file_path"]:
if key in file and file[key]:
return str(file[key])
# 如果支持文件系统路径协议
if hasattr(file, "__fspath__"):
return str(file)
# 记录未知的文件对象类型以便调试
logger.debug(f"Unknown file object type: {type(file)}, content: {file}")
return None
except Exception as e:
logger.error(f"Error extracting file path: {e}")
return None
def check_mcp_status() -> str:
"""检查MCP状态"""
if mcp_manager.initialization_complete:
return "🟢 Ready"
elif mcp_manager.initialization_started:
return "🟡 Initializing..."
else:
return "🔴 Not Started"
def create_interface():
"""创建Gradio界面"""
with gr.Blocks(
title="Document Extractor with MCP",
theme=gr.themes.Soft(),
css="""
.status-ready { color: green !important; }
.status-init { color: orange !important; }
.status-error { color: red !important; }
""",
) as app:
gr.Markdown("# 📄 Document Extraction Tool with MCP Support")
gr.Markdown("Upload PDF or DOCX files to extract content as Markdown.")
with gr.Row():
with gr.Column(scale=2):
status_output = gr.Textbox(
label="🔧 MCP Server Status",
interactive=False,
value="🟡 Initializing...",
)
with gr.Column(scale=1):
check_btn = gr.Button("🔄 Refresh Status", variant="secondary")
check_btn.click(fn=check_mcp_status, inputs=[], outputs=status_output)
gr.Markdown("---")
with gr.Row():
with gr.Column():
file_input = gr.File(
file_types=[".pdf", ".docx", ".txt"],
label="📁 Upload Document",
type="filepath",
)
extract_btn = gr.Button("🚀 Extract Content", variant="primary")
with gr.Column():
status_text = gr.Textbox(
label="📊 Processing Status",
interactive=False,
placeholder="Upload a file and click Extract Content",
)
content_output = gr.Textbox(
label="📝 Extracted Markdown Content",
lines=20,
interactive=False,
show_copy_button=True,
placeholder="Extracted content will appear here...",
)
extract_btn.click(
fn=extract_document,
inputs=file_input,
outputs=[content_output, status_text],
)
# 初始加载时更新状态
app.load(fn=check_mcp_status, inputs=[], outputs=status_output)
return app
async def async_main():
"""异步主函数"""
logger.info("🚀 Starting document extraction tool with MCP support...")
try:
# 初始化文档提取器
logger.info("📄 Initializing document extractor...")
get_extractor()
# 异步初始化MCP
await mcp_manager.initialize()
# 创建界面
app = create_interface()
# 启动应用(适配 Hugging Face Spaces,必须加 share=True)
app.launch(share=True, show_error=True)
except Exception as e:
logger.error(f"❌ Application startup failed: {e}")
raise
def main():
"""主入口函数"""
try:
# 确保事件循环正确设置
if os.name == "nt": # Windows
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# 使用同步方式运行,避免异步问题
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(async_main())
finally:
loop.close()
except KeyboardInterrupt:
logger.info("🛑 Application stopped by user")
except Exception as e:
logger.error(f"❌ Application error: {e}")
raise
if __name__ == "__main__":
main()