Spaces:
Sleeping
Sleeping
import re | |
import fitz | |
import os | |
import json | |
import pandas as pd | |
from .file_utils import convert_pdf_to_word, delete_temp_folder, extract_tables_from_docx | |
from collections import Counter | |
from openpyxl import load_workbook | |
from difflib import SequenceMatcher | |
def is_similar_header(h1, h2, threshold=0.8): | |
if len(h1) != len(h2): | |
return False | |
ratio = sum(SequenceMatcher(None, a, b).ratio() for a, b in zip(h1, h2)) / len(h1) | |
return ratio > threshold | |
def is_empty_data(rows): | |
return all(all(cell.strip() == "" for cell in row) for row in rows) | |
def parse_table_data(raw_tables): | |
parsed_tables = [] | |
last_table = None | |
pending_header_rows = [] | |
pending_bold_maps = [] | |
for table_dict in raw_tables: | |
table = table_dict["table_data"] | |
bold_map = table_dict["bold_map"] | |
if not table or not bold_map or len(table) != len(bold_map): | |
continue | |
if pending_header_rows: | |
table = pending_header_rows + table | |
bold_map = pending_bold_maps + bold_map | |
pending_header_rows = [] | |
pending_bold_maps = [] | |
title = Counter(table[0]).most_common(1)[0][0] if table[0] else "UNKNOWN" | |
bold_row_indices = [i for i, row in enumerate(bold_map) if any(row)] | |
if not bold_row_indices: | |
if title == "NO": | |
headers = table[0] | |
data_rows = table[1:] | |
bold_indices = list(range(len(headers))) | |
else: | |
if last_table: | |
for row in table: | |
row_dict = { | |
last_table["headers"][i]: row[i] if i < len(row) else "not item" | |
for i in range(len(last_table["headers"])) | |
} | |
not_item_count = sum(1 for v in row_dict.values() if v.strip() == "not item") | |
if not_item_count <= 6: | |
last_table["rows"].append(row_dict) | |
continue | |
else: | |
if len(bold_row_indices) >= 3: | |
header_row_index = bold_row_indices[2] | |
elif len(bold_row_indices) == 2: | |
header_row_index = bold_row_indices[1] | |
else: | |
header_row_index = bold_row_indices[0] | |
header_row = table[header_row_index] | |
bold_row = bold_map[header_row_index] | |
headers = [ | |
cell.strip() if isinstance(cell, str) else f"COL_{i}" | |
for i, (cell, is_bold) in enumerate(zip(header_row, bold_row)) if is_bold | |
] | |
bold_indices = [i for i, is_bold in enumerate(bold_row) if is_bold] | |
data_rows = table[header_row_index + 1:] | |
if is_empty_data(data_rows): | |
if last_table and is_similar_header(headers, last_table["headers"]): | |
continue | |
else: | |
continue | |
rows = [] | |
for row in data_rows: | |
row_dict = {} | |
for i, header_index in enumerate(bold_indices): | |
header = headers[i] if i < len(headers) else f"COL_{header_index}" | |
value = row[header_index] if header_index < len(row) else "" | |
row_dict[header] = value | |
rows.append(row_dict) | |
parsed = { | |
"title": title, | |
"headers": headers, | |
"rows": rows | |
} | |
parsed_tables.append(parsed) | |
last_table = parsed | |
return parsed_tables | |
def clean_checkbox_newlines(text): | |
pattern = r"([☑☐])\s*\n" | |
cleaned_text = re.sub(pattern, r"\1 ", text) | |
return cleaned_text | |
def parse_promotion_pdf(pdf_path): | |
doc = fitz.open(pdf_path) | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
text= clean_checkbox_newlines(text) | |
pathname = os.path.splitext(os.path.basename(pdf_path))[0] | |
docx_path = pathname + ".docx" | |
with open(pdf_path, 'rb') as f: | |
convert_pdf_to_word(f, os.path.join('/tmp', docx_path)) | |
tables = extract_tables_from_docx(os.path.join('/tmp', docx_path)) | |
tables_result = parse_table_data(tables) | |
del tables_result[0] | |
result = { | |
"header": {}, | |
"products": [], | |
"outlets": [], | |
"mechanisms": [], | |
"budget": {}, | |
} | |
header_patterns = { | |
"file_number": r"NOMOR:\s*(.+)", | |
"product_category": r"PRODUCT CATEGORY\s*:\s*(.+)", | |
"brand": r"BRAND\s*:\s*(.+)", | |
"channel": r"CHANNEL\s*:\s*(.+)", | |
"region" : r"REGION\s*:\s*(.+)", | |
"sub_region": r"SUB REGION\s*:\s*(.+)", | |
"distributor": r"DISTRIBUTOR\s*:\s*(.+)", | |
"promo_type" : r"PROMO TYPE\s*:\s*(.+)", | |
"sub_promo_type" : r"SUB PROMO TYPE\s*:\s*(.+)", | |
"period": r"PERIODE CP:\s*(\d{2}/\d{2}/\d{4})\s*-\s*(\d{2}/\d{2}/\d{4})", | |
"ref_doc" : r"REF DOC\s*:\s*(.+)", | |
"ref_cp_no" : r"REF CP NO\s*:\s*(.+)", | |
"cost_category": r"COST CATEGORY\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CP|$))", | |
"tipe_cp": r"TIPE CP\s*((?:[☑☐][^\n]*\n)+)(?=(?:TIPE CLAIM|$))", | |
"tipe_claim": r"TIPE CLAIM\s*((?:[☑☐][^\n]*\n)+)(?=(?:CLAIM BASED|$))", | |
"claim_based": r"CLAIM BASED\s*((?:[☑☐][^\n]*\n)+)(?=$)" | |
} | |
# result["text_table"] = tables_result | |
for field, pattern in header_patterns.items(): | |
match = re.search(pattern, text) | |
if match: | |
if field == "period": | |
result["header"]["validfrom"] = match.group(1).replace("/", "") | |
result["header"]["validto"] = match.group(2).replace("/", "") | |
elif field in ["cost_category", "tipe_cp", "tipe_claim", "claim_based"]: | |
section_text = match.group(1) | |
text = text+section_text | |
options = {} | |
for opt_match in re.finditer(r"([☑☐])\s*([^\n☑☐]+)", section_text): | |
is_checked = opt_match.group(1) == '☑' | |
option_name = opt_match.group(2).strip() | |
if option_name: | |
options[option_name] = is_checked | |
result["header"][field] = options | |
else: | |
result["header"][field] = match.group(1).strip() | |
product_table_start = next((item["rows"] for item in tables_result if item["title"] == "DISCOUNT PROMOTION"), []) | |
strata_table_start = next((item["rows"] for item in tables_result if item["title"] == "STRATA DISCOUNT TABLE"), []) | |
if product_table_start and strata_table_start: | |
product_lookup = {item["UOM"]: item for item in product_table_start} | |
for feature in strata_table_start: | |
uom = feature['UOM'] | |
product_data = product_lookup.get(uom) | |
if product_data: | |
product = { | |
"sku": feature['SKU'], | |
"uom": uom, | |
"price_list": product_data.get('PRICE LIST SATP'), | |
"discount_percent": feature.get('DISC %'), | |
"rbp_store": product_data.get('RBP STORE'), | |
"share_dist": product_data.get('SHARE DIST %'), | |
"rbp_net": feature.get('RBP NET INC PPN') | |
} | |
result["products"].append(product) | |
result["outlets"] = next((item["rows"] for item in tables_result if item["title"] == "NO"), []) | |
mechanism_match = re.search(r"MECHANISM:\s*(.+?)(?=(✔|$))", text, re.DOTALL) | |
if mechanism_match: | |
mechanisms = [m.strip() for m in mechanism_match.group(1).split("\n") if m.strip()] | |
mechanisms_clean = [re.sub(r'\'\d+\.\s*', '', m) for m in mechanisms] | |
result["mechanisms"] = mechanisms_clean | |
budget_match = re.search(r"TOTAL EST BUDGET PROMO\s*\|\s*([\d.,]+)", text) | |
if budget_match: | |
budget = float(budget_match.group(1).replace(".", "").replace(",", ".")) | |
result["budget"]["total"] = budget | |
delete_temp_folder() | |
return result | |
def parse_promotion_excel(excel_path, filename): | |
wb = load_workbook(excel_path) | |
ws = wb.active | |
start_row = None | |
start_col = None | |
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=20), start=1): # Cek 20 baris pertama | |
for j, cell in enumerate(row, start=1): | |
if cell.value not in [None, ''] and isinstance(cell.value, str): | |
if start_row is None or i < start_row: | |
start_row = i | |
if start_col is None or j < start_col: | |
start_col = j | |
if start_row is not None: | |
break | |
if start_row != 1 or start_col != 1: | |
new_ws = wb.create_sheet(title="Normalized") | |
for i, row in enumerate(ws.iter_rows(min_row=start_row, values_only=True), start=1): | |
for j, val in enumerate(row[start_col - 1:], start=1): | |
new_ws.cell(row=i, column=j, value=val) | |
wb.remove(ws) | |
ws = new_ws | |
wb.save(excel_path) | |
df = pd.read_excel(excel_path, engine='openpyxl', header=0) | |
df.dropna(axis=1, how='all', inplace=True) | |
df.dropna(axis=0, how='all', inplace=True) | |
df.columns = [str(col) if not str(col).startswith('Unnamed') else f'Col_{i}' for i, col in enumerate(df.columns)] | |
df = df.where(pd.notnull(df), None) | |
data = df.to_dict(orient="records") | |
# Buat folder temp jika belum ada | |
os.makedirs('/tmp', exist_ok=True) | |
# Tambah .json jika belum ada | |
if not filename.lower().endswith('.json'): | |
filename += '.json' | |
# Cegah overwrite file | |
filepath = os.path.join('/tmp', filename) | |
base_name, ext = os.path.splitext(filename) | |
copy_num = 1 | |
while os.path.exists(filepath): | |
filepath = os.path.join('/tmp', f"{base_name} ({copy_num}){ext}") | |
copy_num += 1 | |
with open(filepath, "w", encoding="utf-8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
delete_temp_folder() | |
return data | |
def convert_to_target_json(parsed_data): | |
"""Convert parsed data to match the target JSON structure""" | |
target_json = { | |
"m_discountschema_id": 0, | |
"ad_org_id": 0, | |
"c_doctype_id": 1000134, | |
"name": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS", | |
"description": f"{parsed_data['header'].get('brand', '')} PST DEAL KHUSUS", | |
"discounttype": "B", | |
"vendor_id": 1000078, | |
"requirementtype": "MS", | |
"flatdiscounttype": "P", | |
"cumulativelevel": "L", | |
"validfrom": parsed_data['header'].get('validfrom', ''), | |
"validto": parsed_data['header'].get('validto', ''), | |
"selectiontype": "ISC", | |
"budgettype": "NB", | |
"organizationaleffectiveness": "ISO", | |
"qtyallocated": 0, | |
"issotrx": "Y", | |
"ispickup": "N", | |
"fl_isallowmultiplediscount": "N", | |
"isincludingsubordinate": "N", | |
"isbirthdaydiscount": "N", | |
"isactive": "Y", | |
"list_org": [{ | |
"m_discountschema_id": 0, | |
"uns_discount_org_id": 0, | |
"ad_org_id": 0, | |
"ad_orgtrx_id": 1000006, | |
"isactive": "Y" | |
}], | |
"list_customer": [], | |
"list_break": [] | |
} | |
for i, outlet in enumerate(parsed_data['outlets'], start=1): | |
target_json["list_customer"].append({ | |
"m_discountschema_id": 0, | |
"uns_discount_customer_id": 0, | |
"m_discountschemabreak_id": 0, | |
"ad_org_id": 0, | |
"c_bpartner_id": 1000000 + i | |
}) | |
for product in parsed_data['products']: | |
target_json["list_break"].append({ | |
"m_discountschema_id": 0, | |
"m_discountschemabreak_id": 0, | |
"ad_org_id": 0, | |
"seqno": 10, | |
"targetbreak": "EP", | |
"discounttype": "PVD", | |
"breaktype": "M", | |
"calculationtype": "Q", | |
"name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}", | |
"requirementtype": "MS", | |
"productselection": "IOP", | |
"c_uom_id": 1000020, | |
"m_product_id": 1002979, | |
"budgettype": "GB", | |
"budgetcalculation": "QTY", | |
"qtyallocated": 1000, | |
"breakvalue": 0, | |
"breakdiscount": 0, | |
"isincludingsubordinate": "N", | |
"isshareddiscount": "N", | |
"isactive": "Y", | |
"list_line": [{ | |
"m_discountschemabreak_id": 0, | |
"uns_dsbreakline_id": 0, | |
"name": f"{parsed_data['header'].get('promo_number', '')} {product['sku']}", | |
"breakvalue": 300, | |
"breakvalueto": 1000, | |
"qtyallocated": 1000, | |
"breakdiscount": product['discount_percent'], | |
"isactive": "Y" | |
}] | |
}) | |
return target_json |