import gradio as gr import torch from transformers import AutoTokenizer, AutoModelForMultipleChoice, AutoModelForQuestionAnswering import json import collections # 如果您的 postprocess_qa_predictions 需要 # 假設 utils_qa.py 在同一目錄下 (或者您需要將其函數複製過來或確保可導入) # from utils_qa import postprocess_qa_predictions # 您可能需要完整路徑或將其放入 requirements.txt # --- 模型和分詞器加載 --- # 建議從 Hugging Face Hub 加載您已經上傳的模型 # 這樣您的 Space 就不需要包含模型文件本身,保持輕量 TOKENIZER_PATH = "bert-base-chinese" # 或者您上傳的分詞器路徑 SELECTOR_MODEL_PATH = "TheWeeeed/chinese-paragraph-selector" # 替換為您上傳的段落選擇模型 ID QA_MODEL_PATH = "TheWeeeed/chinese-extractive-qa" # 替換為您上傳的答案抽取模型 ID try: tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_PATH) selector_model = AutoModelForMultipleChoice.from_pretrained(SELECTOR_MODEL_PATH) qa_model = AutoModelForQuestionAnswering.from_pretrained(QA_MODEL_PATH) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") selector_model.to(device) selector_model.eval() qa_model.to(device) qa_model.eval() models_loaded_successfully = True print(f"模型和分詞器加載成功,使用設備: {device}") except Exception as e: models_loaded_successfully = False error_message = f"加載模型或分詞器時出錯: {e}" print(error_message) # 在 Gradio 界面中,我們可以顯示這個錯誤信息 # --- 從您的 inference_pipeline.py 中提取並調整以下函數 --- def select_relevant_paragraph_gradio(question_text, candidate_paragraph_texts_str, model, tokenizer, device, max_seq_len): # candidate_paragraph_texts_str 是一個由換行符分隔的字符串 candidate_paragraph_texts = [p.strip() for p in candidate_paragraph_texts_str.split('\n') if p.strip()] if not candidate_paragraph_texts: return "請至少提供一個候選段落。", -1 model.eval() inputs_mc = [] for p_text in candidate_paragraph_texts: inputs_mc.append( tokenizer( question_text, p_text, add_special_tokens=True, max_length=max_seq_len, padding="max_length", truncation=True, return_tensors="pt" ) ) input_ids = torch.stack([inp["input_ids"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) attention_mask = torch.stack([inp["attention_mask"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) token_type_ids = None if "token_type_ids" in inputs_mc[0]: token_type_ids = torch.stack([inp["token_type_ids"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) with torch.no_grad(): if token_type_ids is not None: outputs = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) else: outputs = model(input_ids=input_ids, attention_mask=attention_mask) predicted_index = torch.argmax(outputs.logits, dim=1).item() if predicted_index < len(candidate_paragraph_texts): return candidate_paragraph_texts[predicted_index], predicted_index else: return "段落選擇索引錯誤。", -1 def prepare_features_for_qa_inference_gradio(question_id, question_text, selected_context, tokenizer, max_seq_len, doc_stride): # 這個函數需要從您的 inference_pipeline.py 中提取並適當修改 # 它需要返回一個可以被 QA 模型使用的 Dataset 或 features 列表 # 簡化版: from datasets import Dataset # 需要在 requirements.txt 中 qa_example_for_processing = {"id": [question_id], "question": [question_text], "context": [selected_context]} temp_dataset = Dataset.from_dict(qa_example_for_processing) pad_on_right = tokenizer.padding_side == "right" qa_features = temp_dataset.map( lambda examples: prepare_features_for_qa_inference( # 這是您 inference_pipeline.py 中的函數 examples, tokenizer, pad_on_right, max_seq_len, doc_stride ), batched=True, remove_columns=temp_dataset.column_names ) return qa_features # 返回 Dataset 對象 # 您 inference_pipeline.py 中的 prepare_features_for_qa_inference 函數需要被複製到這裡 # 或者確保它可以被導入 def prepare_features_for_qa_inference(examples, tokenizer, pad_on_right, max_seq_len, doc_stride): examples["question"] = [q.lstrip() if isinstance(q, str) else "" for q in examples["question"]] questions = examples["question" if pad_on_right else "context"] contexts = examples["context" if pad_on_right else "question"] # Ensure questions and contexts are lists of strings, handle None by converting to empty string questions = [q if isinstance(q, str) else "" for q in questions] contexts = [c if isinstance(c, str) else "" for c in contexts] tokenized_output = tokenizer( questions, contexts, truncation="only_second" if pad_on_right else "only_first", max_length=max_seq_len, stride=doc_stride, return_overflowing_tokens=True, return_offsets_mapping=True, padding="max_length", # This ensures all primary outputs are lists of numbers of fixed length ) # The tokenizer with padding="max_length" should already produce lists of integers # for input_ids, attention_mask, token_type_ids. # The main risk of 'None' would be if the input strings were so problematic # that the tokenizer failed internally in a way not producing standard padded output. # However, standard tokenizers are quite robust with empty strings when padding is enabled. # Let's directly create the structure we need for the output Dataset. # `tokenized_output` is a BatchEncoding (dict-like). # If `return_overflowing_tokens=True` and N features are generated from one example, # then `tokenized_output['input_ids']` is a list of N lists. processed_features = [] num_generated_features = len(tokenized_output["input_ids"]) # Number of features due to overflow # `sample_mapping` maps each generated feature back to its original example index in the input `examples` sample_mapping = tokenized_output.pop("overflow_to_sample_mapping", list(range(len(examples["id"])))) for i in range(num_generated_features): feature = {} original_example_index = sample_mapping[i] # Index of the original example this feature came from # These should always be lists of integers due to padding="max_length" feature["input_ids"] = tokenized_output["input_ids"][i] if "attention_mask" in tokenized_output: feature["attention_mask"] = tokenized_output["attention_mask"][i] if "token_type_ids" in tokenized_output: feature["token_type_ids"] = tokenized_output["token_type_ids"][i] # These might not be strictly needed by the model's forward pass but are used by postprocessing feature["example_id"] = examples["id"][original_example_index] current_offset_mapping = tokenized_output["offset_mapping"][i] sequence_ids = tokenized_output.sequence_ids(i) # Pass the index of the feature context_idx_in_pair = 1 if pad_on_right else 0 feature["offset_mapping"] = [ offset if sequence_ids[k] == context_idx_in_pair else None for k, offset in enumerate(current_offset_mapping) ] processed_features.append(feature) # The .map function expects a dictionary where keys are column names # and values are lists of features for those columns. # Since we are processing one original example at a time (batched=True on a Dataset of 1 row), # and this one example can produce multiple features, `processed_features` is a list of dicts. # We need to return a dictionary of lists. if not processed_features: # Should not happen if tokenizer works, but as a safeguard # Return structure with empty lists to match expected features by .map() # This case indicates an issue with tokenizing the input example. logger.error(f"No features generated for example ID {examples['id'][0]}. Input q: {examples['question'][0]}, c: {examples['context'][0]}") return { "input_ids": [], "token_type_ids": [], "attention_mask": [], "offset_mapping": [], "example_id": [] } # Transpose the list of feature dictionaries into a dictionary of feature lists # This is what the .map(batched=True) function expects as a return value final_batch = {} for key in processed_features[0].keys(): final_batch[key] = [feature[key] for feature in processed_features] for key_to_check in ["input_ids", "attention_mask", "token_type_ids"]: if key_to_check in final_batch: for i, lst in enumerate(final_batch[key_to_check]): if lst is None: raise ValueError(f"在 prepare_features_for_qa_inference 中,{key_to_check} 的第 {i} 個特徵列表為 None!") if any(x is None for x in lst): raise ValueError(f"在 prepare_features_for_qa_inference 中,{key_to_check} 的第 {i} 個特徵列表內部包含 None!內容: {lst[:20]}") return final_batch # postprocess_qa_predictions 函數也需要從 utils_qa.py 複製或導入 # from utils_qa import postprocess_qa_predictions # 確保 utils_qa.py 在 Space 的環境中可用 # --- Gradio 界面函數 --- def two_stage_qa(question, candidate_paragraphs_str, max_seq_len_mc=512, max_seq_len_qa=384, doc_stride_qa=128, n_best_size=20, max_answer_length=100): if not models_loaded_successfully: return f"錯誤: {error_message}", "N/A", "N/A" if not question.strip() or not candidate_paragraphs_str.strip(): return "錯誤: 問題和候選段落不能為空。", "N/A", "N/A" # 階段一 selected_paragraph, selected_idx = select_relevant_paragraph_gradio( question, candidate_paragraphs_str, selector_model, tokenizer, device, max_seq_len_mc ) if selected_idx == -1: # 段落選擇出錯 return f"段落選擇出錯: {selected_paragraph}", "N/A", selected_paragraph # 階段二 # 準備 QA 特徵 qa_features_dataset = prepare_features_for_qa_inference_gradio( "temp_id", question, selected_paragraph, tokenizer, max_seq_len_qa, doc_stride_qa ) if len(qa_features_dataset) == 0: return "錯誤: 無法為選定段落生成QA特徵 (可能段落太短或內容問題)。", f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", "N/A" # 創建 DataLoader from transformers import default_data_collator # 需要導入 qa_dataloader = DataLoader( qa_features_dataset, collate_fn=default_data_collator, batch_size=8 # batch_size可以小一些 ) all_start_logits = [] all_end_logits = [] for batch in qa_dataloader: batch = {k: v.to(device) for k, v in batch.items()} with torch.no_grad(): outputs_qa = qa_model(**batch) all_start_logits.append(outputs_qa.start_logits.cpu().numpy()) all_end_logits.append(outputs_qa.end_logits.cpu().numpy()) if not all_start_logits: return "錯誤: QA模型沒有產生logits。", f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", "N/A" start_logits_np = np.concatenate(all_start_logits, axis=0) end_logits_np = np.concatenate(all_end_logits, axis=0) # 為了 postprocess_qa_predictions,我們需要原始的 example 數據 # 它期望一個包含 "answers" 字段的 Dataset def add_empty_answers(example): example["answers"] = {"text": [], "answer_start": []} return example # temp_dataset 用於 postprocessing original_example_for_postproc = {"id": ["temp_id"], "question": [question], "context": [selected_paragraph]} original_dataset_for_postproc = Dataset.from_dict(original_example_for_postproc).map(add_empty_answers) # 後處理 # 確保 postprocess_qa_predictions 可用 predictions_dict = postprocess_qa_predictions( examples=original_dataset_for_postproc, # 原始的、包含 context 和空 answers 的 Dataset features=qa_features_dataset, # 包含 offset_mapping 和 example_id 的 Dataset predictions=(start_logits_np, end_logits_np), version_2_with_negative=False, n_best_size=n_best_size, max_answer_length=max_answer_length, null_score_diff_threshold=0.0, output_dir=None, prefix="gradio_predict", is_world_process_zero=True ) final_answer = predictions_dict.get("temp_id", "未能提取答案。") return final_answer, f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", predictions_dict # --- 創建 Gradio 界面 --- # 定義預設的問題和段落內容 DEFAULT_QUESTION = "世界最高峰是什麼?" DEFAULT_PARAGRAPHS = ( "珠穆朗瑪峰是喜馬拉雅山脈的主峰,位於中國與尼泊爾邊界上,是世界海拔最高的山峰。\n" "喬戈里峰,又稱K2,是喀喇崑崙山脈的主峰,海拔8611米,是世界第二高峰,位於中國與巴基斯坦邊界。\n" "干城章嘉峰位於喜馬拉雅山脈中段尼泊爾和印度邊界線上,海拔8586米,為世界第三高峰。\n" "洛子峰,海拔8516米,為世界第四高峰,位於珠穆朗瑪峰以南約3公里處,同屬喜馬拉雅山脈。" ) iface = gr.Interface( fn=two_stage_qa, # 您的兩階段問答處理函數 inputs=[ gr.Textbox( lines=2, placeholder="輸入您的問題...", label="問題 (Question)", value=DEFAULT_QUESTION # <--- 為問題設置預設值 ), gr.Textbox( lines=10, placeholder="在此處輸入候選段落,每段一行...", label="候選段落 (Candidate Paragraphs - One per line)", value=DEFAULT_PARAGRAPHS # <--- 為段落設置預設值 ) ], outputs=[ gr.Textbox(label="預測答案 (Predicted Answer)"), gr.Textbox(label="選中的相關段落 (Selected Relevant Paragraph)"), gr.JSON(label="原始預測字典 (Raw Predictions Dict - for debugging)") ], title="兩階段中文抽取式問答系統", description="輸入一個問題和多個候選段落(每行一個段落)。系統會先選擇最相關的段落,然後從中抽取答案。", allow_flagging="never" # 或者您希望的標記設置 ) if __name__ == "__main__": if models_loaded_successfully: # 確保模型已加載才啟動 iface.launch() else: print(f"Gradio 應用無法啟動,因為模型加載失敗: {error_message if 'error_message' in locals() else '未知錯誤'}")