Mr.Events / app(full_bags).py
Futuresony's picture
Rename app.py to app(full_bags).py
f8f2871 verified
# This script combines all components for deployment on Hugging Face Spaces.
# --- Imports ---
import spaces
import os
import gradio as gr
from huggingface_hub import InferenceClient
import torch
import re
import warnings
import time
import json
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer, util, CrossEncoder
import gspread
# from google.colab import auth # Colab specific, remove for HF Spaces
# from google.auth import default # Colab specific, remove for HF Spaces
from tqdm import tqdm
from duckduckgo_search import DDGS
import spacy
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta
import traceback
import base64
@spaces.GPU
def startup():
print("GPU function registered for Hugging Face Spaces startup.")
return "Ready"
startup()
# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
# --- Global Variables and Secrets ---
# HF_TOKEN is automatically available in HF Spaces secrets
HF_TOKEN = os.getenv("HF_TOKEN")
# GOOGLE_BASE64_CREDENTIALS should be added as a Space Secret
SHEET_ID = os.getenv("SHEET_ID") # Get SHEET_ID from Space Secrets
GOOGLE_BASE64_CREDENTIALS = os.getenv("GOOGLE_BASE64_CREDENTIALS")
# --- Model and Tool Initialization ---
client = None # Initialize after HF_TOKEN is confirmed available
nlp = None
embedder = None
reranker = None
try:
# Initialize InferenceClient
if HF_TOKEN:
client = InferenceClient("google/gemma-2-9b-it", token=HF_TOKEN)
print("Hugging Face Inference Client initialized.")
else:
print("Warning: HF_TOKEN not found. Inference Client not initialized.")
# Load spacy model for sentence splitting
try:
nlp = spacy.load("en_core_web_sm")
print("SpaCy model 'en_core_web_sm' loaded.")
except OSError:
print("SpaCy model 'en_core_web_sm' not found. Downloading...")
try:
# Use pip for installation in HF Spaces environment
os.system("pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1.tar.gz")
nlp = spacy.load("en_core_web_sm")
print("SpaCy model 'en_core_web_sm' downloaded and loaded.")
except Exception as e:
print(f"Failed to download or load SpaCy model: {e}")
# Load SentenceTransformer for RAG/business info retrieval
print("Attempting to load Sentence Transformer (sentence-transformers/paraphrase-MiniLM-L6-v2)...")
embedder = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L6-v2")
print("Sentence Transformer loaded.")
# Load a Cross-Encoder model for re-ranking retrieved documents
print("Attempting to load Cross-Encoder Reranker (cross-encoder/ms-marco-MiniLM-L6-v2)...")
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L6-v2')
print("Cross-Encoder Reranker loaded.")
except Exception as e:
print(f"An error occurred during model/tool initialization: {e}")
print(traceback.format_exc())
# --- Google Sheets Authentication ---
gc = None # Global variable for gspread client
def authenticate_google_sheets():
"""Authenticates with Google Sheets using base64 encoded credentials."""
global gc
print("Authenticating Google Account...")
if not GOOGLE_BASE64_CREDENTIALS:
print("Error: GOOGLE_BASE64_CREDENTIALS secret not found.")
print("Please add GOOGLE_BASE64_CREDENTIALS as a Space Secret.")
return False
try:
# Decode the base64 credentials
credentials_json = base64.b64decode(GOOGLE_BASE64_CREDENTIALS).decode('utf-8')
credentials = json.loads(credentials_json)
# Authenticate using service account from dictionary
gc = gspread.service_account_from_dict(credentials)
print("Google Sheets authentication successful via service account.")
return True
except Exception as e:
print(f"Google Sheets authentication failed: {e}")
print("Please ensure your GOOGLE_BASE64_CREDENTIALS secret is correctly set and contains valid service account credentials.")
print(traceback.format_exc())
return False
# --- Google Sheets Data Loading and Embedding ---
data = [] # Global variable to store loaded data
descriptions_for_embedding = []
embeddings = torch.tensor([])
business_info_available = False # Flag to indicate if business info was loaded successfully
def load_business_info():
"""Loads business information from Google Sheet and creates embeddings."""
global data, descriptions_for_embedding, embeddings, business_info_available
business_info_available = False # Reset flag
if gc is None:
print("Skipping Google Sheet loading: Google Sheets client not authenticated.")
return
if not SHEET_ID:
print("Error: SHEET_ID not set.")
print("Please add SHEET_ID as a Space Secret.")
return
try:
sheet = gc.open_by_key(SHEET_ID).sheet1
print(f"Successfully opened Google Sheet with ID: {SHEET_ID}")
data_records = sheet.get_all_records()
if not data_records:
print(f"Warning: No data records found in Google Sheet with ID: {SHEET_ID}")
data = []
descriptions_for_embedding = []
else:
# Filter out rows missing 'Service' or 'Description'
filtered_data = [row for row in data_records if row.get('Service') and row.get('Description')]
if not filtered_data:
print("Warning: Filtered data is empty after checking for 'Service' and 'Description'.")
data = []
descriptions_for_embedding = []
else:
data = filtered_data
descriptions_for_embedding = [f"Service: {row['Service']}. Description: {row['Description']}" for row in data]
if descriptions_for_embedding and embedder is not None:
print("Encoding descriptions...")
try:
embeddings = embedder.encode(descriptions_for_embedding, convert_to_tensor=True)
print("Encoding complete.")
business_info_available = True # Set flag if successful
except Exception as e:
print(f"Error during description encoding: {e}")
embeddings = torch.tensor([])
business_info_available = False # Encoding failed
else:
print("Skipping encoding descriptions: No descriptions found or embedder not available.")
embeddings = torch.tensor([])
business_info_available = False
print(f"Loaded {len(descriptions_for_embedding)} entries from Google Sheet for embedding/RAG.")
if not business_info_available:
print("Business information retrieval (RAG) is NOT available.")
except gspread.exceptions.SpreadsheetNotFound:
print(f"Error: Google Sheet with ID '{SHEET_ID}' not found.")
print("Please check the SHEET_ID and ensure your authenticated Google Account has access to this sheet.")
business_info_available = False
except Exception as e:
print(f"An error occurred while accessing the Google Sheet: {e}")
print(traceback.format_exc())
business_info_available = False
def retrieve_business_info(query: str, top_n: int = 2) -> list: # Reduced top_n
"""
Retrieves relevant business information from loaded data based on a query.
"""
global data
if not business_info_available or embedder is None or not descriptions_for_embedding or not data:
print("Business information retrieval is not available or data is empty.")
return []
try:
query_embedding = embedder.encode(query, convert_to_tensor=True)
cosine_scores = util.cos_sim(query_embedding, embeddings)[0]
# Get the top N indices based on cosine similarity
# Make sure k does not exceed the number of available descriptions
top_results_indices = torch.topk(cosine_scores, k=min(top_n, len(descriptions_for_embedding)))[1].tolist()
# Retrieve the actual data entries corresponding to the top indices
top_results = [data[i] for i in top_results_indices]
if reranker is not None and top_results:
print("Re-ranking top results...")
rerank_pairs = [(query, descriptions_for_embedding[i]) for i in top_results_indices]
rerank_scores = reranker.predict(rerank_pairs)
reranked_indices = sorted(range(len(rerank_scores)), key=lambda i: rerank_scores[i], reverse=True)
reranked_results = [top_results[i] for i in reranked_indices]
print("Re-ranking complete.")
return reranked_results
else:
return top_results
except Exception as e:
print(f"Error during business information retrieval: {e}")
print(traceback.format_exc())
return []
# --- Tool Functions ---
# Function to perform DuckDuckGo Search and return results with URLs
def perform_duckduckgo_search(query: str, max_results: int = 5):
"""
Performs a search using DuckDuckGo and returns a list of dictionaries.
Includes a delay to avoid rate limits.
Returns an empty list and prints an error if search fails.
"""
print(f"Executing Tool: perform_duckduckgo_search with query='{query}')")
search_results_list = []
try:
# Add a delay before each search
time.sleep(1) # Sleep for 1 second
with DDGS() as ddgs:
if not query or len(query.split()) < 2:
print(f"Skipping search for short query: '{query}'")
return []
# Use text() method for general text search
results_generator = ddgs.text(query, max_results=max_results)
results_found = False
for r in results_generator:
search_results_list.append(r)
results_found = True
if not results_found and max_results > 0:
print(f"DuckDuckGo search for '{query}' returned no results.")
except Exception as e:
print(f"Error during Duckduckgo search for '{query}': {e}")
return []
return search_results_list
# Function to retrieve relevant business info using RAG with Re-ranking
# MODIFIED to return MULTIPLE matches
def retrieve_business_info(query: str, threshold: float = 0.50, max_matches: int = 5): # Added max_matches parameter
"""
Retrieves relevant business information based on query similarity using vector search
and re-ranking. Returns a LIST of dictionaries for relevant matches and the best score.
Returns an empty list and 0.0 if no match above threshold is found, or on error.
Handles cases where data, embeddings, or reranker are not available.
"""
print(f"Executing Tool: retrieve_business_info with query='{query}' (threshold={threshold}, max_matches={max_matches})")
# Check if necessary components for RAG are available
if not business_info_available or not data or (embeddings is None or embeddings.numel() == 0) or embedder is None:
print("Business info data, embeddings, or embedder not available for retrieval.")
return [], 0.0 # Return empty list and 0.0 score if RAG setup is incomplete
# Handle case where reranker is not available - fall back to basic vector search
# This fallback will still only return the single best match for simplicity
if reranker is None:
print("Reranker model not loaded. Falling back to basic vector search (less robust, single match).")
try:
user_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.cos_sim(user_embedding, embeddings)[0]
best_score = cos_scores.max().item()
if best_score > threshold:
best_match_idx = cos_scores.argmax().item()
best_match = data[best_match_idx]
print(f"Basic vector search match found with score {best_score:.4f}.")
return [best_match], best_score # Return list containing one match
else:
print(f"Basic vector search: No match found above threshold {threshold:.4f} (best score: {best_score:.4f}).")
return [], best_score # Return empty list
except Exception as e:
print(f"Error during basic vector search retrieval: {e}")
return [], 0.0 # Return empty list and 0.0 score on error
# If reranker is available, proceed with vector search and re-ranking (MODIFIED FOR MULTIPLE MATCHES)
try:
user_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.cos_sim(user_embedding, embeddings)[0]
# Get initial candidates from vector search (e.g., top 20)
# We need more initial candidates than the final max_matches
top_k_initial = max(max_matches * 2, 10) # Get at least double the desired matches, minimum 10
top_k_initial = min(top_k_initial, len(descriptions_for_embedding)) # Ensure not more than available
if top_k_initial == 0: # Handle case with no descriptions or k=0
print("No descriptions available or top_k_initial is zero.")
return [], 0.0
top_results_indices = torch.topk(cos_scores, k=top_k_initial, largest=True).indices.tolist()
if not top_results_indices:
print("Vector search found no initial candidates.")
return [], 0.0
# Prepare query-document pairs for re-ranking
rerank_pairs = [[query, descriptions_for_embedding[idx]] for idx in top_results_indices]
# Get re-ranker scores
rerank_scores = reranker.predict(rerank_pairs).tolist()
# Combine scores and original indices, then sort by re-ranker score
scored_indices = sorted(zip(rerank_scores, top_results_indices), key=lambda x: x[0], reverse=True)
relevant_matches = []
best_overall_score = 0.0 # Track the highest score among retrieved
# Iterate through sorted results and collect matches above the threshold, up to max_matches
for i, (score, original_idx) in enumerate(scored_indices):
if score >= threshold:
relevant_matches.append(data[original_idx])
print(f"Including match (score: {score:.4f}, Original index: {original_idx}) above threshold {threshold:.4f}.")
if i == 0: # The first item in the sorted list is the best score
best_overall_score = score
else:
print(f"Skipping match (score: {score:.4f}) below threshold {threshold:.4f}.")
# If results are sorted, we can break early once the threshold is no longer met
break
if len(relevant_matches) >= max_matches:
print(f"Reached maximum number of matches ({max_matches}). Stopping collection.")
break # Stop once max_matches are collected
if relevant_matches:
print(f"Retrieved {len(relevant_matches)} relevant business info matches.")
return relevant_matches, best_overall_score # Return list of matches and the best score
else:
# If no matches were found above the threshold
# Find the score of the single best match even if it's below the threshold
best_possible_score = scored_indices[0][0] if scored_indices else 0.0
print(f"Reranked business info: No matches found above threshold {threshold:.4f}. Best score was {best_possible_score:.4f}.")
return [], best_possible_score # Return empty list and best score found
except Exception as e:
print(f"Error during re-ranked business information retrieval: {e}")
print(traceback.format_exc()) # Print traceback for RAG errors
return [], 0.0 # Return empty list and 0.0 score on error
# Function to perform date calculation if needed
def perform_date_calculation(query: str):
"""
Analyzes query for date calculation requests and performs the calculation.
Returns a dict describing the calculation and result, or None.
Handles formats like 'X days ago', 'X days from now', 'X weeks ago', 'X weeks from now', 'what is today's date'.
Uses dateutil for slightly more flexibility (though core logic remains simple).
"""
print(f"Executing Tool: perform_date_calculation with query='{query}')")
query_lower = query.lower()
today = date.today()
result_date = None
calculation_description = None
if re.search(r"\btoday'?s date\b|what is today'?s date\b|what day is it\b", query_lower):
result_date = today
calculation_description = f"The current date is: {today.strftime('%Y-%m-%d')}"
print(f"Identified query for today's date.")
return {"query": query, "description": calculation_description, "result": result_date.strftime('%Y-%m-%d'), "success": True}
match = re.search(r"(\d+)\s+(day|week|month|year)s?\s+(ago|from now)", query_lower)
if match:
value = int(match.group(1))
unit = match.group(2)
direction = match.group(3)
try:
if unit == 'day':
delta = timedelta(days=value)
elif unit == 'week':
delta = timedelta(weeks=value)
elif unit == 'month':
delta = relativedelta(months=value)
elif unit == 'year':
delta = relativedelta(years=value)
else:
desc = f"Could not understand the time unit '{unit}' in '{query}'."
print(desc)
return {"query": query, "description": desc, "result": None, "success": False, "error": desc}
if direction == 'ago':
result_date = today - delta
calculation_description = f"Calculating date {value} {unit}s ago from {today.strftime('%Y-%m-%d')}: {result_date.strftime('%Y-%m-%d')}"
elif direction == 'from now':
result_date = today + delta
calculation_description = f"Calculating date {value} {unit}s from now from {today.strftime('%Y-%m-%d')}: {result_date.strftime('%Y-%m-%d')}"
print(f"Performed date calculation: {calculation_description}")
return {"query": query, "description": calculation_description, "result": result_date.strftime('%Y-%m-%d'), "success": True}
except OverflowError:
desc = f"Date calculation overflow for query: {query}"
print(f"Date calculation overflow for query: {query}")
return {"query": query, "description": desc, "result": None, "success": False, "error": desc}
except Exception as e:
desc = f"An error occurred during date calculation for query '{query}': {e}"
print(desc)
return {"query": query, "description": desc, "result": None, "success": False, "error": str(e)}
desc = "No specific date calculation pattern recognized."
print(f"No specific date calculation pattern found in query: '{query}'")
return {"query": query, "description": desc, "result": None, "success": False}
# --- Tool Definitions for the Model ---
# Describe the tools available to the model in a structured format
# This will be injected into the prompt.
TOOL_DEFINITIONS = """
Available tools:
1. **search**: Use this tool to perform a web search for current external information. Useful for facts, news, weather, etc.
Parameters:
- query (string, required): The search query.
- max_results (integer, optional, default=5): The maximum number of results to return.
2. **lookup_business_info**: Use this tool to search the internal business database for information about our services, products, pricing, availability, and key people. This is the primary source for company-specific details. This lookup is now more robust to variations in phrasing due to enhanced search.
Parameters:
- query (string, required): The query terms related to the business information needed (e.g., "consultation service", "DSTv assistant model price", "Salum Ally").
- threshold (number, optional, default=0.50): The minimum relevance score required for a match based on a re-ranking process. Use a lower threshold (e.g., 0.4) if very broad matching is needed.
- max_matches (integer, optional, default=5): The maximum number of relevant matches to retrieve from the internal database. Use a higher number (e.g., 10 or 15) for broad queries asking about multiple items.
3. **perform_date_calculation**: Use this tool to calculate dates relative to today or find today's date. Understands phrases like "today's date", "X days ago", "Y weeks from now", "Z months/years ago/from now".
Parameters:
- query (string, required): The natural language query asking for a date calculation.
4. **answer**: Use this tool when you have gathered all necessary information from tools and history, or when the user's query can be answered directly based on your knowledge. This is the *last* action you should take in a turn.
Parameters:
- text (string, required): The final, comprehensive, natural language response to the user.
"""
# --- System Prompt Template for Tool Use ---
# This template instructs the model on how to use the tools and format its output.
# Inject this *within* the user message content.
# MODIFIED to ask for COMPREHENSIVE answers
tool_use_system_template = """<system>
You are FutureAi, a helpful, polite, and professional assistant for Futuresony. Your primary goal is to assist the user by effectively using the available tools or answering directly based on the conversation history and tool outputs. Maintain a positive and helpful tone. If you are unsure or a tool returns no clear results, state this gracefully. When providing answers based on gathered information, aim for a comprehensive and detailed response, synthesizing all relevant points from the tool outputs.
Today's date is: {current_date}
Available tools:
{tool_definitions}
Analyze the user's request and decide whether to use one or more tools, or provide a final answer.
**Tool Usage Priority:**
- If the query is about *our business services, products, pricing, or people (like employees/contacts listed in our internal data)*, prioritize using the `lookup_business_info` tool first.
- If the query is a date calculation, use the `perform_date_calculation` tool.
- Use the `search` tool for general knowledge, current events, weather, or information clearly outside of our internal business data.
- You can use multiple tools if a query is multifaceted. Process internal information first.
To use a tool, output a command within <tool_code> and </tool_code> tags. The content inside should be a JSON object with "tool_name" and "parameters". Ensure parameters like 'threshold' and 'max_matches' are included for `lookup_business_info` when needed for broad queries.
Example tool call:
<tool_code> {{"tool_name": "search", "parameters": {{"query": "weather today"}}}} </tool_code>
<tool_code> {{"tool_name": "lookup_business_info", "parameters": {{"query": "consultation service", "threshold": 0.6}}}} </tool_code>
<tool_code> {{"tool_name": "lookup_business_info", "parameters": {{"query": "all services", "threshold": 0.4, "max_matches": 10}}}} </tool_code> # Example for broad query
After executing tools, you will receive the tool results. Use these results and the conversation history to formulate your **comprehensive** final answer. Tool results will be provided within `<tool_results>` tags, containing sub-tags specific to each tool's output. Pay close attention to these results and any notes within `<system_note>` or `<error>` tags.
To provide the final answer to the user, use the 'answer' tool. This indicates you are finished and the text within the parameters will be shown to the user. Use the 'answer' tool as soon as you have sufficient information to answer the user's query, or if you determine you cannot answer it even with the tools. Your answer should be detailed and synthesize information effectively, especially from multiple lookup results.
Example final answer:
<tool_code> {{"tool_name": "answer", "parameters": {{"text": "Based on the search results, the weather today is sunny."}}}} </tool_code>
If you can answer the query directly without tools (e.g., a simple greeting, acknowledging instructions), use the 'answer' tool immediately with a direct, polite response.
Think step-by-step. Decide if tools are needed based on the **Tool Usage Priority**. If so, which ones? What parameters? Consider if a broad query requires setting a lower `threshold` and higher `max_matches` for `lookup_business_info`. If you have results, how do they help answer the user? Synthesize ALL relevant information into your final answer. If results are insufficient or indicate an error, how should you respond gracefully? Finally, formulate the comprehensive answer using the 'answer' tool.
Output ONLY tool calls within <tool_code> tags or a final answer using the 'answer' tool. Do not include any other text unless it's within the 'answer' tool's parameters.
</system>
"""
# Max history length in terms of turns (user + assistant) to keep in the model context
MAX_HISTORY_TURNS = 5 # Keep last 5 turns
# --- Chat Logic Function with Tool Use ---
def chat_with_bot(user_input, chat_history_state):
"""
Processes user input through an iterative tool-use logic for Gradio interface.
Takes user_input string and chat_history_state (list of lists) as input.
Returns the updated chat_history_state (list of lists).
Uses a structured tool-calling approach.
Guaranteed strict user/assistant role alternation in model_chat_history.
"""
# Basic Input Safety Check (Example)
if any(phrase in user_input.lower() for phrase in ["harmful content", "malicious intent"]):
safe_response = "I cannot process requests that involve harmful or inappropriate content."
return chat_history_state + [[user_input, safe_response]]
# Append user message to history immediately for display
# The bot message will be updated iteratively
# We append a placeholder now, and update it with the final response later.
chat_history_state = chat_history_state + [[user_input, "..."]]
original_user_input = user_input
print(f"\n--- Starting turn with input: {user_input} ---") # Debug Print
# Get current date
current_date = date.today().strftime('%Y-%m-%d')
print(f"Current Date: {current_date}") # Debug Print
# Maintain an internal model history that strictly alternates user/assistant roles
# This history will be used directly by apply_chat_template.
# It represents the conversation *as the model sees it*, including tool calls/results.
# Build this history from the *completed* past turns from chat_history_state.
model_chat_history = []
# Convert Gradio chat history (list of lists) to model history (list of dicts)
# Ensure strict alternation: user, assistant, user, assistant...
# Only add complete turns from the *past* history (exclude the current incomplete turn)
# Limit the history length
history_to_process = chat_history_state[:-1] # Exclude the current turn being processed
# Ensure we only take pairs [user, bot] from past history where bot is NOT the initial placeholder
# This guarantees that the last message in `recent_complete_turns` corresponds to a *completed* assistant response.
complete_past_turns = [
turn for turn in history_to_process
if turn is not None and len(turn) == 2 and turn[0] is not None and turn[1] is not None and str(turn[1]).strip() != "..."
]
# Take the last MAX_HISTORY_TURNS complete turns
recent_complete_turns = complete_past_turns[max(0, len(complete_past_turns) - MAX_HISTORY_TURNS):]
for user_msg, bot_msg in recent_complete_turns:
# Add user message (must be present)
if user_msg is not None: # Should always be True based on complete_past_turns filter
model_chat_history.append({"role": "user", "content": str(user_msg).strip()})
# Add assistant message (must be present and non-placeholder based on complete_past_turns filter)
if bot_msg is not None and str(bot_msg).strip() != "...": # Should always be True based on filter
model_chat_history.append({"role": "assistant", "content": str(bot_msg).strip()})
# --- Iterative Tool Calling Loop ---
max_tool_iterations = 5 # Limit the number of tool calls in a single turn to prevent infinite loops
final_response_text = None # Variable to hold the final answer from the 'answer' tool
current_tool_results_text = "" # Accumulate tool results text for the *next* model call in this turn
print("Starting tool execution loop...")
try: # This is the main try block for the chat_with_bot function
for i in range(max_tool_iterations):
print(f"\n--- Tool Iteration {i+1} ---")
# Step 1 & 2: Prepare the user message content for THIS iteration and append to history
# The content of the user message for this iteration depends on whether it's the first step
# (original query + system prompt) or a subsequent step (tool results).
current_user_message_content = ""
if i == 0:
# First iteration: Include the system template and the original user input
system_prompt_content = tool_use_system_template.format(
current_date=current_date,
tool_definitions=TOOL_DEFINITIONS
)
current_user_message_content = system_prompt_content + "\n\nUser Query: " + original_user_input
else:
# Subsequent iterations: Include the tool results from the previous assistant response.
if current_tool_results_text:
current_user_message_content = "<tool_results>\n" + current_tool_results_text.strip() + "\n</tool_results>"
current_tool_results_text = "" # Clear the buffer after adding to the prompt
else:
# If no new tool results were accumulated in the previous step (e.g., parsing failed, no tools called),
# send a message indicating this so the model doesn't wait indefinitely.
current_user_message_content = "<tool_results>No new results or no tools were called in the previous turn.</tool_results>"
print("No new tool results to add for this iteration.")
# Append the user message for the current iteration to the main model history.
# This history is what apply_chat_template will process.
# If the logic is correct, model_chat_history should always end with an 'assistant' role
# before this append, except for the very first turn of the conversation.
model_chat_history.append({"role": "user", "content": current_user_message_content.strip()})
# Step 3 & 4: Apply template to get the full prompt and Generate model output
# The history `model_chat_history` should now be in the correct state for generation:
# starting with 'user' and ending with the current 'user' message.
# The check below verifies the strict alternation before tokenization.
if len(model_chat_history) > 1 and model_chat_history[-1]['role'] == model_chat_history[-2]['role']:
print("Error: History roles are not alternating before generation!")
print("History:", model_chat_history)
final_response_text = "Sorry, I encountered an internal error with the conversation history format before generation."
break # Break the tool loop if history is malformed
prompt_for_generation = tokenizer.apply_chat_template(
model_chat_history, # Use the main model_chat_history directly
tokenize=False,
add_generation_prompt=True
)
generation_config = GenerationConfig(
max_new_tokens=700, # Increased tokens to allow for multiple tool calls or a longer answer
do_sample=False, # Keep deterministic for tool calls initially
temperature=0.1, # Low temperature for predictable tool calls
top_k=None,
top_p=None,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
use_cache=True
)
raw_model_output = ""
# Add try-except around tokenizer call as well
try:
input_ids = tokenizer(prompt_for_generation, return_tensors="pt").input_ids.to(model.device)
if input_ids.numel() == 0:
print("Warning: Empty input_ids for model generation.")
raw_model_output = "<system_error>Error: Empty input_ids for model generation.</system_error>" # Report error via system tag
else:
try:
outputs = model.generate(
input_ids=input_ids,
generation_config=generation_config,
)
prompt_length = input_ids.shape[1]
if outputs.shape[1] > prompt_length:
raw_model_output = tokenizer.decode(outputs[0, prompt_length:], skip_special_tokens=True).strip()
else:
raw_model_output = ""
print("Warning: Model generated no new tokens.")
except Exception as e:
print(f"Error during model generation in tool loop: {e}")
raw_model_output = f"<system_error>Error: Model generation failed: {e}</system_error>" # Report error via system tag
except Exception as e:
print(f"Error during tokenizer call in tool loop: {e}")
raw_model_output = f"<system_error>Error: Tokenizer failed: {e}</system_error>" # Report error via system tag
print(f"Raw model output: {raw_model_output}")
# Step 5: Append the model's raw output as the assistant message for THIS iteration
# This is crucial for maintaining the alternation in `model_chat_history`
model_chat_history.append({"role": "assistant", "content": raw_model_output.strip()})
# Step 6: Parse Tool Calls from the latest assistant message (which is now the last entry in history)
tool_calls = []
# Use regex to find all content within <tool_code> tags in the latest assistant message
matches = re.findall(r'<tool_code>(.*?)</tool_code>', model_chat_history[-1]['content'], re.DOTALL)
if not matches:
print("No tool calls found in latest model output.")
# If no tool calls, check if the model tried to output an answer directly
# This is a fallback if the model fails to use the 'answer' tool.
# Apply cleanup patterns just to the latest assistant message to see if it's a potential answer
cleaned_potential_answer = re.sub(r'<tool_code>.*?</tool_code>', '', model_chat_history[-1]['content'], flags=re.DOTALL) # Remove tool tags first
cleaned_potential_answer = re.sub(r'<.*?>', '', cleaned_potential_answer).strip() # Remove any other potential tags
# If the cleaned output is not empty or just whitespace, treat it as a potential final answer
if cleaned_potential_answer and final_response_text is None:
print("Model output does not contain tool calls, treating cleaned output as potential direct answer.")
final_response_text = cleaned_potential_answer
break # Exit the tool loop as we have a response
# If no tool calls and not a potential answer, check for explicit system errors reported by the model
if "<system_error>" in model_chat_history[-1]['content'] or "<error>" in model_chat_history[-1]['content']:
print("Model output contains system error tags. Exiting tool loop.")
# The synthesis step will pick up these errors from the history
break # Exit loop on critical error reported by the model
# If no tool calls, no potential answer, and no explicit error, the loop might continue.
# The next iteration's user message content will be generated as "No new results..."
continue # Skip to the next iteration
# Step 7: Execute Tool Calls and accumulate results for the *next* iteration's user message
# We clear the buffer here, as we are processing the *latest* assistant message's tools.
current_tool_results_text = ""
answer_tool_called_in_this_iter = False # Reset flag for this iteration's output
for match in matches:
try:
# Attempt to parse the content within the tags as JSON
tool_call_json = json.loads(match.strip())
if "tool_name" in tool_call_json and "parameters" in tool_call_json:
tool_name = tool_call_json.get("tool_name")
parameters = tool_call_json.get("parameters", {})
if tool_name == "answer":
final_response_text = parameters.get("text", "")
answer_tool_called_in_this_iter = True
print(f"Model called 'answer' tool. Final response intended: '{final_response_text}'")
# Once the 'answer' tool is called, we prioritize exiting the loop after this iteration.
# We still process any other tool calls in this *same* model output, but then break the loop afterwards.
continue # Process next tool call in the same output (from the same model output)
elif tool_name == "search":
query = parameters.get("query")
max_results = parameters.get("max_results", 5)
if query:
print(f"Executing Tool: search with query='{query}', max_results={max_results}")
results = perform_duckduckgo_search(query, max_results)
current_tool_results_text += f"<search_results_for_query query='{query}'>\n"
if results:
for r in results:
snippet = r.get('body', 'N/A')
if len(snippet) > 300:
snippet = snippet[:300] + "..."
current_tool_results_text += f"<item>\n<title>{r.get('title', 'N/A')}</title>\n<snippet>{snippet}</snippet>\n<url>{r.get('href', 'N/A')}</url>\n</item>\n"
print(f"Executed search for '{query}'. Found {len(results)} results.")
else:
current_tool_results_text += "No results found.\n"
print(f"No search results found for '{query}'.")
current_tool_results_text += "</search_results_for_query>\n"
else:
current_tool_results_text += f"<search_results_for_query query='{query}'><error>Missing 'query' parameter.</error></search_results_for_query>\n"
print(f"Skipping search tool call: Missing 'query' parameter.")
elif tool_name == "lookup_business_info":
query = parameters.get("query")
# Use the threshold and max_matches provided by the model, or the defaults
threshold = parameters.get("threshold", 0.50)
max_matches = parameters.get("max_matches", 5) # Use max_matches parameter
if query:
print(f"Executing Tool: lookup_business_info with query='{query}', threshold={threshold}, max_matches={max_matches}")
# retrieve_business_info now returns a LIST of matches and the best score
matches_list, best_score = retrieve_business_info(query, threshold=threshold, max_matches=max_matches)
# MODIFIED: Format results block to contain MULTIPLE match tags
current_tool_results_text += f"<lookup_business_info_results_for_query query='{query}' requested_threshold='{threshold:.4f}' requested_max_matches='{max_matches}' final_best_score='{best_score:.4f}'>\n"
if matches_list: # Check if the list is not empty
for match in matches_list: # Iterate through the list of matches
if isinstance(match, dict): # Ensure it's a dictionary
current_tool_results_text += f"<match>\n"
current_tool_results_text += f"<service>{match.get('Service', 'N/A')}</service>\n"
current_tool_results_text += f"<description>{match.get('Description', 'N/A')}</description>\n"
current_tool_results_text += f"<price>{match.get('Price', 'N/A')}</price>\n"
current_tool_results_text += f"<available>{match.get('Available', 'N/A')}</available>\n"
# Add other relevant fields from your sheet here if needed for synthesis
# e.g., <contact_person> etc.
current_tool_results_text += f"</match>\n"
# Optionally add a note if any item in the list was not a dict
else:
print(f"Warning: Item in retrieved_business_info list was not a dict: {match}")
print(f"Executed business lookup for '{query}'. Found {len(matches_list)} matches above threshold {threshold:.4f}. Best score: {best_score:.4f}.")
else:
# This case covers No matches found above threshold within retrieve_business_info
current_tool_results_text += f"No relevant matches found above threshold {threshold:.4f} (best score: {best_score:.4f}).\n"
print(f"Executed business lookup for '{query}'. No matches found above threshold.")
# Add a note about the best score being below threshold
if best_score > 0: # Only add note if *some* match was found, but not above threshold
current_tool_results_text += f"<system_note>Best match score ({best_score:.4f}) was below the requested threshold ({threshold:.4f}).</system_note>\n"
current_tool_results_text += "</lookup_business_info_results_for_query>\n"
else:
current_tool_results_text += f"<lookup_business_info_results_for_query query='{query}'><error>Missing 'query' parameter.</error></lookup_business_info_results_for_query>\n"
print(f"Skipping business lookup tool call: Missing 'query' parameter.")
elif tool_name == "perform_date_calculation":
query = parameters.get("query")
if query:
print(f"Executing Tool: perform_date_calculation with query='{query}'")
result = perform_date_calculation(query) # This function already returns a dict or error
current_tool_results_text += f"<perform_date_calculation_results_for_query query='{query}'>\n"
if result and result.get('success'): # Check the 'success' key
current_tool_results_text += f"<description>{result.get('description', 'Calculation Successful')}</description>\n<date>{result.get('result')}</date>\n"
print(f"Executed date calculation for '{query}'. Result: {result.get('result')}.")
elif result and result.get('description'):
current_tool_results_text += f"<description>{result.get('description')}</description>\n" # Report description if result is None or not success
print(f"Executed date calculation for '{query}'. Failed: {result.get('description')}.")
elif isinstance(result, str) and result.startswith("Error"):
current_tool_results_text += f"<error>{result}</error>\n" # Report error string
print(f"Executed date calculation for '{query}'. Error: {result}.")
else: # Generic failure case
current_tool_results_text += "Calculation failed or no specific date recognized.\n"
print(f"Executed date calculation for '{query}'. No specific result.")
current_tool_results_text += "</perform_date_calculation_results_for_query>\n"
else:
current_tool_results_text += f"<perform_date_calculation_results_for_query query='{query}'><error>Missing 'query' parameter.</error></perform_date_calculation_results_for_query>\n"
print(f"Skipping date calculation tool call: Missing 'query' parameter.")
else:
print(f"Unknown tool requested by model: {tool_name}")
# Add a note to results buffer about the unknown tool
current_tool_results_text += f"<system_note>Unknown tool requested: {tool_name}</system_note>\n"
else:
print(f"Parsed JSON missing 'tool_name' or 'parameters': {tool_call_json}")
current_tool_results_text += f"<system_note>Failed to parse tool call: Missing 'tool_name' or 'parameters' in JSON: {match.strip()}</system_note>\n"
except json.JSONDecodeError as e:
print(f"Failed to parse tool call JSON: {e}")
print(f"Content was: {match.strip()}")
current_tool_results_text += f"<system_note>Failed to parse tool call JSON: {e}. Content: {match.strip()}</system_note>\n"
except Exception as e:
print(f"An unexpected error occurred during tool call parsing or execution: {e}")
print(traceback.format_exc()) # Print traceback for tool execution errors
current_tool_results_text += f"<system_note>An unexpected error occurred during tool call processing: {e}. Content: {match.strip()}</system_note>\n"
# Step 8: Check if the 'answer' tool was called in this iteration
if answer_tool_called_in_this_iter:
print("Answer tool called. Exiting tool loop.")
break # Exit the main tool iteration loop
# Step 9: If max iterations reached and 'answer' tool wasn't called
if i == max_tool_iterations - 1 and final_response_text is None:
print(f"Max tool iterations reached ({max_tool_iterations}) without 'answer' call.")
# Add a final note to the results buffer so the model sees it in the last forced synthesis step
current_tool_results_text += "<system_note>Maximum tool calls reached. Please provide a final answer based on the information gathered so far or state that the request cannot be fully fulfilled.</system_note>\n"
# Fall through to the final response generation step below
# --- End of the main try block for chat_with_bot ---
# THIS EXCEPT BLOCK NEEDS TO BE AT THE SAME INDENTATION LEVEL AS THE 'try' ABOVE
except Exception as e: # This except matches the 'try' block at the beginning of the function
print(f"An unexpected error occurred in the chat_with_bot function: {e}")
print(traceback.format_exc()) # Print full traceback for debugging
final_response_text = f"Sorry, I encountered an unexpected error while processing your request: {e}"
# In case of error, ensure final_response_text is set so we proceed to update history
# The code below runs AFTER the tool iteration loop and its enclosing try/except finishes
# --- Final Response Generation (Synthesis) ---
# This step is either using the text from the 'answer' tool call,
# or generating a fallback response if the model failed to call 'answer'.
print("\n--- Final Response Generation ---")
# If the model successfully called the 'answer' tool, use that text.
# Otherwise, construct a synthesis prompt for the model to generate a final answer.
if final_response_text is None:
print("Model did not call 'answer' tool. Falling back to synthesis prompt.")
# Model failed to call the 'answer' tool within iterations or encountered an error.
# Fallback: Generate a response based on the accumulated history and tool results.
# The history `model_chat_history` now contains the full trace of tool calls
# and the user messages containing the tool results.
# Construct the synthesis prompt content.
# MODIFIED Synthesis Prompt to emphasize comprehensive answer
synthesis_prompt_content = """<system>
Please provide a final, comprehensive answer to the user's original query based on ALL the information gathered from the executed tools and the conversation history. Synthesize the information into a coherent, natural language response. Pay special attention to providing detailed descriptions and listing all relevant points found from the business lookup tool when multiple items were retrieved.
User's original query: "{original_user_input}"
Information gathered from tools and process notes:
{gathered_info_summary}
Synthesize ALL relevant information into a clear, concise, and **comprehensive** natural language response for the user. When presenting information from multiple business lookup results, structure your answer to clearly describe each item found (e.g., list them, describe each one fully).
**Guidelines for your response:**
- Address the user's original question directly.
- Use the information provided in the 'Information gathered' section, synthesizing details from all relevant results.
- If the business lookup returned multiple matches, present the information for *each* match found clearly and informatively.
- If a tool was executed but returned no relevant results (especially if the best score was below the threshold), or if there were errors (<system_error>, <error>, <system_note> tags), explain this gracefully to the user.
- Maintain a helpful, polite, and professional business tone, reflecting the Futuresony brand and your identity as FutureAi.
- Do NOT include raw tool call or result tags in your final answer.
- If you were unable to gather necessary information, clearly state what you could and could not find.
After your answer, generate 2-3 concise follow-up questions that might be helpful or relevant to the user based on the conversation and your response. List these questions clearly at the end.
If Search Results were used, list the relevant URLs under a "Sources:" heading at the very end.
</system>
"""
# Summarize the gathered information by processing the model_chat_history
gathered_info_summary = ""
unique_urls = set() # Collect URLs for Sources section
# Iterate through the model history to find user messages that followed an assistant message
# These 'user' messages should contain the tool results block if tools were run.
# We iterate up to the second-to-last message, as the *very* last message in history
# will be the synthesis prompt itself, which hasn't been processed yet.
for i in range(1, len(model_chat_history)):
# Look for 'user' messages that follow an 'assistant' message
if model_chat_history[i]['role'] == 'user' and isinstance(model_chat_history[i]['content'], str) and '<tool_results>' in model_chat_history[i]['content']:
msg_content = model_chat_history[i]['content']
# Check if it contains the tool results block
tool_results_block = re.search(r'<tool_results>(.*?)</tool_results>', msg_content, re.DOTALL)
if tool_results_block:
content = tool_results_block.group(1) # Content inside <tool_results>
# --- Extract and format info from tool result blocks ---
search_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', content, re.DOTALL)
for sr_content in search_blocks:
query_match = re.search(r"query='(.*?)'", sr_content) # Extract query attribute
query = query_match.group(1) if query_match else "Unknown"
gathered_info_summary += f"Search results for '{query}':\n"
items = re.findall(r'<item>(.*?)</item>', sr_content, re.DOTALL)
if items:
for item_content in items:
title = re.search(r'<title>(.*?)</title>', item_content, re.DOTALL)
snippet = re.search(r'<snippet>(.*?)</snippet>', item_content, re.DOTALL)
url = re.search(r'<url>(.*?)</url>', item_content, re.DOTALL)
title_text = title.group(1).strip() if title else 'N/A'
snippet_text = snippet.group(1).strip() if snippet else 'N/A'
url_text = url.group(1).strip() if url else 'N/A'
gathered_info_summary += f"- Title: {title_text}, Snippet: {snippet_text}\n"
if url_text and url_text != 'N/A':
unique_urls.add(url_text) # Add URL to set
elif "No results found" in sr_content:
gathered_info_summary += "- No results found.\n"
elif "<error>" in sr_content:
error_text = re.search(r'<error>(.*?)</error>', sr_content, re.DOTALL)
gathered_info_summary += f"- Error during search: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
# Business lookup results (MODIFIED to handle MULTIPLE match tags)
lookup_blocks = re.findall(r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>', content, re.DOTALL)
for lr_content in lookup_blocks:
query_match = re.search(r"query='(.*?)'", lr_content)
query = query_match.group(1) if query_match else "Unknown"
# Extract requested_threshold, requested_max_matches, final_best_score
req_thresh_match = re.search(r"requested_threshold='(.*?)'", lr_content)
req_thresh = float(req_thresh_match.group(1)) if req_thresh_match else 0.50
req_max_matches_match = re.search(r"requested_max_matches='(.*?)'", lr_content)
req_max_matches = int(req_max_matches_match.group(1)) if req_max_matches_match else 5
final_best_score_match = re.search(r"final_best_score='(.*?)'", lr_content)
final_best_score = float(final_best_score_match.group(1)) if final_best_score_match else 0.0
gathered_info_summary += f"Business lookup results for '{query}' (Requested Threshold: {req_thresh:.4f}, Requested Max Matches: {req_max_matches}, Final Best Score: {final_best_score:.4f}):\n"
matches_found = re.findall(r'<match>(.*?)</match>', lr_content, re.DOTALL) # Find ALL match tags
if matches_found:
gathered_info_summary += f" Found {len(matches_found)} relevant item(s):\n"
for match_content in matches_found: # Iterate through each match
service = re.search(r'<service>(.*?)</service>', match_content, re.DOTALL)
description = re.search(r'<description>(.*?)</description>', match_content, re.DOTALL)
price = re.search(r'<price>(.*?)</price>', match_content, re.DOTALL)
available = re.search(r'<available>(.*?)</available>', match_content, re.DOTALL)
# Add extraction for other fields if you include them in your tool output
# contact_person = re.search(r'<contact_person>(.*?)</contact_person>', match_content, re.DOTALL)
gathered_info_summary += f" - Service: {service.group(1).strip() if service else 'N/A'}\n"
gathered_info_summary += f" Description: {description.group(1).strip() if description else 'N/A'}\n"
gathered_info_summary += f" Price: {price.group(1).strip() if price else 'N/A'}\n"
gathered_info_summary += f" Available: {available.group(1).strip() if available else 'N/A'}\n"
# Add other fields here...
# if contact_person: gathered_info_summary += f" Contact Person: {contact_person.group(1).strip()}\n"
elif "No relevant matches found" in lr_content:
gathered_info_summary += f" No relevant matches found above threshold {req_thresh:.4f} (best score: {final_best_score:.4f}).\n"
elif "<error>" in lr_content:
error_text = re.search(r'<error>(.*?)</error>', lr_content, re.DOTALL)
gathered_info_summary += f" Error during business lookup: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
# Include system notes found within the business lookup results block
system_notes_in_lookup = re.findall(r'<system_note>(.*?)</system_note>', lr_content, re.DOTALL)
for note in system_notes_in_lookup:
gathered_info_summary += f" System Note within Lookup: {note.strip()}\n"
# Date calculation results
date_blocks = re.findall(r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>', content, re.DOTALL)
for dr_content in date_blocks:
query_match = re.search(r"query='(.*?)'", dr_content)
query = query_match.group(1) if query_match else "Unknown"
gathered_info_summary += f"Date calculation results for '{query}':\n"
date_val = re.search(r'<date>(.*?)</date>', dr_content, re.DOTALL)
desc = re.search(r'<description>(.*?)</description>', dr_content, re.DOTALL)
if date_val:
gathered_info_summary += f"- Result: {date_val.group(1).strip()}\n"
if desc: gathered_info_summary += f" Description: {desc.group(1).strip()}\n"
elif desc:
gathered_info_summary += f"- {desc.group(1).strip()}\n"
elif "<error>" in dr_content:
error_text = re.search(r'<error>(.*?)</error>', dr_content, re.DOTALL)
gathered_info_summary += f"- Error during date calculation: {error_text.group(1).strip() if error_text else 'Unknown error'}\n"
else:
gathered_info_summary += "- No specific date result found.\n"
# System Notes/Errors from Tool Execution (outside of specific tool blocks but within <tool_results>)
system_notes_in_results_block = re.findall(r'<system_note>(.*?)</system_note>', content, re.DOTALL)
for note in system_notes_in_results_block:
# Add only if not already added from within a specific lookup block
if f"System Note: {note.strip()}\n" not in gathered_info_summary and f"System Note within Lookup: {note.strip()}\n" not in gathered_info_summary:
gathered_info_summary += f"System Note from Tool Results: {note.strip()}\n"
system_errors_in_results_block = re.findall(r'<system_error>(.*?)</system_error>', content, re.DOTALL)
for error_note in system_errors_in_results_block:
gathered_info_summary += f"System Error from Tool Results: {error_note.strip()}\n"
# Also check the raw model output (last assistant message) for system errors if tool results block wasn't generated
last_assistant_message_content = model_chat_history[-1]['content'] if model_chat_history and model_chat_history[-1]['role'] == 'assistant' else ""
system_errors_in_raw_output = re.findall(r'<system_error>(.*?)</system_error>', last_assistant_message_content, re.DOTALL)
for error_note in system_errors_in_raw_output:
# Add only if not already captured from within tool results block
if f"System Error from Tool Results: {error_note.strip()}" not in gathered_info_summary:
gathered_info_summary += f"System Error in model output: {error_note.strip()}\n"
# Check for system notes/errors that might be outside <tool_results> but in the raw assistant output
system_notes_in_raw_output = re.findall(r'<system_note>(.*?)</system_note>', last_assistant_message_content, re.DOTALL)
for note in system_notes_in_raw_output:
if f"System Note from Tool Results: {note.strip()}" not in gathered_info_summary and f"System Note within Lookup: {note.strip()}\n" not in gathered_info_summary: # Avoid duplicates
gathered_info_summary += f"System Note in model output: {note.strip()}\n"
if not gathered_info_summary.strip():
gathered_info_summary = "No specific information was gathered using tools."
# Add the synthesis prompt to the history for the final generation step
# This keeps the history structure correct for apply_chat_template
# IMPORTANT: This adds the synthesis prompt as the final USER message.
# The model will then generate the final ASSISTANT response.
temp_chat_history_for_synthesis = model_chat_history.copy() # Copy the history including tool results
synthesis_prompt_formatted = synthesis_prompt_content.format(
original_user_input=original_user_input,
gathered_info_summary=gathered_info_summary.strip() # Add the summary of results
)
# Append the synthesis prompt as the final user message content
# This maintains the user/assistant alternation (last was assistant, now user for synthesis instruction)
temp_chat_history_for_synthesis.append({"role": "user", "content": synthesis_prompt_formatted.strip()})
# --- Final Synthesis Generation Call ---
# Check strict alternation *again* before the final synthesis generation
if len(temp_chat_history_for_synthesis) > 1 and temp_chat_history_for_synthesis[-1]['role'] == temp_chat_history_for_synthesis[-2]['role']:
# This should ideally not happen with correct history management
print("Error: History roles are not alternating just before final synthesis tokenization!")
print("History:", temp_chat_history_for_synthesis)
final_response = "Sorry, I encountered an internal error during final response generation history formatting."
else:
# Add try-except around the final tokenizer call as well
try:
prompt_for_synthesis = tokenizer.apply_chat_template(
temp_chat_history_for_synthesis, # Use the history with the synthesis prompt
tokenize=False,
add_generation_prompt=True
)
synthesis_generation_config = GenerationConfig(
max_new_tokens=1500, # More tokens for the full answer
do_sample=True, # Use sampling for more creative synthesis
temperature=0.7,
top_k=50,
top_p=0.95,
repetition_penalty=1.1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
use_cache=True
)
input_ids_synthesis = tokenizer(prompt_for_synthesis, return_tensors="pt").input_ids.to(model.device)
if input_ids_synthesis.numel() == 0:
final_response = "Sorry, I couldn't generate a response (empty input for final synthesis)."
print("Warning: Final synthesis input_ids empty.")
else:
try:
outputs_synthesis = model.generate(
input_ids=input_ids_synthesis,
generation_config=synthesis_generation_config,
)
prompt_length_synthesis = input_ids_synthesis.shape[1]
if outputs_synthesis.shape[1] > prompt_length_synthesis:
final_response = tokenizer.decode(outputs_synthesis[0, prompt_length_synthesis:], skip_special_tokens=True).strip()
else:
final_response = "..." # Indicate potentially empty response
print("Warning: Final synthesis generated no new tokens.")
except Exception as e:
print(f"Error during final synthesis model generation: {e}")
final_response = f"Sorry, I encountered an error while generating my response: {e}"
print(traceback.format_exc()) # Print full traceback for debugging
except Exception as e:
print(f"Error during final synthesis tokenizer call: {e}")
final_response = f"Sorry, I encountered an error preparing the final response: {e}"
print(traceback.format_exc()) # Print full traceback for debugging
else:
# If final_response_text is not None, it means the 'answer' tool was called.
# Use that text directly.
final_response = final_response_text
print(f"Using response from 'answer' tool call: '{final_response}'")
# --- Post-process Final Response ---
print("Post-processing final response...")
cleaned_response = final_response
# Remove potential prompt bleed or unwanted phrases/tags that the model might still output
# Be more aggressive about removing tool-related artifacts and system instructions
# Removed many patterns as the synthesis prompt is now clearer.
unwanted_patterns = [
r'<tool_code>.*?</tool_code>', # Remove raw tool calls
r'<tool_results>.*?</tool_results>', # Remove the main tool results wrapper
# Keep detailed result blocks for parsing URLs below, but remove them from final text
r'<search_results_for_query.*?>(.*?)</search_results_for_query>',
r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>',
r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>',
r'<system>.*?</system>', # Remove the system block
r'<item>.*?</item>', # Remove individual item tags (from search results)
r'<title>.*?</title>', r'<snippet>.*?</snippet>', r'<url>.*?</url>', # Remove individual search item tags
r'<match>.*?</match>', # Remove individual business match tag
r'<service>(.*?)</service>', # Remove individual business info tags, keep content if needed for fallback
r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', # Remove individual business info tags, keep content if needed for fallback
r'<date>(.*?)</date>', # Remove date tag, keep content
r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>', # Remove error/system note tags, capture content
r'System:', # Remove system prefix if it bleeds
r'Assistant:', # Remove Assistant prefix if it bleeds outside of intended response
r'User:', # Remove User prefix if it bleeds
r'Tool Results:', # Remove the tool results header if it bleeds
# More specific cleanup for synthesis prompt bleed
r"User's original query:.*",
r"Information gathered \(from previous tool calls and results\):.*",
r"Information gathered from tools and process notes:.*", # New pattern from synthesis prompt
r"Synthesize ALL relevant information.*",
r"Guidelines for your response:.*",
r"Available tools:.*",
r"To use a tool, output a command.*",
r"Example tool call:.*",
r"You can make multiple tool calls.*",
r"After executing tools, you will receive the tool results.*",
r"To provide the final answer.*",
r"Example final answer:.*",
r"If you can answer the query directly.*",
r"Think step-by-step.*",
r"Output ONLY tool calls.*",
r"Conversation History \(Recent Turns\):.*",
r"Business Info Check Results for Query Parts:.*",
r"When presenting information from multiple business lookup results, structure your answer to clearly describe each item found.*", # Added pattern from synthesis prompt
r"Pay special attention to providing detailed descriptions and listing all relevant points found from the business lookup tool when multiple items were retrieved.*", # Added pattern from synthesis prompt
]
# First, extract URLs from history *before* removing the blocks from the final response text
# Iterate through the final state of model_chat_history to collect all URLs
unique_urls = set()
for msg in model_chat_history:
if msg['role'] == 'user' and isinstance(msg['content'], str) and '<tool_results>' in msg['content']:
search_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', msg['content'], re.DOTALL)
for sr_content in search_blocks:
urls = re.findall(r'<url>(.*?)</url>', sr_content, re.DOTALL)
for url in urls:
url_text = url.strip()
if url_text and url_text != 'N/A': # Check for empty string as well
unique_urls.add(url_text)
# Now apply the cleanup patterns to the generated response text
# We need to be careful here not to remove content we want to keep while removing the tags
temp_cleaned_response = cleaned_response
for pattern in unwanted_patterns:
# For patterns that capture content we might need in fallback, just remove the tags
if pattern in [r'<service>(.*?)</service>', r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', r'<date>(.*?)</date>', r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>']:
temp_cleaned_response = re.sub(pattern, r'\1', temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
else:
temp_cleaned_response = re.sub(pattern, "", temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
cleaned_response = temp_cleaned_response
# Remove any remaining multiple empty lines
cleaned_response = re.sub(r'\n\s*\n', '\n\n', cleaned_response).strip()
# Append Sources if URLs were collected and response is not just an error
if unique_urls and not ("Sorry, I encountered an unexpected error" in cleaned_response or "Error loading model" in cleaned_response):
# Convert set to list and sort for consistent output
sorted_urls = sorted(list(unique_urls))
# Add a marker to ensure Sources appear clearly
cleaned_response += "\n\nSources:\n" + "\n".join(sorted_urls)
final_response = cleaned_response
# Fallback if the final response is still empty or a placeholder after post-processing
if not final_response.strip() or final_response.strip() == "...":
print("Warning: Final response was empty after cleaning or was placeholder. Providing a fallback.")
# Construct a fallback based on any executed tool result reflected in history
fallback_parts = []
# Iterate through history (excluding the synthesis prompt) to find user messages containing tool results
# The synthesis prompt is the very last user message, so iterate up to model_chat_history[-2] potentially.
# Let's iterate through all user messages as tool results could theoretically be spread.
for msg in model_chat_history:
if msg['role'] == 'user' and isinstance(msg['content'], str) and '<tool_results>' in msg['content']:
# Extract and summarize results from the <tool_results> block
tool_results_block = re.search(r'<tool_results>(.*?)</tool_results>', msg['content'], re.DOTALL)
if tool_results_block:
content = tool_results_block.group(1)
# Check for date calculation results
date_results = re.findall(r'<perform_date_calculation_results_for_query.*?>(.*?)</perform_date_calculation_results_for_query>', content, re.DOTALL)
for dr_content in date_results:
date_val = re.search(r'<date>(.*?)</date>', dr_content, re.DOTALL)
desc = re.search(r'<description>(.*?)</description>', dr_content, re.DOTALL)
if date_val:
fallback_parts.append(f"Date calculation result: {date_val.group(1).strip()}")
elif desc:
fallback_parts.append(f"Date calculation attempt: {desc.group(1).strip()}")
elif "<error>" in dr_content:
error_text = re.search(r'<error>(.*?)</error>', dr_content, re.DOTALL)
fallback_parts.append(f"There was an issue with a date calculation requested: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Check for business lookup results (MODIFIED for multiple matches in fallback)
lookup_results = re.findall(r'<lookup_business_info_results_for_query.*?>(.*?)</lookup_business_info_results_for_query>', content, re.DOTALL)
for lr_content in lookup_results:
matches_found = re.findall(r'<match>(.*?)</match>', lr_content, re.DOTALL)
if matches_found:
fallback_parts.append(f"Found {len(matches_found)} potential business information match(es):")
for match_content in matches_found: # Iterate through each found match
service_match = re.search(r'<service>(.*?)</service>', match_content, re.DOTALL)
desc_match = re.search(r'<description>(.*?)</description>', match_content, re.DOTALL)
service_name = service_match.group(1).strip() if service_match and service_match.group(1).strip() != 'N/A' else "An item"
desc_snippet = desc_match.group(1).strip()[:50] + "..." if desc_match and desc_match.group(1).strip() != 'N/A' else "No description provided."
fallback_parts.append(f" - {service_name}: {desc_snippet}")
elif "No relevant matches found" in lr_content:
score_match = re.search(r"final_best_score='(.*?)'", lr_content) # Look for final_best_score
score = float(score_match.group(1)) if score_match else 0.0
threshold_match = re.search(r"requested_threshold='(.*?)'", lr_content)
threshold_val = float(threshold_match.group(1)) if threshold_match else 0.50
fallback_parts.append(f"Could not find specific business information requested above threshold {threshold_val:.4f} (best score: {score:.4f}).")
elif "<error>" in lr_content:
error_text = re.search(r'<error>(.*?)</error>', lr_content, re.DOTALL)
fallback_parts.append(f"There was an error looking up business information: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Include system notes found within the business lookup results block
system_notes_in_lookup = re.findall(r'<system_note>(.*?)</system_note>', lr_content, re.DOTALL)
for note in system_notes_in_lookup:
fallback_parts.append(f"Business Lookup Note: {note.strip()}")
# Check for search results
search_results_blocks = re.findall(r'<search_results_for_query.*?>(.*?)</search_results_for_query>', content, re.DOTALL)
for sr_content in search_results_blocks:
if "<item>" in sr_content: # Indicates results were found (even if snippet is N/A)
query_match = re.search(r"query='(.*?)'", sr_content)
query = query_match.group(1) if query_match else "your query"
fallback_parts.append(f"Found some search results for {query}.")
elif "No results found" in sr_content:
query_match = re.search(r"query='(.*?)'", sr_content)
query = query_match.group(1) if query_match else "your query"
fallback_parts.append(f"No search results were found for {query}.")
elif "<error>" in sr_content:
error_text = re.search(r'<error>(.*?)</error>', sr_content, re.DOTALL)
fallback_parts.append(f"There was an error performing the search: {error_text.group(1).strip() if error_text else 'Unknown error'}")
# Check for system notes/errors from tool results (outside of specific tool blocks but within <tool_results>)
system_notes_in_results_block = re.findall(r'<system_note>(.*?)</system_note>', content, re.DOTALL)
for note in system_notes_in_results_block:
# Add only if not already added from within a specific lookup block
if f"System Note: {note.strip()}" not in fallback_parts and f"Business Lookup Note: {note.strip()}" not in fallback_parts:
fallback_parts.append(f"System Note from Tool Results: {note.strip()}")
system_errors_in_results_block = re.findall(r'<system_error>(.*?)</system_error>', content, re.DOTALL)
for error_note in system_errors_in_results_block:
fallback_parts.append(f"System Error from Tool Results: {error_note.strip()}")
# Check for system errors directly in the raw model output (last assistant message)
last_assistant_msg_content = model_chat_history[-1]['content'] if model_chat_history and model_chat_history[-1]['role'] == 'assistant' else ""
system_errors_in_raw_output = re.findall(r'<system_error>(.*?)</system_error>', last_assistant_msg_content, re.DOTALL)
for error_note in system_errors_in_raw_output:
# Add only if not already captured from within tool results block
if f"System Error from Tool Results: {error_note.strip()}" not in fallback_parts:
fallback_parts.append(f"System error during processing: {error_note.strip()}")
# Check for system notes/errors that might be outside <tool_results> but in the raw assistant output
system_notes_in_raw_output = re.findall(r'<system_note>(.*?)</system_note>', last_assistant_msg_content, re.DOTALL)
for note in system_notes_in_raw_output:
if f"System Note from Tool Results: {note.strip()}" not in fallback_parts and f"Business Lookup Note: {note.strip()}" not in fallback_parts: # Avoid duplicates
fallback_parts.append(f"System Note in model output: {note.strip()}")
if fallback_parts:
# Use a set to deduplicate fallback messages based on content
unique_fallback_parts = list(dict.fromkeys(fallback_parts))
# Add a polite intro if there are fallback parts
final_response = "I encountered some difficulty, but based on my attempts:\n- " + "\n- ".join(unique_fallback_parts)
else:
# General fallback if no tools were executed, or no results/errors were reflected in history
final_response = "Sorry, I couldn't process your request at this time. Please try again."
# Check if the final response still contains any tool/system tags after all processing
# This is a final cleanup attempt if previous regex missed something
if re.search(r'<(tool_code|tool_results|search_results_for_query|lookup_business_info_results_for_query|perform_date_calculation_results_for_query|system|item|title|snippet|url|match|service|description|price|available|date|error|system_note)>', final_response):
print("Warning: Final response still contains unexpected tags after post-processing. Cleaning further.")
# Apply unwanted patterns one last time aggressively
temp_cleaned_response = final_response
for pattern in unwanted_patterns:
# Special handling for patterns that capture content
if pattern in [r'<service>(.*?)</service>', r'<description>(.*?)</description>', r'<price>(.*?)</price>', r'<available>(.*?)</available>', r'<date>(.*?)</date>', r'<error>(.*?)</error>', r'<system_note>(.*?)</system_note>']:
temp_cleaned_response = re.sub(pattern, r'\1', temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
else:
temp_cleaned_response = re.sub(pattern, "", temp_cleaned_response, flags=re.IGNORECASE | re.DOTALL | re.MULTILINE)
temp_cleaned_response = re.sub(r'\n\s*\n', '\n\n', temp_cleaned_response).strip()
if temp_cleaned_response.strip(): # Only replace if cleaned version is not empty
final_response = temp_cleaned_response
else: # If aggressive cleaning resulted in empty, use a generic error message
final_response = "Sorry, I had difficulty formulating a complete response."
print("\nBot Response:", final_response, "\n") # Debug Print
# Update the last message in history_state with the final response
# The last message in history_state is the user's input followed by the placeholder "..."
# We update the placeholder with the final response.
if chat_history_state and len(chat_history_state) > 0 and len(chat_history_state[-1]) == 2 and chat_history_state[-1][0] == original_user_input and (chat_history_state[-1][1] is None or chat_history_state[-1][1] == "..."):
chat_history_state[-1][1] = final_response
else:
# This shouldn't happen with the current logic where we append the placeholder immediately,
# but as a safeguard if the history structure is unexpectedly altered.
print("Warning: Could not find placeholder in chat_history_state to update. Appending new turn.")
chat_history_state.append([original_user_input, final_response])
# Return the updated history state
return chat_history_state
# The chat_with_bot function definition ends here.
# The next section should start at the correct indentation level for top-level code.
# --- Gradio Interface Setup ---
# (This part remains largely the same, just ensure it calls the modified chat_with_bot)
# Moved Gradio setup and launch outside the __main__ block to ensure it runs in Colab/Jupyter
print("Setting up Gradio Interface...")
# Define the components
chatbot = gr.Chatbot(height=400, label="Chat History")
msg = gr.Textbox(label="Your message", placeholder="Ask a question...", lines=2)
clear = gr.Button("Clear")
# Create the Gradio Interface with explicit components
# The inputs are the textbox and the chatbot state (for history)
# The outputs are the chatbot state (updated history) and the cleared textbox
# Note: The function should return (updated_history, cleared_input_box_value)
# We will create a helper function to handle the textbox clearing
def respond_and_clear(user_input, chat_history_state):
# Call the main chat logic function
# Ensure chat_history_state is not None; Gradio initializes it as [] but safety first
if chat_history_state is None:
chat_history_state = []
updated_history = chat_with_bot(user_input, chat_history_state)
# Return the updated history and clear the input textbox
return updated_history, ""
# Combine components in a Block or Interface
with gr.Blocks() as demo:
gr.Markdown("## Business RAG Chatbot with Tool Use (Futuresony's FutureAi)") # Added persona/company name
gr.Markdown("Ask questions about Futuresony's services, people, and location, or general knowledge and date calculations! FutureAi aims to provide comprehensive answers.") # Added persona/company name and goal
# Display messages about business info availability status
if business_info_available:
gr.Markdown("<font color='green'>Business Information Loaded Successfully.</font>")
else:
gr.Markdown("<font color='red'>Warning: Business Information Not Loaded. Business-specific questions may not be answerable.</font>")
# Display message about Reranker availability
if reranker is not None:
gr.Markdown("<font color='green'>Business Lookup Reranker Loaded.</font>")
else:
gr.Markdown("<font color='orange'>Warning: Business Lookup Reranker Not Loaded. Lookup may be less robust.</font>")
chatbot.render() # Render the chatbot display area
with gr.Row(): # Place the input textbox and send button side-by-side
msg.render() # Render the input textbox
submit_btn = gr.Button("Send") # Render the explicit send button
clear.render() # Render the clear button
# Define event listeners
# When the submit button is clicked or Enter is pressed in the textbox,
# call the respond_and_clear function.
# The inputs are the textbox value and the chatbot state.
# The outputs are the updated chatbot state and the textbox value (set to empty).
submit_btn.click(respond_and_clear, inputs=[msg, chatbot], outputs=[chatbot, msg])
msg.submit(respond_and_clear, inputs=[msg, chatbot], outputs=[chatbot, msg]) # Also trigger on Enter key in textbox
# When the clear button is clicked, clear the textbox and the chatbot state
clear.click(lambda: ([], ""), outputs=[chatbot, msg]) # Lambda returns empty history and empty textbox
# Add examples (Updated to reflect company/persona and multi-item queries)
gr.Examples(
examples=[
"Tell me about Futuresony's IT Consultation and Network Setup services.", # Multi-item business query
"What are all the services Futuresony offers?", # Broad business query
"Who are the key personnel at Futuresony and what are their roles?", # Multi-person query
"Tell me about the DSTv Assistant model price and its description.", # Multi-detail query
"What is the capital of France?", # General search
"What is the weather in Dar-es-salaam today?", # Location-specific search
"What is today's date?", # Date calculation
"What was the date 30 days ago?", # Date calculation
"Tell me about Futuresony's location and contact details.", # Combined business details
"Je, ni nini huduma zote za Futuresony?", # Broad business query in Swahili
"Nani ni Mkurugenzi wa Futuresony?", # Specific person query in Swahili
"Tafuta habari za hivi punde kuhusu akili bandia (AI).", # General search in Swahili
"Tarehe itakuwa ngapi baada ya wiki 2 kutoka leo?", # Date calculation in Swahili
],
inputs=msg,
# Connect the examples 'click' event directly to the respond_and_clear function
# This is a more standard way.
fn=respond_and_clear,
outputs=[chatbot, msg],
# Clear the textbox after the example is used
cache_examples=False
)
# Launch the Gradio app - Moved outside __main__ block
try:
print("Launching Gradio interface...")
# Set server_name to '0.0.0.0' to make it accessible externally in Colab/Docker
demo.launch(share=True, server_name="0.0.0.0") # Use the Block interface 'demo'
print("Gradio interface launched. Check the public URL.")
except Exception as e: # This is the separate except block for the Gradio launch
print(f"Error launching Gradio interface: {e}")
print("Ensure you have the necessary libraries installed and a stable internet connection.")
print(traceback.format_exc())