Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
""" | |
高雄市統計智慧查詢應用程式 | |
版本:Hugging Face Spaces 適用版 | |
最後修改日期:2025-08-12 | |
""" | |
# ======================================================================= | |
# 1. 匯入必要函式庫 | |
# ======================================================================= | |
import os | |
import re | |
import io | |
import json | |
import math | |
import jieba | |
import torch | |
import msoffcrypto | |
import gradio as gr | |
import pandas as pd | |
import google.generativeai as genai | |
from typing import Type | |
from typing import List, Dict | |
from collections import defaultdict, OrderedDict | |
# LangChain & SentenceTransformers | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from sentence_transformers import SentenceTransformer | |
from langchain.retrievers import BM25Retriever | |
from langchain.schema import Document | |
from langchain_core.tools import tool | |
# ======================================================================= | |
# 2. 初始設定與資料庫載入 | |
# ======================================================================= | |
# --- 全域變數與模型設定 --- | |
EMBEDDING_MODEL_NAME = 'intfloat/multilingual-e5-base' | |
DB_JB_PATH = "yearbook_contents_jb_db_base5" | |
DB_SIM_PATH = "yearbook_contents_simple_db_base5" | |
EXCEL_FILE_PATH = "合併檔案.xlsx" | |
EXCEL_PASSWORD = os.getenv('open_key') | |
_df_cache = None | |
# --- Custom Embedding Class --- | |
class CustomE5Embedding(HuggingFaceEmbeddings): | |
def embed_documents(self, texts): | |
texts = [f"passage: {t}" for t in texts] | |
return super().embed_documents(texts) | |
def embed_query(self, text): | |
return super().embed_query(f"query: {text}") | |
# --- 載入模型與向量資料庫 --- | |
print("載入嵌入模型中...") | |
embedding_model_st = SentenceTransformer(EMBEDDING_MODEL_NAME) | |
embedding_model_lc = CustomE5Embedding(model_name=EMBEDDING_MODEL_NAME) | |
print("載入向量資料庫中...") | |
try: | |
db_jb = FAISS.load_local(DB_JB_PATH, embedding_model_lc, allow_dangerous_deserialization=True) | |
db_sim = FAISS.load_local(DB_SIM_PATH, embedding_model_lc, allow_dangerous_deserialization=True) | |
print("✅ 向量資料庫載入成功。") | |
except Exception as e: | |
print(f"❌ 載入向量資料庫失敗,請確認檔案路徑是否正確: {e}") | |
db_jb, db_sim = None, None | |
# ======================================================================= | |
# 3. 核心查詢與處理函式 | |
# ======================================================================= | |
def chinese_tokenizer(text: str) -> list[str]: | |
return list(jieba.cut(text)) | |
def extract_project_name_from_content(content: str) -> str: | |
cleaned_content = re.sub(r"[\s\u3000]", "", content) | |
match = re.search(r"項目[::]([^。]+)", cleaned_content) | |
if match: | |
raw_name = match.group(1) | |
final_name = re.sub(r"^\d+", "", raw_name) | |
return final_name.strip() | |
return None | |
def extract_project_names_from_rag_manual_mix(query: str, db_jb, db_sim, top_k: int = 4) -> List[str]: | |
if not db_jb or not db_sim: | |
return [] | |
k_bm25 = math.ceil(top_k / 2) | |
k_faiss = math.floor(top_k / 2) | |
split_docs = list(db_jb.docstore._dict.values()) | |
bm25 = BM25Retriever.from_documents(split_docs, tokenizer=chinese_tokenizer) | |
bm25.k = 20 | |
bm25_docs = bm25.get_relevant_documents(" ".join(jieba.cut(query))) | |
bm25_names = [name for doc in bm25_docs if (name := extract_project_name_from_content(doc.page_content))] | |
unique_bm25_names = list(OrderedDict.fromkeys(bm25_names))[:k_bm25] | |
prefixed_query = f"query: {query}" | |
vector_docs_with_scores = db_sim.similarity_search_with_score(prefixed_query, k=20) | |
faiss_names = [name for doc, score in vector_docs_with_scores if (name := extract_project_name_from_content(doc.page_content))] | |
unique_faiss_names = list(OrderedDict.fromkeys(faiss_names))[:k_faiss] | |
combined_names = unique_bm25_names + unique_faiss_names | |
return list(OrderedDict.fromkeys(combined_names))[:top_k] | |
def load_data(file_path: str = EXCEL_FILE_PATH) -> pd.DataFrame: | |
global _df_cache | |
if _df_cache is not None: | |
return _df_cache | |
if not EXCEL_PASSWORD: | |
print("❌ 錯誤:未提供 Excel 密碼。") | |
return None | |
try: | |
print(f"解密並讀取 Excel 檔案中... ({file_path})") | |
# 建立一個暫存的記憶體空間 | |
decrypted_buffer = io.BytesIO() | |
# 開啟加密檔案 | |
with open(file_path, 'rb') as f: | |
# 使用 msoffcrypto 進行解密 | |
file = msoffcrypto.OfficeFile(f) | |
file.load_key(password=EXCEL_PASSWORD) | |
# 將解密後的內容寫入記憶體空間 | |
file.decrypt(decrypted_buffer) | |
# Pandas 從記憶體中讀取解密後的資料 | |
_df_cache = pd.read_excel(decrypted_buffer) | |
print("✅ Excel 資料載入成功。") | |
return _df_cache | |
except FileNotFoundError: | |
print(f"❌ 錯誤:找不到檔案 {file_path}") | |
return None | |
except Exception as e: | |
# 捕捉可能的錯誤,例如密碼錯誤 | |
print(f"❌ 錯誤:無法讀取檔案,請檢查密碼是否正確或檔案是否損毀。") | |
print(f"詳細錯誤訊息: {e}") | |
return None | |
def batch_find_relevant_tables(api_key: str, sub_queries: list[str], top_k: int = 1) -> dict: | |
""" | |
為每個子問題獨立查找候選表,並將完整的配對結構交由 Gemini 判斷。 | |
""" | |
print("🧠 正在為每個子問題獨立查找其相對應候選表...") | |
# --- Step 1: 為每個子問題獨立獲取候選表,並存入字典 --- | |
query_to_candidates_map = {} | |
for query in sub_queries: | |
print(f" -> 正在處理: '{query}'") | |
# 為每個子問題找回 20 個最相關的候選表 | |
candidates_per_query = extract_project_names_from_rag_manual_mix(query, db_jb, db_sim, top_k=20) | |
# test | |
# print(candidates_per_query) | |
if candidates_per_query: # 只加入有找到候選表的查詢 | |
query_to_candidates_map[query] = candidates_per_query | |
if not query_to_candidates_map: | |
print("⚠️ RAG 步驟未找到任何候選資料表,終止批次匹配。") | |
return {} | |
print(f"✅ 已為 {len(query_to_candidates_map)} 個問題找到專屬候選表。") | |
# --- Step 2: 動態建構一個新的、結構化的 Prompt --- | |
# 建立任務描述文字 | |
tasks_text_parts = [] | |
for i, (query, candidates) in enumerate(query_to_candidates_map.items()): | |
# 將候選表列表格式化 | |
candidate_list_str = "\n".join(f" - {c}" for c in candidates) | |
task_block = f""" | |
--- | |
[任務 {i+1}] | |
問題: "{query}" | |
此問題的候選資料表清單: | |
{candidate_list_str} | |
""" | |
tasks_text_parts.append(task_block) | |
tasks_text = "".join(tasks_text_parts) | |
# 任務描述 | |
batch_prompt = f""" | |
你是一個專業的數據庫助理。你的任務是從下方的「待處理的配對任務清單」中,根據每一個query,找出最相關的資料表。其餘捨棄。 | |
你必須依[輸出範例]回傳問題及表名,不要有任何多餘的文字、編號、引號或說明。 | |
你必須嚴格遵守以下規則: | |
**[最優先規則:查詢總戶數與人口數]** | |
* **條件**:當問題中包含「高雄市」或其下**任何一個「行政區」**的名稱(例如:左營區、楠梓區、杉林區等),並且詢問的是整體的**「人口數」、「總人口數」或「戶數」**時。 | |
* **動作**:你 **必須** 將其對應到表名 `高雄市戶數、人口密度及性比例`。此規則的優先級最高。 | |
**[一般規則]** | |
* 對於不符合上述優先規則的其他問題,請根據問題的關鍵字(如:出生、死亡、結婚、離婚、遷入、遷出等)選擇最相關的資料表。 | |
[待處理的配對任務清單]: | |
{tasks_text} | |
--- | |
請嚴格遵循以下 JSON 格式輸出,不要有任何多餘的文字或說明。 | |
輸出的 JSON 物件中,鍵(key)必須是原始問題,值(value)必須是您為該問題選擇的最佳表名。 | |
[輸出範例]: | |
{{ | |
"113年三民區總人口數": "高雄市戶數、人口密度及性比例", | |
"112年鼓山區出生人數": "高雄市嬰兒出生數" | |
}} | |
""".strip() | |
# --- Step 3: 呼叫 Gemini 並解析結果 --- | |
try: | |
print("--- Structured Batch Prompt to Gemini ---") | |
# print(repr(batch_prompt)) # 如需偵錯,可取消註解 | |
print("---------------------------------------") | |
response_text = reply(api_key, "", batch_prompt) | |
parsed_json = extract_json(response_text) | |
if isinstance(parsed_json, dict): | |
print(f"✅ 結構化批次匹配表名成功: {parsed_json}") | |
return parsed_json | |
return {} | |
except Exception as e: | |
print(f"❌ 結構化批次匹配表名失敗: {e}") | |
return {} | |
def batch_parse_sub_queries_with_gemini(api_key: str, sub_queries: List[str]) -> Dict[str, Dict]: | |
""" | |
批次解析所有子問題,提取時間、地區及查詢項目。 | |
回傳一個以子問題為鍵(key)的字典。 | |
""" | |
print(f"🤖 正在請求 Gemini 批次解析 {len(sub_queries)} 個子問題...") | |
sub_queries_formatted = "\n".join([f"- {q}" for q in sub_queries]) | |
prompt = f""" | |
你是一個高效率的數據查詢代理,請使用繁體中文回答。 | |
你的任務是分析使用者問題,並根據內容清晰度決定如何回應。 | |
**情境一:問題清晰,可進行查詢** | |
如果問題中明確包含「時間」和「查詢項目」,請依照以下格式輸出: | |
time_query: <時間文字> | |
district_query: <地點文字> | |
item_query: <地點文字>+<查詢項目文字> | |
**重要規則:** | |
1. **時間正規化**:當使用者輸入的時間包含 "年底"、"年中"、"年初"、"年度" 等描述時,請將 `time_query` 正規化為年份。例如,"113年底" 應轉換為 "113年"。 | |
2. **時間校正**:當使用者輸入的時間包含 "年底"、"年中"、"年初" 等描述時,如該問題是有關學校類型(國中小、補習班等概況),請將 `time_query` 修正為學年。例如,"113年" 應轉換為 "113學年"。 | |
3. `district_query` 為可選項目,若無則設為"高雄市全區"。如為"高雄市"或"高雄"等泛指整體者,亦設為"高雄市全區" | |
4. 請勿遺漏使用者輸入的任何關鍵詞。 | |
**情境二:問題模糊,無法查詢** | |
如果問題中缺少「時間」或「查詢項目」任一資訊,導致無法進行查詢,請進行以下操作: | |
1. 直接回應以下文字,**並且不要生成 time_query 等欄位**。 | |
`我不太了解你的意思,請重新定義問題(包含資料時間及統計指標)` | |
--- | |
請嚴格遵循以上所有情境與規則,禁止加入多餘說明。 | |
[子問題列表]: | |
{sub_queries_formatted} | |
--- | |
[規則]: | |
1. **時間正規化**:當使用者輸入的時間包含 "年底"、"年中"、"年初"、"年度" 等描述時,請將 `time_query` 正規化為年份。例如,"113年底" 應轉換為 "113年"。 | |
2. **時間校正**:當使用者輸入的時間包含 "年底"、"年中"、"年初" 等描述時,如該問題是有關學校類型(國中小、補習班等概況),請將 `time_query` 修正為學年。例如,"113年" 應轉換為 "113學年"。 | |
3. `district_query` 為可選項目,若無則設為"高雄市全區"。如為"高雄市"或"高雄"等泛指整體者,亦設為"高雄市全區" | |
4. 請勿遺漏使用者輸入的任何關鍵詞。 | |
--- | |
[輸出格式]: | |
請嚴格遵循以下 JSON 格式輸出,不要有任何多餘的文字或說明。 | |
輸出的 JSON 物件中,鍵(key)必須是「子問題列表」中的原始問題,值(value)是一個包含解析參數的物件。 | |
[輸出範例]: | |
{{ | |
"113年三民區總人口數": {{ | |
"time_query": "113年", | |
"district_query": "三民區", | |
"item_query": "三民區總人口數" | |
}}, | |
"112年鼓山區出生人數": {{ | |
"time_query": "112年", | |
"district_query": "鼓山區", | |
"item_query": "鼓山區出生人數" | |
}} | |
}} | |
""".strip() | |
try: | |
response_text = reply(api_key, "", prompt) | |
parsed_json = extract_json(response_text) | |
if isinstance(parsed_json, dict): | |
print("✅ 子問題批次解析成功。") | |
return parsed_json | |
return {} | |
except Exception as e: | |
print(f"❌ 批次解析子問題失敗: {e}") | |
return {} | |
# --- 動態查詢工具 --- | |
def semantic_query_logic(time_query: str, item_query: str, project_name: str, district_query: str = "") -> str: | |
""" | |
直接以已匹配好的表名、時間及統計指標查詢合併資料。 | |
""" | |
print(f"--- 執行查詢: 表名='{project_name}', 時間='{time_query}', 地區='{district_query}', 項目='{item_query}' ---") | |
df = load_data() | |
if df is None: return "[]" | |
# 步驟 1: 先用精確的表名進行篩選,大幅縮小範圍 | |
filtered_df = df[df['表名'] == project_name].copy() | |
if filtered_df.empty: | |
# 如果表名就找不到任何資料,直接返回 | |
return "[]" | |
# 步驟 2: 在已縮小的範圍內,進行時間和地區的篩選 | |
conditions = [] | |
if time_query: | |
conditions.append(filtered_df['表側資訊'].astype(str).str.contains(time_query, na=False) | filtered_df['表首資訊'].astype(str).str.contains(time_query, na=False)) | |
if district_query: | |
conditions.append(filtered_df['表側資訊'].astype(str).str.contains(district_query, na=False) | filtered_df['表首資訊'].astype(str).str.contains(district_query, na=False)) | |
if conditions: | |
combined_condition = pd.concat([cond.rename(i) for i, cond in enumerate(conditions)], axis=1).all(axis=1) | |
filtered_df = filtered_df[combined_condition] | |
if filtered_df.empty: return "[]" | |
# 關鍵安全閥 | |
MAX_CANDIDATES = 1300 | |
if len(filtered_df) > MAX_CANDIDATES: | |
print(f"⚠️ 篩選結果超過{MAX_CANDIDATES}筆({len(filtered_df)}),僅取前{MAX_CANDIDATES}筆進行向量分析以節省資源。") | |
filtered_df = filtered_df.head(MAX_CANDIDATES) | |
# 步驟 3: 對最終篩選出的結果進行向量化與語意比對 | |
print(f"向量化階段:對 {len(filtered_df)} 筆資料進行向量化...") | |
combined_texts = (filtered_df['表名'].astype(str) + " " + filtered_df['表首資訊'].astype(str) + " " + filtered_df['表側資訊'].astype(str)).tolist() | |
# 加上前綴 | |
prefixed_passage_texts = [f"passage: {t}" for t in combined_texts] | |
prefixed_query_text = f"query: {item_query}" | |
item_query_embedding = embedding_model_st.encode(prefixed_query_text, convert_to_tensor=True) | |
candidate_embeddings = embedding_model_st.encode( prefixed_passage_texts, convert_to_tensor=True) | |
semantic_scores = torch.matmul(item_query_embedding, candidate_embeddings.T).tolist() | |
print(" ✅ 向量化完成。") | |
# 步驟 4: 根據語意分數排序並產生最終結果 | |
results = [] | |
for i, row in enumerate(filtered_df.itertuples(index=False)): | |
results.append({**row._asdict(), "語意分數": round(semantic_scores[i], 4)}) | |
results.sort(key=lambda x: x['語意分數'], reverse=True) | |
FINAL_K = 25 | |
top_results = results[:FINAL_K] | |
print("--- semantic_query_logic 執行完畢 ---") | |
return json.dumps(top_results, ensure_ascii=False) if top_results else "[]" | |
# 2. 從上面的普通函式,明確地建立 LangChain 工具 | |
from langchain.tools import StructuredTool | |
semantic_query_tool = StructuredTool.from_function( | |
func=semantic_query_logic, | |
name="semantic_query_tool", | |
description="直接使用向量語意模型進行檢索排序。" | |
) | |
# ======================================================================= | |
# 4. 主要執行流程 (Reflect) | |
# ======================================================================= | |
system_reviewer = """ | |
你是語意分析專家,請將使用者的複雜問題拆解成具體子問題,並判斷每個子問題的查詢類型。 | |
⚠️ 拆解前請先檢查以下條件: | |
1. 涉及高雄市以外或全國性資料,請直接回傳:「抱歉~我是高雄市查詢機器人,無法查詢高雄以外資料。」 | |
2. 未提及明確時間(如112年、113年3月),請直接回傳:「抱歉~請問查詢的資料時間。」 | |
📌 明確時間=出現「具體年份」、「年月」、「季」或「學年」。模糊詞(平均、近年、目前、歷年等)皆視為未指定。 | |
--- | |
### 規則與指令 | |
👑 最高優先級規則:行政區關鍵詞擴展 | |
- **條件**:當問題中明確出現 **"各行政區"**、**"所有行政區"** 或 **"全體行政區"** 時,此規則優先級最高。 | |
- **動作**:你 **必須** 將該問題擴展為 38 個獨立的子問題,每個子問題對應一個高雄市的行政區。擴展的方法是:將原始問題中的關鍵詞(例如 "各行政區")精確替換為下方列表中的每一個行政區名稱。 | |
- **[輸入範例]**:「113年各行政區總人口數」 | |
- **[輸出範例]**: | |
[ | |
{ "sub_query": "113年鹽埕區總人口數", "type": "direct" }, | |
{ "sub_query": "113年鼓山區總人口數", "type": "direct" }, | |
{ "sub_query": "113年左營區總人口數", "type": "direct" }, | |
... (依此類推,直到最後一個行政區) ..., | |
{ "sub_query": "113年那瑪夏區總人口數", "type": "direct" } | |
] | |
- **行政區列表**:["鹽埕區", "鼓山區", "左營區", "楠梓區", "三民區", "新興區", "前金區", "苓雅區", "前鎮區", "旗津區", "小港區", "鳳山區", "林園區", "大寮區", "大樹區", "大社區", "仁武區", "鳥松區", "岡山區", "橋頭區", "燕巢區", "田寮區", "阿蓮區", "路竹區", "湖內區", "茄萣區", "永安區", "彌陀區", "梓官區", "旗山區", "美濃區", "六龜區", "甲仙區", "杉林區", "內門區", "茂林區", "桃源區", "那瑪夏區"] | |
--- | |
### 其他規則 | |
📌 子問題拆解規則: | |
- 對於不符合上述「最高優先級規則」的問題,依此規則處理。 | |
- 每個子問題必須包含 1 個「地點」、1 個「時間」、1 個「指標」。 | |
- 若同時包含多個時間、地區或指標,請拆成多筆(如:110-113年、1至3月 都要拆開)。 | |
- 若內容有年齡區間如20-24歲,則不必拆分。 | |
- ⛔ 禁止省略使用者輸入中的任何關鍵詞(例如:「人口數合計」的「合計」也不得省略)。 | |
📌 回傳格式(**僅限 JSON 陣列**,不得加上任何文字): | |
[ | |
{ | |
"sub_query": "子問題內容(不得遺漏任何原始資訊)", | |
"type": "direct" 或 "comparison" | |
# } | |
# ] | |
📌 類型說明: | |
- 可直接查詢者為 "direct"。 | |
- 涉及比較、推論、排序者為 "comparison"。 | |
--- | |
📤 輸出要求: | |
- 嚴格遵循 JSON 陣列格式,禁止加上任何說明文字或```json ```標籤。 | |
- 禁用「...」,所有子問題必須完整列出。 | |
- 回應必須為繁體中文。 | |
""" | |
system_integration = """ | |
你是資深資料分析師,擅長回答高雄市統計問題。請依下列規則,根據使用者提問與查詢資料,產出清楚、正確的繁體中文答案。 | |
### 🎯 使用者問題: | |
{user_query} | |
### 📊 查詢資料: | |
{retrieved_chunks} | |
--- | |
## 🧩 問題類型與處理方式: | |
### 一、比較型問題(如「最多」「變化」「排名」「哪區最高」): | |
1. **對應條件**:僅使用與問題一致的「時間、地區、指標」。 | |
2. **缺漏處理**:若資料不齊,請指出缺哪一項與無法比較的原因。 | |
3. **數值處理**:轉為千分位(例:23,000)、百分比與金額取至小數第 2 位。 | |
4. **條列推論**:逐項列出比較結果,明確指出最高、最低、差異。 | |
5. **禁止**:不得使用科學記號、英文、原欄位名稱;不得補資料或推論未查到的年份。 | |
### 二、一般整合型問題(如「113年底苓雅區人口?」): | |
1. **條件驗證**:若資料年份不同,請說明「您問的是 113 年,我找到的是 114 年…」。 | |
2. **缺資料處理**:無資料請說「資料缺乏,無法回答」;不可用其他時間資料代替。 | |
3. **作答格式**:300 字內、結論先行、條列清楚、千分位數字,不使用科學記號。開頭統一:「關於您提出的問題,綜合參考資料如下:」,結尾列出參考資料表名:「參考資料:高雄市原住民戶口數」。 | |
--- | |
## 📌 共通禁止事項(適用所有問題): | |
- ❌ 不得推論或補未查到的資料 | |
- ❌ 不可引用不符問題條件的數據 | |
- ❌ 不可貼欄位原文、英文、代碼、科學記號 | |
--- | |
## ✅ 輸出格式: | |
- 使用繁體中文 | |
- **請使用 Markdown 格式化您的回覆,例如使用粗體標示重點數字、使用項目符號條列結果。** | |
- 逐項列出的結果請由大到小排序。 | |
- 數值一律轉為千分位 | |
- 每份資料來源僅列一次 | |
- 若缺資料請誠實說明並結束回答 | |
請根據以上規則,輸出準確答案。 | |
""" | |
def reply(api_key: str, system: str, prompt: str, model: str = "gemini-2.0-flash-lite"): | |
""" | |
獲取 Gemini 回應。 | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
if system and system.strip(): | |
gemini_model = genai.GenerativeModel(model_name=model, system_instruction=system) | |
else: | |
# 如果 system 是空的,則不傳遞 system_instruction 參數 | |
gemini_model = genai.GenerativeModel(model_name=model) | |
response = gemini_model.generate_content(prompt, generation_config={'temperature': 0}) | |
# 直接回傳完整的文字 | |
return response.text | |
except Exception as e: | |
error_message = f"系統錯誤:呼叫 Gemini API 失敗。錯誤詳情: {e}" | |
print(error_message) | |
return error_message | |
def extract_json(text: str) -> list | dict: | |
json_block_match = re.search(r'```json\s*([\s\S]*?)\s*```|([\s\S]*)', text) | |
if not json_block_match: raise ValueError("在回傳內容中找不到任何可解析的文字。") | |
content = json_block_match.group(1) or json_block_match.group(2) | |
start = content.find('[') if content.find('[') != -1 else content.find('{') | |
end = content.rfind(']') if content.rfind(']') != -1 else content.rfind('}') | |
if start == -1 or end == -1 or end < start: | |
raise ValueError("在回傳內容中找不到有效的 JSON 結構。") | |
json_text = content[start : end + 1] | |
json_text = re.sub(r',\s*([\}\]])', r'\1', json_text) | |
try: | |
return json.loads(json_text) | |
except json.JSONDecodeError as e: | |
raise ValueError(f"清理後仍然無法解析 JSON。原始錯誤: {e}") | |
# ======================================================================= | |
# 5. Gradio Web UI | |
# ======================================================================= | |
def gradio_interface(user_input): | |
"""Gradio 主要處理函式""" | |
api_key = os.getenv('Gemini') | |
api_key2 = os.getenv('Gemini2') | |
api_key3 = os.getenv('Gemini3') | |
api_key4 = os.getenv('Gemini4') | |
if not api_key: | |
return "❌ 查詢失敗", "錯誤:未在伺服器環境中設定 'Gemini' API 金鑰。" | |
# 檢查向量資料庫是否成功載入 | |
if not db_jb or not db_sim: | |
return "❌ 系統錯誤", "向量資料庫未成功載入,請檢查伺服器日誌。" | |
# --- 開始逐步回饋 --- | |
try: | |
# Step 1:拆解子問題 | |
yield "", "🔍 正在分析您的問題..." | |
decomposed_text = reply(api_key, system_reviewer, user_input) | |
if "抱歉~" in decomposed_text: | |
yield "", decomposed_text | |
return | |
parsed_list = extract_json(decomposed_text) | |
direct_queries = [item for item in parsed_list if item.get("type") == "direct"] | |
if not direct_queries: | |
yield "", "⚠️ 無法從輸入問題中擷取有效查詢項目。" | |
return | |
all_querys_summary = "🔹 關鍵查詢:\n" + "\n".join(f"- {q['sub_query']}" for q in direct_queries) | |
yield all_querys_summary, "✅ 問題分析完成,正在匹配資料表..." | |
# Step 2 & 3:批次匹配與解析 | |
sub_query_texts = [q["sub_query"] for q in direct_queries] | |
table_map = batch_find_relevant_tables(api_key2, sub_query_texts) | |
if not table_map: | |
yield all_querys_summary, "⚠️ 系統無法為您的查詢匹配到合適的資料表。" | |
return | |
yield all_querys_summary, "📚 資料表匹配成功,正在解析查詢參數..." | |
params_map = batch_parse_sub_queries_with_gemini(api_key3, sub_query_texts) | |
if not params_map: | |
yield all_querys_summary, "⚠️ 系統無法解析您問題中的查詢參數。" | |
return | |
# Step 4:執行查詢 | |
yield all_querys_summary, "⏳ 正在從資料庫中檢索資訊..." | |
context_list = [] | |
for q in direct_queries: | |
sub_query = q["sub_query"] | |
table_name = table_map.get(sub_query) | |
params = params_map.get(sub_query) | |
if not table_name or not params: | |
context_list.append(f"【{sub_query}】\n查詢失敗:未能匹配到資料表或解析參數。") | |
continue | |
try: | |
# 帶著所有精準參數執行查詢 | |
result_json = semantic_query_logic( | |
time_query=params.get("time_query", ""), | |
item_query=params.get("item_query", ""), | |
district_query=params.get("district_query", ""), | |
project_name=table_name | |
) | |
result_data = json.loads(result_json) | |
if result_data: | |
formatted_result = json.dumps(result_data, ensure_ascii=False, indent=2) | |
context_list.append(f"【{sub_query}】\n查詢結果:\n{formatted_result}") | |
else: | |
context_list.append(f"【{sub_query}】\n查無資料。") | |
except Exception as e: | |
context_list.append(f"【{sub_query}】\n查詢失敗:{e}") | |
combined_context = "\n\n".join(context_list) | |
# Step 5:整合分析 | |
yield all_querys_summary, "✍️ 查詢完成,正在生成最終回覆..." | |
integration_prompt = f"使用者問題:{user_input}\n\n查詢資料如下:\n{combined_context}" | |
integration_result = reply(api_key4, system_integration, integration_prompt) | |
yield all_querys_summary, integration_result # 最終結果 | |
except Exception as e: | |
import traceback | |
print(traceback.format_exc()) | |
yield "❌ 查詢失敗", f"發生未預期的系統錯誤:{str(e)}" | |
# --- UI 介面定義 --- | |
# 1. 定義自訂 CSS 樣式 | |
custom_css = """ | |
/* --- 基本樣式 (淺色與夜間模式共用) --- */ | |
#output_markdown { | |
padding: 15px; | |
border-radius: 8px; | |
min-height: 150px; | |
max-height: 400px; | |
overflow-y: auto; | |
box-shadow: inset 0 2px 4px 0 rgba(0,0,0,0.05); | |
/* --- 預設為淺色模式的樣式 --- */ | |
background: #ffffff !important; /* 淺色模式下的背景 */ | |
color: #374151; /* 淺色模式下的文字顏色 (深灰色) */ | |
} | |
/* --- 夜間模式下的覆蓋樣式 --- */ | |
/* 當系統或瀏覽器處於夜間模式時,以下規則會生效 */ | |
@media (prefers-color-scheme: dark) { | |
#output_markdown { | |
background: #1f2937 !important; /* 夜間模式下的背景 (深藍灰色) */ | |
color: #d1d5db !important; /* 夜間模式下的文字顏色 (淺灰色) */ | |
} | |
} | |
""" | |
# 2. 在 gr.Blocks 中載入 CSS | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange"), css=custom_css) as demo: | |
gr.Markdown( | |
""" | |
# 🤖 高雄市公務統計資料智慧查詢 | |
歡迎使用!您可以透過自然語言提出關於高雄市的公務統計問題,系統將盡力為您查找相關資訊。 <br> | |
本系統運作於2v cpu & 16 GB RAM 免費資源,系統速度稍慢(產生結果時間依問題複雜度而定,一般為10至15秒)。 <br> | |
**本智慧查詢資料範圍:104-113年高雄市;113年各行政區。** | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
user_input_box = gr.Textbox( | |
label="請在此輸入您的問題", | |
placeholder="例如:113年底前金區人口數?(小技巧:查不到資料時可轉換問法)", | |
lines=5 | |
) | |
gr.Examples( | |
examples=[ | |
"113年底前金區人口數?", | |
"110-113年高雄市總人口數,及其趨勢?", | |
"110-113年失業率情形", | |
"113學年國小一年級學生人數", | |
"113年底鹽埕區、三民區、前鎮區、林園區及美濃區人口數", | |
"107年勞工教育班數", | |
"112年路竹區門診體驗人數", | |
"113學年國立大學教師數", | |
"105年公共污水下水道當年接管戶數", | |
"113年三民區人口數及該區身心障礙人數", | |
"113年高雄市各行政區中哪一區人口最多" | |
], | |
inputs=user_input_box, | |
label="💡 範例問題" | |
) | |
with gr.Row(): | |
btn_clear = gr.ClearButton(value="清除") | |
btn_submit = gr.Button("送出查詢", variant="primary") | |
with gr.Column(scale=1): | |
output_analysis = gr.Textbox( | |
label="🌟 問題分析", | |
interactive=False, | |
lines=5, | |
visible=False, | |
) | |
with gr.Group(): | |
gr.Markdown("### 🧐 查詢結果") | |
output_result = gr.Markdown( | |
# label="🧐 查詢結果", | |
# show_label=True, | |
elem_id="output_markdown" | |
) | |
gr.Markdown( | |
""" | |
--- | |
*資料來源:[高雄市政府主計處](https://kcgdg.kcg.gov.tw/StatWebRWD/Page/Default.aspx)* <br> | |
*本工具由 AI 驅動,查詢結果僅供參考。* | |
""" | |
) | |
# --- 事件綁定 --- | |
outputs_list = [output_analysis, output_result] | |
btn_submit.click(fn=gradio_interface, inputs=user_input_box, outputs=outputs_list) | |
user_input_box.submit(fn=gradio_interface, inputs=user_input_box, outputs=outputs_list) | |
btn_clear.add([user_input_box] + outputs_list) | |
# --- 啟動應用程式 --- | |
if __name__ == "__main__": | |
load_data() # 預先載入 Excel 資料到快取 | |
demo.launch(debug=True) |