Spaces:
Sleeping
Sleeping
# Developer Documentation | |
## Development Setup | |
### Prerequisites | |
- Python 3.9 or higher | |
- Git | |
- Azure OpenAI account | |
- Azure Document Intelligence account | |
### Local Development Environment | |
1. **Clone the repository** | |
```bash | |
git clone <repository-url> | |
cd doctorecord | |
``` | |
2. **Create virtual environment** | |
```bash | |
python -m venv venv | |
source venv/bin/activate # On Windows: venv\Scripts\activate | |
``` | |
3. **Install dependencies** | |
```bash | |
pip install -r requirements.txt | |
``` | |
4. **Set up environment variables** | |
```bash | |
cp .env.example .env | |
# Edit .env with your Azure credentials | |
``` | |
5. **Run the application** | |
```bash | |
streamlit run src/app.py | |
``` | |
## Project Structure | |
``` | |
doctorecord/ | |
βββ src/ | |
β βββ agents/ # Agent implementations | |
β β βββ base_agent.py # Base agent class | |
β β βββ pdf_agent.py # PDF text extraction | |
β β βββ table_agent.py # Table processing | |
β β βββ field_mapper_agent.py # Field extraction | |
β β βββ unique_indices_combinator.py # Unique combinations | |
β β βββ unique_indices_loop_agent.py # Loop processing | |
β βββ services/ # Service layer | |
β β βββ llm_client.py # Azure OpenAI client | |
β β βββ azure_di_service.py # Document Intelligence | |
β β βββ cost_tracker.py # Cost tracking | |
β β βββ embedding_client.py # Semantic search | |
β βββ orchestrator/ # Orchestration layer | |
β β βββ planner.py # Plan generation | |
β β βββ executor.py # Plan execution | |
β βββ config/ # Configuration | |
β β βββ settings.py # Settings management | |
β βββ app.py # Streamlit application | |
βββ tests/ # Test files | |
βββ logs/ # Log files | |
βββ requirements.txt # Python dependencies | |
βββ README.md # Project documentation | |
``` | |
## Coding Standards | |
### Python Style Guide | |
- Follow PEP 8 style guidelines | |
- Use type hints for function parameters and return values | |
- Maximum line length: 88 characters (Black formatter) | |
- Use descriptive variable and function names | |
### Code Organization | |
```python | |
# Standard imports | |
import logging | |
from typing import Dict, Any, Optional, List | |
# Third-party imports | |
import pandas as pd | |
from azure.ai.documentintelligence import DocumentIntelligenceClient | |
# Local imports | |
from .base_agent import BaseAgent | |
from services.llm_client import LLMClient | |
``` | |
### Logging Standards | |
```python | |
class MyAgent(BaseAgent): | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: | |
self.logger.info("Starting execution") | |
self.logger.debug(f"Context keys: {list(ctx.keys())}") | |
try: | |
# Implementation | |
self.logger.info("Execution completed successfully") | |
return result | |
except Exception as e: | |
self.logger.error(f"Execution failed: {str(e)}", exc_info=True) | |
return None | |
``` | |
### Error Handling | |
```python | |
def safe_execution(self, operation): | |
try: | |
return operation() | |
except Exception as e: | |
self.logger.error(f"Operation failed: {str(e)}", exc_info=True) | |
# Return appropriate fallback or re-raise | |
raise | |
``` | |
## Agent Development | |
### Creating a New Agent | |
1. **Inherit from BaseAgent** | |
```python | |
from .base_agent import BaseAgent | |
class MyNewAgent(BaseAgent): | |
def __init__(self): | |
super().__init__() | |
self.logger = logging.getLogger(__name__) | |
``` | |
2. **Implement the execute method** | |
```python | |
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: | |
""" | |
Execute the agent's main functionality. | |
Args: | |
ctx: Context dictionary containing input data | |
Returns: | |
Result string or None if failed | |
""" | |
self.logger.info("Starting MyNewAgent execution") | |
# Store context for use in helper methods | |
self.ctx = ctx | |
# Implementation here | |
result = self._process_data(ctx) | |
return result | |
``` | |
3. **Add to executor** | |
```python | |
# In src/orchestrator/executor.py | |
from agents.my_new_agent import MyNewAgent | |
class Executor: | |
def __init__(self, settings, cost_tracker=None): | |
self.tools = { | |
# ... existing tools | |
"MyNewAgent": MyNewAgent(), | |
} | |
``` | |
### Agent Best Practices | |
1. **Context Management** | |
```python | |
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: | |
# Store context for helper methods | |
self.ctx = ctx | |
# Access context data | |
text = ctx.get("text", "") | |
fields = ctx.get("fields", []) | |
``` | |
2. **Cost Tracking Integration** | |
```python | |
def _call_llm(self, prompt: str, description: str) -> str: | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
result = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description=description | |
) | |
return result | |
``` | |
3. **Error Handling** | |
```python | |
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: | |
try: | |
# Implementation | |
return result | |
except Exception as e: | |
self.logger.error(f"Agent execution failed: {str(e)}", exc_info=True) | |
return None | |
``` | |
## Service Development | |
### LLM Client Usage | |
```python | |
from services.llm_client import LLMClient | |
from config.settings import settings | |
class MyAgent(BaseAgent): | |
def __init__(self): | |
self.llm = LLMClient(settings) | |
def _extract_data(self, text: str) -> str: | |
prompt = f"Extract data from: {text}" | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
result = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description="Data Extraction" | |
) | |
return result | |
``` | |
### Cost Tracking Integration | |
```python | |
from services.cost_tracker import CostTracker | |
# In executor or main application | |
cost_tracker = CostTracker() | |
# Pass to agents via context | |
ctx = { | |
"cost_tracker": cost_tracker, | |
# ... other context data | |
} | |
# Track costs | |
costs = cost_tracker.calculate_current_file_costs() | |
print(f"Total cost: ${costs['openai']['total_cost']:.4f}") | |
``` | |
## Testing | |
### Running Tests | |
```bash | |
# Run all tests | |
python -m pytest tests/ | |
# Run specific test file | |
python -m pytest tests/test_cost_tracking.py | |
# Run with coverage | |
python -m pytest --cov=src tests/ | |
``` | |
### Writing Tests | |
```python | |
import pytest | |
from unittest.mock import Mock, patch | |
from src.agents.my_agent import MyAgent | |
def test_my_agent_execution(): | |
"""Test MyAgent execution with mock data.""" | |
agent = MyAgent() | |
# Mock context | |
ctx = { | |
"text": "Test document content", | |
"fields": ["field1", "field2"], | |
"cost_tracker": Mock() | |
} | |
# Mock LLM response | |
with patch.object(agent.llm, 'responses') as mock_llm: | |
mock_llm.return_value = '{"field1": "value1", "field2": "value2"}' | |
result = agent.execute(ctx) | |
assert result is not None | |
assert "field1" in result | |
assert "field2" in result | |
``` | |
### Test Structure | |
``` | |
tests/ | |
βββ test_agents/ # Agent tests | |
β βββ test_field_mapper_agent.py | |
β βββ test_unique_indices_combinator.py | |
βββ test_services/ # Service tests | |
β βββ test_llm_client.py | |
β βββ test_cost_tracker.py | |
βββ test_orchestrator/ # Orchestrator tests | |
β βββ test_planner.py | |
β βββ test_executor.py | |
βββ integration/ # Integration tests | |
βββ test_end_to_end.py | |
``` | |
## Configuration Management | |
### Settings Structure | |
```python | |
# src/config/settings.py | |
from pydantic_settings import BaseSettings | |
class Settings(BaseSettings): | |
# Azure OpenAI | |
AZURE_OPENAI_ENDPOINT: str | |
AZURE_OPENAI_API_KEY: str | |
AZURE_OPENAI_DEPLOYMENT: str | |
AZURE_OPENAI_API_VERSION: str = "2025-03-01-preview" | |
# Azure Document Intelligence | |
AZURE_DI_ENDPOINT: str | |
AZURE_DI_KEY: str | |
# Retry Configuration | |
LLM_MAX_RETRIES: int = 5 | |
LLM_BASE_DELAY: float = 1.0 | |
LLM_MAX_DELAY: float = 60.0 | |
class Config: | |
env_file = ".env" | |
``` | |
### Environment Variables | |
```bash | |
# .env file | |
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ | |
AZURE_OPENAI_API_KEY=your-api-key | |
AZURE_OPENAI_DEPLOYMENT=your-deployment-name | |
AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ | |
AZURE_DI_KEY=your-di-key | |
``` | |
## Debugging | |
### Logging Configuration | |
```python | |
import logging | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
# Set specific logger levels | |
logging.getLogger('azure').setLevel(logging.WARNING) | |
logging.getLogger('openai').setLevel(logging.WARNING) | |
``` | |
### Debug Mode | |
```python | |
# Enable debug logging | |
logging.getLogger().setLevel(logging.DEBUG) | |
# In agents | |
self.logger.debug(f"Processing data: {data[:200]}...") | |
``` | |
### Cost Tracking Debug | |
```python | |
# Check cost tracker state | |
print(f"LLM calls: {len(cost_tracker.llm_calls)}") | |
print(f"Input tokens: {cost_tracker.llm_input_tokens}") | |
print(f"Output tokens: {cost_tracker.llm_output_tokens}") | |
# Get detailed costs | |
costs_df = cost_tracker.get_detailed_costs_table() | |
print(costs_df) | |
``` | |
## Performance Optimization | |
### Memory Management | |
```python | |
# Process large documents in chunks | |
def process_large_document(self, text: str, chunk_size: int = 10000): | |
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
results = [] | |
for chunk in chunks: | |
result = self._process_chunk(chunk) | |
results.append(result) | |
return self._combine_results(results) | |
``` | |
### Caching | |
```python | |
# Use session state for caching | |
if 'processed_data' not in st.session_state: | |
st.session_state.processed_data = {} | |
# Check cache before processing | |
if key in st.session_state.processed_data: | |
return st.session_state.processed_data[key] | |
``` | |
### Batch Processing | |
```python | |
# Process multiple items efficiently | |
def process_batch(self, items: List[str]) -> List[str]: | |
results = [] | |
for item in items: | |
try: | |
result = self._process_item(item) | |
results.append(result) | |
except Exception as e: | |
self.logger.error(f"Failed to process item: {str(e)}") | |
results.append(None) | |
return results | |
``` | |
## Deployment | |
### Production Setup | |
1. **Environment Configuration** | |
```bash | |
# Set production environment variables | |
export AZURE_OPENAI_ENDPOINT=... | |
export AZURE_OPENAI_API_KEY=... | |
``` | |
2. **Dependencies** | |
```bash | |
pip install -r requirements.txt | |
``` | |
3. **Run Application** | |
```bash | |
streamlit run src/app.py --server.port 8501 | |
``` | |
### Docker Deployment | |
```dockerfile | |
FROM python:3.9-slim | |
WORKDIR /app | |
COPY requirements.txt . | |
RUN pip install -r requirements.txt | |
COPY src/ ./src/ | |
COPY .env . | |
EXPOSE 8501 | |
CMD ["streamlit", "run", "src/app.py", "--server.port=8501"] | |
``` | |
## Contributing | |
### Development Workflow | |
1. Create feature branch: `git checkout -b feature/new-feature` | |
2. Make changes following coding standards | |
3. Add tests for new functionality | |
4. Run tests: `python -m pytest tests/` | |
5. Update documentation | |
6. Submit pull request | |
### Code Review Checklist | |
- [ ] Code follows style guidelines | |
- [ ] Tests are included and passing | |
- [ ] Documentation is updated | |
- [ ] Error handling is implemented | |
- [ ] Cost tracking is integrated | |
- [ ] Logging is appropriate | |
### Release Process | |
1. Update version in `__init__.py` | |
2. Update CHANGELOG.md | |
3. Create release tag | |
4. Deploy to production | |
5. Update documentation | |
## Troubleshooting | |
### Common Issues | |
**Azure OpenAI Connection Errors** | |
```python | |
# Check configuration | |
print(f"Endpoint: {settings.AZURE_OPENAI_ENDPOINT}") | |
print(f"Deployment: {settings.AZURE_OPENAI_DEPLOYMENT}") | |
print(f"API Version: {settings.AZURE_OPENAI_API_VERSION}") | |
``` | |
**Cost Tracking Issues** | |
```python | |
# Verify cost tracker is passed correctly | |
if 'cost_tracker' not in ctx: | |
self.logger.warning("No cost tracker in context") | |
# Check if agents store context | |
if not hasattr(self, 'ctx'): | |
self.logger.warning("Agent doesn't store context") | |
``` | |
**Memory Issues** | |
```python | |
# Monitor memory usage | |
import psutil | |
process = psutil.Process() | |
print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB") | |
``` | |
### Debug Tools | |
- **Log Analysis**: Check logs for error patterns | |
- **Cost Monitoring**: Track API usage and costs | |
- **Performance Profiling**: Monitor execution times | |
- **Memory Profiling**: Track memory usage | |
## API Reference | |
### Agent Base Class | |
```python | |
class BaseAgent: | |
def execute(self, ctx: Dict[str, Any]) -> Optional[str]: | |
"""Execute the agent's main functionality.""" | |
raise NotImplementedError | |
``` | |
### LLM Client | |
```python | |
class LLMClient: | |
def responses(self, prompt: str, **kwargs) -> str: | |
"""Send prompt to Azure OpenAI and return response.""" | |
``` | |
### Cost Tracker | |
```python | |
class CostTracker: | |
def add_llm_tokens(self, input_tokens: int, output_tokens: int, description: str): | |
"""Track LLM token usage and costs.""" | |
def calculate_current_file_costs(self) -> Dict[str, Any]: | |
"""Calculate costs for current file processing.""" | |
``` | |
For more detailed information, refer to the inline documentation in the source code. |