Spaces:
Sleeping
Sleeping
import io | |
import os | |
import json | |
import tempfile | |
from google.cloud import vision | |
from google.oauth2 import service_account | |
from PIL import Image | |
import base64 | |
import re | |
import logging | |
from pdf2image import convert_from_path | |
from OCRAccuracyAnalyzer import OCRAccuracyAnalyzer | |
import cv2 | |
import numpy as np | |
import shutil | |
from typing import Tuple, Dict, Any | |
from datetime import datetime | |
import platform | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
logger.info("Importing OCR.py...") | |
# Handle Google Cloud credentials - support both environment variables and file | |
def get_google_credentials(): | |
"""Get Google Cloud credentials from environment variable or file.""" | |
# First, try to get credentials from environment variable (for Heroku) | |
credentials_json = os.environ.get('GOOGLE_CLOUD_CREDENTIALS') | |
if credentials_json: | |
try: | |
import json | |
credentials_info = json.loads(credentials_json) | |
return service_account.Credentials.from_service_account_info(credentials_info) | |
except Exception as e: | |
logger.warning(f"Failed to parse credentials from environment: {e}") | |
# Fall back to file-based credentials (for local development) | |
credentials_path = os.path.join(os.path.dirname(__file__), "css-edge-e347b0ed2b9e.json") | |
if os.path.exists(credentials_path): | |
return service_account.Credentials.from_service_account_file(credentials_path) | |
# If neither is available, raise an error | |
raise FileNotFoundError( | |
"Google Cloud credentials not found. " | |
"Please set GOOGLE_CLOUD_CREDENTIALS environment variable or " | |
f"place credentials file at: {credentials_path}" | |
) | |
class OCR: | |
def __init__(self): | |
logger.info("Initializing OCR...") | |
try: | |
# Get credentials using the helper function | |
credentials = get_google_credentials() | |
self.client = vision.ImageAnnotatorClient(credentials=credentials) | |
self.accuracy_analyzer = OCRAccuracyAnalyzer() | |
logger.info("Successfully initialized Google Cloud Vision client") | |
except Exception as e: | |
logger.error(f"Failed to initialize Google Cloud Vision client: {str(e)}") | |
raise | |
def preprocess_image(self, image_path): | |
logger.info(f"Preprocessing image: {image_path}") | |
try: | |
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
if img is None: | |
return image_path # fallback if image can't be read | |
# Resize image if too large (optimize for performance) | |
height, width = img.shape | |
if width > 2000 or height > 2000: | |
scale = min(2000/width, 2000/height) | |
new_width = int(width * scale) | |
new_height = int(height * scale) | |
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA) | |
# Apply OTSU binarization for better OCR | |
_, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
# Apply slight blur to reduce noise | |
img = cv2.GaussianBlur(img, (1, 1), 0) | |
processed_path = f"preprocessed_{os.path.basename(image_path)}" | |
cv2.imwrite(processed_path, img, [cv2.IMWRITE_PNG_COMPRESSION, 9]) | |
return processed_path | |
except Exception as e: | |
logger.warning(f"Image preprocessing failed: {e}") | |
return image_path | |
def process_image_with_vision(self, image_path): | |
"""Process an image file using Google Cloud Vision API with optimized settings.""" | |
try: | |
# Preprocess image for better OCR | |
processed_path = self.preprocess_image(image_path) | |
with open(processed_path, 'rb') as image_file: | |
content = image_file.read() | |
image = vision.Image(content=content) | |
# Use document text detection for better accuracy | |
response = self.client.document_text_detection(image=image) | |
if response.error.message: | |
raise Exception(f"Error during Vision API call: {response.error.message}") | |
# Calculate accuracy metrics | |
accuracy_metrics = self.accuracy_analyzer.analyze_ocr_quality( | |
response.full_text_annotation, | |
response.full_text_annotation.text | |
) | |
# Clean up processed image | |
if processed_path != image_path and os.path.exists(processed_path): | |
os.remove(processed_path) | |
# Debug: print/log the full extracted text | |
logger.info(f"Extracted text (first 500 chars): {response.full_text_annotation.text[:500]}") | |
# Return both the text content and accuracy metrics | |
return response.full_text_annotation.text, accuracy_metrics | |
except Exception as e: | |
logger.error(f"Error processing image: {str(e)}") | |
return "", {"overall_accuracy": 0.0} | |
def process_pdf_file_with_vision(self, pdf_path): | |
"""Process a PDF file by converting pages to images and using Google Cloud Vision API with optimized settings.""" | |
try: | |
# Use system-installed Poppler (much faster and smaller) | |
# Convert PDF to images with optimized settings | |
images = convert_from_path( | |
pdf_path, | |
dpi=200, # Reduced from 300 for better performance | |
thread_count=1, # Reduced for container environments | |
grayscale=True, # Smaller file size | |
size=(1654, 2340) # A4 size at 200 DPI | |
) | |
all_text = "" | |
all_accuracy_metrics = [] | |
for i, image in enumerate(images): | |
# Save page as temporary image with compression | |
temp_path = f"temp_page_{i}.png" | |
image.save(temp_path, 'PNG', optimize=True, quality=85) | |
logger.info(f"Processing page {i + 1} of PDF...") | |
page_text, page_metrics = self.process_image_with_vision(temp_path) | |
all_text += f"\n--- Page {i + 1} ---\n" + page_text | |
all_accuracy_metrics.append(page_metrics.get("overall_accuracy", 0.0)) | |
# Clean up temporary file | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
# Average accuracy across all pages | |
avg_accuracy = sum(all_accuracy_metrics) / len(all_accuracy_metrics) if all_accuracy_metrics else 0.0 | |
return all_text, {"overall_accuracy": avg_accuracy} | |
except Exception as e: | |
logger.error(f"Error processing PDF: {str(e)}") | |
return "", {"overall_accuracy": 0.0} | |
def process_file(self, file_path): | |
"""Process either PDF or image file.""" | |
if file_path.lower().endswith('.pdf'): | |
return self.process_pdf_file_with_vision(file_path) | |
else: | |
return self.process_image_with_vision(file_path) | |
def save_text_to_file(self, text, output_path): | |
"""Save the text to a .txt file.""" | |
try: | |
with open(output_path, 'w', encoding='utf-8') as f: | |
f.write(text) | |
logger.info(f"Saved recognized text to {output_path}") | |
except Exception as e: | |
logger.error(f"Error saving text to file: {e}") | |
def run_ocr_pipeline(self): | |
""" | |
Run the OCR pipeline with file and directory selection. | |
""" | |
logger.info("DEBUG: Entered run_ocr_pipeline") | |
logger.info("Select files for OCR processing...") | |
# This would need to be implemented based on your UI framework | |
# For now, return a placeholder | |
return {"status": "OCR pipeline not implemented for headless mode"} | |
def run_ocr(self, uploaded_file, output_directory): | |
""" | |
Run the OCR process for an uploaded file. | |
:param uploaded_file: The file to process (Streamlit or local file path). | |
:param output_directory: Directory to save the processed result. | |
:return: Path to the saved text file. | |
""" | |
try: | |
# Ensure output directory exists | |
os.makedirs(output_directory, exist_ok=True) | |
# Process the file | |
if uploaded_file.lower().endswith('.pdf'): | |
extracted_text, accuracy_metrics = self.process_pdf_file_with_vision(uploaded_file) | |
else: | |
extracted_text, accuracy_metrics = self.process_image_with_vision(uploaded_file) | |
# Generate output filename | |
base_name = os.path.splitext(os.path.basename(uploaded_file))[0] | |
output_path = os.path.join(output_directory, f"{base_name}_ocr_result.txt") | |
# Save the extracted text | |
self.save_text_to_file(extracted_text, output_path) | |
return output_path | |
except Exception as e: | |
logger.error(f"Error in run_ocr: {e}") | |
raise |