Spaces:
Sleeping
Sleeping
import os | |
import time | |
import logging | |
import re | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from cryptography.fernet import Fernet | |
from simple_salesforce import Salesforce | |
from transformers import pipeline | |
from PIL import Image | |
import pytesseract | |
import pandas as pd | |
from docx import Document | |
import PyPDF2 | |
import gradio as gr | |
from pdf2image import convert_from_path | |
import tempfile | |
from pytz import timezone | |
import shutil | |
import unicodedata | |
import asyncio | |
import torch | |
from dateutil import parser as date_parser | |
# Setup logging | |
log_file = os.path.join(tempfile.gettempdir(), 'contract_app.log') | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[logging.FileHandler(log_file), logging.StreamHandler()] | |
) | |
logger = logging.getLogger(__name__) | |
# Initialize dependencies | |
def init_globals(): | |
load_dotenv() | |
required_env_vars = [ | |
'ENCRYPTION_KEY', 'SALESFORCE_USERNAME', 'SALESFORCE_PASSWORD', | |
'SALESFORCE_SECURITY_TOKEN', 'SALESFORCE_DOMAIN' | |
] | |
env = {var: os.getenv(var) for var in required_env_vars} | |
if missing := [k for k in required_env_vars if not env[k]]: | |
logger.error(f"Missing env vars: {', '.join(missing)}") | |
return None, None | |
try: | |
fernet = Fernet(env['ENCRYPTION_KEY'].encode()) | |
summarizer = pipeline( | |
"summarization", | |
model="facebook/bart-large-cnn", | |
tokenizer="facebook/bart-large-cnn", | |
framework="pt", | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
logger.info("Summarizer initialized with BART-large-cnn") | |
return fernet, summarizer | |
except Exception as e: | |
logger.error(f"Initialization failed: {str(e)}") | |
return None, None | |
# Check dependencies | |
def check_dependencies(): | |
missing = [] | |
try: | |
tesseract_path = shutil.which('tesseract') | |
if not tesseract_path: | |
logger.warning("Tesseract not found. OCR unavailable.") | |
missing.append("Tesseract") | |
else: | |
pytesseract.pytesseract.tesseract_cmd = tesseract_path | |
poppler_path = shutil.which('pdfinfo') | |
if not poppler_path: | |
logger.warning("Poppler not found.") | |
missing.append("Poppler") | |
return missing | |
except Exception as e: | |
logger.error(f"Dependency check failed: {str(e)}") | |
return ["Tesseract", "Poppler"] | |
fernet, summarizer = init_globals() | |
if not fernet or not summarizer: | |
raise RuntimeError("Failed to initialize dependencies") | |
missing_deps = check_dependencies() | |
# Validate file | |
def validate_file(file_path): | |
ext = os.path.splitext(file_path)[1].lower() | |
supported = ['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx'] | |
if ext not in supported: | |
return False, f"Unsupported file type: {ext}. Supported types: {', '.join(supported)}" | |
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0: | |
return False, f"File not found or empty: {file_path}" | |
return True, None | |
# Preprocess image | |
def preprocess_image(image): | |
img = image.convert('L') | |
img = img.point(lambda x: 0 if x < 140 else 255, '1') | |
return img | |
# Clean text | |
def clean_text(text): | |
text = unicodedata.normalize('NFKC', text) | |
text = re.sub(r'\s+', ' ', text.strip()) | |
return text[:4096] | |
# Extract text | |
async def extract_text_async(file_path): | |
is_valid, error = validate_file(file_path) | |
if not is_valid: | |
return None, error | |
ext = os.path.splitext(file_path)[1].lower() | |
try: | |
if ext == '.pdf': | |
with open(file_path, 'rb') as f: | |
pdf_reader = PyPDF2.PdfReader(f) | |
text = "".join([p.extract_text() or "" for p in pdf_reader.pages]) | |
if text.strip(): | |
return clean_text(text), None | |
if "Tesseract" not in missing_deps: | |
images = convert_from_path(file_path, dpi=200) | |
text = "\n".join([ | |
pytesseract.image_to_string(preprocess_image(img), config='--psm 6 --oem 3 -l eng') | |
for img in images[:2] | |
]) | |
elif ext == '.docx': | |
doc = Document(file_path) | |
text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
elif ext in ['.png', '.jpg', '.jpeg']: | |
if "Tesseract" not in missing_deps: | |
img = Image.open(file_path) | |
text = pytesseract.image_to_string(preprocess_image(img), config='--psm 6 --oem 3 -l eng') | |
else: | |
return None, "Tesseract not available for image processing" | |
elif ext in ['.csv', '.xls', '.xlsx']: | |
df = pd.read_csv(file_path, encoding='utf-8', errors='ignore') if ext == '.csv' else pd.read_excel(file_path) | |
text = " ".join(df.astype(str).values.flatten())[:2000] | |
text = clean_text(text) | |
if not text or len(text) < 50: | |
return None, f"No valid text extracted from {file_path}" | |
return text, None | |
except Exception as e: | |
logger.error(f"Text extraction failed for {file_path}: {str(e)}") | |
return None, f"Text extraction failed: {str(e)}" | |
# Parse dates | |
def parse_dates(text): | |
ist = timezone('Asia/Kolkata') | |
current_date = datetime.now(ist) | |
default_date = current_date.strftime('%Y-%m-%d') | |
try: | |
date_patterns = [ | |
r'\b(\d{4}-\d{2}-\d{2})\b', | |
r'\b(\d{2}/\d{2}/\d{4})\b', | |
r'\b(\w+\s+\d{1,2},\s+\d{4})\b', | |
r'\b(\d{1,2}\s+\w+\s+\d{4})\b', | |
] | |
dates = [] | |
for pattern in date_patterns: | |
dates.extend(re.findall(pattern, text, re.IGNORECASE)) | |
parsed_dates = [] | |
for date_str in dates: | |
try: | |
parsed_date = date_parser.parse(date_str, fuzzy=True, dayfirst=True) | |
if parsed_date <= current_date + timedelta(days=365): | |
parsed_dates.append(parsed_date) | |
except ValueError: | |
continue | |
start_date = default_date | |
end_date = default_date | |
if parsed_dates: | |
parsed_dates.sort() | |
start_date = parsed_dates[0].strftime('%Y-%m-%d') | |
end_date = (parsed_dates[-1] + timedelta(days=730)).strftime('%Y-%m-%d') | |
return start_date, end_date | |
except Exception as e: | |
logger.error(f"Date parsing failed: {str(e)}") | |
return default_date, default_date | |
# Summarize | |
async def summarize_contract_async(text, summarizer, file_name): | |
aspects = ["parties", "payment terms", "obligations", "termination clauses"] | |
try: | |
summary_result = summarizer(text[:2048], max_length=150, min_length=30, do_sample=False) | |
full_summary = summary_result[0]['summary_text'] if summary_result else text[:150] | |
aspect_summaries = {} | |
patterns = { | |
"parties": r"between\s+(.+?)\s+and\s+(.+?)(?:,|\.)", | |
"payment terms": r"(?:shall\s+pay|payment\s+of)\s+(.+?)(?:\.|\s|with|,)", | |
"obligations": r"(?:obligations|services|shall\s+provide|support|responsibilities)\s+(.+?)(?:\.|\s|by|for|,)", | |
"termination clauses": r"(?:terminate|termination\s+clause.*?)\s+(.+?)(?:\.|\s|with|,)" | |
} | |
for asp, pattern in patterns.items(): | |
match = re.search(pattern, text, re.IGNORECASE | re.DOTALL) | |
aspect_summaries[asp] = clean_text(match.group(1)) if match else "Not extracted" | |
start_date, end_date = parse_dates(text) | |
return { | |
"contract_name": file_name, | |
"full_summary": clean_text(full_summary), | |
"aspect_summaries": aspect_summaries, | |
"start_date": start_date, | |
"end_date": end_date, | |
"validation_status": "Validated" | |
}, None | |
except Exception as e: | |
logger.error(f"Summarization failed: {str(e)}") | |
return { | |
"contract_name": file_name, | |
"full_summary": "Summarization error", | |
"aspect_summaries": {asp: "Not extracted" for asp in aspects}, | |
"start_date": datetime.now().strftime('%Y-%m-%d'), | |
"end_date": datetime.now().strftime('%Y-%m-%d'), | |
"validation_status": "Pending" | |
}, str(e) | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
gr.Markdown("## Contract Summarization Tool") | |
file_input = gr.File(label="Upload Contract", file_types=['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx']) | |
result_output = gr.Textbox(label="Result") | |
async def process(file): | |
file_path = file.name | |
is_valid, error = validate_file(file_path) | |
if not is_valid: | |
return f"Error: {error}" | |
text, text_error = await extract_text_async(file_path) | |
if text_error: | |
return f"Text extraction failed: {text_error}" | |
summary_data, summ_error = await summarize_contract_async(text, summarizer, os.path.basename(file_path)) | |
return summary_data['full_summary'] if not summ_error else f"Error: {summ_error}" | |
submit_btn = gr.Button("Summarize") | |
submit_btn.click(process, inputs=[file_input], outputs=[result_output]) | |
if __name__ == "__main__": | |
iface.launch() | |