newtestingdanish / OCR.optimized.py
aghaai's picture
Fresh commit of all updated files
459923e
import io
import os
import json
import tempfile
from google.cloud import vision
from google.oauth2 import service_account
from PIL import Image
import base64
import re
import logging
from pdf2image import convert_from_path
from OCRAccuracyAnalyzer import OCRAccuracyAnalyzer
import cv2
import numpy as np
import pytesseract
import shutil
from typing import Tuple, Dict, Any
from datetime import datetime
import platform
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Handle Google Cloud credentials - support both environment variables and file
def get_google_credentials():
"""Get Google Cloud credentials from environment variable or file."""
# First, try to get credentials from environment variable (for Heroku)
credentials_json = os.environ.get('GOOGLE_CLOUD_CREDENTIALS')
if credentials_json:
try:
import json
credentials_info = json.loads(credentials_json)
return service_account.Credentials.from_service_account_info(credentials_info)
except Exception as e:
logger.warning(f"Failed to parse credentials from environment: {e}")
# Fall back to file-based credentials (for local development)
credentials_path = os.path.join(os.path.dirname(__file__), "css-edge-e347b0ed2b9e.json")
if os.path.exists(credentials_path):
return service_account.Credentials.from_service_account_file(credentials_path)
# If neither is available, raise an error
raise FileNotFoundError(
"Google Cloud credentials not found. "
"Please set GOOGLE_CLOUD_CREDENTIALS environment variable or "
f"place credentials file at: {credentials_path}"
)
class OCR:
def __init__(self):
"""Initialize the OCR class with Google Cloud Vision credentials."""
try:
# Get credentials using the helper function
credentials = get_google_credentials()
self.client = vision.ImageAnnotatorClient(credentials=credentials)
self.accuracy_analyzer = OCRAccuracyAnalyzer()
logger.info("Successfully initialized Google Cloud Vision client")
except Exception as e:
logger.error(f"Failed to initialize Google Cloud Vision client: {str(e)}")
raise
def preprocess_image(self, image_path):
"""Preprocess image for better OCR with optimized settings."""
try:
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return image_path # fallback if image can't be read
# Resize image if too large (optimize for performance)
height, width = img.shape
if width > 2000 or height > 2000:
scale = min(2000/width, 2000/height)
new_width = int(width * scale)
new_height = int(height * scale)
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
# Apply OTSU binarization for better OCR
_, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Apply slight blur to reduce noise
img = cv2.GaussianBlur(img, (1, 1), 0)
processed_path = f"preprocessed_{os.path.basename(image_path)}"
cv2.imwrite(processed_path, img, [cv2.IMWRITE_PNG_COMPRESSION, 9])
return processed_path
except Exception as e:
logger.warning(f"Image preprocessing failed: {e}")
return image_path
def process_image_with_vision(self, image_path):
"""Process an image file using Google Cloud Vision API with optimized settings."""
try:
# Preprocess image for better OCR
processed_path = self.preprocess_image(image_path)
with open(processed_path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
# Use document text detection for better accuracy
response = self.client.document_text_detection(image=image)
if response.error.message:
raise Exception(f"Error during Vision API call: {response.error.message}")
# Calculate accuracy metrics
accuracy_metrics = self.accuracy_analyzer.analyze_ocr_quality(
response.full_text_annotation,
response.full_text_annotation.text
)
# Clean up processed image
if processed_path != image_path and os.path.exists(processed_path):
os.remove(processed_path)
# Debug: print/log the full extracted text
logger.info(f"Extracted text (first 500 chars): {response.full_text_annotation.text[:500]}")
# Return both the text content and accuracy metrics
return response.full_text_annotation.text, accuracy_metrics
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
return "", {"overall_accuracy": 0.0}
def process_pdf_file_with_vision(self, pdf_path):
"""Process a PDF file by converting pages to images and using Google Cloud Vision API with optimized settings."""
try:
# Use system-installed Poppler (much faster and smaller)
# Convert PDF to images with optimized settings
images = convert_from_path(
pdf_path,
dpi=200, # Reduced from 300 for better performance
thread_count=1, # Reduced for container environments
grayscale=True, # Smaller file size
size=(1654, 2340) # A4 size at 200 DPI
)
all_text = ""
all_accuracy_metrics = []
for i, image in enumerate(images):
# Save page as temporary image with compression
temp_path = f"temp_page_{i}.png"
image.save(temp_path, 'PNG', optimize=True, quality=85)
logger.info(f"Processing page {i + 1} of PDF...")
page_text, page_metrics = self.process_image_with_vision(temp_path)
all_text += f"\n--- Page {i + 1} ---\n" + page_text
all_accuracy_metrics.append(page_metrics.get("overall_accuracy", 0.0))
# Clean up temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
# Average accuracy across all pages
avg_accuracy = sum(all_accuracy_metrics) / len(all_accuracy_metrics) if all_accuracy_metrics else 0.0
return all_text, {"overall_accuracy": avg_accuracy}
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
return "", {"overall_accuracy": 0.0}
def process_file(self, file_path):
"""Process either PDF or image file."""
if file_path.lower().endswith('.pdf'):
return self.process_pdf_file_with_vision(file_path)
else:
return self.process_image_with_vision(file_path)
def save_text_to_file(self, text, output_path):
"""Save the text to a .txt file."""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(text)
logger.info(f"Saved recognized text to {output_path}")
except Exception as e:
logger.error(f"Error saving text to file: {e}")
def run_ocr_pipeline(self):
"""
Run the OCR pipeline with file and directory selection.
"""
logger.info("DEBUG: Entered run_ocr_pipeline")
logger.info("Select files for OCR processing...")
# This would need to be implemented based on your UI framework
# For now, return a placeholder
return {"status": "OCR pipeline not implemented for headless mode"}
def run_ocr(self, uploaded_file, output_directory):
"""
Run the OCR process for an uploaded file.
:param uploaded_file: The file to process (Streamlit or local file path).
:param output_directory: Directory to save the processed result.
:return: Path to the saved text file.
"""
try:
# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)
# Process the file
if uploaded_file.lower().endswith('.pdf'):
extracted_text, accuracy_metrics = self.process_pdf_file_with_vision(uploaded_file)
else:
extracted_text, accuracy_metrics = self.process_image_with_vision(uploaded_file)
# Generate output filename
base_name = os.path.splitext(os.path.basename(uploaded_file))[0]
output_path = os.path.join(output_directory, f"{base_name}_ocr_result.txt")
# Save the extracted text
self.save_text_to_file(extracted_text, output_path)
return output_path
except Exception as e:
logger.error(f"Error in run_ocr: {e}")
raise