#!/usr/bin/env python3 import re import json import sys from docx import Document from docx.oxml.ns import qn from master_key import TABLE_SCHEMAS, HEADING_PATTERNS, PARAGRAPH_PATTERNS def normalize_header_label(s: str) -> str: """Normalize a header/label by stripping parentheticals & punctuation.""" s = re.sub(r"\s+", " ", s.strip()) # remove content in parentheses/brackets s = re.sub(r"\([^)]*\)", "", s) s = re.sub(r"\[[^]]*\]", "", s) # unify slashes and hyphens, collapse spaces s = s.replace("–", "-").replace("β€”", "-").replace("/", " / ").replace(" ", " ") return s.strip() # Canonical label aliases for Vehicle/Maintenance/General headers LABEL_ALIASES = { # Vehicle Registration (Maintenance) "roadworthiness certificates": "Roadworthiness Certificates", "maintenance records": "Maintenance Records", "daily checks": "Daily Checks", "fault recording / reporting": "Fault Recording/ Reporting", "fault repair": "Fault Repair", # Vehicle Registration (Mass) "sub contracted vehicles statement of compliance": "Sub-contracted Vehicles Statement of Compliance", "weight verification records": "Weight Verification Records", "rfs suspension certification #": "RFS Suspension Certification #", "suspension system maintenance": "Suspension System Maintenance", "trip records": "Trip Records", "fault recording/ reporting on suspension system": "Fault Recording/ Reporting on Suspension System", # Common "registration number": "Registration Number", "no.": "No.", "sub contractor": "Sub contractor", "sub-contractor": "Sub contractor", } def looks_like_operator_declaration(context): """True iff heading says Operator Declaration and headers include Print Name + Position Title.""" heading = (context.get("heading") or "").strip().lower() headers = " ".join(context.get("headers") or []).lower() return ( "operator declaration" in heading and "print name" in headers and "position" in headers and "title" in headers ) def looks_like_auditor_declaration(context): heading = (context.get("heading") or "").strip().lower() headers = " ".join(context.get("headers") or []).lower() return ( "auditor declaration" in heading and "print name" in headers and ("nhvr" in headers or "auditor registration number" in headers) ) # --- NEW: header-only fallback that ignores headings and just keys on the two column names def extract_operator_declaration_by_headers_from_end(doc): """ Scan tables from the end; if a table's first row contains both 'Print Name' AND 'Position Title' (case-insensitive), extract red text from the data rows into: {"Print Name": [...], "Position Title": [...]} """ for tbl in reversed(doc.tables): if len(tbl.rows) < 2: continue # need header + at least one data row headers_norm = [normalize_header_label(c.text).lower() for c in tbl.rows[0].cells] has_print = any("print name" in h for h in headers_norm) has_pos_tit = any(("position title" in h) or ("position" in h and "title" in h) for h in headers_norm) if not (has_print and has_pos_tit): continue idx_print = next((i for i, h in enumerate(headers_norm) if "print name" in h), None) idx_pos = next((i for i, h in enumerate(headers_norm) if "position title" in h), None) if idx_pos is None: idx_pos = next((i for i, h in enumerate(headers_norm) if ("position" in h and "title" in h)), None) result = {"Print Name": [], "Position Title": []} for row in tbl.rows[1:]: if idx_print is not None and idx_print < len(row.cells): cell = row.cells[idx_print] reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text] reds = coalesce_numeric_runs(reds) txt = normalize_text(" ".join(reds)) if txt: result["Print Name"].append(txt) if idx_pos is not None and idx_pos < len(row.cells): cell = row.cells[idx_pos] reds = [r.text for p in cell.paragraphs for r in p.runs if is_red_font(r) and r.text] reds = coalesce_numeric_runs(reds) txt = normalize_text(" ".join(reds)) if txt: result["Position Title"].append(txt) if result["Print Name"] or result["Position Title"]: return {k: v for k, v in result.items() if v} return None # --- end NEW helper def canonicalize_label(s: str) -> str: key = normalize_header_label(s).lower() key = re.sub(r"\s+", " ", key) return LABEL_ALIASES.get(key, s) def bag_similarity(a: str, b: str) -> float: """Loose bag-of-words similarity for header↔label matching.""" aw = {w for w in re.split(r"[^A-Za-z0-9#]+", normalize_header_label(a).lower()) if len(w) > 2 or w in {"#","no"}} bw = {w for w in re.split(r"[^A-Za-z0-9#]+", normalize_header_label(b).lower()) if len(w) > 2 or w in {"#","no"}} if not aw or not bw: return 0.0 inter = len(aw & bw) return inter / max(len(aw), len(bw)) def coalesce_numeric_runs(text_list): """ If a cell yields ['4','5','6','9','8','7','1','2','3'] etc., join continuous single-char digit runs. Returns ['456987123'] instead of many singles. Non-digit tokens are preserved. """ out, buf = [], [] for t in text_list: if len(t) == 1 and t.isdigit(): buf.append(t) else: if buf: out.append("".join(buf)) buf = [] out.append(t) if buf: out.append("".join(buf)) return out def is_red_font(run): """Enhanced red font detection with better color checking""" col = run.font.color if col and col.rgb: r, g, b = col.rgb if r > 150 and g < 100 and b < 100 and (r-g) > 30 and (r-b) > 30: return True rPr = getattr(run._element, "rPr", None) if rPr is not None: clr = rPr.find(qn('w:color')) if clr is not None: val = clr.get(qn('w:val')) if val and re.fullmatch(r"[0-9A-Fa-f]{6}", val): rr, gg, bb = int(val[:2], 16), int(val[2:4], 16), int(val[4:], 16) if rr > 150 and gg < 100 and bb < 100 and (rr-gg) > 30 and (rr-bb) > 30: return True return False def _prev_para_text(tbl): """Get text from previous paragraph before table""" prev = tbl._tbl.getprevious() while prev is not None and not prev.tag.endswith("}p"): prev = prev.getprevious() if prev is None: return "" return "".join(node.text for node in prev.iter() if node.tag.endswith("}t") and node.text).strip() def normalize_text(text): """Normalize text for better matching""" return re.sub(r'\s+', ' ', text.strip()) def fuzzy_match_heading(heading, patterns): """Check if heading matches any pattern with fuzzy matching""" heading_norm = normalize_text(heading.upper()) for pattern in patterns: if re.search(pattern, heading_norm, re.IGNORECASE): return True return False def get_table_context(tbl): """Get comprehensive context information for table""" heading = normalize_text(_prev_para_text(tbl)) headers = [normalize_text(c.text) for c in tbl.rows[0].cells if c.text.strip()] col0 = [normalize_text(r.cells[0].text) for r in tbl.rows if r.cells[0].text.strip()] first_cell = normalize_text(tbl.rows[0].cells[0].text) if tbl.rows else "" all_cells = [] for row in tbl.rows: for cell in row.cells: text = normalize_text(cell.text) if text: all_cells.append(text) return { 'heading': heading, 'headers': headers, 'col0': col0, 'first_cell': first_cell, 'all_cells': all_cells, 'num_rows': len(tbl.rows), 'num_cols': len(tbl.rows[0].cells) if tbl.rows else 0 } def calculate_schema_match_score(schema_name, spec, context): """Enhanced calculate match score - IMPROVED for Vehicle Registration tables""" score = 0 reasons = [] # 🎯 VEHICLE REGISTRATION BOOST if "Vehicle Registration" in schema_name: vehicle_keywords = ["registration", "vehicle", "sub-contractor", "weight verification", "rfs suspension"] table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower() keyword_matches = sum(1 for keyword in vehicle_keywords if keyword in table_text) if keyword_matches >= 2: score += 150 # Very high boost for vehicle tables reasons.append(f"Vehicle Registration keywords: {keyword_matches}/5") elif keyword_matches >= 1: score += 75 # Medium boost reasons.append(f"Some Vehicle Registration keywords: {keyword_matches}/5") # 🎯 SUMMARY TABLE BOOST (existing logic) if "Summary" in schema_name and "details" in " ".join(context['headers']).lower(): score += 100 reasons.append(f"Summary schema with DETAILS column - perfect match") if "Summary" not in schema_name and "details" in " ".join(context['headers']).lower(): score -= 75 reasons.append(f"Non-summary schema penalized for DETAILS column presence") # Context exclusions if spec.get("context_exclusions"): table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower() for exclusion in spec["context_exclusions"]: if exclusion.lower() in table_text: score -= 50 reasons.append(f"Context exclusion penalty: '{exclusion}' found") # Context keywords if spec.get("context_keywords"): table_text = " ".join(context['headers']).lower() + " " + context['heading'].lower() keyword_matches = 0 for keyword in spec["context_keywords"]: if keyword.lower() in table_text: keyword_matches += 1 if keyword_matches > 0: score += keyword_matches * 15 reasons.append(f"Context keyword matches: {keyword_matches}/{len(spec['context_keywords'])}") # Direct first cell match if context['first_cell'] and context['first_cell'].upper() == schema_name.upper(): score += 100 reasons.append(f"Direct first cell match: '{context['first_cell']}'") # Heading pattern matching if spec.get("headings"): for h in spec["headings"]: if fuzzy_match_heading(context['heading'], [h["text"]]): score += 50 reasons.append(f"Heading match: '{context['heading']}'") break # Column header matching if spec.get("columns"): cols = [normalize_text(col) for col in spec["columns"]] matches = 0 for col in cols: if any(col.upper() in h.upper() for h in context['headers']): matches += 1 if matches == len(cols): score += 60 reasons.append(f"All column headers match: {cols}") elif matches > 0: score += matches * 20 reasons.append(f"Partial column matches: {matches}/{len(cols)}") # Label matching for left-oriented tables if spec.get("orientation") == "left": labels = [normalize_text(lbl) for lbl in spec["labels"]] matches = 0 for lbl in labels: if any(lbl.upper() in c.upper() or c.upper() in lbl.upper() for c in context['col0']): matches += 1 if matches > 0: score += (matches / len(labels)) * 30 reasons.append(f"Left orientation label matches: {matches}/{len(labels)}") # 🎯 ENHANCED Label matching for row1-oriented tables (Vehicle Registration) elif spec.get("orientation") == "row1": labels = [normalize_text(lbl) for lbl in spec["labels"]] matches = 0 for lbl in labels: if any(lbl.upper() in h.upper() or h.upper() in lbl.upper() for h in context['headers']): matches += 1 elif any(word.upper() in " ".join(context['headers']).upper() for word in lbl.split() if len(word) > 3): matches += 0.5 # Partial credit if matches > 0: score += (matches / len(labels)) * 40 reasons.append(f"Row1 orientation header matches: {matches}/{len(labels)}") # Special handling for Declaration tables (existing logic) if schema_name == "Operator Declaration" and context['first_cell'].upper() == "PRINT NAME": if "OPERATOR DECLARATION" in context['heading'].upper(): score += 80 reasons.append("Operator Declaration context match") elif any("MANAGER" in cell.upper() for cell in context['all_cells']): score += 60 reasons.append("Manager found in cells (likely Operator Declaration)") if schema_name == "NHVAS Approved Auditor Declaration" and context['first_cell'].upper() == "PRINT NAME": if any("MANAGER" in cell.upper() for cell in context['all_cells']): score -= 50 reasons.append("Penalty: Manager found (not auditor)") return score, reasons def match_table_schema(tbl): """Improved table schema matching with scoring system""" context = get_table_context(tbl) # Auditor Declaration first if ("print name" in " ".join(context.get("headers", [])).lower() and "auditor" in " ".join(context.get("headers", [])).lower()): return "NHVAS Approved Auditor Declaration" # NEW: prioritize Auditor Declaration to avoid misclassification if looks_like_auditor_declaration(context): return "NHVAS Approved Auditor Declaration" # hard-match Operator Declaration first (high priority, avoids misclassification) if looks_like_operator_declaration(context): return "Operator Declaration" best_match = None best_score = 0 for name, spec in TABLE_SCHEMAS.items(): score, reasons = calculate_schema_match_score(name, spec, context) if score > best_score: best_score = score best_match = name if best_score >= 20: return best_match return None def check_multi_schema_table(tbl): """Check if table contains multiple schemas and split appropriately""" context = get_table_context(tbl) operator_labels = ["Operator name (Legal entity)", "NHVAS Accreditation No.", "Registered trading name/s", "Australian Company Number", "NHVAS Manual"] contact_labels = ["Operator business address", "Operator Postal address", "Email address", "Operator Telephone Number"] has_operator = any(any(op_lbl.upper() in cell.upper() for op_lbl in operator_labels) for cell in context['col0']) has_contact = any(any(cont_lbl.upper() in cell.upper() for cont_lbl in contact_labels) for cell in context['col0']) if has_operator and has_contact: return ["Operator Information", "Operator contact details"] return None def extract_multi_schema_table(tbl, schemas): """Extract data from table with multiple schemas""" result = {} for schema_name in schemas: if schema_name not in TABLE_SCHEMAS: continue spec = TABLE_SCHEMAS[schema_name] schema_data = {} for ri, row in enumerate(tbl.rows): if ri == 0: continue row_label = normalize_text(row.cells[0].text) belongs_to_schema = False matched_label = None for spec_label in spec["labels"]: spec_norm = normalize_text(spec_label).upper() row_norm = row_label.upper() if spec_norm == row_norm or spec_norm in row_norm or row_norm in spec_norm: belongs_to_schema = True matched_label = spec_label break if not belongs_to_schema: continue for ci, cell in enumerate(row.cells): red_txt = "".join(run.text for p in cell.paragraphs for run in p.runs if is_red_font(run)).strip() if red_txt: if matched_label not in schema_data: schema_data[matched_label] = [] if red_txt not in schema_data[matched_label]: schema_data[matched_label].append(red_txt) if schema_data: result[schema_name] = schema_data return result def extract_table_data(tbl, schema_name, spec): """Extract red text data from table based on schema – per-row repeats for specific tables.""" # ─────────────────────────────────────────────────────────────────────────── # OPERATOR DECLARATION (row1 headers: Print Name | Position Title) # ─────────────────────────────────────────────────────────────────────────── if schema_name == "Operator Declaration": print(f" 🧾 EXTRACTION FIX: Processing Operator Declaration table") labels = spec["labels"] # ["Print Name", "Position Title"] canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels} collected = {lbl: [] for lbl in labels} if len(tbl.rows) < 2: print(f" ❌ Operator Declaration table has less than 2 rows") return {} # map header cells β†’ labels (row1 orientation) header_row = tbl.rows[0] column_mapping = {} print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels") for col_idx, cell in enumerate(header_row.cells): raw_h = normalize_text(cell.text) header_text = normalize_header_label(raw_h) if not header_text: continue print(f" Column {col_idx}: '{raw_h}'") # alias/canonical first canon = canonicalize_label(header_text) if canon in canonical_labels: best_label = canonical_labels[canon] print(f" βœ… Mapped to: '{best_label}' (alias)") column_mapping[col_idx] = best_label continue # else bag-of-words similarity best_label, best_score = None, 0.0 for canon_lab, original_lab in canonical_labels.items(): s = bag_similarity(header_text, canon_lab) if s > best_score: best_score, best_label = s, original_lab if best_label and best_score >= 0.40: print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})") column_mapping[col_idx] = best_label else: print(f" ⚠️ No mapping found for '{raw_h}'") print(f" πŸ“Š Total column mappings: {len(column_mapping)}") # collect red text from the (usually single) data row for row_idx in range(1, len(tbl.rows)): row = tbl.rows[row_idx] print(f" πŸ“Œ Processing data row {row_idx}") for col_idx, cell in enumerate(row.cells): if col_idx not in column_mapping: continue label = column_mapping[col_idx] reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text] if not reds: continue reds = coalesce_numeric_runs(reds) red_txt = normalize_text(" ".join(reds)) if not red_txt: continue print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'") collected[label].append(red_txt) result = {k: v for k, v in collected.items() if v} print(f" βœ… Operator Declaration extracted: {len(result)} columns with data") return result # ─────────────────────────────────────────────────────────────────────────── # A) Vehicle Registration tables (per-row accumulation; NO dedupe) # ─────────────────────────────────────────────────────────────────────────── if "Vehicle Registration" in schema_name: print(f" πŸš— EXTRACTION FIX: Processing Vehicle Registration table") labels = spec["labels"] canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels} collected = {lbl: [] for lbl in labels} # ← keep every row value unmapped_bucket = {} if len(tbl.rows) < 2: print(f" ❌ Vehicle table has less than 2 rows") return {} header_row = tbl.rows[0] column_mapping = {} print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels") for col_idx, cell in enumerate(header_row.cells): raw_h = normalize_text(cell.text) header_text = normalize_header_label(raw_h) if not header_text: continue print(f" Column {col_idx}: '{raw_h}'") # Try alias/canonical first canon = canonicalize_label(header_text) if canon in canonical_labels: best_label = canonical_labels[canon] print(f" βœ… Mapped to: '{best_label}' (alias)") column_mapping[col_idx] = best_label continue # Else bag-of-words similarity best_label, best_score = None, 0.0 for canon_lab, original_lab in canonical_labels.items(): s = bag_similarity(header_text, canon_lab) if s > best_score: best_score, best_label = s, original_lab if best_label and best_score >= 0.40: print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})") column_mapping[col_idx] = best_label else: print(f" ⚠️ No mapping found for '{raw_h}'") unmapped_bucket[raw_h] = [] print(f" πŸ“Š Total column mappings: {len(column_mapping)}") header_texts = [normalize_text(hc.text) for hc in header_row.cells] for row_idx in range(1, len(tbl.rows)): row = tbl.rows[row_idx] print(f" πŸ“Œ Processing data row {row_idx}") for col_idx, cell in enumerate(row.cells): reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text] if not reds: continue reds = coalesce_numeric_runs(reds) red_txt = normalize_text(" ".join(reds)) if not red_txt: continue if col_idx in column_mapping: label = column_mapping[col_idx] print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'") collected[label].append(red_txt) # ← append every occurrence else: header_name = header_texts[col_idx] if col_idx < len(header_texts) else f"(unmapped col {col_idx})" unmapped_bucket.setdefault(header_name, []).append(red_txt) result = {k: v for k, v in collected.items() if v} if unmapped_bucket: result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v}) print(f" βœ… Vehicle Registration extracted: {len(result)} columns with data") return result # ─────────────────────────────────────────────────────────────────────────── # B) Driver / Scheduler Records Examined (per-row accumulation; NO dedupe) # ─────────────────────────────────────────────────────────────────────────── if "Driver / Scheduler" in schema_name: print(f" πŸ‘€ EXTRACTION FIX: Processing Driver / Scheduler table") labels = spec["labels"] canonical_labels = {canonicalize_label(lbl): lbl for lbl in labels} collected = {lbl: [] for lbl in labels} # ← keep every row value unmapped_bucket = {} if len(tbl.rows) < 2: print(f" ❌ Driver/Scheduler table has less than 2 rows") return {} header_row = tbl.rows[0] column_mapping = {} print(f" πŸ“‹ Mapping {len(header_row.cells)} header cells to labels") for col_idx, cell in enumerate(header_row.cells): raw_h = normalize_text(cell.text) header_text = normalize_header_label(raw_h) if not header_text: continue print(f" Column {col_idx}: '{raw_h}'") # Try alias/canonical first (rarely used here, but safe) canon = canonicalize_label(header_text) if canon in canonical_labels: best_label = canonical_labels[canon] print(f" βœ… Mapped to: '{best_label}' (alias)") column_mapping[col_idx] = best_label continue # Else bag-of-words similarity (good for long headings) best_label, best_score = None, 0.0 for canon_lab, original_lab in canonical_labels.items(): s = bag_similarity(header_text, canon_lab) if s > best_score: best_score, best_label = s, original_lab if best_label and best_score >= 0.40: print(f" βœ… Mapped to: '{best_label}' (score: {best_score:.2f})") column_mapping[col_idx] = best_label else: print(f" ⚠️ No mapping found for '{raw_h}'") unmapped_bucket[raw_h] = [] print(f" πŸ“Š Total column mappings: {len(column_mapping)}") header_texts = [normalize_text(hc.text) for hc in header_row.cells] for row_idx in range(1, len(tbl.rows)): row = tbl.rows[row_idx] print(f" πŸ“Œ Processing data row {row_idx}") for col_idx, cell in enumerate(row.cells): reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text] if not reds: continue reds = coalesce_numeric_runs(reds) red_txt = normalize_text(" ".join(reds)) if not red_txt: continue if col_idx in column_mapping: label = column_mapping[col_idx] print(f" πŸ”΄ Found red text in '{label}': '{red_txt}'") collected[label].append(red_txt) # ← append every occurrence else: header_name = header_texts[col_idx] if col_idx < len(header_texts) else f"(unmapped col {col_idx})" unmapped_bucket.setdefault(header_name, []).append(red_txt) result = {k: v for k, v in collected.items() if v} if unmapped_bucket: result.update({f"UNMAPPED::{k}": v for k, v in unmapped_bucket.items() if v}) print(f" βœ… Driver / Scheduler extracted: {len(result)} columns with data") return result # ─────────────────────────────────────────────────────────────────────────── # C) Generic tables (unchanged: WITH dedupe) # ─────────────────────────────────────────────────────────────────────────── labels = spec["labels"] + [schema_name] collected = {lbl: [] for lbl in labels} seen = {lbl: set() for lbl in labels} by_col = (spec.get("orientation") == "row1") start_row = 1 if by_col else 0 rows = tbl.rows[start_row:] for ri, row in enumerate(rows): for ci, cell in enumerate(row.cells): reds = [run.text for p in cell.paragraphs for run in p.runs if is_red_font(run) and run.text] if not reds: continue reds = coalesce_numeric_runs(reds) red_txt = normalize_text(" ".join(reds)) if not red_txt: continue if by_col: if ci < len(spec["labels"]): lbl = spec["labels"][ci] else: lbl = schema_name else: raw_label = normalize_text(row.cells[0].text) lbl = None for spec_label in spec["labels"]: if normalize_text(spec_label).upper() == raw_label.upper(): lbl = spec_label break if not lbl: a_raw = normalize_header_label(raw_label).upper() for spec_label in spec["labels"]: a_spec = normalize_header_label(spec_label).upper() if a_spec in a_raw or a_raw in a_spec: lbl = spec_label break if not lbl: lbl = schema_name if red_txt not in seen[lbl]: seen[lbl].add(red_txt) collected[lbl].append(red_txt) return {k: v for k, v in collected.items() if v} def extract_red_text(input_doc): # input_doc: docx.Document object or file path if isinstance(input_doc, str): doc = Document(input_doc) else: doc = input_doc out = {} table_count = 0 for tbl in doc.tables: table_count += 1 multi_schemas = check_multi_schema_table(tbl) if multi_schemas: multi_data = extract_multi_schema_table(tbl, multi_schemas) for schema_name, schema_data in multi_data.items(): if schema_data: if schema_name in out: for k, v in schema_data.items(): if k in out[schema_name]: out[schema_name][k].extend(v) else: out[schema_name][k] = v else: out[schema_name] = schema_data continue schema = match_table_schema(tbl) if not schema: continue spec = TABLE_SCHEMAS[schema] data = extract_table_data(tbl, schema, spec) if data: if schema in out: for k, v in data.items(): if k in out[schema]: out[schema][k].extend(v) else: out[schema][k] = v else: out[schema] = data # paragraphs (FIX: do not return early; build full 'paras' then attach) paras = {} for idx, para in enumerate(doc.paragraphs): red_txt = "".join(r.text for r in para.runs if is_red_font(r)).strip() if not red_txt: continue context = None for j in range(idx-1, -1, -1): txt = normalize_text(doc.paragraphs[j].text) if txt: all_patterns = HEADING_PATTERNS["main"] + HEADING_PATTERNS["sub"] if any(re.search(p, txt, re.IGNORECASE) for p in all_patterns): context = txt break if not context and re.fullmatch(PARAGRAPH_PATTERNS["date_line"], red_txt): context = "Date" if not context: context = "(para)" paras.setdefault(context, []).append(red_txt) if paras: out["paragraphs"] = paras # Fallback: ensure we capture the last-page Operator Declaration by headers if "Operator Declaration" not in out: op_dec = extract_operator_declaration_by_headers_from_end(doc) if op_dec: out["Operator Declaration"] = op_dec return out def extract_red_text_filelike(input_file, output_file): """ Accepts: input_file: file-like object (BytesIO/File) or path output_file: file-like object (opened for writing text) or path """ if hasattr(input_file, "seek"): input_file.seek(0) doc = Document(input_file) result = extract_red_text(doc) if hasattr(output_file, "write"): json.dump(result, output_file, indent=2, ensure_ascii=False) output_file.flush() else: with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) return result if __name__ == "__main__": # Support both script and app/file-like usage if len(sys.argv) == 3: input_docx = sys.argv[1] output_json = sys.argv[2] doc = Document(input_docx) word_data = extract_red_text(doc) with open(output_json, 'w', encoding='utf-8') as f: json.dump(word_data, f, indent=2, ensure_ascii=False) print(json.dumps(word_data, indent=2, ensure_ascii=False)) else: print("To use as a module: extract_red_text_filelike(input_file, output_file)")