Spaces:
Sleeping
Sleeping
Developer Documentation
Development Setup
Prerequisites
- Python 3.9 or higher
- Git
- Azure OpenAI account
- Azure Document Intelligence account
Local Development Environment
Clone the repository
git clone <repository-url> cd doctorecord
Create virtual environment
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
Install dependencies
pip install -r requirements.txt
Set up environment variables
cp .env.example .env # Edit .env with your Azure credentials
Run the application
streamlit run src/app.py
Project Structure
doctorecord/
βββ src/
β βββ agents/ # Agent implementations
β β βββ base_agent.py # Base agent class
β β βββ pdf_agent.py # PDF text extraction
β β βββ table_agent.py # Table processing
β β βββ field_mapper_agent.py # Field extraction
β β βββ unique_indices_combinator.py # Unique combinations
β β βββ unique_indices_loop_agent.py # Loop processing
β βββ services/ # Service layer
β β βββ llm_client.py # Azure OpenAI client
β β βββ azure_di_service.py # Document Intelligence
β β βββ cost_tracker.py # Cost tracking
β β βββ embedding_client.py # Semantic search
β βββ orchestrator/ # Orchestration layer
β β βββ planner.py # Plan generation
β β βββ executor.py # Plan execution
β βββ config/ # Configuration
β β βββ settings.py # Settings management
β βββ app.py # Streamlit application
βββ tests/ # Test files
βββ logs/ # Log files
βββ requirements.txt # Python dependencies
βββ README.md # Project documentation
Coding Standards
Python Style Guide
- Follow PEP 8 style guidelines
- Use type hints for function parameters and return values
- Maximum line length: 88 characters (Black formatter)
- Use descriptive variable and function names
Code Organization
# Standard imports
import logging
from typing import Dict, Any, Optional, List
# Third-party imports
import pandas as pd
from azure.ai.documentintelligence import DocumentIntelligenceClient
# Local imports
from .base_agent import BaseAgent
from services.llm_client import LLMClient
Logging Standards
class MyAgent(BaseAgent):
def __init__(self):
self.logger = logging.getLogger(__name__)
def execute(self, ctx: Dict[str, Any]) -> Optional[str]:
self.logger.info("Starting execution")
self.logger.debug(f"Context keys: {list(ctx.keys())}")
try:
# Implementation
self.logger.info("Execution completed successfully")
return result
except Exception as e:
self.logger.error(f"Execution failed: {str(e)}", exc_info=True)
return None
Error Handling
def safe_execution(self, operation):
try:
return operation()
except Exception as e:
self.logger.error(f"Operation failed: {str(e)}", exc_info=True)
# Return appropriate fallback or re-raise
raise
Agent Development
Creating a New Agent
Inherit from BaseAgent
from .base_agent import BaseAgent class MyNewAgent(BaseAgent): def __init__(self): super().__init__() self.logger = logging.getLogger(__name__)
Implement the execute method
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: """ Execute the agent's main functionality. Args: ctx: Context dictionary containing input data Returns: Result string or None if failed """ self.logger.info("Starting MyNewAgent execution") # Store context for use in helper methods self.ctx = ctx # Implementation here result = self._process_data(ctx) return result
Add to executor
# In src/orchestrator/executor.py from agents.my_new_agent import MyNewAgent class Executor: def __init__(self, settings, cost_tracker=None): self.tools = { # ... existing tools "MyNewAgent": MyNewAgent(), }
Agent Best Practices
Context Management
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: # Store context for helper methods self.ctx = ctx # Access context data text = ctx.get("text", "") fields = ctx.get("fields", [])
Cost Tracking Integration
def _call_llm(self, prompt: str, description: str) -> str: # Get cost tracker from context cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None result = self.llm.responses( prompt, temperature=0.0, ctx={"cost_tracker": cost_tracker} if cost_tracker else None, description=description ) return result
Error Handling
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: try: # Implementation return result except Exception as e: self.logger.error(f"Agent execution failed: {str(e)}", exc_info=True) return None
Service Development
LLM Client Usage
from services.llm_client import LLMClient
from config.settings import settings
class MyAgent(BaseAgent):
def __init__(self):
self.llm = LLMClient(settings)
def _extract_data(self, text: str) -> str:
prompt = f"Extract data from: {text}"
# Get cost tracker from context
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None
result = self.llm.responses(
prompt, temperature=0.0,
ctx={"cost_tracker": cost_tracker} if cost_tracker else None,
description="Data Extraction"
)
return result
Cost Tracking Integration
from services.cost_tracker import CostTracker
# In executor or main application
cost_tracker = CostTracker()
# Pass to agents via context
ctx = {
"cost_tracker": cost_tracker,
# ... other context data
}
# Track costs
costs = cost_tracker.calculate_current_file_costs()
print(f"Total cost: ${costs['openai']['total_cost']:.4f}")
Testing
Running Tests
# Run all tests
python -m pytest tests/
# Run specific test file
python -m pytest tests/test_cost_tracking.py
# Run with coverage
python -m pytest --cov=src tests/
Writing Tests
import pytest
from unittest.mock import Mock, patch
from src.agents.my_agent import MyAgent
def test_my_agent_execution():
"""Test MyAgent execution with mock data."""
agent = MyAgent()
# Mock context
ctx = {
"text": "Test document content",
"fields": ["field1", "field2"],
"cost_tracker": Mock()
}
# Mock LLM response
with patch.object(agent.llm, 'responses') as mock_llm:
mock_llm.return_value = '{"field1": "value1", "field2": "value2"}'
result = agent.execute(ctx)
assert result is not None
assert "field1" in result
assert "field2" in result
Test Structure
tests/
βββ test_agents/ # Agent tests
β βββ test_field_mapper_agent.py
β βββ test_unique_indices_combinator.py
βββ test_services/ # Service tests
β βββ test_llm_client.py
β βββ test_cost_tracker.py
βββ test_orchestrator/ # Orchestrator tests
β βββ test_planner.py
β βββ test_executor.py
βββ integration/ # Integration tests
βββ test_end_to_end.py
Configuration Management
Settings Structure
# src/config/settings.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Azure OpenAI
AZURE_OPENAI_ENDPOINT: str
AZURE_OPENAI_API_KEY: str
AZURE_OPENAI_DEPLOYMENT: str
AZURE_OPENAI_API_VERSION: str = "2025-03-01-preview"
# Azure Document Intelligence
AZURE_DI_ENDPOINT: str
AZURE_DI_KEY: str
# Retry Configuration
LLM_MAX_RETRIES: int = 5
LLM_BASE_DELAY: float = 1.0
LLM_MAX_DELAY: float = 60.0
class Config:
env_file = ".env"
Environment Variables
# .env file
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_DEPLOYMENT=your-deployment-name
AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/
AZURE_DI_KEY=your-di-key
Debugging
Logging Configuration
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Set specific logger levels
logging.getLogger('azure').setLevel(logging.WARNING)
logging.getLogger('openai').setLevel(logging.WARNING)
Debug Mode
# Enable debug logging
logging.getLogger().setLevel(logging.DEBUG)
# In agents
self.logger.debug(f"Processing data: {data[:200]}...")
Cost Tracking Debug
# Check cost tracker state
print(f"LLM calls: {len(cost_tracker.llm_calls)}")
print(f"Input tokens: {cost_tracker.llm_input_tokens}")
print(f"Output tokens: {cost_tracker.llm_output_tokens}")
# Get detailed costs
costs_df = cost_tracker.get_detailed_costs_table()
print(costs_df)
Performance Optimization
Memory Management
# Process large documents in chunks
def process_large_document(self, text: str, chunk_size: int = 10000):
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
results = []
for chunk in chunks:
result = self._process_chunk(chunk)
results.append(result)
return self._combine_results(results)
Caching
# Use session state for caching
if 'processed_data' not in st.session_state:
st.session_state.processed_data = {}
# Check cache before processing
if key in st.session_state.processed_data:
return st.session_state.processed_data[key]
Batch Processing
# Process multiple items efficiently
def process_batch(self, items: List[str]) -> List[str]:
results = []
for item in items:
try:
result = self._process_item(item)
results.append(result)
except Exception as e:
self.logger.error(f"Failed to process item: {str(e)}")
results.append(None)
return results
Deployment
Production Setup
Environment Configuration
# Set production environment variables export AZURE_OPENAI_ENDPOINT=... export AZURE_OPENAI_API_KEY=...
Dependencies
pip install -r requirements.txt
Run Application
streamlit run src/app.py --server.port 8501
Docker Deployment
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
COPY .env .
EXPOSE 8501
CMD ["streamlit", "run", "src/app.py", "--server.port=8501"]
Contributing
Development Workflow
- Create feature branch:
git checkout -b feature/new-feature
- Make changes following coding standards
- Add tests for new functionality
- Run tests:
python -m pytest tests/
- Update documentation
- Submit pull request
Code Review Checklist
- Code follows style guidelines
- Tests are included and passing
- Documentation is updated
- Error handling is implemented
- Cost tracking is integrated
- Logging is appropriate
Release Process
- Update version in
__init__.py
- Update CHANGELOG.md
- Create release tag
- Deploy to production
- Update documentation
Troubleshooting
Common Issues
Azure OpenAI Connection Errors
# Check configuration
print(f"Endpoint: {settings.AZURE_OPENAI_ENDPOINT}")
print(f"Deployment: {settings.AZURE_OPENAI_DEPLOYMENT}")
print(f"API Version: {settings.AZURE_OPENAI_API_VERSION}")
Cost Tracking Issues
# Verify cost tracker is passed correctly
if 'cost_tracker' not in ctx:
self.logger.warning("No cost tracker in context")
# Check if agents store context
if not hasattr(self, 'ctx'):
self.logger.warning("Agent doesn't store context")
Memory Issues
# Monitor memory usage
import psutil
process = psutil.Process()
print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB")
Debug Tools
- Log Analysis: Check logs for error patterns
- Cost Monitoring: Track API usage and costs
- Performance Profiling: Monitor execution times
- Memory Profiling: Track memory usage
API Reference
Agent Base Class
class BaseAgent:
def execute(self, ctx: Dict[str, Any]) -> Optional[str]:
"""Execute the agent's main functionality."""
raise NotImplementedError
LLM Client
class LLMClient:
def responses(self, prompt: str, **kwargs) -> str:
"""Send prompt to Azure OpenAI and return response."""
Cost Tracker
class CostTracker:
def add_llm_tokens(self, input_tokens: int, output_tokens: int, description: str):
"""Track LLM token usage and costs."""
def calculate_current_file_costs(self) -> Dict[str, Any]:
"""Calculate costs for current file processing."""
For more detailed information, refer to the inline documentation in the source code.