|
import os |
|
import re |
|
import json |
|
import gradio as gr |
|
import pandas as pd |
|
import pdfplumber |
|
import pytesseract |
|
from pdf2image import convert_from_path |
|
from huggingface_hub import InferenceClient |
|
from fpdf import FPDF |
|
import tempfile |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
client = InferenceClient(model="mistralai/Mistral-7B-Instruct-v0.2", token=hf_token) |
|
|
|
def extract_excel_data(file_path): |
|
"""Extract text from Excel file""" |
|
df = pd.read_excel(file_path, engine='openpyxl') |
|
return df.to_string(index=False) |
|
|
|
def extract_text_from_pdf(pdf_path, is_scanned=False): |
|
"""Extract text from PDF with fallback OCR""" |
|
try: |
|
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
text = "" |
|
for page in pdf.pages: |
|
|
|
tables = page.extract_tables() |
|
for table in tables: |
|
for row in table: |
|
text += " | ".join(str(cell) for cell in row) + "\n" |
|
text += "\n" |
|
|
|
|
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n\n" |
|
return text |
|
except Exception as e: |
|
print(f"Native PDF extraction failed: {str(e)}") |
|
|
|
images = convert_from_path(pdf_path, dpi=200) |
|
text = "" |
|
for image in images: |
|
text += pytesseract.image_to_string(image) + "\n" |
|
return text |
|
|
|
def parse_bank_statement(text, file_type): |
|
"""Parse bank statement using LLM with fallback to rule-based parser""" |
|
|
|
cleaned_text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) |
|
|
|
if file_type == 'pdf': |
|
|
|
cleaned_text = re.sub(r'Page \d+ of \d+', '', cleaned_text, flags=re.IGNORECASE) |
|
cleaned_text = re.sub(r'CropBox.*?MediaBox', '', cleaned_text, flags=re.IGNORECASE) |
|
|
|
|
|
transaction_lines = [] |
|
for line in cleaned_text.split('\n'): |
|
if re.match(r'^\d{4}-\d{2}-\d{2}', line): |
|
transaction_lines.append(line) |
|
elif '|' in line and any(x in line for x in ['Date', 'Amount', 'Balance']): |
|
transaction_lines.append(line) |
|
|
|
cleaned_text = "\n".join(transaction_lines) |
|
|
|
print(f"Cleaned text sample: {cleaned_text[:200]}...") |
|
|
|
|
|
rule_based_data = rule_based_parser(cleaned_text) |
|
if rule_based_data["transactions"]: |
|
print("Using rule-based parser results") |
|
return rule_based_data |
|
|
|
|
|
print("Falling back to LLM parsing") |
|
return llm_parser(cleaned_text) |
|
|
|
def llm_parser(text): |
|
"""LLM parser for unstructured text""" |
|
|
|
prompt = f""" |
|
<|system|> |
|
You are a financial data parser. Extract transactions from bank statements and return ONLY valid JSON. |
|
</s> |
|
<|user|> |
|
Extract all transactions from this bank statement with these exact fields: |
|
- date (format: YYYY-MM-DD) |
|
- description |
|
- amount (format: 0.00) |
|
- debit (format: 0.00) |
|
- credit (format: 0.00) |
|
- closing_balance (format: 0.00 or -0.00 for negative) |
|
- category |
|
Statement text: |
|
{text[:3000]} [truncated if too long] |
|
Return JSON with this exact structure: |
|
{{ |
|
"transactions": [ |
|
{{ |
|
"date": "2025-05-08", |
|
"description": "Company XYZ Payroll", |
|
"amount": "8315.40", |
|
"debit": "0.00", |
|
"credit": "8315.40", |
|
"closing_balance": "38315.40", |
|
"category": "Salary" |
|
}} |
|
] |
|
}} |
|
RULES: |
|
1. Output ONLY the JSON object with no additional text |
|
2. Keep amounts as strings with 2 decimal places |
|
3. For missing values, use empty strings |
|
4. Convert negative amounts to format "-123.45" |
|
5. Map categories to: Salary, Groceries, Medical, Utilities, Entertainment, Dining, Misc |
|
</s> |
|
<|assistant|> |
|
""" |
|
|
|
try: |
|
|
|
response = client.text_generation( |
|
prompt, |
|
max_new_tokens=2000, |
|
temperature=0.01, |
|
stop=["</s>"] |
|
) |
|
print(f"LLM Response: {response}") |
|
|
|
|
|
response = response.strip() |
|
if not response.startswith('{'): |
|
|
|
start_idx = response.find('{') |
|
end_idx = response.rfind('}') |
|
if start_idx != -1 and end_idx != -1: |
|
response = response[start_idx:end_idx+1] |
|
|
|
|
|
data = json.loads(response) |
|
if "transactions" not in data: |
|
raise ValueError("Missing 'transactions' key in JSON") |
|
|
|
return data |
|
except Exception as e: |
|
print(f"LLM Error: {str(e)}") |
|
return {"transactions": []} |
|
|
|
def rule_based_parser(text): |
|
"""Enhanced fallback parser for structured tables""" |
|
lines = [line.strip() for line in text.split('\n') if line.strip()] |
|
|
|
|
|
header_index = None |
|
header_patterns = [ |
|
r'Date\b', r'Description\b', r'Amount\b', |
|
r'Debit\b', r'Credit\b', r'Closing\s*Balance\b', r'Category\b' |
|
] |
|
|
|
|
|
for i, line in enumerate(lines): |
|
if all(re.search(pattern, line, re.IGNORECASE) for pattern in header_patterns[:3]): |
|
header_index = i |
|
break |
|
|
|
|
|
if header_index is None: |
|
for i, line in enumerate(lines): |
|
if any(re.search(pattern, line, re.IGNORECASE) for pattern in header_patterns): |
|
header_index = i |
|
break |
|
|
|
|
|
if header_index is None: |
|
for i, line in enumerate(lines): |
|
if '|' in line and any(p in line for p in ['Date', 'Amount', 'Balance']): |
|
header_index = i |
|
break |
|
|
|
if header_index is None: |
|
return {"transactions": []} |
|
|
|
data_lines = lines[header_index + 1:] |
|
transactions = [] |
|
|
|
for line in data_lines: |
|
|
|
if '|' in line: |
|
parts = [p.strip() for p in line.split('|') if p.strip()] |
|
else: |
|
|
|
parts = re.split(r'\s{2,}', line) |
|
|
|
|
|
if len(parts) < 7: |
|
continue |
|
|
|
try: |
|
|
|
if not re.match(r'\d{4}-\d{2}-\d{2}', parts[0]): |
|
continue |
|
|
|
transactions.append({ |
|
"date": parts[0], |
|
"description": parts[1], |
|
"amount": format_number(parts[2]), |
|
"debit": format_number(parts[3]), |
|
"credit": format_number(parts[4]), |
|
"closing_balance": format_number(parts[5]), |
|
"category": parts[6] |
|
}) |
|
except Exception as e: |
|
print(f"Error parsing line: {str(e)}") |
|
|
|
return {"transactions": transactions} |
|
|
|
def format_number(value): |
|
"""Format numeric values consistently""" |
|
if not value or str(value).lower() in ['nan', 'nat']: |
|
return "0.00" |
|
|
|
|
|
if isinstance(value, (int, float)): |
|
return f"{value:.2f}" |
|
|
|
|
|
value = str(value).replace(',', '').replace('$', '').strip() |
|
|
|
|
|
if '(' in value and ')' in value: |
|
value = '-' + value.replace('(', '').replace(')', '') |
|
|
|
|
|
if not value: |
|
return "0.00" |
|
|
|
|
|
if '.' not in value: |
|
value += '.00' |
|
|
|
|
|
try: |
|
num_value = float(value) |
|
return f"{num_value:.2f}" |
|
except ValueError: |
|
|
|
return value.split('.')[0] + '.' + value.split('.')[1][:2].ljust(2, '0') |
|
|
|
def process_file(file, is_scanned=False): |
|
"""Main processing function""" |
|
if not file: |
|
return empty_df() |
|
|
|
file_path = file.name |
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
try: |
|
if file_ext == '.xlsx': |
|
|
|
df = pd.read_excel(file_path, engine='openpyxl') |
|
|
|
|
|
df.columns = df.columns.str.strip().str.lower() |
|
|
|
|
|
col_mapping = { |
|
'date': 'date', |
|
'description': 'description', |
|
'amount': 'amount', |
|
'debit': 'debit', |
|
'credit': 'credit', |
|
'closing balance': 'closing_balance', |
|
'closing': 'closing_balance', |
|
'balance': 'closing_balance', |
|
'category': 'category' |
|
} |
|
|
|
|
|
output_df = pd.DataFrame() |
|
for col in ['date', 'description', 'amount', 'debit', 'credit', 'closing_balance', 'category']: |
|
if col in df.columns: |
|
output_df[col] = df[col] |
|
elif any(alias in col_mapping and col_mapping[alias] == col for alias in df.columns): |
|
|
|
for alias in df.columns: |
|
if alias in col_mapping and col_mapping[alias] == col: |
|
output_df[col] = df[alias] |
|
break |
|
else: |
|
output_df[col] = "" |
|
|
|
|
|
for col in ['amount', 'debit', 'credit', 'closing_balance']: |
|
output_df[col] = output_df[col].apply(format_number) |
|
|
|
|
|
output_df.columns = ["Date", "Description", "Amount", "Debit", |
|
"Credit", "Closing Balance", "Category"] |
|
return output_df |
|
|
|
elif file_ext == '.pdf': |
|
text = extract_text_from_pdf(file_path, is_scanned=is_scanned) |
|
parsed_data = parse_bank_statement(text, 'pdf') |
|
df = pd.DataFrame(parsed_data["transactions"]) |
|
|
|
|
|
required_cols = ["date", "description", "amount", "debit", |
|
"credit", "closing_balance", "category"] |
|
for col in required_cols: |
|
if col not in df.columns: |
|
df[col] = "" |
|
|
|
|
|
df.columns = ["Date", "Description", "Amount", "Debit", |
|
"Credit", "Closing Balance", "Category"] |
|
return df |
|
|
|
else: |
|
return empty_df() |
|
|
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
return empty_df() |
|
|
|
def empty_df(): |
|
"""Return empty DataFrame with correct columns""" |
|
return pd.DataFrame(columns=["Date", "Description", "Amount", "Debit", |
|
"Credit", "Closing Balance", "Category"]) |
|
|
|
|
|
def generate_pdf(df): |
|
"""Generate PDF from DataFrame and return file path""" |
|
if df.empty: |
|
return None |
|
|
|
|
|
pdf = FPDF() |
|
pdf.add_page() |
|
pdf.set_font("Arial", size=8) |
|
|
|
|
|
col_widths = [22, 65, 20, 15, 15, 25, 20] |
|
|
|
|
|
headers = df.columns.tolist() |
|
for i, header in enumerate(headers): |
|
pdf.cell(col_widths[i], 10, header, border=1) |
|
pdf.ln() |
|
|
|
|
|
for _, row in df.iterrows(): |
|
for i, col in enumerate(headers): |
|
|
|
value = str(row[col]) |
|
if headers[i] == "Description" and len(value) > 30: |
|
value = value[:27] + "..." |
|
pdf.cell(col_widths[i], 10, value, border=1) |
|
pdf.ln() |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) |
|
temp_file.close() |
|
pdf.output(temp_file.name) |
|
return temp_file.name |
|
|
|
|
|
with gr.Blocks() as interface: |
|
gr.Markdown("## AI Bank Statement Parser") |
|
gr.Markdown("Extract structured transaction data from PDF/Excel bank statements") |
|
|
|
|
|
file_input = gr.File(label="Upload Bank Statement (PDF/Excel)") |
|
|
|
|
|
output_df = gr.Dataframe( |
|
label="Parsed Transactions", |
|
headers=["Date", "Description", "Amount", "Debit", "Credit", "Closing Balance", "Category"], |
|
datatype=["date", "str", "number", "number", "number", "number", "str"] |
|
) |
|
|
|
|
|
state_df = gr.State(value=pd.DataFrame()) |
|
|
|
|
|
download_btn = gr.DownloadButton( |
|
"Download as PDF", |
|
visible=False, |
|
elem_classes="download-btn" |
|
) |
|
|
|
|
|
def process_and_store(file): |
|
df = process_file(file) |
|
return df, df, gr.DownloadButton(visible=not df.empty) |
|
|
|
|
|
file_input.change( |
|
process_and_store, |
|
inputs=[file_input], |
|
outputs=[output_df, state_df, download_btn] |
|
) |
|
|
|
|
|
def on_download_click(df): |
|
return generate_pdf(df) |
|
|
|
download_btn.click( |
|
on_download_click, |
|
inputs=[state_df], |
|
outputs=[download_btn] |
|
) |
|
|
|
|
|
interface.css = """ |
|
.download-btn { |
|
margin-top: 20px !important; |
|
margin-bottom: 30px !important; |
|
} |
|
""" |
|
|
|
if __name__ == "__main__": |
|
interface.launch() |