import os import torch from transformers import AutoTokenizer, AutoModelForTokenClassification, Trainer, TrainingArguments, DataCollatorForTokenClassification import pandas as pd from torch.utils.data import Dataset import ast import re # Modelos pré‑treinados base (cased) TOKENIZER_MODEL = "neuralmind/bert-base-portuguese-cased" NER_MODEL = "neuralmind/bert-base-portuguese-cased" # Nome do modelo fine‑tuned hospedado no Hugging Face HF_MODEL = "italoxesteres/BERTimbau-Financial-Fine-tuned" # Tenta carregar o modelo fine‑tuned diretamente do Hugging Face try: tokenizer = AutoTokenizer.from_pretrained(HF_MODEL) model = AutoModelForTokenClassification.from_pretrained(HF_MODEL, ignore_mismatched_sizes=True) print("Loaded fine‑tuned model from Hugging Face:", HF_MODEL) except Exception as e: print("Falha ao carregar modelo fine‑tuned do Hugging Face:", e) print("Using base model from", NER_MODEL) tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_MODEL) model = AutoModelForTokenClassification.from_pretrained(NER_MODEL) id2label = model.config.id2label def extract_kpis(text): """ Extrai entidades do texto utilizando o modelo para NER. Filtra tokens especiais (como [CLS] e [SEP]) e retorna um dicionário com a lista de entidades. """ inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) outputs = model(**inputs) logits = outputs.logits predictions = torch.argmax(logits, dim=2).squeeze().tolist() tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze()) # Filtra tokens especiais special_tokens = set(tokenizer.all_special_tokens) entities = [] current_entity = "" current_label = None for token, label_id in zip(tokens, predictions): if token in special_tokens: continue label = id2label.get(label_id, "O") if label.startswith("B-"): if current_entity: entities.append((current_entity, current_label)) current_entity = "" current_label = label[2:] token_clean = token.replace("##", "") current_entity = token_clean elif label.startswith("I-") and current_label == label[2:]: token_clean = token.replace("##", "") current_entity += " " + token_clean else: if current_entity: entities.append((current_entity, current_label)) current_entity = "" current_label = None if current_entity: entities.append((current_entity, current_label)) return {"entities": entities} def extract_kpis_from_tables(tables, kpi_keywords): """ Extrai KPIs de tabelas (DataFrames) usando palavras‑chave. Retorna uma lista de dicionários com "kpi", "relation", "value" e "source" = "table". """ table_kpis = [] for table in tables: if hasattr(table, "to_dict"): df_records = table.to_dict(orient="records") for row in df_records: row_str = " ".join(str(v).lower() for v in row.values() if v is not None) for kw in kpi_keywords: if kw.lower() in row_str: numeric_vals = find_all_numeric_in_row(row) for val in numeric_vals: table_kpis.append({ "kpi": kw, "relation": "", "value": val, "source": "table" }) else: print("Skipping non‑DataFrame table") return table_kpis def find_all_numeric_in_row(row): numeric_vals = [] for key, val in row.items(): if is_numeric(val): numeric_vals.append(str(val)) return numeric_vals def is_numeric(value): if value is None: return False val_str = str(value).replace(".", "").replace(",", ".").replace("%", "") try: float(val_str) return True except ValueError: return False def parse_float(value): if not isinstance(value, str): value = str(value) val_clean = value.replace(".", "").replace(",", ".").replace("%", "") try: return float(val_clean) except ValueError: return None def compare_kpis(text_kpis, table_kpis, tolerance): """ Compara os valores dos KPIs extraídos do texto com os das tabelas, considerando uma tolerância. Retorna uma lista de checagens de consistência, incluindo o campo 'relation' dos KPIs extraídos. """ checks = [] for txt_kpi in text_kpis: for tbl_kpi in table_kpis: if txt_kpi["kpi"].lower() == tbl_kpi["kpi"].lower(): tv = parse_float(txt_kpi.get("value")) kv = parse_float(tbl_kpi.get("value")) if tv is not None and kv is not None: consistent = abs(tv - kv) <= tolerance checks.append({ "kpi": txt_kpi["kpi"], "relation": txt_kpi.get("relation", ""), "text_value": txt_kpi.get("value"), "table_value": tbl_kpi.get("value"), "consistent": consistent }) return checks def extract_and_compare(entities, relations, tables, config, full_text=None): """ Fluxo completo: 1. A partir das entidades extraídas do texto, forma triplas (KPI, Relação, Valor). 2. Aplica fallback via regex no full_text para cada KPI esperado. 3. Extrai KPIs das tabelas. 4. Compara os KPIs extraídos do texto com os das tabelas. Retorna (all_kpis, consistency_checks). """ kpi_keywords = config.get("kpi_keywords", []) tolerance = config["consistency_check"]["tolerance"] model_kpi_triplets = [] # Procura por entidades com rótulo "KPI", "Relação" e "Valor" kpi_list = [text for text, lab in entities if lab.strip().lower() == "kpi" and any(kw in text.lower() for kw in kpi_keywords)] rel_list = [text for text, lab in entities if lab.strip().lower() == "relação"] valor_list = [text for text, lab in entities if lab.strip().lower() == "valor"] if kpi_list and rel_list and valor_list: n = min(len(kpi_list), len(rel_list), len(valor_list)) for i in range(n): model_kpi_triplets.append({ "kpi": kpi_list[i].strip(), "relation": rel_list[i].strip(), "value": valor_list[i].strip(), "source": "text" }) fallback_triplets = [] if full_text: for kw in kpi_keywords: pattern = re.compile( r"(" + re.escape(kw) + r")(?:\s+([a-zA-ZçÇáéíóúÁÉÍÓÚãõÃÕ]+\s?){0,3})\s+([\d.,]+%?)", re.IGNORECASE ) matches = pattern.findall(full_text) for m in matches: fallback_triplets.append({ "kpi": kw, "relation": m[1].strip(), "value": m[2].strip(), "source": "text" }) text_triplets = model_kpi_triplets if model_kpi_triplets else fallback_triplets table_kpis = extract_kpis_from_tables(tables, kpi_keywords) consistency_checks = compare_kpis(text_triplets, table_kpis, tolerance) all_kpis = text_triplets + table_kpis return all_kpis, consistency_checks # ------------------------------------------------------------------- # Treinamento usando anotações manuais (formato Doccano) # O CSV deve ter as colunas: sentence_id, sentence_text, annotations # Onde "annotations" é uma string representando uma lista de spans, ex: # [[2, 17, "Relação"], [21, 25, "Valor"], [26, 40, "KPI"]] # ------------------------------------------------------------------- def load_doccano_annotations(csv_file): df = pd.read_csv(csv_file) data = [] for _, row in df.iterrows(): text = row["sentence_text"].strip() try: annotations = ast.literal_eval(row["annotations"]) except Exception as e: print(f"Erro na conversão das anotações para a sentença {row['sentence_id']}: {e}") annotations = [] data.append({ "text": text, "annotations": annotations, "sentence_id": row["sentence_id"].strip() }) return data def convert_annotations_to_iob(text, annotations): encoding = tokenizer(text, return_offsets_mapping=True, truncation=True, max_length=512) offsets = encoding.pop("offset_mapping") tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"]) labels = ["O"] * len(tokens) for ann in annotations: start_ann, end_ann, label_ann = ann first = True for i, (token, (token_start, token_end)) in enumerate(zip(tokens, offsets)): if token_start is None or token_end is None: continue if token_end <= start_ann or token_start >= end_ann: continue if first: labels[i] = f"B-{label_ann}" first = False else: labels[i] = f"I-{label_ann}" return tokens, labels class KPIDataset(Dataset): def __init__(self, data, tokenizer, label2id, max_length=128): self.data = data self.tokenizer = tokenizer self.label2id = label2id self.max_length = max_length def __len__(self): return len(self.data) def __getitem__(self, idx): item = self.data[idx] text = item["text"] annotations = item["annotations"] tokens, labels_str = convert_annotations_to_iob(text, annotations) encoding = self.tokenizer(text, truncation=True, max_length=self.max_length, return_offsets_mapping=True) word_ids = encoding.word_ids() labels = [] token_index = 0 previous_word_idx = None for word_idx in word_ids: if word_idx is None: labels.append(-100) else: if word_idx != previous_word_idx: if token_index < len(labels_str): labels.append(self.label2id.get(labels_str[token_index], 0)) else: labels.append(0) token_index += 1 else: if token_index - 1 < len(labels_str): lab = labels_str[token_index - 1] if lab.startswith("B-"): lab = "I-" + lab[2:] labels.append(self.label2id.get(lab, 0)) else: labels.append(0) previous_word_idx = word_idx encoding.pop("offset_mapping") encoding["labels"] = labels return {key: torch.tensor(val) for key, val in encoding.items()} def train_kpi_model(annotations_file, output_dir, max_length=128, num_epochs=3, batch_size=8): data = load_doccano_annotations(annotations_file) label_list = ["O", "B-Relação", "I-Relação", "B-Valor", "I-Valor", "B-KPI", "I-KPI"] label2id = {label: i for i, label in enumerate(label_list)} model.config.id2label = {i: label for label, i in label2id.items()} model.config.label2id = label2id dataset = KPIDataset(data, tokenizer, label2id, max_length) training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=num_epochs, per_device_train_batch_size=batch_size, logging_steps=10, save_steps=50, evaluation_strategy="no", weight_decay=0.01, ) data_collator = DataCollatorForTokenClassification(tokenizer) trainer = Trainer( model=model, args=training_args, train_dataset=dataset, data_collator=data_collator, tokenizer=tokenizer, ) print("Iniciando o treinamento com as anotações manuais...") trainer.train() print("Treinamento concluído. Salvando o modelo...") trainer.save_model(output_dir) print(f"Modelo salvo em {output_dir}") if __name__ == "__main__": # Exemplo simples de uso sample_text = "A receita operacional líquida da empresa cresceu 10% no último trimestre." result = extract_kpis(sample_text) print("KPIs extraídos:", result)