Nihal2000's picture
Resolved Dropdown issue And MCP Server
4a0fab5
# chunker.py
import logging
from typing import List, Dict, Any, Optional
import re
from .models import Chunk
from .text_preprocessor import TextPreprocessor
import config
logger = logging.getLogger(__name__)
class TextChunker:
def __init__(self):
self.config = config.config
self.preprocessor = TextPreprocessor()
self.chunk_size = self.config.CHUNK_SIZE
self.chunk_overlap = self.config.CHUNK_OVERLAP
def chunk_document(self, document_id: str, content: str, method: str = "recursive") -> List[Chunk]:
"""Chunk a document using the specified method"""
if not content:
return []
try:
if method == "recursive":
return self._recursive_chunk(document_id, content)
elif method == "sentence":
return self._sentence_chunk(document_id, content)
elif method == "paragraph":
return self._paragraph_chunk(document_id, content)
elif method == "fixed":
return self._fixed_chunk(document_id, content)
else:
logger.warning(f"Unknown chunking method: {method}, using recursive")
return self._recursive_chunk(document_id, content)
except Exception as e:
logger.error(f"Error chunking document: {str(e)}")
# Fallback to simple fixed chunking
return self._fixed_chunk(document_id, content)
def _recursive_chunk(self, document_id: str, content: str) -> List[Chunk]:
"""Recursively split text by different separators"""
chunks = []
# Define separators in order of preference
separators = [
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
", ", # Clauses
" " # Words
]
def split_text(text: str, separators: List[str], chunk_size: int) -> List[str]:
if len(text) <= chunk_size:
return [text] if text.strip() else []
if not separators:
# If no separators left, split by character
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
separator = separators[0]
remaining_separators = separators[1:]
splits = text.split(separator)
result = []
current_chunk = ""
for split in splits:
if len(current_chunk) + len(split) + len(separator) <= chunk_size:
if current_chunk:
current_chunk += separator + split
else:
current_chunk = split
else:
if current_chunk:
result.append(current_chunk)
if len(split) > chunk_size:
# Split is too big, need to split further
result.extend(split_text(split, remaining_separators, chunk_size))
current_chunk = ""
else:
current_chunk = split
if current_chunk:
result.append(current_chunk)
return result
text_chunks = split_text(content, separators, self.chunk_size)
# Create chunk objects with overlap
for i, chunk_text in enumerate(text_chunks):
if not chunk_text.strip():
continue
# Calculate positions
start_pos = content.find(chunk_text)
if start_pos == -1:
start_pos = i * self.chunk_size
end_pos = start_pos + len(chunk_text)
# Add overlap from previous chunk if not the first chunk
if i > 0 and self.chunk_overlap > 0:
prev_chunk = text_chunks[i-1]
overlap_text = prev_chunk[-self.chunk_overlap:] if len(prev_chunk) > self.chunk_overlap else prev_chunk
chunk_text = overlap_text + " " + chunk_text
chunk = Chunk(
id=self._generate_chunk_id(document_id, i),
document_id=document_id,
content=chunk_text.strip(),
chunk_index=i,
start_pos=start_pos,
end_pos=end_pos,
metadata={
"chunk_method": "recursive",
"original_length": len(chunk_text),
"word_count": len(chunk_text.split())
}
)
chunks.append(chunk)
return chunks
def _sentence_chunk(self, document_id: str, content: str) -> List[Chunk]:
"""Chunk text by sentences"""
chunks = []
sentences = self.preprocessor.extract_sentences(content)
current_chunk = ""
chunk_index = 0
start_pos = 0
for sentence in sentences:
if len(current_chunk) + len(sentence) <= self.chunk_size:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
start_pos = content.find(sentence)
else:
if current_chunk:
chunk = Chunk(
id=self._generate_chunk_id(document_id, chunk_index),
document_id=document_id,
content=current_chunk.strip(),
chunk_index=chunk_index,
start_pos=start_pos,
end_pos=start_pos + len(current_chunk),
metadata={
"chunk_method": "sentence",
"sentence_count": len(self.preprocessor.extract_sentences(current_chunk))
}
)
chunks.append(chunk)
chunk_index += 1
current_chunk = sentence
start_pos = content.find(sentence)
# Add final chunk
if current_chunk:
chunk = Chunk(
id=self._generate_chunk_id(document_id, chunk_index),
document_id=document_id,
content=current_chunk.strip(),
chunk_index=chunk_index,
start_pos=start_pos,
end_pos=start_pos + len(current_chunk),
metadata={
"chunk_method": "sentence",
"sentence_count": len(self.preprocessor.extract_sentences(current_chunk))
}
)
chunks.append(chunk)
return chunks
def _paragraph_chunk(self, document_id: str, content: str) -> List[Chunk]:
"""Chunk text by paragraphs"""
chunks = []
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
current_chunk = ""
chunk_index = 0
start_pos = 0
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) <= self.chunk_size:
if current_chunk:
current_chunk += "\n\n" + paragraph
else:
current_chunk = paragraph
start_pos = content.find(paragraph)
else:
if current_chunk:
chunk = Chunk(
id=self._generate_chunk_id(document_id, chunk_index),
document_id=document_id,
content=current_chunk.strip(),
chunk_index=chunk_index,
start_pos=start_pos,
end_pos=start_pos + len(current_chunk),
metadata={
"chunk_method": "paragraph",
"paragraph_count": len([p for p in current_chunk.split('\n\n') if p.strip()])
}
)
chunks.append(chunk)
chunk_index += 1
# If paragraph is too long, split it further
if len(paragraph) > self.chunk_size:
para_chunks = self._fixed_chunk(document_id, paragraph)
for pc in para_chunks:
pc.chunk_index = chunk_index
pc.id = self._generate_chunk_id(document_id, chunk_index)
chunks.append(pc)
chunk_index += 1
else:
current_chunk = paragraph
start_pos = content.find(paragraph)
# Add final chunk
if current_chunk:
chunk = Chunk(
id=self._generate_chunk_id(document_id, chunk_index),
document_id=document_id,
content=current_chunk.strip(),
chunk_index=chunk_index,
start_pos=start_pos,
end_pos=start_pos + len(current_chunk),
metadata={
"chunk_method": "paragraph",
"paragraph_count": len([p for p in current_chunk.split('\n\n') if p.strip()])
}
)
chunks.append(chunk)
return chunks
def _fixed_chunk(self, document_id: str, content: str) -> List[Chunk]:
"""Simple fixed-size chunking with overlap"""
chunks = []
for i in range(0, len(content), self.chunk_size - self.chunk_overlap):
chunk_text = content[i:i + self.chunk_size]
if not chunk_text.strip():
continue
chunk = Chunk(
id=self._generate_chunk_id(document_id, len(chunks)),
document_id=document_id,
content=chunk_text.strip(),
chunk_index=len(chunks),
start_pos=i,
end_pos=min(i + self.chunk_size, len(content)),
metadata={
"chunk_method": "fixed",
"original_length": len(chunk_text)
}
)
chunks.append(chunk)
return chunks
def _generate_chunk_id(self, document_id: str, chunk_index: int) -> str:
"""Generate a unique chunk ID"""
return f"{document_id}_chunk_{chunk_index}"
def optimize_chunks_for_embedding(self, chunks: List[Chunk]) -> List[Chunk]:
"""Optimize chunks for better embedding generation"""
optimized_chunks = []
for chunk in chunks:
# Clean the content for embedding
clean_content = self.preprocessor.prepare_for_embedding(chunk.content)
# Skip very short chunks
if len(clean_content.split()) < 5:
continue
# Update chunk with optimized content
optimized_chunk = Chunk(
id=chunk.id,
document_id=chunk.document_id,
content=clean_content,
chunk_index=chunk.chunk_index,
start_pos=chunk.start_pos,
end_pos=chunk.end_pos,
metadata={
**chunk.metadata,
"optimized_for_embedding": True,
"original_content_length": len(chunk.content),
"optimized_content_length": len(clean_content)
}
)
optimized_chunks.append(optimized_chunk)
return optimized_chunks