File size: 3,295 Bytes
e9ce2a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import fitz
import os
import re
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class DocumentChunk:
    content: str
    metadata: Dict[str, Any]
    page_number: int
    source_file: str

class PDFProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def extract_text_from_pdf(self, pdf_path: str) -> List[DocumentChunk]:
        """Extract text from PDF and return chunks with metadata."""
        chunks = []

        try:
            doc = fitz.open(pdf_path)
            filename = os.path.basename(pdf_path)

            for page_num in range(len(doc)):
                page = doc.load_page(page_num)
                text = page.get_text()

                if text.strip():
                    cleaned_text = self._clean_text(text)
                    page_chunks = self._create_chunks(cleaned_text, page_num + 1, filename)
                    chunks.extend(page_chunks)

            doc.close()
            return chunks

        except Exception as e:
            raise Exception(f"Error processing PDF {pdf_path}: {str(e)}")

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'[^\w\s.,!?;:()\[\]{}"-]', '', text)
        text = re.sub(r'([.,!?;:]){2,}', r'\1', text)

        return text.strip()

    def _create_chunks(self, text: str, page_number: int, filename: str) -> List[DocumentChunk]:
        """Split text into overlapping chunks."""
        chunks = []
        sentences = re.split(r'(?<=[.!?])\s+', text)
        current_chunk = ""
        current_length = 0

        for sentence in sentences:
            sentence_length = len(sentence)

            if current_length + sentence_length > self.chunk_size and current_chunk:
                chunks.append(DocumentChunk(
                    content=current_chunk.strip(),
                    metadata={
                        'filename': filename,
                        'page_number': page_number,
                        'chunk_length': len(current_chunk)
                    },
                    page_number=page_number,
                    source_file=filename
                ))

                overlap_text = self._get_overlap_text(current_chunk)
                current_chunk = overlap_text + " " + sentence
                current_length = len(current_chunk)
            else:
                current_chunk += " " + sentence if current_chunk else sentence
                current_length = len(current_chunk)


        if current_chunk.strip():
            chunks.append(DocumentChunk(
                content=current_chunk.strip(),
                metadata={
                    'filename': filename,
                    'page_number': page_number,
                    'chunk_length': len(current_chunk)
                },
                page_number=page_number,
                source_file=filename
            ))

        return chunks

    def _get_overlap_text(self, text: str) -> str:
        """Get overlap text from the end of current chunk."""
        if len(text) <= self.chunk_overlap:
            return text
        return text[-self.chunk_overlap:]