import logging import os from langchain_core.tools import StructuredTool from pydantic import BaseModel, Field from typing import Optional logger = logging.getLogger(__name__) class DocumentRetrieverInput(BaseModel): task_id: str = Field(description="Task identifier") query: str = Field(description="Search query") file_path: Optional[str] = Field(description="Path to document file", default=None) async def document_retriever_func(task_id: str, query: str, file_path: Optional[str] = None) -> str: """ Retrieve content from documents for a given task and query. Args: task_id (str): Task identifier. query (str): Search query. file_path (Optional[str]): Path to document file. Returns: str: Retrieved document content or error message. """ try: if file_path and os.path.exists(file_path): logger.info(f"Retrieving document from {file_path} for task {task_id}") if file_path.endswith('.pdf'): import PyPDF2 with open(file_path, 'rb') as f: reader = PyPDF2.PdfReader(f) text = "".join(page.extract_text() or "" for page in reader.pages) return text[:500] if text else "No text extracted" return "Unsupported file format" logger.warning(f"No valid documents found for task {task_id}") return "Document not found" except Exception as e: logger.error(f"Error retrieving document for task {task_id}: {e}") return f"Error: {str(e)}" document_retriever_tool = StructuredTool.from_function( func=document_retriever_func, name="document_retriever_tool", args_schema=DocumentRetrieverInput, coroutine=document_retriever_func )