Spaces:
Sleeping
Sleeping
""" | |
Markdown response parser implementation. | |
This module provides a parser that extracts structured information | |
from markdown-formatted LLM responses, including citations and formatting. | |
Architecture Notes: | |
- Direct implementation (no adapter needed) | |
- Pure text parsing algorithms | |
- Handles various markdown conventions | |
- Robust citation extraction | |
""" | |
import re | |
import logging | |
from typing import List, Dict, Any, Optional, Tuple | |
from dataclasses import dataclass | |
from ..base import ResponseParser, Citation, Document, ParsingError, ConfigurableComponent | |
logger = logging.getLogger(__name__) | |
class MarkdownParser(ResponseParser, ConfigurableComponent): | |
""" | |
Parser for markdown-formatted responses. | |
Features: | |
- Extract main answer text | |
- Parse inline citations [1], [Document 1], etc. | |
- Handle footnote-style citations | |
- Preserve formatting (headers, lists, code blocks) | |
- Extract confidence statements | |
Configuration: | |
- extract_citations: Whether to extract citations (default: True) | |
- citation_patterns: Regex patterns for citations (customizable) | |
- preserve_formatting: Keep markdown formatting (default: True) | |
- extract_sections: Parse into sections by headers (default: False) | |
""" | |
# Default citation patterns | |
DEFAULT_CITATION_PATTERNS = [ | |
r'\[(\d+)\]', # [1], [2], etc. | |
r'\[Document\s+(\d+)\]', # [Document 1], [Document 2] | |
r'\[Document\s+(\d+),\s*Page\s+\d+\]', # [Document 1, Page 1], [Document 2, Page 15] | |
r'\[Doc\s+(\d+)\]', # [Doc 1], [Doc 2] | |
r'\[\^(\d+)\]', # Footnote style [^1] | |
r'¹²³⁴⁵⁶⁷⁸⁹⁰', # Unicode superscripts | |
] | |
def __init__(self, | |
extract_citations: bool = True, | |
preserve_formatting: bool = True, | |
extract_sections: bool = False, | |
citation_patterns: Optional[List[str]] = None, | |
config: Optional[Dict[str, Any]] = None): | |
""" | |
Initialize markdown parser. | |
Args: | |
extract_citations: Whether to extract citations | |
preserve_formatting: Keep markdown formatting | |
extract_sections: Parse into sections by headers | |
citation_patterns: Custom citation regex patterns | |
config: Additional configuration | |
""" | |
# Merge config | |
parser_config = { | |
'extract_citations': extract_citations, | |
'preserve_formatting': preserve_formatting, | |
'extract_sections': extract_sections, | |
'citation_patterns': citation_patterns or self.DEFAULT_CITATION_PATTERNS, | |
**(config or {}) | |
} | |
super().__init__(parser_config) | |
self.extract_citations_enabled = parser_config['extract_citations'] | |
self.preserve_formatting = parser_config['preserve_formatting'] | |
self.extract_sections = parser_config['extract_sections'] | |
# Compile citation patterns | |
self.citation_patterns = [ | |
re.compile(pattern) for pattern in parser_config['citation_patterns'] | |
] | |
def parse(self, raw_response: str) -> Dict[str, Any]: | |
""" | |
Parse the raw LLM response into structured format. | |
Args: | |
raw_response: Raw text from LLM | |
Returns: | |
Structured dictionary with parsed content | |
Raises: | |
ParsingError: If parsing fails | |
""" | |
if not raw_response: | |
raise ParsingError("Empty response to parse") | |
try: | |
# Clean response | |
cleaned = self._clean_response(raw_response) | |
# Extract main components | |
result = { | |
'answer': cleaned, | |
'raw_response': raw_response, | |
'format': 'markdown', | |
'metadata': {} | |
} | |
# Extract sections if requested | |
if self.extract_sections: | |
sections = self._extract_sections(cleaned) | |
result['sections'] = sections | |
result['answer'] = self._merge_sections(sections) | |
# Extract confidence if present | |
confidence = self._extract_confidence(cleaned) | |
if confidence is not None: | |
result['confidence'] = confidence | |
# Extract any metadata | |
metadata = self._extract_metadata(cleaned) | |
result['metadata'].update(metadata) | |
return result | |
except Exception as e: | |
logger.error(f"Failed to parse response: {str(e)}") | |
raise ParsingError(f"Markdown parsing failed: {str(e)}") | |
def extract_citations(self, response: Dict[str, Any], context: List[Document]) -> List[Citation]: | |
""" | |
Extract citations from the parsed response. | |
Args: | |
response: Parsed response dictionary | |
context: Original context documents | |
Returns: | |
List of extracted citations | |
""" | |
if not self.extract_citations_enabled: | |
return [] | |
answer_text = response.get('answer', '') | |
citations = [] | |
# Find all citation markers in the text | |
for pattern in self.citation_patterns: | |
for match in pattern.finditer(answer_text): | |
citation_marker = match.group(0) | |
citation_id = match.group(1) if match.groups() else match.group(0) | |
# Try to resolve to document | |
doc_index = self._resolve_citation_index(citation_id) | |
if doc_index is not None and 0 <= doc_index < len(context): | |
# Create citation object | |
citation = Citation( | |
source_id=f"doc_{doc_index}", | |
text=citation_marker, | |
start_pos=match.start(), | |
end_pos=match.end(), | |
confidence=0.9 # High confidence for explicit citations | |
) | |
citations.append(citation) | |
# Remove duplicates while preserving order | |
seen = set() | |
unique_citations = [] | |
for citation in citations: | |
key = (citation.source_id, citation.text) | |
if key not in seen: | |
seen.add(key) | |
unique_citations.append(citation) | |
logger.debug(f"Extracted {len(unique_citations)} unique citations") | |
return unique_citations | |
def get_parser_info(self) -> Dict[str, Any]: | |
"""Get information about the parser.""" | |
return { | |
'type': 'markdown', | |
'parser_class': self.__class__.__name__, | |
'extract_citations': self.extract_citations_enabled, | |
'preserve_formatting': self.preserve_formatting, | |
'extract_sections': self.extract_sections, | |
'citation_patterns': len(self.citation_patterns), | |
'capabilities': { | |
'handles_markdown': True, | |
'extracts_structure': self.extract_sections, | |
'preserves_formatting': self.preserve_formatting | |
} | |
} | |
def _clean_response(self, response: str) -> str: | |
""" | |
Clean the response while preserving formatting. | |
Args: | |
response: Raw response text | |
Returns: | |
Cleaned response | |
""" | |
# Remove leading/trailing whitespace | |
cleaned = response.strip() | |
# Remove any markdown artifacts if not preserving | |
if not self.preserve_formatting: | |
# Remove code blocks | |
cleaned = re.sub(r'```[\s\S]*?```', '', cleaned) | |
# Remove inline code | |
cleaned = re.sub(r'`[^`]+`', lambda m: m.group(0)[1:-1], cleaned) | |
# Remove emphasis | |
cleaned = re.sub(r'\*\*([^*]+)\*\*', r'\1', cleaned) | |
cleaned = re.sub(r'\*([^*]+)\*', r'\1', cleaned) | |
cleaned = re.sub(r'__([^_]+)__', r'\1', cleaned) | |
cleaned = re.sub(r'_([^_]+)_', r'\1', cleaned) | |
return cleaned | |
def _extract_sections(self, text: str) -> Dict[str, str]: | |
""" | |
Extract sections based on markdown headers. | |
Args: | |
text: Markdown text | |
Returns: | |
Dictionary of section_name -> content | |
""" | |
sections = {} | |
current_section = "main" | |
current_content = [] | |
lines = text.split('\n') | |
for line in lines: | |
# Check for headers | |
header_match = re.match(r'^#+\s+(.+)$', line) | |
if header_match: | |
# Save previous section | |
if current_content: | |
sections[current_section] = '\n'.join(current_content).strip() | |
# Start new section | |
current_section = header_match.group(1).strip() | |
current_content = [] | |
else: | |
current_content.append(line) | |
# Save last section | |
if current_content: | |
sections[current_section] = '\n'.join(current_content).strip() | |
return sections | |
def _merge_sections(self, sections: Dict[str, str]) -> str: | |
""" | |
Merge sections back into a single answer. | |
Args: | |
sections: Dictionary of sections | |
Returns: | |
Merged text | |
""" | |
# Prioritize certain sections | |
priority_sections = ['answer', 'response', 'main', 'summary'] | |
merged = [] | |
# Add priority sections first | |
for section_name in priority_sections: | |
if section_name in sections and sections[section_name]: | |
merged.append(sections[section_name]) | |
# Add remaining sections | |
for section_name, content in sections.items(): | |
if section_name not in priority_sections and content: | |
merged.append(content) | |
return '\n\n'.join(merged) | |
def _extract_confidence(self, text: str) -> Optional[float]: | |
""" | |
Extract confidence score if mentioned in text. | |
Args: | |
text: Response text | |
Returns: | |
Confidence score or None | |
""" | |
# Look for confidence patterns | |
confidence_patterns = [ | |
r'confidence:?\s*(\d+(?:\.\d+)?)\s*%', | |
r'confidence:?\s*(\d+(?:\.\d+)?)', | |
r'(\d+(?:\.\d+)?)\s*%\s*confident', | |
] | |
for pattern in confidence_patterns: | |
match = re.search(pattern, text, re.IGNORECASE) | |
if match: | |
try: | |
value = float(match.group(1)) | |
# Normalize to 0-1 range | |
if value > 1: | |
value = value / 100 | |
return min(max(value, 0.0), 1.0) | |
except ValueError: | |
continue | |
return None | |
def _extract_metadata(self, text: str) -> Dict[str, Any]: | |
""" | |
Extract any metadata from the response. | |
Args: | |
text: Response text | |
Returns: | |
Metadata dictionary | |
""" | |
metadata = {} | |
# Extract word count | |
words = text.split() | |
metadata['word_count'] = len(words) | |
# Check for specific markers | |
if re.search(r'uncertain|not sure|unclear', text, re.IGNORECASE): | |
metadata['uncertainty_detected'] = True | |
if re.search(r'no information|not found|not available', text, re.IGNORECASE): | |
metadata['no_answer_detected'] = True | |
# Count citations | |
citation_count = 0 | |
for pattern in self.citation_patterns: | |
citation_count += len(pattern.findall(text)) | |
metadata['citation_count'] = citation_count | |
return metadata | |
def _resolve_citation_index(self, citation_id: str) -> Optional[int]: | |
""" | |
Resolve citation ID to document index. | |
Args: | |
citation_id: Citation identifier (e.g., "1", "2") | |
Returns: | |
Zero-based document index or None | |
""" | |
try: | |
# Try to parse as integer | |
index = int(citation_id) - 1 # Convert to 0-based | |
return index | |
except ValueError: | |
# Handle special cases | |
if citation_id.lower() in ['a', 'b', 'c', 'd', 'e']: | |
return ord(citation_id.lower()) - ord('a') | |
return None |