|
import logging |
|
import json |
|
import os |
|
from typing import List, Dict, Any, Optional |
|
from pathlib import Path |
|
import pickle |
|
from datetime import datetime |
|
import asyncio |
|
|
|
from core.models import Document, DocumentType |
|
import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class DocumentStoreService: |
|
def __init__(self): |
|
self.config = config.config |
|
self.store_path = Path(self.config.DOCUMENT_STORE_PATH) |
|
self.store_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self.metadata_path = self.store_path / "metadata" |
|
self.content_path = self.store_path / "content" |
|
|
|
self.metadata_path.mkdir(exist_ok=True) |
|
self.content_path.mkdir(exist_ok=True) |
|
|
|
|
|
self._cache = {} |
|
self._cache_size_limit = 100 |
|
|
|
async def store_document(self, document: Document) -> bool: |
|
"""Store a document and its metadata""" |
|
try: |
|
|
|
metadata_file = self.metadata_path / f"{document.id}.json" |
|
metadata = { |
|
"id": document.id, |
|
"filename": document.filename, |
|
"doc_type": document.doc_type.value, |
|
"file_size": document.file_size, |
|
"created_at": document.created_at.isoformat(), |
|
"metadata": document.metadata, |
|
"tags": document.tags, |
|
"summary": document.summary, |
|
"category": document.category, |
|
"language": document.language, |
|
"content_length": len(document.content) |
|
} |
|
|
|
with open(metadata_file, 'w', encoding='utf-8') as f: |
|
json.dump(metadata, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
content_file = self.content_path / f"{document.id}.txt" |
|
with open(content_file, 'w', encoding='utf-8') as f: |
|
f.write(document.content) |
|
|
|
|
|
self._add_to_cache(document.id, document) |
|
|
|
logger.info(f"Stored document {document.id} ({document.filename})") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Error storing document {document.id}: {str(e)}") |
|
return False |
|
|
|
async def get_document(self, document_id: str) -> Optional[Document]: |
|
"""Retrieve a document by ID""" |
|
try: |
|
|
|
if document_id in self._cache: |
|
return self._cache[document_id] |
|
|
|
|
|
metadata_file = self.metadata_path / f"{document_id}.json" |
|
content_file = self.content_path / f"{document_id}.txt" |
|
|
|
if not metadata_file.exists() or not content_file.exists(): |
|
return None |
|
|
|
|
|
with open(metadata_file, 'r', encoding='utf-8') as f: |
|
metadata = json.load(f) |
|
|
|
|
|
with open(content_file, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
document = Document( |
|
id=metadata["id"], |
|
filename=metadata["filename"], |
|
content=content, |
|
doc_type=DocumentType(metadata["doc_type"]), |
|
file_size=metadata["file_size"], |
|
created_at=datetime.fromisoformat(metadata["created_at"]), |
|
metadata=metadata.get("metadata", {}), |
|
tags=metadata.get("tags", []), |
|
summary=metadata.get("summary"), |
|
category=metadata.get("category"), |
|
language=metadata.get("language") |
|
) |
|
|
|
|
|
self._add_to_cache(document_id, document) |
|
|
|
return document |
|
|
|
except Exception as e: |
|
logger.error(f"Error retrieving document {document_id}: {str(e)}") |
|
return None |
|
|
|
async def list_documents(self, limit: int = 50, offset: int = 0, |
|
filters: Optional[Dict[str, Any]] = None) -> List[Document]: |
|
"""List documents with pagination and filtering""" |
|
try: |
|
documents = [] |
|
metadata_files = list(self.metadata_path.glob("*.json")) |
|
|
|
|
|
metadata_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) |
|
|
|
|
|
start_idx = offset |
|
end_idx = offset + limit |
|
|
|
for metadata_file in metadata_files[start_idx:end_idx]: |
|
try: |
|
with open(metadata_file, 'r', encoding='utf-8') as f: |
|
metadata = json.load(f) |
|
|
|
|
|
if filters and not self._apply_filters(metadata, filters): |
|
continue |
|
|
|
|
|
content_file = self.content_path / f"{metadata['id']}.txt" |
|
if content_file.exists(): |
|
with open(content_file, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
else: |
|
content = "" |
|
|
|
document = Document( |
|
id=metadata["id"], |
|
filename=metadata["filename"], |
|
content=content, |
|
doc_type=DocumentType(metadata["doc_type"]), |
|
file_size=metadata["file_size"], |
|
created_at=datetime.fromisoformat(metadata["created_at"]), |
|
metadata=metadata.get("metadata", {}), |
|
tags=metadata.get("tags", []), |
|
summary=metadata.get("summary"), |
|
category=metadata.get("category"), |
|
language=metadata.get("language") |
|
) |
|
|
|
documents.append(document) |
|
|
|
except Exception as e: |
|
logger.warning(f"Error loading document metadata from {metadata_file}: {str(e)}") |
|
continue |
|
|
|
return documents |
|
|
|
except Exception as e: |
|
logger.error(f"Error listing documents: {str(e)}") |
|
return [] |
|
|
|
def _apply_filters(self, metadata: Dict[str, Any], filters: Dict[str, Any]) -> bool: |
|
"""Apply filters to document metadata""" |
|
try: |
|
for key, value in filters.items(): |
|
if key == "doc_type": |
|
if metadata.get("doc_type") != value: |
|
return False |
|
elif key == "filename_contains": |
|
if value.lower() not in metadata.get("filename", "").lower(): |
|
return False |
|
elif key == "created_after": |
|
doc_date = datetime.fromisoformat(metadata.get("created_at", "")) |
|
if doc_date < value: |
|
return False |
|
elif key == "created_before": |
|
doc_date = datetime.fromisoformat(metadata.get("created_at", "")) |
|
if doc_date > value: |
|
return False |
|
elif key == "tags": |
|
doc_tags = set(metadata.get("tags", [])) |
|
required_tags = set(value) if isinstance(value, list) else {value} |
|
if not required_tags.intersection(doc_tags): |
|
return False |
|
elif key == "category": |
|
if metadata.get("category") != value: |
|
return False |
|
elif key == "language": |
|
if metadata.get("language") != value: |
|
return False |
|
|
|
return True |
|
except Exception as e: |
|
logger.error(f"Error applying filters: {str(e)}") |
|
return True |
|
|
|
async def update_document_metadata(self, document_id: str, updates: Dict[str, Any]) -> bool: |
|
"""Update document metadata""" |
|
try: |
|
metadata_file = self.metadata_path / f"{document_id}.json" |
|
|
|
if not metadata_file.exists(): |
|
logger.warning(f"Document {document_id} not found") |
|
return False |
|
|
|
|
|
with open(metadata_file, 'r', encoding='utf-8') as f: |
|
metadata = json.load(f) |
|
|
|
|
|
for key, value in updates.items(): |
|
if key in ["tags", "summary", "category", "language", "metadata"]: |
|
metadata[key] = value |
|
|
|
|
|
with open(metadata_file, 'w', encoding='utf-8') as f: |
|
json.dump(metadata, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
if document_id in self._cache: |
|
document = self._cache[document_id] |
|
for key, value in updates.items(): |
|
if hasattr(document, key): |
|
setattr(document, key, value) |
|
|
|
logger.info(f"Updated metadata for document {document_id}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Error updating document metadata: {str(e)}") |
|
return False |
|
|
|
async def delete_document(self, document_id: str) -> bool: |
|
"""Delete a document and its metadata""" |
|
try: |
|
metadata_file = self.metadata_path / f"{document_id}.json" |
|
content_file = self.content_path / f"{document_id}.txt" |
|
|
|
|
|
if metadata_file.exists(): |
|
metadata_file.unlink() |
|
if content_file.exists(): |
|
content_file.unlink() |
|
|
|
|
|
if document_id in self._cache: |
|
del self._cache[document_id] |
|
|
|
logger.info(f"Deleted document {document_id}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Error deleting document {document_id}: {str(e)}") |
|
return False |
|
|
|
async def search_documents(self, query: str, fields: List[str] = None) -> List[Document]: |
|
"""Simple text search across documents""" |
|
if not fields: |
|
fields = ["filename", "content", "tags", "summary"] |
|
|
|
try: |
|
matching_documents = [] |
|
query_lower = query.lower() |
|
|
|
|
|
all_documents = await self.list_documents(limit=1000) |
|
|
|
for document in all_documents: |
|
match_found = False |
|
|
|
for field in fields: |
|
field_value = getattr(document, field, "") |
|
if isinstance(field_value, list): |
|
field_value = " ".join(field_value) |
|
elif field_value is None: |
|
field_value = "" |
|
|
|
if query_lower in str(field_value).lower(): |
|
match_found = True |
|
break |
|
|
|
if match_found: |
|
matching_documents.append(document) |
|
|
|
logger.info(f"Found {len(matching_documents)} documents matching '{query}'") |
|
return matching_documents |
|
|
|
except Exception as e: |
|
logger.error(f"Error searching documents: {str(e)}") |
|
return [] |
|
|
|
def _add_to_cache(self, document_id: str, document: Document): |
|
"""Add document to cache with size limit""" |
|
try: |
|
|
|
if len(self._cache) >= self._cache_size_limit: |
|
|
|
oldest_key = next(iter(self._cache)) |
|
del self._cache[oldest_key] |
|
|
|
self._cache[document_id] = document |
|
except Exception as e: |
|
logger.error(f"Error adding to cache: {str(e)}") |
|
|
|
async def get_stats(self) -> Dict[str, Any]: |
|
"""Get statistics about the document store""" |
|
try: |
|
metadata_files = list(self.metadata_path.glob("*.json")) |
|
content_files = list(self.content_path.glob("*.txt")) |
|
|
|
|
|
total_size = 0 |
|
for file_path in metadata_files + content_files: |
|
total_size += file_path.stat().st_size |
|
|
|
|
|
type_counts = {} |
|
for metadata_file in metadata_files: |
|
try: |
|
with open(metadata_file, 'r') as f: |
|
metadata = json.load(f) |
|
doc_type = metadata.get("doc_type", "unknown") |
|
type_counts[doc_type] = type_counts.get(doc_type, 0) + 1 |
|
except: |
|
continue |
|
|
|
return { |
|
"total_documents": len(metadata_files), |
|
"total_size_bytes": total_size, |
|
"total_size_mb": round(total_size / (1024 * 1024), 2), |
|
"cache_size": len(self._cache), |
|
"document_types": type_counts, |
|
"storage_path": str(self.store_path), |
|
"metadata_files": len(metadata_files), |
|
"content_files": len(content_files) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error getting document store stats: {str(e)}") |
|
return {"error": str(e)} |