Spaces:
Running
Running
import os | |
import openai | |
import json | |
import uuid | |
import re | |
import asyncio | |
import time | |
import argparse | |
from typing import List, Dict, Optional, Tuple | |
from dotenv import load_dotenv | |
# --- Required Libraries --- | |
try: | |
from docx import Document | |
except ImportError: | |
print("Requirement Missing: Please install 'python-docx' (`pip install python-docx`)") | |
exit() | |
# PDF library (PyPDF2) import removed | |
try: | |
from langdetect import detect, DetectorFactory, LangDetectException | |
DetectorFactory.seed = 0 | |
except ImportError: | |
print("Requirement Missing: Please install 'langdetect' (`pip install langdetect`)") | |
exit() | |
# --- Configuration --- | |
load_dotenv() | |
API_KEY = os.environ.get("OPENAI_API_KEY") | |
if not API_KEY: | |
print("🛑 ERROR: OpenAI API key not found. Set OPENAI_API_KEY in your .env file.") | |
exit() | |
OUTPUT_DIR = "data" | |
TRANSLATION_MODEL = "gpt-4o-mini" | |
MAX_CONCURRENT_TRANSLATIONS = 10 | |
TARGET_LANGUAGE = "en" | |
# --- Chunking Configuration --- | |
PARAGRAPH_CHUNK_THRESHOLD = 2000 # Characters | |
CHUNK_SIZE = 800 # Characters | |
CHUNK_OVERLAP = 100 # Characters | |
# Validate chunking config | |
if CHUNK_OVERLAP >= CHUNK_SIZE: | |
print(f"🛑 ERROR: CHUNK_OVERLAP ({CHUNK_OVERLAP}) must be less than CHUNK_SIZE ({CHUNK_SIZE}).") | |
exit() | |
# --- Setup OpenAI Client --- | |
try: | |
client = openai.AsyncOpenAI(api_key=API_KEY) | |
print("✅ OpenAI Async Client Initialized.") | |
except Exception as e: | |
print(f"🛑 ERROR: Failed to initialize OpenAI client: {e}") | |
exit() | |
# --- Text Extraction Functions --- | |
def extract_text_from_docx(file_path: str) -> Optional[str]: | |
"""Extracts all text from a DOCX file.""" | |
try: | |
doc = Document(file_path) | |
full_text = [para.text for para in doc.paragraphs if para.text.strip()] | |
print(f" 📄 Extracted {len(full_text)} paragraphs from DOCX: {os.path.basename(file_path)}") | |
return "\n\n".join(full_text) # Use double newline join as a base | |
except Exception as e: | |
print(f" ❌ ERROR reading DOCX file '{os.path.basename(file_path)}': {e}") | |
return None | |
# --- PDF Extraction Function Removed --- | |
def extract_text_from_txt(file_path: str) -> Optional[str]: | |
"""Reads text from a TXT file.""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
text = file.read() | |
print(f" 📄 Read TXT file: {os.path.basename(file_path)} (length: {len(text)} chars)") | |
return text | |
except Exception as e: | |
print(f" ❌ ERROR reading TXT file '{os.path.basename(file_path)}': {e}") | |
return None | |
# --- Text Processing Functions (segment, chunk, detect, translate - No changes needed here) --- | |
def _chunk_text(text: str, size: int, overlap: int) -> List[str]: | |
"""Helper function to chunk a single block of text.""" | |
# (Implementation remains the same as previous version) | |
if not text: return [] | |
chunks = [] | |
start_index = 0 | |
text_len = len(text) | |
while start_index < text_len: | |
end_index = start_index + size | |
chunk = text[start_index:end_index] | |
chunks.append(chunk.strip()) | |
next_start = start_index + size - overlap | |
if next_start <= start_index: next_start = start_index + 1 | |
start_index = next_start | |
if start_index >= text_len: break | |
return [c for c in chunks if c] | |
def segment_into_paragraphs_or_chunks(text: str) -> List[str]: | |
""" | |
Segments text into paragraphs based on newlines. | |
If a resulting paragraph exceeds PARAGRAPH_CHUNK_THRESHOLD, | |
it chunks that specific paragraph instead. | |
""" | |
# (Implementation remains the same as previous version) | |
if not text: return [] | |
normalized_text = text.replace('\r\n', '\n').replace('\r', '\n') | |
initial_segments = re.split(r'\n\s*\n+', normalized_text) | |
initial_segments = [s.strip() for s in initial_segments if s.strip()] | |
if len(initial_segments) <= 1 and '\n' in normalized_text: | |
print(" Parsing: Double newline split yielded few segments, trying single newline split.") | |
initial_segments = [s.strip() for s in normalized_text.split('\n') if s.strip()] | |
if not initial_segments: | |
print(" Parsing: No segments found after initial splitting.") | |
return [] | |
print(f" Parsing: Initial segmentation yielded {len(initial_segments)} segments.") | |
final_segments = [] | |
long_segment_count = 0 | |
for segment in initial_segments: | |
if len(segment) > PARAGRAPH_CHUNK_THRESHOLD: | |
long_segment_count += 1 | |
print(f" ❗ Segment ({len(segment)} chars > {PARAGRAPH_CHUNK_THRESHOLD}) is too long. Applying chunking (Size: {CHUNK_SIZE}, Overlap: {CHUNK_OVERLAP})...") | |
chunks = _chunk_text(segment, CHUNK_SIZE, CHUNK_OVERLAP) | |
print(f" -> Chunked into {len(chunks)} pieces.") | |
final_segments.extend(chunks) | |
elif segment: | |
final_segments.append(segment) | |
if long_segment_count > 0: | |
print(f" Parsing: Chunking applied to {long_segment_count} long segments.") | |
print(f" 🔪 Final segmentation/chunking resulted in {len(final_segments)} pieces.") | |
return final_segments | |
def detect_language_safe(text: str, default_lang: str = "unknown") -> str: | |
"""Detects language, handling short text and errors.""" | |
# (Implementation remains the same as previous version) | |
clean_text = text.strip() | |
if not clean_text or len(clean_text) < 10: return default_lang | |
try: return detect(clean_text) | |
except LangDetectException: return default_lang | |
except Exception as e: | |
print(f" ❌ Unexpected error during language detection: {e}") | |
return "error" | |
async def translate_paragraph(text: str, target_lang: str, semaphore: asyncio.Semaphore) -> Tuple[str, Optional[str]]: | |
"""Translates a single paragraph/chunk using OpenAI, with rate limiting.""" | |
# (Implementation remains the same as previous version) | |
async with semaphore: | |
detected_lang = detect_language_safe(text) | |
if detected_lang != 'he': return text, None | |
print(f" 🌍 Translating Hebrew segment to {target_lang.upper()}: '{text[:60]}...'") | |
prompt = f"Translate the following Hebrew text accurately to {target_lang}. Provide only the translation, without any introductory phrases.\nHebrew Text:\n```heb\n{text}\n```\nTranslation:" | |
retries = 1 | |
for attempt in range(retries + 1): | |
try: | |
response = await client.chat.completions.create( | |
model=TRANSLATION_MODEL, messages=[ {"role": "system", "content": f"You are an expert translator specializing in Hebrew to {target_lang} translation. Provide only the translated text."}, {"role": "user", "content": prompt} ], | |
max_tokens=int(len(text.split()) * 2.5) + 50, temperature=0.1, n=1, stop=None, ) | |
translation = response.choices[0].message.content.strip() | |
if translation: | |
if translation.strip() == text.strip(): | |
print(f" ⚠️ Translation attempt returned original text for: '{text[:60]}...'") | |
return text, "Translation Failed: Model returned original text" | |
return text, translation | |
else: | |
print(f" ❌ Translation attempt returned empty response for: '{text[:60]}...'") | |
if attempt == retries: return text, "Translation Failed: Empty Response" | |
except openai.RateLimitError as e: | |
wait_time = 5 * (attempt + 1) | |
print(f" ⏳ Rate limit hit during translation. Waiting {wait_time}s... ({e})") | |
await asyncio.sleep(wait_time) | |
if attempt == retries: return text, "Translation Failed: Rate Limited" | |
except openai.APIError as e: | |
print(f" ❌ OpenAI API Error during translation: {e}") | |
wait_time = 3 * (attempt + 1); await asyncio.sleep(wait_time) | |
if attempt == retries: return text, f"Translation Failed: API Error ({e.code})" | |
except Exception as e: | |
print(f" ❌ Unexpected error during translation: {e}") | |
if attempt == retries: return text, f"Translation Failed: Unexpected Error ({type(e).__name__})" | |
if attempt < retries: await asyncio.sleep(2 * (attempt + 1)) | |
return text, "Translation Failed: Max Retries" | |
# --- Main Processing Function --- | |
async def process_file(input_path: str, output_dir: str): | |
"""Processes a single DOCX or TXT file: extracts, segments/chunks, translates, saves JSON.""" | |
print(f"\n--- Processing file: {os.path.basename(input_path)} ---") | |
start_time = time.time() | |
file_ext = os.path.splitext(input_path)[1].lower() | |
extracted_text: Optional[str] = None | |
# 1. Extract Text (Only DOCX and TXT) | |
if file_ext == ".docx": | |
extracted_text = extract_text_from_docx(input_path) | |
elif file_ext == ".txt": | |
extracted_text = extract_text_from_txt(input_path) | |
else: | |
# This case should ideally not be hit if input is pre-filtered, but acts as safeguard | |
print(f" ⚠️ Internal Skip: Unsupported extension '{file_ext}' passed to process_file.") | |
return | |
if not extracted_text or not extracted_text.strip(): | |
print(" ❌ Text extraction failed or returned empty. Skipping.") | |
return | |
# 2. Segment into Paragraphs or Chunks | |
segments = segment_into_paragraphs_or_chunks(extracted_text) | |
if not segments: | |
print(" ❌ No paragraphs or chunks found after segmentation. Skipping.") | |
return | |
# 3. Translate Hebrew Segments (Asynchronously) | |
output_data = [] | |
translation_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TRANSLATIONS) | |
tasks = [] | |
print(f" 🗣️ Preparing to translate {len(segments)} segments (max concurrent: {MAX_CONCURRENT_TRANSLATIONS})...") | |
for i, seg_text in enumerate(segments): | |
task = asyncio.create_task(translate_paragraph(seg_text, TARGET_LANGUAGE, translation_semaphore)) | |
tasks.append(task) | |
translation_results = await asyncio.gather(*tasks) | |
# 4. Format into JSON Structure | |
print(" 📝 Formatting results into JSON...") | |
translation_failures = 0 | |
for i, (original_he, translation_en) in enumerate(translation_results): | |
failure_msg = "Translation Failed" | |
is_failure = isinstance(translation_en, str) and failure_msg in translation_en | |
if is_failure: | |
translation_failures += 1 | |
english_text = translation_en # Store the error message | |
else: | |
english_text = translation_en if translation_en else "" | |
output_data.append({ "id": str(uuid.uuid4()), "hebrew": original_he, "english": english_text }) | |
if translation_failures > 0: | |
print(f" ⚠️ Encountered {translation_failures} translation failures out of {len(segments)} segments.") | |
# 5. Save to JSON File | |
base_filename = os.path.splitext(os.path.basename(input_path))[0] | |
output_filename = f"{base_filename}.json" | |
output_path = os.path.join(output_dir, output_filename) | |
try: | |
os.makedirs(output_dir, exist_ok=True) | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(output_data, f, ensure_ascii=False, indent=2) | |
end_time = time.time() | |
print(f"✅ Successfully saved {len(output_data)} segments to: {output_path}") | |
print(f"⏱️ File processing time: {end_time - start_time:.2f} seconds") | |
except Exception as e: | |
print(f" ❌ ERROR saving JSON file '{output_path}': {e}") | |
# --- Script Execution --- | |
if __name__ == "__main__": | |
# Update description to remove PDF mention | |
parser = argparse.ArgumentParser(description="Process DOCX and TXT files into paragraph/chunk-based JSON with Hebrew-to-English translation.") | |
parser.add_argument("input_paths", nargs='+', help="Path(s) to input file(s) or directory(ies) containing DOCX/TXT files.") | |
parser.add_argument("-o", "--output_dir", default=OUTPUT_DIR, help=f"Directory to save output JSON files (default: '{OUTPUT_DIR}')") | |
parser.add_argument("--chunk_threshold", type=int, default=PARAGRAPH_CHUNK_THRESHOLD, help="Max chars per paragraph before chunking.") | |
parser.add_argument("--chunk_size", type=int, default=CHUNK_SIZE, help="Target chunk size in chars.") | |
parser.add_argument("--chunk_overlap", type=int, default=CHUNK_OVERLAP, help="Chunk overlap in chars.") | |
args = parser.parse_args() | |
OUTPUT_DIR = args.output_dir | |
PARAGRAPH_CHUNK_THRESHOLD = args.chunk_threshold | |
CHUNK_SIZE = args.chunk_size | |
CHUNK_OVERLAP = args.chunk_overlap | |
if CHUNK_OVERLAP >= CHUNK_SIZE: | |
print(f"🛑 ERROR: Chunk overlap ({CHUNK_OVERLAP}) must be less than chunk size ({CHUNK_SIZE}). Adjust --chunk_overlap or --chunk_size.") | |
exit() | |
print(f"🚀 Starting File Processor (DOCX & TXT only)...") # Updated startup message | |
print(f"📂 Output Directory: {os.path.abspath(OUTPUT_DIR)}") | |
print(f"🔪 Paragraph/Chunking Settings: Threshold={PARAGRAPH_CHUNK_THRESHOLD}, Size={CHUNK_SIZE}, Overlap={CHUNK_OVERLAP}") | |
files_to_process = [] | |
for path in args.input_paths: | |
if os.path.isfile(path): | |
files_to_process.append(path) | |
elif os.path.isdir(path): | |
print(f"📁 Scanning directory: {path}") | |
for filename in os.listdir(path): | |
full_path = os.path.join(path, filename) | |
if os.path.isfile(full_path): | |
files_to_process.append(full_path) | |
else: | |
print(f"⚠️ Warning: Input path not found or not a file/directory: {path}") | |
# Update supported extensions list | |
supported_extensions = ('.docx', '.txt') | |
valid_files = [f for f in files_to_process if f.lower().endswith(supported_extensions)] | |
if not valid_files: | |
# Update message for no supported files found | |
print(f"\n🛑 No supported files ({', '.join(supported_extensions)}) found in the specified paths. Exiting.") | |
else: | |
print(f"\nFound {len(valid_files)} supported files to process:") | |
for f in valid_files: | |
print(f" - {os.path.basename(f)}") | |
async def main(): | |
process_tasks = [process_file(f, OUTPUT_DIR) for f in valid_files] | |
await asyncio.gather(*process_tasks) | |
script_start_time = time.time() | |
asyncio.run(main()) | |
script_end_time = time.time() | |
print(f"\n🏁 File processing complete. Total script time: {script_end_time - script_start_time:.2f} seconds.") |