blaxx14's picture
merging
fa6a5e5
import re
import fitz
import os
import json
import pandas as pd
from .file_utils import convert_pdf_to_word, delete_temp_folder, extract_tables_from_docx
from collections import Counter
from openpyxl import load_workbook
from difflib import SequenceMatcher
def is_similar_header(h1, h2, threshold=0.8):
if len(h1) != len(h2):
return False
ratio = sum(SequenceMatcher(None, a, b).ratio() for a, b in zip(h1, h2)) / len(h1)
return ratio > threshold
def is_empty_data(rows):
return all(all(cell.strip() == "" for cell in row) for row in rows)
def parse_table_data(raw_tables):
parsed_tables = []
last_table = None
pending_header_rows = []
pending_bold_maps = []
for table_dict in raw_tables:
table = table_dict["table_data"]
bold_map = table_dict["bold_map"]
if not table or not bold_map or len(table) != len(bold_map):
continue
if pending_header_rows:
table = pending_header_rows + table
bold_map = pending_bold_maps + bold_map
pending_header_rows = []
pending_bold_maps = []
title = Counter(table[0]).most_common(1)[0][0] if table[0] else "UNKNOWN"
bold_row_indices = [i for i, row in enumerate(bold_map) if any(row)]
if not bold_row_indices:
if title == "NO":
headers = table[0]
data_rows = table[1:]
bold_indices = list(range(len(headers)))
else:
if last_table:
for row in table:
row_dict = {
last_table["headers"][i]: row[i] if i < len(row) else "not item"
for i in range(len(last_table["headers"]))
}
not_item_count = sum(1 for v in row_dict.values() if v.strip() == "not item")
if not_item_count <= 6:
last_table["rows"].append(row_dict)
continue
else:
if len(bold_row_indices) >= 3:
header_row_index = bold_row_indices[2]
elif len(bold_row_indices) == 2:
header_row_index = bold_row_indices[1]
else:
header_row_index = bold_row_indices[0]
header_row = table[header_row_index]
bold_row = bold_map[header_row_index]
headers = [
cell.strip() if isinstance(cell, str) else f"COL_{i}"
for i, (cell, is_bold) in enumerate(zip(header_row, bold_row)) if is_bold
]
bold_indices = [i for i, is_bold in enumerate(bold_row) if is_bold]
data_rows = table[header_row_index + 1:]
if is_empty_data(data_rows):
if last_table and is_similar_header(headers, last_table["headers"]):
continue
else:
continue
rows = []
for row in data_rows:
row_dict = {}
for i, header_index in enumerate(bold_indices):
header = headers[i] if i < len(headers) else f"COL_{header_index}"
value = row[header_index] if header_index < len(row) else ""
row_dict[header] = value
rows.append(row_dict)
parsed = {
"title": title,
"headers": headers,
"rows": rows
}
parsed_tables.append(parsed)
last_table = parsed
return parsed_tables
def clean_checkbox_newlines(text):
pattern = r"([☑☐])\s*\n"
cleaned_text = re.sub(pattern, r"\1 ", text)
return cleaned_text
def parse_promotion_pdf(pdf_path):
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
text= clean_checkbox_newlines(text)
pathname = os.path.splitext(os.path.basename(pdf_path))[0]
docx_path = pathname + ".docx"
with open(pdf_path, 'rb') as f:
convert_pdf_to_word(f, os.path.join('/tmp', docx_path))
tables = extract_tables_from_docx(os.path.join('/tmp', docx_path))
tables_result = parse_table_data(tables)
del tables_result[0]
result = {
"header": {},
"products": [],
"outlets": [],
"mechanisms": [],
"budget": {},
}
header_patterns = {
"file_number": r"NOMOR:\s*(.+)",
"product_category": r"PRODUCT CATEGORY\s*:\s*(.+)",
"brand": r"BRAND\s*:\s*(.+)",
"channel": r"CHANNEL\s*:\s*(.+)",
"region" : r"REGION\s*:\s*(.+)",
"sub_region": r"SUB REGION\s*:\s*(.+)",
"distributor": r"DISTRIBUTOR\s*:\s*(.+)",
"promo_type" : r"PROMO TYPE\s*:\s*(.+)",
"sub_promo_type" : r"SUB PROMO TYPE\s*:\s*(.+)",
"period": r"PERIODE CP:\s*(\d{2}/\d{2}/\d{4})\s*-\s*(\d{2}/\d{2}/\d{4})",
"ref_doc" : r"REF DOC\s*:\s*(.+)",
"ref_cp_no" : r"REF CP NO\s*:\s*(.+)",
"cost_category": r"COST CATEGORY\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CP|$))",
"tipe_cp": r"TIPE CP\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CLAIM|$))",
"tipe_claim": r"TIPE CLAIM\s*((?:[☑☐][^\n]*\n)+)(?=(?:CLAIM BASED|$))",
"claim_based": r"CLAIM BASED\s*((?:[☑☐][^\n]*\n)+)(?=$)"
}
# result["text_table"] = tables_result
for field, pattern in header_patterns.items():
match = re.search(pattern, text)
if match:
if field == "period":
result["header"]["validfrom"] = match.group(1).replace("/", "")
result["header"]["validto"] = match.group(2).replace("/", "")
elif field in ["cost_category", "tipe_cp", "tipe_claim", "claim_based"]:
section_text = match.group(1)
text = text+section_text
options = {}
for opt_match in re.finditer(r"([☑☐])\s*([^\n☑☐]+)", section_text):
is_checked = opt_match.group(1) == '☑'
option_name = opt_match.group(2).strip()
if option_name:
options[option_name] = is_checked
result["header"][field] = options
else:
result["header"][field] = match.group(1).strip()
product_table_start = next((item["rows"] for item in tables_result if item["title"] == "DISCOUNT PROMOTION"), [])
strata_table_start = next((item["rows"] for item in tables_result if item["title"] == "STRATA DISCOUNT TABLE"), [])
if product_table_start and strata_table_start:
product_lookup = {item["UOM"]: item for item in product_table_start}
for feature in strata_table_start:
uom = feature['UOM']
product_data = product_lookup.get(uom)
if product_data:
product = {
"sku": feature['SKU'],
"uom": uom,
"price_list": product_data.get('PRICE LIST SATP'),
"discount_percent": feature.get('DISC %'),
"rbp_store": product_data.get('RBP STORE'),
"share_dist": product_data.get('SHARE DIST %'),
"rbp_net": feature.get('RBP NET INC PPN')
}
result["products"].append(product)
result["outlets"] = next((item["rows"] for item in tables_result if item["title"] == "NO"), [])
mechanism_match = re.search(r"MECHANISM:\s*(.+?)(?=(✔|$))", text, re.DOTALL)
if mechanism_match:
mechanisms = [m.strip() for m in mechanism_match.group(1).split("\n") if m.strip()]
mechanisms_clean = [re.sub(r'\'\d+\.\s*', '', m) for m in mechanisms]
result["mechanisms"] = mechanisms_clean
budget_match = re.search(r"TOTAL EST BUDGET PROMO\s*\|\s*([\d.,]+)", text)
if budget_match:
budget = float(budget_match.group(1).replace(".", "").replace(",", "."))
result["budget"]["total"] = budget
delete_temp_folder()
return result
def parse_promotion_excel(excel_path, filename):
wb = load_workbook(excel_path)
ws = wb.active
start_row = None
start_col = None
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=20), start=1): # Cek 20 baris pertama
for j, cell in enumerate(row, start=1):
if cell.value not in [None, ''] and isinstance(cell.value, str):
if start_row is None or i < start_row:
start_row = i
if start_col is None or j < start_col:
start_col = j
if start_row is not None:
break
if start_row != 1 or start_col != 1:
new_ws = wb.create_sheet(title="Normalized")
for i, row in enumerate(ws.iter_rows(min_row=start_row, values_only=True), start=1):
for j, val in enumerate(row[start_col - 1:], start=1):
new_ws.cell(row=i, column=j, value=val)
wb.remove(ws)
ws = new_ws
wb.save(excel_path)
df = pd.read_excel(excel_path, engine='openpyxl', header=0)
df.dropna(axis=1, how='all', inplace=True)
df.dropna(axis=0, how='all', inplace=True)
df.columns = [str(col) if not str(col).startswith('Unnamed') else f'Col_{i}' for i, col in enumerate(df.columns)]
df = df.where(pd.notnull(df), None)
data = df.to_dict(orient="records")
# Buat folder temp jika belum ada
os.makedirs('/tmp', exist_ok=True)
# Tambah .json jika belum ada
if not filename.lower().endswith('.json'):
filename += '.json'
# Cegah overwrite file
filepath = os.path.join('/tmp', filename)
base_name, ext = os.path.splitext(filename)
copy_num = 1
while os.path.exists(filepath):
filepath = os.path.join('/tmp', f"{base_name} ({copy_num}){ext}")
copy_num += 1
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
delete_temp_folder()
return data
def convert_to_target_json(parsed_data):
"""Convert parsed data to match the target JSON structure"""
target_json = {
"m_discountschema_id": 0,
"ad_org_id": 0,
"c_doctype_id": 1000134,
"name": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS",
"description": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS",
"discounttype": "B",
"vendor_id": 1000078,
"requirementtype": "MS",
"flatdiscounttype": "P",
"cumulativelevel": "L",
"validfrom": parsed_data['header'].get('validfrom', ''),
"validto": parsed_data['header'].get('validto', ''),
"selectiontype": "ISC",
"budgettype": "NB",
"organizationaleffectiveness": "ISO",
"qtyallocated": 0,
"issotrx": "Y",
"ispickup": "N",
"fl_isallowmultiplediscount": "N",
"isincludingsubordinate": "N",
"isbirthdaydiscount": "N",
"isactive": "Y",
"list_org": [{
"m_discountschema_id": 0,
"uns_discount_org_id": 0,
"ad_org_id": 0,
"ad_orgtrx_id": 1000006,
"isactive": "Y"
}],
"list_customer": [],
"list_break": []
}
for i, outlet in enumerate(parsed_data['outlets'], start=1):
target_json["list_customer"].append({
"m_discountschema_id": 0,
"uns_discount_customer_id": 0,
"m_discountschemabreak_id": 0,
"ad_org_id": 0,
"c_bpartner_id": 1000000 + i
})
for product in parsed_data['products']:
target_json["list_break"].append({
"m_discountschema_id": 0,
"m_discountschemabreak_id": 0,
"ad_org_id": 0,
"seqno": 10,
"targetbreak": "EP",
"discounttype": "PVD",
"breaktype": "M",
"calculationtype": "Q",
"name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}",
"requirementtype": "MS",
"productselection": "IOP",
"c_uom_id": 1000020,
"m_product_id": 1002979,
"budgettype": "GB",
"budgetcalculation": "QTY",
"qtyallocated": 1000,
"breakvalue": 0,
"breakdiscount": 0,
"isincludingsubordinate": "N",
"isshareddiscount": "N",
"isactive": "Y",
"list_line": [{
"m_discountschemabreak_id": 0,
"uns_dsbreakline_id": 0,
"name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}",
"breakvalue": 300,
"breakvalueto": 1000,
"qtyallocated": 1000,
"breakdiscount": product['discount_percent'],
"isactive": "Y"
}]
})
return target_json