Spaces:
Running
Running
File size: 92,251 Bytes
83501f6 a126613 83501f6 a126613 058600e 83501f6 822e006 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 |
# This script combines all components for deployment on Hugging Face Spaces.
# --- Imports ---
import spaces
import os
import gradio as gr
from huggingface_hub import InferenceClient
import torch
import re
import warnings
import time
import json
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer, util, CrossEncoder
import gspread
# from google.colab import auth # Colab specific, remove for HF Spaces
# from google.auth import default # Colab specific, remove for HF Spaces
from tqdm import tqdm
from duckduckgo_search import DDGS
import spacy
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta
import traceback
import base64
@spaces.GPU
def startup():
print("GPU function registered for Hugging Face Spaces startup.")
return "Ready"
startup()
# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
# --- Global Variables and Secrets ---
# HF_TOKEN is automatically available in HF Spaces secrets
HF_TOKEN = os.getenv("HF_TOKEN")
# GOOGLE_BASE64_CREDENTIALS should be added as a Space Secret
SHEET_ID = os.getenv("SHEET_ID") # Get SHEET_ID from Space Secrets
GOOGLE_BASE64_CREDENTIALS = os.getenv("GOOGLE_BASE64_CREDENTIALS")
# --- Model and Tool Initialization ---
client = None # Initialize after HF_TOKEN is confirmed available
nlp = None
embedder = None
reranker = None
try:
# Initialize InferenceClient
if HF_TOKEN:
client = InferenceClient("google/gemma-2-9b-it", token=HF_TOKEN)
print("Hugging Face Inference Client initialized.")
else:
print("Warning: HF_TOKEN not found. Inference Client not initialized.")
# Load spacy model for sentence splitting
try:
nlp = spacy.load("en_core_web_sm")
print("SpaCy model 'en_core_web_sm' loaded.")
except OSError:
print("SpaCy model 'en_core_web_sm' not found. Downloading...")
try:
# Use pip for installation in HF Spaces environment
os.system("pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1.tar.gz")
nlp = spacy.load("en_core_web_sm")
print("SpaCy model 'en_core_web_sm' downloaded and loaded.")
except Exception as e:
print(f"Failed to download or load SpaCy model: {e}")
# Load SentenceTransformer for RAG/business info retrieval
print("Attempting to load Sentence Transformer (sentence-transformers/paraphrase-MiniLM-L6-v2)...")
embedder = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L6-v2")
print("Sentence Transformer loaded.")
# Load a Cross-Encoder model for re-ranking retrieved documents
print("Attempting to load Cross-Encoder Reranker (cross-encoder/ms-marco-MiniLM-L6-v2)...")
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L6-v2')
print("Cross-Encoder Reranker loaded.")
except Exception as e:
print(f"An error occurred during model/tool initialization: {e}")
print(traceback.format_exc())
# --- Google Sheets Authentication ---
gc = None # Global variable for gspread client
def authenticate_google_sheets():
"""Authenticates with Google Sheets using base64 encoded credentials."""
global gc
print("Authenticating Google Account...")
if not GOOGLE_BASE64_CREDENTIALS:
print("Error: GOOGLE_BASE64_CREDENTIALS secret not found.")
print("Please add GOOGLE_BASE64_CREDENTIALS as a Space Secret.")
return False
try:
# Decode the base64 credentials
credentials_json = base64.b64decode(GOOGLE_BASE64_CREDENTIALS).decode('utf-8')
credentials = json.loads(credentials_json)
# Authenticate using service account from dictionary
gc = gspread.service_account_from_dict(credentials)
print("Google Sheets authentication successful via service account.")
return True
except Exception as e:
print(f"Google Sheets authentication failed: {e}")
print("Please ensure your GOOGLE_BASE64_CREDENTIALS secret is correctly set and contains valid service account credentials.")
print(traceback.format_exc())
return False
# --- Google Sheets Data Loading and Embedding ---
data = [] # Global variable to store loaded data
descriptions_for_embedding = []
embeddings = torch.tensor([])
business_info_available = False # Flag to indicate if business info was loaded successfully
def load_business_info():
"""Loads business information from Google Sheet and creates embeddings."""
global data, descriptions_for_embedding, embeddings, business_info_available
business_info_available = False # Reset flag
if gc is None:
print("Skipping Google Sheet loading: Google Sheets client not authenticated.")
return
if not SHEET_ID:
print("Error: SHEET_ID not set.")
print("Please add SHEET_ID as a Space Secret.")
return
try:
sheet = gc.open_by_key(SHEET_ID).sheet1
print(f"Successfully opened Google Sheet with ID: {SHEET_ID}")
data_records = sheet.get_all_records()
if not data_records:
print(f"Warning: No data records found in Google Sheet with ID: {SHEET_ID}")
data = []
descriptions_for_embedding = []
else:
# Filter out rows missing 'Service' or 'Description'
filtered_data = [row for row in data_records if row.get('Service') and row.get('Description')]
if not filtered_data:
print("Warning: Filtered data is empty after checking for 'Service' and 'Description'.")
data = []
descriptions_for_embedding = []
else:
data = filtered_data
descriptions_for_embedding = [f"Service: {row['Service']}. Description: {row['Description']}" for row in data]
if descriptions_for_embedding and embedder is not None:
print("Encoding descriptions...")
try:
embeddings = embedder.encode(descriptions_for_embedding, convert_to_tensor=True)
print("Encoding complete.")
business_info_available = True # Set flag if successful
except Exception as e:
print(f"Error during description encoding: {e}")
embeddings = torch.tensor([])
business_info_available = False # Encoding failed
else:
print("Skipping encoding descriptions: No descriptions found or embedder not available.")
embeddings = torch.tensor([])
business_info_available = False
print(f"Loaded {len(descriptions_for_embedding)} entries from Google Sheet for embedding/RAG.")
if not business_info_available:
print("Business information retrieval (RAG) is NOT available.")
except gspread.exceptions.SpreadsheetNotFound:
print(f"Error: Google Sheet with ID '{SHEET_ID}' not found.")
print("Please check the SHEET_ID and ensure your authenticated Google Account has access to this sheet.")
business_info_available = False
except Exception as e:
print(f"An error occurred while accessing the Google Sheet: {e}")
print(traceback.format_exc())
business_info_available = False
def retrieve_business_info(query: str, top_n: int = 2) -> list: # Reduced top_n
"""
Retrieves relevant business information from loaded data based on a query.
"""
global data
if not business_info_available or embedder is None or not descriptions_for_embedding or not data:
print("Business information retrieval is not available or data is empty.")
return []
try:
query_embedding = embedder.encode(query, convert_to_tensor=True)
cosine_scores = util.cos_sim(query_embedding, embeddings)[0]
# Get the top N indices based on cosine similarity
# Make sure k does not exceed the number of available descriptions
top_results_indices = torch.topk(cosine_scores, k=min(top_n, len(descriptions_for_embedding)))[1].tolist()
# Retrieve the actual data entries corresponding to the top indices
top_results = [data[i] for i in top_results_indices]
if reranker is not None and top_results:
print("Re-ranking top results...")
rerank_pairs = [(query, descriptions_for_embedding[i]) for i in top_results_indices]
rerank_scores = reranker.predict(rerank_pairs)
reranked_indices = sorted(range(len(rerank_scores)), key=lambda i: rerank_scores[i], reverse=True)
reranked_results = [top_results[i] for i in reranked_indices]
print("Re-ranking complete.")
return reranked_results
else:
return top_results
except Exception as e:
print(f"Error during business information retrieval: {e}")
print(traceback.format_exc())
return []
# --- Tool Functions ---
# Function to perform DuckDuckGo Search and return results with URLs
def perform_duckduckgo_search(query: str, max_results: int = 5):
"""
Performs a search using DuckDuckGo and returns a list of dictionaries.
Includes a delay to avoid rate limits.
Returns an empty list and prints an error if search fails.
"""
print(f"Executing Tool: perform_duckduckgo_search with query='{query}')")
search_results_list = []
try:
# Add a delay before each search
time.sleep(1) # Sleep for 1 second
with DDGS() as ddgs:
if not query or len(query.split()) < 2:
print(f"Skipping search for short query: '{query}'")
return []
# Use text() method for general text search
results_generator = ddgs.text(query, max_results=max_results)
results_found = False
for r in results_generator:
search_results_list.append(r)
results_found = True
if not results_found and max_results > 0:
print(f"DuckDuckGo search for '{query}' returned no results.")
except Exception as e:
print(f"Error during Duckduckgo search for '{query}': {e}")
return []
return search_results_list
# Function to retrieve relevant business info using RAG with Re-ranking
# MODIFIED to return MULTIPLE matches
def retrieve_business_info(query: str, threshold: float = 0.50, max_matches: int = 5): # Added max_matches parameter
"""
Retrieves relevant business information based on query similarity using vector search
and re-ranking. Returns a LIST of dictionaries for relevant matches and the best score.
Returns an empty list and 0.0 if no match above threshold is found, or on error.
Handles cases where data, embeddings, or reranker are not available.
"""
print(f"Executing Tool: retrieve_business_info with query='{query}' (threshold={threshold}, max_matches={max_matches})")
# Check if necessary components for RAG are available
if not business_info_available or not data or (embeddings is None or embeddings.numel() == 0) or embedder is None:
print("Business info data, embeddings, or embedder not available for retrieval.")
return [], 0.0 # Return empty list and 0.0 score if RAG setup is incomplete
# Handle case where reranker is not available - fall back to basic vector search
# This fallback will still only return the single best match for simplicity
if reranker is None:
print("Reranker model not loaded. Falling back to basic vector search (less robust, single match).")
try:
user_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.cos_sim(user_embedding, embeddings)[0]
best_score = cos_scores.max().item()
if best_score > threshold:
best_match_idx = cos_scores.argmax().item()
best_match = data[best_match_idx]
print(f"Basic vector search match found with score {best_score:.4f}.")
return [best_match], best_score # Return list containing one match
else:
print(f"Basic vector search: No match found above threshold {threshold:.4f} (best score: {best_score:.4f}).")
return [], best_score # Return empty list
except Exception as e:
print(f"Error during basic vector search retrieval: {e}")
return [], 0.0 # Return empty list and 0.0 score on error
# If reranker is available, proceed with vector search and re-ranking (MODIFIED FOR MULTIPLE MATCHES)
try:
user_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.cos_sim(user_embedding, embeddings)[0]
# Get initial candidates from vector search (e.g., top 20)
# We need more initial candidates than the final max_matches
top_k_initial = max(max_matches * 2, 10) # Get at least double the desired matches, minimum 10
top_k_initial = min(top_k_initial, len(descriptions_for_embedding)) # Ensure not more than available
if top_k_initial == 0: # Handle case with no descriptions or k=0
print("No descriptions available or top_k_initial is zero.")
return [], 0.0
top_results_indices = torch.topk(cos_scores, k=top_k_initial, largest=True).indices.tolist()
if not top_results_indices:
print("Vector search found no initial candidates.")
return [], 0.0
# Prepare query-document pairs for re-ranking
rerank_pairs = [[query, descriptions_for_embedding[idx]] for idx in top_results_indices]
# Get re-ranker scores
rerank_scores = reranker.predict(rerank_pairs).tolist()
# Combine scores and original indices, then sort by re-ranker score
scored_indices = sorted(zip(rerank_scores, top_results_indices), key=lambda x: x[0], reverse=True)
relevant_matches = []
best_overall_score = 0.0 # Track the highest score among retrieved
# Iterate through sorted results and collect matches above the threshold, up to max_matches
for i, (score, original_idx) in enumerate(scored_indices):
if score >= threshold:
relevant_matches.append(data[original_idx])
print(f"Including match (score: {score:.4f}, Original index: {original_idx}) above threshold {threshold:.4f}.")
if i == 0: # The first item in the sorted list is the best score
best_overall_score = score
else:
print(f"Skipping match (score: {score:.4f}) below threshold {threshold:.4f}.")
# If results are sorted, we can break early once the threshold is no longer met
break
if len(relevant_matches) >= max_matches:
print(f"Reached maximum number of matches ({max_matches}). Stopping collection.")
break # Stop once max_matches are collected
if relevant_matches:
print(f"Retrieved {len(relevant_matches)} relevant business info matches.")
return relevant_matches, best_overall_score # Return list of matches and the best score
else:
# If no matches were found above the threshold
# Find the score of the single best match even if it's below the threshold
best_possible_score = scored_indices[0][0] if scored_indices else 0.0
print(f"Reranked business info: No matches found above threshold {threshold:.4f}. Best score was {best_possible_score:.4f}.")
return [], best_possible_score # Return empty list and best score found
except Exception as e:
print(f"Error during re-ranked business information retrieval: {e}")
print(traceback.format_exc()) # Print traceback for RAG errors
return [], 0.0 # Return empty list and 0.0 score on error
# Function to perform date calculation if needed
def perform_date_calculation(query: str):
"""
Analyzes query for date calculation requests and performs the calculation.
Returns a dict describing the calculation and result, or None.
Handles formats like 'X days ago', 'X days from now', 'X weeks ago', 'X weeks from now', 'what is today's date'.
Uses dateutil for slightly more flexibility (though core logic remains simple).
"""
print(f"Executing Tool: perform_date_calculation with query='{query}')")
query_lower = query.lower()
today = date.today()
result_date = None
calculation_description = None
if re.search(r"\btoday'?s date\b|what is today'?s date\b|what day is it\b", query_lower):
result_date = today
calculation_description = f"The current date is: {today.strftime('%Y-%m-%d')}"
print(f"Identified query for today's date.")
return {"query": query, "description": calculation_description, "result": result_date.strftime('%Y-%m-%d'), "success": True}
match = re.search(r"(\d+)\s+(day|week|month|year)s?\s+(ago|from now)", query_lower)
if match:
value = int(match.group(1))
unit = match.group(2)
direction = match.group(3)
try:
if unit == 'day':
delta = timedelta(days=value)
elif unit == 'week':
delta = timedelta(weeks=value)
elif unit == 'month':
delta = relativedelta(months=value)
elif unit == 'year':
delta = relativedelta(years=value)
else:
desc = f"Could not understand the time unit '{unit}' in '{query}'."
print(desc)
return {"query": query, "description": desc, "result": None, "success": False, "error": desc}
if direction == 'ago':
result_date = today - delta
calculation_description = f"Calculating date {value} {unit}s ago from {today.strftime('%Y-%m-%d')}: {result_date.strftime('%Y-%m-%d')}"
elif direction == 'from now':
result_date = today + delta
calculation_description = f"Calculating date {value} {unit}s from now from {today.strftime('%Y-%m-%d')}: {result_date.strftime('%Y-%m-%d')}"
print(f"Performed date calculation: {calculation_description}")
return {"query": query, "description": calculation_description, "result": result_date.strftime('%Y-%m-%d'), "success": True}
except OverflowError:
desc = f"Date calculation overflow for query: {query}"
print(f"Date calculation overflow for query: {query}")
return {"query": query, "description": desc, "result": None, "success": False, "error": desc}
except Exception as e:
desc = f"An error occurred during date calculation for query '{query}': {e}"
print(desc)
return {"query": query, "description": desc, "result": None, "success": False, "error": str(e)}
desc = "No specific date calculation pattern recognized."
print(f"No specific date calculation pattern found in query: '{query}'")
return {"query": query, "description": desc, "result": None, "success": False}
# --- Tool Definitions for the Model ---
# Describe the tools available to the model in a structured format
# This will be injected into the prompt.
TOOL_DEFINITIONS = """
Available tools:
1. **search**: Use this tool to perform a web search for current external information. Useful for facts, news, weather, etc.
Parameters:
- query (string, required): The search query.
- max_results (integer, optional, default=5): The maximum number of results to return.
2. **lookup_business_info**: Use this tool to search the internal business database for information about our services, products, pricing, availability, and key people. This is the primary source for company-specific details. This lookup is now more robust to variations in phrasing due to enhanced search.
Parameters:
- query (string, required): The query terms related to the business information needed (e.g., "consultation service", "DSTv assistant model price", "Salum Ally").
- threshold (number, optional, default=0.50): The minimum relevance score required for a match based on a re-ranking process. Use a lower threshold (e.g., 0.4) if very broad matching is needed.
- max_matches (integer, optional, default=5): The maximum number of relevant matches to retrieve from the internal database. Use a higher number (e.g., 10 or 15) for broad queries asking about multiple items.
3. **perform_date_calculation**: Use this tool to calculate dates relative to today or find today's date. Understands phrases like "today's date", "X days ago", "Y weeks from now", "Z months/years ago/from now".
Parameters:
- query (string, required): The natural language query asking for a date calculation.
4. **answer**: Use this tool when you have gathered all necessary information from tools and history, or when the user's query can be answered directly based on your knowledge. This is the *last* action you should take in a turn.
Parameters:
- text (string, required): The final, comprehensive, natural language response to the user.
"""
# --- System Prompt Template for Tool Use ---
# This template instructs the model on how to use the tools and format its output.
# Inject this *within* the user message content.
# MODIFIED to ask for COMPREHENSIVE answers
tool_use_system_template = """<system>
You are FutureAi, a helpful, polite, and professional assistant for Futuresony. Your primary goal is to assist the user by effectively using the available tools or answering directly based on the conversation history and tool outputs. Maintain a positive and helpful tone. If you are unsure or a tool returns no clear results, state this gracefully. When providing answers based on gathered information, aim for a comprehensive and detailed response, synthesizing all relevant points from the tool outputs.
Today's date is: {current_date}
Available tools:
{tool_definitions}
Analyze the user's request and decide whether to use one or more tools, or provide a final answer.
**Tool Usage Priority:**
- If the query is about *our business services, products, pricing, or people (like employees/contacts listed in our internal data)*, prioritize using the `lookup_business_info` tool first.
- If the query is a date calculation, use the `perform_date_calculation` tool.
- Use the `search` tool for general knowledge, current events, weather, or information clearly outside of our internal business data.
- You can use multiple tools if a query is multifaceted. Process internal information first.
To use a tool, output a command within <tool_code> and </tool_code> tags. The content inside should be a JSON object with "tool_name" and "parameters". Ensure parameters like 'threshold' and 'max_matches' are included for `lookup_business_info` when needed for broad queries.
Example tool call:
<tool_code> {{"tool_name": "search", "parameters": {{"query": "weather today"}}}} </tool_code>
<tool_code> {{"tool_name": "lookup_business_info", "parameters": {{"query": "consultation service", "threshold": 0.6}}}} </tool_code>
<tool_code> {{"tool_name": "lookup_business_info", "parameters": {{"query": "all services", "threshold": 0.4, "max_matches": 10}}}} </tool_code> # Example for broad query
After executing tools, you will receive the tool results. Use these results and the conversation history to formulate your **comprehensive** final answer. Tool results will be provided within `<tool_results>` tags, containing sub-tags specific to each tool's output. Pay close attention to these results and any notes within `<system_note>` or `<error>` tags.
To provide the final answer to the user, use the 'answer' tool. This indicates you are finished and the text within the parameters will be shown to the user. Use the 'answer' tool as soon as you have sufficient information to answer the user's query, or if you determine you cannot answer it even with the tools. Your answer should be detailed and synthesize information effectively, especially from multiple lookup results.
Example final answer:
<tool_code> {{"tool_name": "answer", "parameters": {{"text": "Based on the search results, the weather today is sunny."}}}} </tool_code>
If you can answer the query directly without tools (e.g., a simple greeting, acknowledging instructions), use the 'answer' tool immediately with a direct, polite response.
Think step-by-step. Decide if tools are needed based on the **Tool Usage Priority**. If so, which ones? What parameters? Consider if a broad query requires setting a lower `threshold` and higher `max_matches` for `lookup_business_info`. If you have results, how do they help answer the user? Synthesize ALL relevant information into your final answer. If results are insufficient or indicate an error, how should you respond gracefully? Finally, formulate the comprehensive answer using the 'answer' tool.
Output ONLY tool calls within <tool_code> tags or a final answer using the 'answer' tool. Do not include any other text unless it's within the 'answer' tool's parameters.
</system>
"""
# Max history length in terms of turns (user + assistant) to keep in the model context
MAX_HISTORY_TURNS = 5 # Keep last 5 turns
# --- Chat Logic Function with Tool Use ---
def chat_with_bot(user_input, chat_history_state):
"""
Processes user input through an iterative tool-use logic for Gradio interface.
Takes user_input string and chat_history_state (list of lists) as input.
Returns the updated chat_history_state (list of lists).
Uses a structured tool-calling approach.
Guaranteed strict user/assistant role alternation in model_chat_history.
"""
# Basic Input Safety Check (Example)
if any(phrase in user_input.lower() for phrase in ["harmful content", "malicious intent"]):
safe_response = "I cannot process requests that involve harmful or inappropriate content."
return chat_history_state + [[user_input, safe_response]]
# Append user message to history immediately for display
# The bot message will be updated iteratively
# We append a placeholder now, and update it with the final response later.
chat_history_state = chat_history_state + [[user_input, "..."]]
original_user_input = user_input
print(f"\n--- Starting turn with input: {user_input} ---") # Debug Print
# Get current date
current_date = date.today().strftime('%Y-%m-%d')
print(f"Current Date: {current_date}") # Debug Print
# Maintain an internal model history that strictly alternates user/assistant roles
# This history will be used directly by apply_chat_template.
# It represents the conversation *as the model sees it*, including tool calls/results.
# Build this history from the *completed* past turns from chat_history_state.
model_chat_history = []
# Convert Gradio chat history (list of lists) to model history (list of dicts)
# Ensure strict alternation: user, assistant, user, assistant...
# Only add complete turns from the *past* history (exclude the current incomplete turn)
# Limit the history length
history_to_process = chat_history_state[:-1] # Exclude the current turn being processed
# Ensure we only take pairs [user, bot] from past history where bot is NOT the initial placeholder
# This guarantees that the last message in `recent_complete_turns` corresponds to a *completed* assistant response.
complete_past_turns = [
turn for turn in history_to_process
if turn is not None and len(turn) == 2 and turn[0] is not None and turn[1] is not None and str(turn[1]).strip() != "..."
]
# Take the last MAX_HISTORY_TURNS complete turns
recent_complete_turns = complete_past_turns[max(0, len(complete_past_turns) - MAX_HISTORY_TURNS):]
for user_msg, bot_msg in recent_complete_turns:
# Add user message (must be present)
if user_msg is not None: # Should always be True based on complete_past_turns filter
model_chat_history.append({"role": "user", "content": str(user_msg).strip()})
# Add assistant message (must be present and non-placeholder based on complete_past_turns filter)
if bot_msg is not None and str(bot_msg).strip() != "...": # Should always be True based on filter
model_chat_history.append({"role": "assistant", "content": str(bot_msg).strip()})
# --- Iterative Tool Calling Loop ---
max_tool_iterations = 5 # Limit the number of tool calls in a single turn to prevent infinite loops
final_response_text = None # Variable to hold the final answer from the 'answer' tool
current_tool_results_text = "" # Accumulate tool results text for the *next* model call in this turn
print("Starting tool execution loop...")
try: # This is the main try block for the chat_with_bot function
for i in range(max_tool_iterations):
print(f"\n--- Tool Iteration {i+1} ---")
# Step 1 & 2: Prepare the user message content for THIS iteration and append to history
# The content of the user message for this iteration depends on whether it's the first step
# (original query + system prompt) or a subsequent step (tool results).
current_user_message_content = ""
if i == 0:
# First iteration: Include the system template and the original user input
system_prompt_content = tool_use_system_template.format(
current_date=current_date,
tool_definitions=TOOL_DEFINITIONS
)
current_user_message_content = system_prompt_content + "\n\nUser Query: " + original_user_input
else:
# Subsequent iterations: Include the tool results from the previous assistant response.
if current_tool_results_text:
current_user_message_content = "<tool_results>\n" + current_tool_results_text.strip() + "\n</tool_results>"
current_tool_results_text = "" # Clear the buffer after adding to the prompt
else:
# If no new tool results were accumulated in the previous step (e.g., parsing failed, no tools called),
# send a message indicating this so the model doesn't wait indefinitely.
current_user_message_content = "<tool_results>No new results or no tools were called in the previous turn.</tool_results>"
print("No new tool results to add for this iteration.")
# Append the user message for the current iteration to the main model history.
# This history is what apply_chat_template will process.
# If the logic is correct, model_chat_history should always end with an 'assistant' role
# before this append, except for the very first turn of the conversation.
model_chat_history.append({"role": "user", "content": current_user_message_content.strip()})
# Step 3 & 4: Apply template to get the full prompt and Generate model output
# The history `model_chat_history` should now be in the correct state for generation:
# starting with 'user' and ending with the current 'user' message.
# The check below verifies the strict alternation before tokenization.
if len(model_chat_history) > 1 and model_chat_history[-1]['role'] == model_chat_history[-2]['role']:
print("Error: History roles are not alternating before generation!")
print("History:", model_chat_history)
final_response_text = "Sorry, I encountered an internal error with the conversation history format before generation."
break # Break the tool loop if history is malformed
prompt_for_generation = tokenizer.apply_chat_template(
model_chat_history, # Use the main model_chat_history directly
tokenize=False,
add_generation_prompt=True
)
generation_config = GenerationConfig(
max_new_tokens=700, # Increased tokens to allow for multiple tool calls or a longer answer
do_sample=False, # Keep deterministic for tool calls initially
temperature=0.1, # Low temperature for predictable tool calls
top_k=None,
top_p=None,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
use_cache=True
)
raw_model_output = ""
# Add try-except around tokenizer call as well
try:
input_ids = tokenizer(prompt_for_generation, return_tensors="pt").input_ids.to(model.device)
if input_ids.numel() == 0:
print("Warning: Empty input_ids for model generation.")
raw_model_output = "<system_error>Error: Empty input_ids for model generation.</system_error>" # Report error via system tag
else:
try:
outputs = model.generate(
input_ids=input_ids,
generation_config=generation_config,
)
prompt_length = input_ids.shape[1]
if outputs.shape[1] > prompt_length:
raw_model_output = tokenizer.decode(outputs[0, prompt_length:], skip_special_tokens=True).strip()
else:
raw_model_output = ""
print("Warning: Model generated no new tokens.")
except Exception as e:
print(f"Error during model generation in tool loop: {e}")
raw_model_output = f"<system_error>Error: Model generation failed: {e}</system_error>" # Report error via system tag
except Exception as e:
print(f"Error during tokenizer call in tool loop: {e}")
raw_model_output = f"<system_error>Error: Tokenizer failed: {e}</system_error>" # Report error via system tag
print(f"Raw model output: {raw_model_output}")
# Step 5: Append the model's raw output as the assistant message for THIS iteration
# This is crucial for maintaining the alternation in `model_chat_history`
model_chat_history.append({"role": "assistant", "content": raw_model_output.strip()})
# Step 6: Parse Tool Calls from the latest assistant message (which is now the last entry in history)
tool_calls = []
# Use regex to find all content within <tool_code> tags in the latest assistant message
matches = re.findall(r'<tool_code>(.*?)</tool_code>', model_chat_history[-1]['content'], re.DOTALL)
if not matches:
print("No tool calls found in latest model output.")
# If no tool calls, check if the model tried to output an answer directly
# This is a fallback if the model fails to use the 'answer' tool.
# Apply cleanup patterns just to the latest assistant message to see if it's a potential answer
cleaned_potential_answer = re.sub(r'<tool_code>.*?</tool_code>', '', model_chat_history[-1]['content'], flags=re.DOTALL) # Remove tool tags first
cleaned_potential_answer = re.sub(r'<.*?>', '', cleaned_potential_answer).strip() # Remove any other potential tags
# If the cleaned output is not empty or just whitespace, treat it as a potential final answer
if cleaned_potential_answer and final_response_text is None:
print("Model output does not contain tool calls, treating cleaned output as potential direct answer.")
final_response_text = cleaned_potential_answer
break # Exit the tool loop as we have a response
# If no tool calls and not a potential answer, check for explicit system errors reported by the model
if "<system_error>" in model_chat_history[-1]['content'] or "<error>" in model_chat_history[-1]['content']:
print("Model output contains system error tags. Exiting tool loop.")
# The synthesis step will pick up these errors from the history
break # Exit loop on critical error reported by the model
# If no tool calls, no potential answer, and no explicit error, the loop might continue.
# The next iteration's user message content will be generated as "No new results..."
continue # Skip to the next iteration
# Step 7: Execute Tool Calls and accumulate results for the *next* iteration's user message
# We clear the buffer here, as we are processing the *latest* assistant message's tools.
current_tool_results_text = ""
answer_tool_called_in_this_iter = False # Reset flag for this iteration's output
for match in matches:
try:
# Attempt to parse the content within the tags as JSON
tool_call_json = json.loads(match.strip())
if "tool_name" in tool_call_json and "parameters" in tool_call_json:
tool_name = tool_call_json.get("tool_name")
parameters = tool_call_json.get("parameters", {})
if tool_name == "answer":
final_response_text = parameters.get("text", "")
answer_tool_called_in_this_iter = True
print(f"Model called 'answer' tool. Final response intended: '{final_response_text}'")
# Once the 'answer' tool is called, we prioritize exiting the loop after this iteration.
# We still process any other tool calls in this *same* model output, but then break the loop afterwards.
continue # Process next tool call in the same output (from the same model output)
elif tool_name == "search":
query = parameters.get("query")
max_results = parameters.get("max_results", 5)
if query:
print(f"Executing Tool: search with query='{query}', max_results={max_results}")
results = perform_duckduckgo_search(query, max_results)
current_tool_results_text += f"<search_results_for_query query='{query}'>\n"
if results:
for r in results:
snippet = r.get('body', 'N/A')
if len(snippet) > 300:
snippet = snippet[:300] + "..."
current_tool_results_text += f"<item>\n<title>{r.get('title', 'N/A')}</title>\n<snippet>{snippet}</snippet>\n<url>{r.get('href', 'N/A')}</url>\n</item>\n"
print(f"Executed search for '{query}'. Found {len(results)} results.")
else:
current_tool_results_text += "No results found.\n"
print(f"No search results found for '{query}'.")
current_tool_results_text += "</search_results_for_query>\n"
else:
current_tool_results_text += f"<search_results_for_query query='{query}'><error>Missing 'query' parameter.</error></search_results_for_query>\n"
print(f"Skipping search tool call: Missing 'query' parameter.")
elif tool_name == "lookup_business_info":
query = parameters.get("query")
# Use the threshold and max_matches provided by the model, or the defaults
threshold = parameters.get("threshold", 0.50)
max_matches = parameters.get("max_matches", 5) # Use max_matches parameter
if query:
print(f"Executing Tool: lookup_business_info with query='{query}', threshold={threshold}, max_matches={max_matches}")
# retrieve_business_info now returns a LIST of matches and the best score
matches_list, best_score = retrieve_business_info(query, threshold=threshold, max_matches=max_matches)
# MODIFIED: Format results block to contain MULTIPLE match tags
current_tool_results_text += f"<lookup_business_info_results_for_query query='{query}' requested_threshold='{threshold:.4f}' requested_max_matches='{max_matches}' final_best_score='{best_score:.4f}'>\n"
if matches_list: # Check if the list is not empty
for match in matches_list: # Iterate through the list of matches
if isinstance(match, dict): # Ensure it's a dictionary
current_tool_results_text += f"<match>\n"
current_tool_results_text += f"<service>{match.get('Service', 'N/A')}</service>\n"
current_tool_results_text += f"<description>{match.get('Description', 'N/A')}</description>\n"
current_tool_results_text += f"<price>{match.get('Price', 'N/A')}</price>\n"
current_tool_results_text += f"<available>{match.get('Available', 'N/A')}</available>\n"
# Add other relevant fields from your sheet here if needed for synthesis
# e.g., <contact_person> etc.
current_tool_results_text += f"</match>\n"
# Optionally add a note if any item in the list was not a dict
else:
print(f"Warning: Item in retrieved_business_info list was not a dict: {match}")
print(f"Executed business lookup for '{query}'. Found {len(matches_list)} matches above threshold {threshold:.4f}. Best score: {best_score:.4f}.")
else:
# This case covers No matches found above threshold within retrieve_business_info
current_tool_results_text += f"No relevant matches found above threshold {threshold:.4f} (best score: {best_score:.4f}).\n"
print(f"Executed business lookup for '{query}'. No matches found above threshold.")
# Add a note about the best score being below threshold
if best_score > 0: # Only add note if *some* match was found, but not above threshold
current_tool_results_text += f"<system_note>Best match score ({best_score:.4f}) was below the requested threshold ({threshold:.4f}).</system_note>\n"
current_tool_results_text += "</lookup_business_info_results_for_query>\n"
else:
current_tool_results_text += f"<lookup_business_info_results_for_query query='{query}'><error>Missing 'query' parameter.</error></lookup_business_info_results_for_query>\n"
print(f"Skipping business lookup tool call: Missing 'query' parameter.")
elif tool_name == "perform_date_calculation":
query = parameters.get("query")
if query:
print(f"Executing Tool: perform_date_calculation with query='{query}'")
result = perform_date_calculation(query) # This function already returns a dict or error
current_tool_results_text += f"<perform_date_calculation_results_for_query query='{query}'>\n"
if result and result.get('success'): # Check the 'success' key
current_tool_results_text += f"<description>{result.get('description', 'Calculation Successful')}</description>\n<date>{result.get('result')}</date>\n"
print(f"Executed date calculation for '{query}'. Result: {result.get('result')}.")
elif result and result.get('description'):
current_tool_results_text += f"<description>{result.get('description')}</description>\n" # Report description if result is None or not success
print(f"Executed date calculation for '{query}'. Failed: {result.get('description')}.")
elif isinstance(result, str) and result.startswith("Error"):
current_tool_results_text += f"<error>{result}</error>\n" # Report error string
print(f"Executed date calculation for '{query}'. Error: {result}.")
else: # Generic failure case
current_tool_results_text += "Calculation failed or no specific date recognized.\n"
print(f"Executed date calculation for '{query}'. No specific result.")
current_tool_results_text += "</perform_date_calculation_results_for_query>\n"
else:
current_tool_results_text += f"<perform_date_calculation_results_for_query query='{query}'><error>Missing 'query' parameter.</error></perform_date_calculation_results_for_query>\n"
print(f"Skipping date calculation tool call: Missing 'query' parameter.")
else:
print(f"Unknown tool requested by model: {tool_name}")
# Add a note to results buffer about the unknown tool
current_tool_results_text += f"<system_note>Unknown tool requested: {tool_name}</system_note>\n"
else:
print(f"Parsed JSON missing 'tool_name' or 'parameters': {tool_call_json}")
current_tool_results_text += f"<system_note>Failed to parse tool call: Missing 'tool_name' or 'parameters' in JSON: {match.strip()}</system_note>\n"
except json.JSONDecodeError as e:
print(f"Failed to parse tool call JSON: {e}")
print(f"Content was: {match.strip()}")
current_tool_results_text += f"<system_note>Failed to parse tool call JSON: {e}. Content: {match.strip()}</system_note>\n"
except Exception as e:
print(f"An unexpected error occurred during tool call parsing or execution: {e}")
print(traceback.format_exc()) # Print traceback for tool execution errors
current_tool_results_text += f"<system_note>An unexpected error occurred during tool call processing: {e}. Content: {match.strip()}</system_note>\n"
# Step 8: Check if the 'answer' tool was called in this iteration
if answer_tool_called_in_this_iter:
print("Answer tool called. Exiting tool loop.")
break # Exit the main tool iteration loop
# Step 9: If max iterations reached and 'answer' tool wasn't called
if i == max_tool_iterations - 1 and final_response_text is None:
print(f"Max tool iterations reached ({max_tool_iterations}) without 'answer' call.")
# Add a final note to the results buffer so the model sees it in the last forced synthesis step
current_tool_results_text += "<system_note>Maximum tool calls reached. Please provide a final answer based on the information gathered so far or state that the request cannot be fully fulfilled.</system_note>\n"
# Fall through to the final response generation step below
# --- End of the main try block for chat_with_bot ---
# THIS EXCEPT BLOCK NEEDS TO BE AT THE SAME INDENTATION LEVEL AS THE 'try' ABOVE
except Exception as e: # This except matches the 'try' block at the beginning of the function
print(f"An unexpected error occurred in the chat_with_bot function: {e}")
print(traceback.format_exc()) # Print full traceback for debugging
final_response_text = f"Sorry, I encountered an unexpected error while processing your request: {e}"
# In case of error, ensure final_response_text is set so we proceed to update history
# The code below runs AFTER the tool iteration loop and its enclosing try/except finishes
# --- Final Response Generation (Synthesis) ---
# This step is either using the text from the 'answer' tool call,
# or generating a fallback response if the model failed to call 'answer'.
print("\n--- Final Response Generation ---")
# If the model successfully called the 'answer' tool, use that text.
# Otherwise, construct a synthesis prompt for the model to generate a final answer.
if final_response_text is None:
print("Model did not call 'answer' tool. Falling back to synthesis prompt.")
# Model failed to call the 'answer' tool within iterations or encountered an error.
# Fallback: Generate a response based on the accumulated history and tool results.
# The history `model_chat_history` now contains the full trace of tool calls
# and the user messages containing the tool results.
# Construct the synthesis prompt content.
# MODIFIED Synthesis Prompt to emphasize comprehensive answer
synthesis_prompt_content = """<system>
Please provide a final, comprehensive answer to the user's original query based on ALL the information gathered from the executed tools and the conversation history. Synthesize the information into a coherent, natural language response. Pay special attention to providing detailed descriptions and listing all relevant points found from the business lookup tool when multiple items were retrieved.
User's original query: "{original_user_input}"
Information gathered from tools and process notes:
{gathered_info_summary}
Synthesize ALL relevant information into a clear, concise, and **comprehensive** natural language response for the user. When presenting information from multiple business lookup results, structure your answer to clearly describe each item found (e.g., list them, describe each one fully).
**Guidelines for your response:**
- Address the user's original question directly.
- Use the information provided in the 'Information gathered' section, synthesizing details from all relevant results.
- If the business lookup returned multiple matches, present the information for *each* match found clearly and informatively.
- If a tool was executed but returned no relevant results (especially if the best score was below the threshold), or if there were errors (<system_error>, <error>, <system_note> tags), explain this gracefully to the user.
- Maintain a helpful, polite, and professional business tone, reflecting the Futuresony brand and your identity as FutureAi.
- Do NOT include raw tool call or result tags in your final answer.
- If you were unable to gather necessary information, clearly state what you could and could not find.
After your answer, generate 2-3 concise follow-up questions that might be helpful or relevant to the user based on the conversation and your response. List these questions clearly at the end.
If Search Results were used, list the relevant URLs under a "Sources:" heading at the very end.
</system>
"""
# Summarize the gathered information by processing the model_chat_history
gathered_info_summary = ""
unique_urls = set() # Collect URLs for Sources section
# Iterate through the model history to find user messages that followed an assistant message
# These 'user' messages should contain the tool results block if tools were run.
# We iterate up to the second-to-last message, as the *very* last message in history
# will be the synthesis prompt itself, which hasn't been processed yet.
for i in range(1, len(model_chat_history)):
# Look for 'user' messages that follow an 'assistant' message
if model_chat_history[i]['role'] == 'user' and isinstance(model_chat_history[i]['content'], str) and '<tool_results>' in model_chat_history[i]['content']:
msg_content = model_chat_history[i]['content']
# Check if it contains the tool results block
tool_results_block = re.search(r'<tool_results>(.*?)</tool_results>', msg_content, re.DOTALL)
if tool_results_block:
content = tool_results_block.group(1) # Content inside <tool_results>
# --- Extract and format info from tool result blocks ---
search_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', content, re.DOTALL)
for sr_content in search_blocks:
query_match = re.search(r"query='(.*?)'", sr_content) # Extract query attribute
query = query_match.group(1) if query_match else "Unknown"
gathered_info_summary += f"Search results for '{query}':\n"
items = re.findall(r'<item>(.*?)</item>', sr_content, re.DOTALL)
if items:
for item_content in items:
title = re.search(r'<title>(.*?)</title>', item_content, re.DOTALL)
snippet = re.search(r'<snippet>(.*?)</snippet>', item_content, re.DOTALL)
url = re.search(r'<url>(.*?)</url>', item_content, re.DOTALL)
title_text = title.group(1).strip() if title else 'N/A'
snippet_text = snippet.group(1).strip() if snippet else 'N/A'
url_text = url.group(1).strip() if url else 'N/A'
gathered_info_summary += f"- Title: {title_text}, Snippet: {snippet_text}\n"
if url_text and url_text != 'N/A':
unique_urls.add(url_text) # Add URL to set
elif "No results found" in sr_content:
gathered_info_summary += "- No results found.\n"
elif "<error>" in sr_content:
error_text = re.search(r'<error>(.*?)</error>', sr_content, re.DOTALL)
gathered_info_summary += f"- Error during search: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
# Business lookup results (MODIFIED to handle MULTIPLE match tags)
lookup_blocks = re.findall(r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>', content, re.DOTALL)
for lr_content in lookup_blocks:
query_match = re.search(r"query='(.*?)'", lr_content)
query = query_match.group(1) if query_match else "Unknown"
# Extract requested_threshold, requested_max_matches, final_best_score
req_thresh_match = re.search(r"requested_threshold='(.*?)'", lr_content)
req_thresh = float(req_thresh_match.group(1)) if req_thresh_match else 0.50
req_max_matches_match = re.search(r"requested_max_matches='(.*?)'", lr_content)
req_max_matches = int(req_max_matches_match.group(1)) if req_max_matches_match else 5
final_best_score_match = re.search(r"final_best_score='(.*?)'", lr_content)
final_best_score = float(final_best_score_match.group(1)) if final_best_score_match else 0.0
gathered_info_summary += f"Business lookup results for '{query}' (Requested Threshold: {req_thresh:.4f}, Requested Max Matches: {req_max_matches}, Final Best Score: {final_best_score:.4f}):\n"
matches_found = re.findall(r'<match>(.*?)</match>', lr_content, re.DOTALL) # Find ALL match tags
if matches_found:
gathered_info_summary += f" Found {len(matches_found)} relevant item(s):\n"
for match_content in matches_found: # Iterate through each match
service = re.search(r'<service>(.*?)</service>', match_content, re.DOTALL)
description = re.search(r'<description>(.*?)</description>', match_content, re.DOTALL)
price = re.search(r'<price>(.*?)</price>', match_content, re.DOTALL)
available = re.search(r'<available>(.*?)</available>', match_content, re.DOTALL)
# Add extraction for other fields if you include them in your tool output
# contact_person = re.search(r'<contact_person>(.*?)</contact_person>', match_content, re.DOTALL)
gathered_info_summary += f" - Service: {service.group(1).strip() if service else 'N/A'}\n"
gathered_info_summary += f" Description: {description.group(1).strip() if description else 'N/A'}\n"
gathered_info_summary += f" Price: {price.group(1).strip() if price else 'N/A'}\n"
gathered_info_summary += f" Available: {available.group(1).strip() if available else 'N/A'}\n"
# Add other fields here...
# if contact_person: gathered_info_summary += f" Contact Person: {contact_person.group(1).strip()}\n"
elif "No relevant matches found" in lr_content:
gathered_info_summary += f" No relevant matches found above threshold {req_thresh:.4f} (best score: {final_best_score:.4f}).\n"
elif "<error>" in lr_content:
error_text = re.search(r'<error>(.*?)</error>', lr_content, re.DOTALL)
gathered_info_summary += f" Error during business lookup: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
# Include system notes found within the business lookup results block
system_notes_in_lookup = re.findall(r'<system_note>(.*?)</system_note>', lr_content, re.DOTALL)
for note in system_notes_in_lookup:
gathered_info_summary += f" System Note within Lookup: {note.strip()}\n"
# Date calculation results
date_blocks = re.findall(r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>', content, re.DOTALL)
for dr_content in date_blocks:
query_match = re.search(r"query='(.*?)'", dr_content)
query = query_match.group(1) if query_match else "Unknown"
gathered_info_summary += f"Date calculation results for '{query}':\n"
date_val = re.search(r'<date>(.*?)</date>', dr_content, re.DOTALL)
desc = re.search(r'<description>(.*?)</description>', dr_content, re.DOTALL)
if date_val:
gathered_info_summary += f"- Result: {date_val.group(1).strip()}\n"
if desc: gathered_info_summary += f" Description: {desc.group(1).strip()}\n"
elif desc:
gathered_info_summary += f"- {desc.group(1).strip()}\n"
elif "<error>" in dr_content:
error_text = re.search(r'<error>(.*?)</error>', dr_content, re.DOTALL)
gathered_info_summary += f"- Error during date calculation: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
else:
gathered_info_summary += "- No specific date result found.\n"
# System Notes/Errors from Tool Execution (outside of specific tool blocks but within <tool_results>)
system_notes_in_results_block = re.findall(r'<system_note>(.*?)</system_note>', content, re.DOTALL)
for note in system_notes_in_results_block:
# Add only if not already added from within a specific lookup block
if f"System Note: {note.strip()}\n" not in gathered_info_summary and f"System Note within Lookup: {note.strip()}\n" not in gathered_info_summary:
gathered_info_summary += f"System Note from Tool Results: {note.strip()}\n"
system_errors_in_results_block = re.findall(r'<system_error>(.*?)</system_error>', content, re.DOTALL)
for error_note in system_errors_in_results_block:
gathered_info_summary += f"System Error from Tool Results: {error_note.strip()}\n"
# Also check the raw model output (last assistant message) for system errors if tool results block wasn't generated
last_assistant_message_content = model_chat_history[-1]['content'] if model_chat_history and model_chat_history[-1]['role'] == 'assistant' else ""
system_errors_in_raw_output = re.findall(r'<system_error>(.*?)</system_error>', last_assistant_message_content, re.DOTALL)
for error_note in system_errors_in_raw_output:
# Add only if not already captured from within tool results block
if f"System Error from Tool Results: {error_note.strip()}" not in gathered_info_summary:
gathered_info_summary += f"System Error in model output: {error_note.strip()}\n"
# Check for system notes/errors that might be outside <tool_results> but in the raw assistant output
system_notes_in_raw_output = re.findall(r'<system_note>(.*?)</system_note>', last_assistant_message_content, re.DOTALL)
for note in system_notes_in_raw_output:
if f"System Note from Tool Results: {note.strip()}" not in gathered_info_summary and f"System Note within Lookup: {note.strip()}\n" not in gathered_info_summary: # Avoid duplicates
gathered_info_summary += f"System Note in model output: {note.strip()}\n"
if not gathered_info_summary.strip():
gathered_info_summary = "No specific information was gathered using tools."
# Add the synthesis prompt to the history for the final generation step
# This keeps the history structure correct for apply_chat_template
# IMPORTANT: This adds the synthesis prompt as the final USER message.
# The model will then generate the final ASSISTANT response.
temp_chat_history_for_synthesis = model_chat_history.copy() # Copy the history including tool results
synthesis_prompt_formatted = synthesis_prompt_content.format(
original_user_input=original_user_input,
gathered_info_summary=gathered_info_summary.strip() # Add the summary of results
)
# Append the synthesis prompt as the final user message content
# This maintains the user/assistant alternation (last was assistant, now user for synthesis instruction)
temp_chat_history_for_synthesis.append({"role": "user", "content": synthesis_prompt_formatted.strip()})
# --- Final Synthesis Generation Call ---
# Check strict alternation *again* before the final synthesis generation
if len(temp_chat_history_for_synthesis) > 1 and temp_chat_history_for_synthesis[-1]['role'] == temp_chat_history_for_synthesis[-2]['role']:
# This should ideally not happen with correct history management
print("Error: History roles are not alternating just before final synthesis tokenization!")
print("History:", temp_chat_history_for_synthesis)
final_response = "Sorry, I encountered an internal error during final response generation history formatting."
else:
# Add try-except around the final tokenizer call as well
try:
prompt_for_synthesis = tokenizer.apply_chat_template(
temp_chat_history_for_synthesis, # Use the history with the synthesis prompt
tokenize=False,
add_generation_prompt=True
)
synthesis_generation_config = GenerationConfig(
max_new_tokens=1500, # More tokens for the full answer
do_sample=True, # Use sampling for more creative synthesis
temperature=0.7,
top_k=50,
top_p=0.95,
repetition_penalty=1.1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
use_cache=True
)
input_ids_synthesis = tokenizer(prompt_for_synthesis, return_tensors="pt").input_ids.to(model.device)
if input_ids_synthesis.numel() == 0:
final_response = "Sorry, I couldn't generate a response (empty input for final synthesis)."
print("Warning: Final synthesis input_ids empty.")
else:
try:
outputs_synthesis = model.generate(
input_ids=input_ids_synthesis,
generation_config=synthesis_generation_config,
)
prompt_length_synthesis = input_ids_synthesis.shape[1]
if outputs_synthesis.shape[1] > prompt_length_synthesis:
final_response = tokenizer.decode(outputs_synthesis[0, prompt_length_synthesis:], skip_special_tokens=True).strip()
else:
final_response = "..." # Indicate potentially empty response
print("Warning: Final synthesis generated no new tokens.")
except Exception as e:
print(f"Error during final synthesis model generation: {e}")
final_response = f"Sorry, I encountered an error while generating my response: {e}"
print(traceback.format_exc()) # Print full traceback for debugging
except Exception as e:
print(f"Error during final synthesis tokenizer call: {e}")
final_response = f"Sorry, I encountered an error preparing the final response: {e}"
print(traceback.format_exc()) # Print full traceback for debugging
else:
# If final_response_text is not None, it means the 'answer' tool was called.
# Use that text directly.
final_response = final_response_text
print(f"Using response from 'answer' tool call: '{final_response}'")
# --- Post-process Final Response ---
print("Post-processing final response...")
cleaned_response = final_response
# Remove potential prompt bleed or unwanted phrases/tags that the model might still output
# Be more aggressive about removing tool-related artifacts and system instructions
# Removed many patterns as the synthesis prompt is now clearer.
unwanted_patterns = [
r'<tool_code>.*?</tool_code>', # Remove raw tool calls
r'<tool_results>.*?</tool_results>', # Remove the main tool results wrapper
# Keep detailed result blocks for parsing URLs below, but remove them from final text
r'<search_results_for_query.*?>(.*?)</search_results_for_query>',
r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>',
r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>',
r'<system>.*?</system>', # Remove the system block
r'<item>.*?</item>', # Remove individual item tags (from search results)
r'<title>.*?</title>', r'<snippet>.*?</snippet>', r'<url>.*?</url>', # Remove individual search item tags
r'<match>.*?</match>', # Remove individual business match tag
r'<service>(.*?)</service>', # Remove individual business info tags, keep content if needed for fallback
r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', # Remove individual business info tags, keep content if needed for fallback
r'<date>(.*?)</date>', # Remove date tag, keep content
r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>', # Remove error/system note tags, capture content
r'System:', # Remove system prefix if it bleeds
r'Assistant:', # Remove Assistant prefix if it bleeds outside of intended response
r'User:', # Remove User prefix if it bleeds
r'Tool Results:', # Remove the tool results header if it bleeds
# More specific cleanup for synthesis prompt bleed
r"User's original query:.*",
r"Information gathered \(from previous tool calls and results\):.*",
r"Information gathered from tools and process notes:.*", # New pattern from synthesis prompt
r"Synthesize ALL relevant information.*",
r"Guidelines for your response:.*",
r"Available tools:.*",
r"To use a tool, output a command.*",
r"Example tool call:.*",
r"You can make multiple tool calls.*",
r"After executing tools, you will receive the tool results.*",
r"To provide the final answer.*",
r"Example final answer:.*",
r"If you can answer the query directly.*",
r"Think step-by-step.*",
r"Output ONLY tool calls.*",
r"Conversation History \(Recent Turns\):.*",
r"Business Info Check Results for Query Parts:.*",
r"When presenting information from multiple business lookup results, structure your answer to clearly describe each item found.*", # Added pattern from synthesis prompt
r"Pay special attention to providing detailed descriptions and listing all relevant points found from the business lookup tool when multiple items were retrieved.*", # Added pattern from synthesis prompt
]
# First, extract URLs from history *before* removing the blocks from the final response text
# Iterate through the final state of model_chat_history to collect all URLs
unique_urls = set()
for msg in model_chat_history:
if msg['role'] == 'user' and isinstance(msg['content'], str) and '<tool_results>' in msg['content']:
search_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', msg['content'], re.DOTALL)
for sr_content in search_blocks:
urls = re.findall(r'<url>(.*?)</url>', sr_content, re.DOTALL)
for url in urls:
url_text = url.strip()
if url_text and url_text != 'N/A': # Check for empty string as well
unique_urls.add(url_text)
# Now apply the cleanup patterns to the generated response text
# We need to be careful here not to remove content we want to keep while removing the tags
temp_cleaned_response = cleaned_response
for pattern in unwanted_patterns:
# For patterns that capture content we might need in fallback, just remove the tags
if pattern in [r'<service>(.*?)</service>', r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', r'<date>(.*?)</date>', r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>']:
temp_cleaned_response = re.sub(pattern, r'\1', temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
else:
temp_cleaned_response = re.sub(pattern, "", temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
cleaned_response = temp_cleaned_response
# Remove any remaining multiple empty lines
cleaned_response = re.sub(r'\n\s*\n', '\n\n', cleaned_response).strip()
# Append Sources if URLs were collected and response is not just an error
if unique_urls and not ("Sorry, I encountered an unexpected error" in cleaned_response or "Error loading model" in cleaned_response):
# Convert set to list and sort for consistent output
sorted_urls = sorted(list(unique_urls))
# Add a marker to ensure Sources appear clearly
cleaned_response += "\n\nSources:\n" + "\n".join(sorted_urls)
final_response = cleaned_response
# Fallback if the final response is still empty or a placeholder after post-processing
if not final_response.strip() or final_response.strip() == "...":
print("Warning: Final response was empty after cleaning or was placeholder. Providing a fallback.")
# Construct a fallback based on any executed tool result reflected in history
fallback_parts = []
# Iterate through history (excluding the synthesis prompt) to find user messages containing tool results
# The synthesis prompt is the very last user message, so iterate up to model_chat_history[-2] potentially.
# Let's iterate through all user messages as tool results could theoretically be spread.
for msg in model_chat_history:
if msg['role'] == 'user' and isinstance(msg['content'], str) and '<tool_results>' in msg['content']:
# Extract and summarize results from the <tool_results> block
tool_results_block = re.search(r'<tool_results>(.*?)</tool_results>', msg['content'], re.DOTALL)
if tool_results_block:
content = tool_results_block.group(1)
# Check for date calculation results
date_results = re.findall(r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>', content, re.DOTALL)
for dr_content in date_results:
date_val = re.search(r'<date>(.*?)</date>', dr_content, re.DOTALL)
desc = re.search(r'<description>(.*?)</description>', dr_content, re.DOTALL)
if date_val:
fallback_parts.append(f"Date calculation result: {date_val.group(1).strip()}")
elif desc:
fallback_parts.append(f"Date calculation attempt: {desc.group(1).strip()}")
elif "<error>" in dr_content:
error_text = re.search(r'<error>(.*?)</error>', dr_content, re.DOTALL)
fallback_parts.append(f"There was an issue with a date calculation requested: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Check for business lookup results (MODIFIED for multiple matches in fallback)
lookup_results = re.findall(r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>', content, re.DOTALL)
for lr_content in lookup_results:
matches_found = re.findall(r'<match>(.*?)</match>', lr_content, re.DOTALL)
if matches_found:
fallback_parts.append(f"Found {len(matches_found)} potential business information match(es):")
for match_content in matches_found: # Iterate through each found match
service_match = re.search(r'<service>(.*?)</service>', match_content, re.DOTALL)
desc_match = re.search(r'<description>(.*?)</description>', match_content, re.DOTALL)
service_name = service_match.group(1).strip() if service_match and service_match.group(1).strip() != 'N/A' else "An item"
desc_snippet = desc_match.group(1).strip()[:50] + "..." if desc_match and desc_match.group(1).strip() != 'N/A' else "No description provided."
fallback_parts.append(f" - {service_name}: {desc_snippet}")
elif "No relevant matches found" in lr_content:
score_match = re.search(r"final_best_score='(.*?)'", lr_content) # Look for final_best_score
score = float(score_match.group(1)) if score_match else 0.0
threshold_match = re.search(r"requested_threshold='(.*?)'", lr_content)
threshold_val = float(threshold_match.group(1)) if threshold_match else 0.50
fallback_parts.append(f"Could not find specific business information requested above threshold {threshold_val:.4f} (best score: {score:.4f}).")
elif "<error>" in lr_content:
error_text = re.search(r'<error>(.*?)</error>', lr_content, re.DOTALL)
fallback_parts.append(f"There was an error looking up business information: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Include system notes found within the business lookup results block
system_notes_in_lookup = re.findall(r'<system_note>(.*?)</system_note>', lr_content, re.DOTALL)
for note in system_notes_in_lookup:
fallback_parts.append(f"Business Lookup Note: {note.strip()}")
# Check for search results
search_results_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', content, re.DOTALL)
for sr_content in search_results_blocks:
if "<item>" in sr_content: # Indicates results were found (even if snippet is N/A)
query_match = re.search(r"query='(.*?)'", sr_content)
query = query_match.group(1) if query_match else "your query"
fallback_parts.append(f"Found some search results for {query}.")
elif "No results found" in sr_content:
query_match = re.search(r"query='(.*?)'", sr_content)
query = query_match.group(1) if query_match else "your query"
fallback_parts.append(f"No search results were found for {query}.")
elif "<error>" in sr_content:
error_text = re.search(r'<error>(.*?)</error>', sr_content, re.DOTALL)
fallback_parts.append(f"There was an error performing the search: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Check for system notes/errors from tool results (outside of specific tool blocks but within <tool_results>)
system_notes_in_results_block = re.findall(r'<system_note>(.*?)</system_note>', content, re.DOTALL)
for note in system_notes_in_results_block:
# Add only if not already added from within a specific lookup block
if f"System Note: {note.strip()}" not in fallback_parts and f"Business Lookup Note: {note.strip()}" not in fallback_parts:
fallback_parts.append(f"System Note from Tool Results: {note.strip()}")
system_errors_in_results_block = re.findall(r'<system_error>(.*?)</system_error>', content, re.DOTALL)
for error_note in system_errors_in_results_block:
fallback_parts.append(f"System Error from Tool Results: {error_note.strip()}")
# Check for system errors directly in the raw model output (last assistant message)
last_assistant_msg_content = model_chat_history[-1]['content'] if model_chat_history and model_chat_history[-1]['role'] == 'assistant' else ""
system_errors_in_raw_output = re.findall(r'<system_error>(.*?)</system_error>', last_assistant_msg_content, re.DOTALL)
for error_note in system_errors_in_raw_output:
# Add only if not already captured from within tool results block
if f"System Error from Tool Results: {error_note.strip()}" not in fallback_parts:
fallback_parts.append(f"System error during processing: {error_note.strip()}")
# Check for system notes/errors that might be outside <tool_results> but in the raw assistant output
system_notes_in_raw_output = re.findall(r'<system_note>(.*?)</system_note>', last_assistant_msg_content, re.DOTALL)
for note in system_notes_in_raw_output:
if f"System Note from Tool Results: {note.strip()}" not in fallback_parts and f"Business Lookup Note: {note.strip()}" not in fallback_parts: # Avoid duplicates
fallback_parts.append(f"System Note in model output: {note.strip()}")
if fallback_parts:
# Use a set to deduplicate fallback messages based on content
unique_fallback_parts = list(dict.fromkeys(fallback_parts))
# Add a polite intro if there are fallback parts
final_response = "I encountered some difficulty, but based on my attempts:\n- " + "\n- ".join(unique_fallback_parts)
else:
# General fallback if no tools were executed, or no results/errors were reflected in history
final_response = "Sorry, I couldn't process your request at this time. Please try again."
# Check if the final response still contains any tool/system tags after all processing
# This is a final cleanup attempt if previous regex missed something
if re.search(r'<(tool_code|tool_results|search_results_for_query|lookup_business_info_results_for_query|perform_date_calculation_results_for_query|system|item|title|snippet|url|match|service|description|price|available|date|error|system_note)>', final_response):
print("Warning: Final response still contains unexpected tags after post-processing. Cleaning further.")
# Apply unwanted patterns one last time aggressively
temp_cleaned_response = final_response
for pattern in unwanted_patterns:
# Special handling for patterns that capture content
if pattern in [r'<service>(.*?)</service>', r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', r'<date>(.*?)</date>', r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>']:
temp_cleaned_response = re.sub(pattern, r'\1', temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
else:
temp_cleaned_response = re.sub(pattern, "", temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
temp_cleaned_response = re.sub(r'\n\s*\n', '\n\n', temp_cleaned_response).strip()
if temp_cleaned_response.strip(): # Only replace if cleaned version is not empty
final_response = temp_cleaned_response
else: # If aggressive cleaning resulted in empty, use a generic error message
final_response = "Sorry, I had difficulty formulating a complete response."
print("\nBot Response:", final_response, "\n") # Debug Print
# Update the last message in history_state with the final response
# The last message in history_state is the user's input followed by the placeholder "..."
# We update the placeholder with the final response.
if chat_history_state and len(chat_history_state) > 0 and len(chat_history_state[-1]) == 2 and chat_history_state[-1][0] == original_user_input and (chat_history_state[-1][1] is None or chat_history_state[-1][1] == "..."):
chat_history_state[-1][1] = final_response
else:
# This shouldn't happen with the current logic where we append the placeholder immediately,
# but as a safeguard if the history structure is unexpectedly altered.
print("Warning: Could not find placeholder in chat_history_state to update. Appending new turn.")
chat_history_state.append([original_user_input, final_response])
# Return the updated history state
return chat_history_state
# The chat_with_bot function definition ends here.
# The next section should start at the correct indentation level for top-level code.
# --- Gradio Interface Setup ---
# (This part remains largely the same, just ensure it calls the modified chat_with_bot)
# Moved Gradio setup and launch outside the __main__ block to ensure it runs in Colab/Jupyter
print("Setting up Gradio Interface...")
# Define the components
chatbot = gr.Chatbot(height=400, label="Chat History")
msg = gr.Textbox(label="Your message", placeholder="Ask a question...", lines=2)
clear = gr.Button("Clear")
# Create the Gradio Interface with explicit components
# The inputs are the textbox and the chatbot state (for history)
# The outputs are the chatbot state (updated history) and the cleared textbox
# Note: The function should return (updated_history, cleared_input_box_value)
# We will create a helper function to handle the textbox clearing
def respond_and_clear(user_input, chat_history_state):
# Call the main chat logic function
# Ensure chat_history_state is not None; Gradio initializes it as [] but safety first
if chat_history_state is None:
chat_history_state = []
updated_history = chat_with_bot(user_input, chat_history_state)
# Return the updated history and clear the input textbox
return updated_history, ""
# Combine components in a Block or Interface
with gr.Blocks() as demo:
gr.Markdown("## Business RAG Chatbot with Tool Use (Futuresony's FutureAi)") # Added persona/company name
gr.Markdown("Ask questions about Futuresony's services, people, and location, or general knowledge and date calculations! FutureAi aims to provide comprehensive answers.") # Added persona/company name and goal
# Display messages about business info availability status
if business_info_available:
gr.Markdown("<font color='green'>Business Information Loaded Successfully.</font>")
else:
gr.Markdown("<font color='red'>Warning: Business Information Not Loaded. Business-specific questions may not be answerable.</font>")
# Display message about Reranker availability
if reranker is not None:
gr.Markdown("<font color='green'>Business Lookup Reranker Loaded.</font>")
else:
gr.Markdown("<font color='orange'>Warning: Business Lookup Reranker Not Loaded. Lookup may be less robust.</font>")
chatbot.render() # Render the chatbot display area
with gr.Row(): # Place the input textbox and send button side-by-side
msg.render() # Render the input textbox
submit_btn = gr.Button("Send") # Render the explicit send button
clear.render() # Render the clear button
# Define event listeners
# When the submit button is clicked or Enter is pressed in the textbox,
# call the respond_and_clear function.
# The inputs are the textbox value and the chatbot state.
# The outputs are the updated chatbot state and the textbox value (set to empty).
submit_btn.click(respond_and_clear, inputs=[msg, chatbot], outputs=[chatbot, msg])
msg.submit(respond_and_clear, inputs=[msg, chatbot], outputs=[chatbot, msg]) # Also trigger on Enter key in textbox
# When the clear button is clicked, clear the textbox and the chatbot state
clear.click(lambda: ([], ""), outputs=[chatbot, msg]) # Lambda returns empty history and empty textbox
# Add examples (Updated to reflect company/persona and multi-item queries)
gr.Examples(
examples=[
"Tell me about Futuresony's IT Consultation and Network Setup services.", # Multi-item business query
"What are all the services Futuresony offers?", # Broad business query
"Who are the key personnel at Futuresony and what are their roles?", # Multi-person query
"Tell me about the DSTv Assistant model price and its description.", # Multi-detail query
"What is the capital of France?", # General search
"What is the weather in Dar-es-salaam today?", # Location-specific search
"What is today's date?", # Date calculation
"What was the date 30 days ago?", # Date calculation
"Tell me about Futuresony's location and contact details.", # Combined business details
"Je, ni nini huduma zote za Futuresony?", # Broad business query in Swahili
"Nani ni Mkurugenzi wa Futuresony?", # Specific person query in Swahili
"Tafuta habari za hivi punde kuhusu akili bandia (AI).", # General search in Swahili
"Tarehe itakuwa ngapi baada ya wiki 2 kutoka leo?", # Date calculation in Swahili
],
inputs=msg,
# Connect the examples 'click' event directly to the respond_and_clear function
# This is a more standard way.
fn=respond_and_clear,
outputs=[chatbot, msg],
# Clear the textbox after the example is used
cache_examples=False
)
# Launch the Gradio app - Moved outside __main__ block
try:
print("Launching Gradio interface...")
# Set server_name to '0.0.0.0' to make it accessible externally in Colab/Docker
demo.launch(share=True, server_name="0.0.0.0") # Use the Block interface 'demo'
print("Gradio interface launched. Check the public URL.")
except Exception as e: # This is the separate except block for the Gradio launch
print(f"Error launching Gradio interface: {e}")
print("Ensure you have the necessary libraries installed and a stable internet connection.")
print(traceback.format_exc())
|