import re import fitz import os import json import pandas as pd from .file_utils import convert_pdf_to_word, delete_temp_folder, extract_tables_from_docx from collections import Counter from openpyxl import load_workbook from difflib import SequenceMatcher def is_similar_header(h1, h2, threshold=0.8): if len(h1) != len(h2): return False ratio = sum(SequenceMatcher(None, a, b).ratio() for a, b in zip(h1, h2)) / len(h1) return ratio > threshold def is_empty_data(rows): return all(all(cell.strip() == "" for cell in row) for row in rows) def parse_table_data(raw_tables): parsed_tables = [] last_table = None pending_header_rows = [] pending_bold_maps = [] for table_dict in raw_tables: table = table_dict["table_data"] bold_map = table_dict["bold_map"] if not table or not bold_map or len(table) != len(bold_map): continue if pending_header_rows: table = pending_header_rows + table bold_map = pending_bold_maps + bold_map pending_header_rows = [] pending_bold_maps = [] title = Counter(table[0]).most_common(1)[0][0] if table[0] else "UNKNOWN" bold_row_indices = [i for i, row in enumerate(bold_map) if any(row)] if not bold_row_indices: if title == "NO": headers = table[0] data_rows = table[1:] bold_indices = list(range(len(headers))) else: if last_table: for row in table: row_dict = { last_table["headers"][i]: row[i] if i < len(row) else "not item" for i in range(len(last_table["headers"])) } not_item_count = sum(1 for v in row_dict.values() if v.strip() == "not item") if not_item_count <= 6: last_table["rows"].append(row_dict) continue else: if len(bold_row_indices) >= 3: header_row_index = bold_row_indices[2] elif len(bold_row_indices) == 2: header_row_index = bold_row_indices[1] else: header_row_index = bold_row_indices[0] header_row = table[header_row_index] bold_row = bold_map[header_row_index] headers = [ cell.strip() if isinstance(cell, str) else f"COL_{i}" for i, (cell, is_bold) in enumerate(zip(header_row, bold_row)) if is_bold ] bold_indices = [i for i, is_bold in enumerate(bold_row) if is_bold] data_rows = table[header_row_index + 1:] if is_empty_data(data_rows): if last_table and is_similar_header(headers, last_table["headers"]): continue else: continue rows = [] for row in data_rows: row_dict = {} for i, header_index in enumerate(bold_indices): header = headers[i] if i < len(headers) else f"COL_{header_index}" value = row[header_index] if header_index < len(row) else "" row_dict[header] = value rows.append(row_dict) parsed = { "title": title, "headers": headers, "rows": rows } parsed_tables.append(parsed) last_table = parsed return parsed_tables def clean_checkbox_newlines(text): pattern = r"([☑☐])\s*\n" cleaned_text = re.sub(pattern, r"\1 ", text) return cleaned_text def parse_promotion_pdf(pdf_path): doc = fitz.open(pdf_path) text = "" for page in doc: text += page.get_text() text= clean_checkbox_newlines(text) pathname = os.path.splitext(os.path.basename(pdf_path))[0] docx_path = pathname + ".docx" with open(pdf_path, 'rb') as f: convert_pdf_to_word(f, os.path.join('/tmp', docx_path)) tables = extract_tables_from_docx(os.path.join('/tmp', docx_path)) tables_result = parse_table_data(tables) del tables_result[0] result = { "header": {}, "products": [], "outlets": [], "mechanisms": [], "budget": {}, } header_patterns = { "file_number": r"NOMOR:\s*(.+)", "product_category": r"PRODUCT CATEGORY\s*:\s*(.+)", "brand": r"BRAND\s*:\s*(.+)", "channel": r"CHANNEL\s*:\s*(.+)", "region" : r"REGION\s*:\s*(.+)", "sub_region": r"SUB REGION\s*:\s*(.+)", "distributor": r"DISTRIBUTOR\s*:\s*(.+)", "promo_type" : r"PROMO TYPE\s*:\s*(.+)", "sub_promo_type" : r"SUB PROMO TYPE\s*:\s*(.+)", "period": r"PERIODE CP:\s*(\d{2}/\d{2}/\d{4})\s*-\s*(\d{2}/\d{2}/\d{4})", "ref_doc" : r"REF DOC\s*:\s*(.+)", "ref_cp_no" : r"REF CP NO\s*:\s*(.+)", "cost_category": r"COST CATEGORY\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CP|$))", "tipe_cp": r"TIPE CP\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CLAIM|$))", "tipe_claim": r"TIPE CLAIM\s*((?:[☑☐][^\n]*\n)+)(?=(?:CLAIM BASED|$))", "claim_based": r"CLAIM BASED\s*((?:[☑☐][^\n]*\n)+)(?=$)" } # result["text_table"] = tables_result for field, pattern in header_patterns.items(): match = re.search(pattern, text) if match: if field == "period": result["header"]["validfrom"] = match.group(1).replace("/", "") result["header"]["validto"] = match.group(2).replace("/", "") elif field in ["cost_category", "tipe_cp", "tipe_claim", "claim_based"]: section_text = match.group(1) text = text+section_text options = {} for opt_match in re.finditer(r"([☑☐])\s*([^\n☑☐]+)", section_text): is_checked = opt_match.group(1) == '☑' option_name = opt_match.group(2).strip() if option_name: options[option_name] = is_checked result["header"][field] = options else: result["header"][field] = match.group(1).strip() product_table_start = next((item["rows"] for item in tables_result if item["title"] == "DISCOUNT PROMOTION"), []) strata_table_start = next((item["rows"] for item in tables_result if item["title"] == "STRATA DISCOUNT TABLE"), []) if product_table_start and strata_table_start: product_lookup = {item["UOM"]: item for item in product_table_start} for feature in strata_table_start: uom = feature['UOM'] product_data = product_lookup.get(uom) if product_data: product = { "sku": feature['SKU'], "uom": uom, "price_list": product_data.get('PRICE LIST SATP'), "discount_percent": feature.get('DISC %'), "rbp_store": product_data.get('RBP STORE'), "share_dist": product_data.get('SHARE DIST %'), "rbp_net": feature.get('RBP NET INC PPN') } result["products"].append(product) result["outlets"] = next((item["rows"] for item in tables_result if item["title"] == "NO"), []) mechanism_match = re.search(r"MECHANISM:\s*(.+?)(?=(✔|$))", text, re.DOTALL) if mechanism_match: mechanisms = [m.strip() for m in mechanism_match.group(1).split("\n") if m.strip()] mechanisms_clean = [re.sub(r'\'\d+\.\s*', '', m) for m in mechanisms] result["mechanisms"] = mechanisms_clean budget_match = re.search(r"TOTAL EST BUDGET PROMO\s*\|\s*([\d.,]+)", text) if budget_match: budget = float(budget_match.group(1).replace(".", "").replace(",", ".")) result["budget"]["total"] = budget delete_temp_folder() return result def parse_promotion_excel(excel_path, filename): wb = load_workbook(excel_path) ws = wb.active start_row = None start_col = None for i, row in enumerate(ws.iter_rows(min_row=1, max_row=20), start=1): # Cek 20 baris pertama for j, cell in enumerate(row, start=1): if cell.value not in [None, ''] and isinstance(cell.value, str): if start_row is None or i < start_row: start_row = i if start_col is None or j < start_col: start_col = j if start_row is not None: break if start_row != 1 or start_col != 1: new_ws = wb.create_sheet(title="Normalized") for i, row in enumerate(ws.iter_rows(min_row=start_row, values_only=True), start=1): for j, val in enumerate(row[start_col - 1:], start=1): new_ws.cell(row=i, column=j, value=val) wb.remove(ws) ws = new_ws wb.save(excel_path) df = pd.read_excel(excel_path, engine='openpyxl', header=0) df.dropna(axis=1, how='all', inplace=True) df.dropna(axis=0, how='all', inplace=True) df.columns = [str(col) if not str(col).startswith('Unnamed') else f'Col_{i}' for i, col in enumerate(df.columns)] df = df.where(pd.notnull(df), None) data = df.to_dict(orient="records") # Buat folder temp jika belum ada os.makedirs('/tmp', exist_ok=True) # Tambah .json jika belum ada if not filename.lower().endswith('.json'): filename += '.json' # Cegah overwrite file filepath = os.path.join('/tmp', filename) base_name, ext = os.path.splitext(filename) copy_num = 1 while os.path.exists(filepath): filepath = os.path.join('/tmp', f"{base_name} ({copy_num}){ext}") copy_num += 1 with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) delete_temp_folder() return data def convert_to_target_json(parsed_data): """Convert parsed data to match the target JSON structure""" target_json = { "m_discountschema_id": 0, "ad_org_id": 0, "c_doctype_id": 1000134, "name": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS", "description": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS", "discounttype": "B", "vendor_id": 1000078, "requirementtype": "MS", "flatdiscounttype": "P", "cumulativelevel": "L", "validfrom": parsed_data['header'].get('validfrom', ''), "validto": parsed_data['header'].get('validto', ''), "selectiontype": "ISC", "budgettype": "NB", "organizationaleffectiveness": "ISO", "qtyallocated": 0, "issotrx": "Y", "ispickup": "N", "fl_isallowmultiplediscount": "N", "isincludingsubordinate": "N", "isbirthdaydiscount": "N", "isactive": "Y", "list_org": [{ "m_discountschema_id": 0, "uns_discount_org_id": 0, "ad_org_id": 0, "ad_orgtrx_id": 1000006, "isactive": "Y" }], "list_customer": [], "list_break": [] } for i, outlet in enumerate(parsed_data['outlets'], start=1): target_json["list_customer"].append({ "m_discountschema_id": 0, "uns_discount_customer_id": 0, "m_discountschemabreak_id": 0, "ad_org_id": 0, "c_bpartner_id": 1000000 + i }) for product in parsed_data['products']: target_json["list_break"].append({ "m_discountschema_id": 0, "m_discountschemabreak_id": 0, "ad_org_id": 0, "seqno": 10, "targetbreak": "EP", "discounttype": "PVD", "breaktype": "M", "calculationtype": "Q", "name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}", "requirementtype": "MS", "productselection": "IOP", "c_uom_id": 1000020, "m_product_id": 1002979, "budgettype": "GB", "budgetcalculation": "QTY", "qtyallocated": 1000, "breakvalue": 0, "breakdiscount": 0, "isincludingsubordinate": "N", "isshareddiscount": "N", "isactive": "Y", "list_line": [{ "m_discountschemabreak_id": 0, "uns_dsbreakline_id": 0, "name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}", "breakvalue": 300, "breakvalueto": 1000, "qtyallocated": 1000, "breakdiscount": product['discount_percent'], "isactive": "Y" }] }) return target_json