# app.py - Combined Script # Combined Imports import os import gradio as gr from huggingface_hub import InferenceClient import torch import re import warnings import time import json from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, BitsAndBytesConfig from sentence_transformers import SentenceTransformer, util, CrossEncoder import gspread # from google.colab import auth from google.auth import default from tqdm import tqdm from ddgs import DDGS # Updated import import spacy from datetime import date, timedelta, datetime # Import datetime from dateutil.relativedelta import relativedelta # Corrected typo import traceback # Import traceback import base64 # Import base64 import dateparser # Import dateparser from dateparser.search import search_dates import pytz # Import pytz for timezone handling # Suppress warnings warnings.filterwarnings("ignore", category=UserWarning) # Define global variables and load secrets HF_TOKEN = os.getenv("HF_TOKEN") # Add a print statement to check if HF_TOKEN is loaded print(f"HF_TOKEN loaded: {'*' * len(HF_TOKEN) if HF_TOKEN else 'None'}") SHEET_ID = "19ipxC2vHYhpXCefpxpIkpeYdI43a1Ku2kYwecgUULIw" GOOGLE_BASE64_CREDENTIALS = os.getenv("GOOGLE_BASE64_CREDENTIALS") # Initialize InferenceClient # client = InferenceClient("google/gemma-2-9b-it", token=HF_TOKEN) # client = InferenceClient("meta-llama/Llama-4-Scout-17B-16E-Instruct", token=HF_TOKEN) client = InferenceClient("meta-llama/Llama-3.3-70B-Instruct", token=HF_TOKEN) # Load spacy model for sentence splitting nlp = None try: nlp = spacy.load("en_core_web_sm") print("SpaCy model 'en_core_web_sm' loaded.") except OSError: print("SpaCy model 'en_core_web_sm' not found. Downloading...") try: os.system("python -m spacy download en_core_web_sm") nlp = spacy.load("en_core_web_sm") print("SpaCy model 'en_core_web_sm' downloaded and loaded.") except Exception as e: print(f"Failed to download or load SpaCy model: {e}") # Load SentenceTransformer for RAG/business info retrieval and semantic detection embedder = None try: print("Attempting to load Sentence Transformer (sentence-transformers/paraphrase-MiniLM-L6-v2)...") # Use the model provided by the user for semantic detection as well embedder = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L6-v2") # Or 'all-MiniLM-L6-v2' if preferred print("Sentence Transformer loaded.") except Exception as e: print(f"Error loading Sentence Transformer: {e}") # Load a Cross-Encoder model for re-ranking retrieved documents reranker = None try: print("Attempting to load Cross-Encoder Reranker (cross-encoder/ms-marco-MiniLM-L6-v2)...") reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L6-v2') print("Cross-Encoder Reranker loaded.") except Exception as e: print(f"Error loading Cross-Encoder Reranker: {e}") print("Please ensure the model identifier 'cross-encoder/ms-marco-MiniLM-L6-v2' is correct and accessible on Hugging Face Hub.") print(traceback.format_exc()) reranker = None # Google Sheets Authentication gc = None # Global variable for gspread client def authenticate_google_sheets(): """Authenticates with Google Sheets using base64 encoded credentials.""" global gc print("Authenticating Google Account...") if not GOOGLE_BASE64_CREDENTIALS: print("Error: GOOGLE_BASE64_CREDENTIALS secret not found.") return False try: # Decode the base64 credentials credentials_json = base64.b64decode(GOOGLE_BASE64_CREDENTIALS).decode('utf-8') credentials = json.loads(credentials_json) # Authenticate using service account from dictionary gc = gspread.service_account_from_dict(credentials) print("Google Sheets authentication successful via service account.") return True except Exception as e: print(f"Google Sheets authentication failed: {e}") print("Please ensure your GOOGLE_BASE64_CREDENTIALS secret is correctly set and contains valid service account credentials.") print(traceback.format_exc()) return False # Google Sheets Data Loading and Embedding data = [] # Global variable to store loaded data descriptions_for_embedding = [] embeddings = torch.tensor([]) business_info_available = False # Flag to indicate if business info was loaded successfully def load_business_info(): """Loads business information from Google Sheet and creates embeddings.""" global data, descriptions_for_embedding, embeddings, business_info_available business_info_available = False # Reset flag if gc is None: print("Skipping Google Sheet loading: Google Sheets client not authenticated.") return if not SHEET_ID: print("Error: SHEET_ID not set.") return try: sheet = gc.open_by_key(SHEET_ID).sheet1 print(f"Successfully opened Google Sheet with ID: {SHEET_ID}") data_records = sheet.get_all_records() if not data_records: print(f"Warning: No data records found in Google Sheet with ID: {SHEET_ID}") data = [] descriptions_for_embedding = [] else: # Filter out rows missing 'Service' or 'Description' filtered_data = [row for row in data_records if row.get('Service') and row.get('Description')] if not filtered_data: print("Warning: Filtered data is empty after checking for 'Service' and 'Description'.") data = [] descriptions_for_embedding = [] else: data = filtered_data # Use BOTH Service and Description for embedding descriptions_for_embedding = [f"Service: {row['Service']}. Description: {row['Description']}" for row in data] # Only encode if descriptions_for_embedding are found and embedder is available if descriptions_for_embedding and embedder is not None: print("Encoding descriptions...") try: embeddings = embedder.encode(descriptions_for_embedding, convert_to_tensor=True) print("Encoding complete.") business_info_available = True except Exception as e: print(f"Error during description encoding: {e}") embeddings = torch.tensor([]) business_info_available = False else: print("Skipping encoding descriptions: No descriptions found or embedder not available.") embeddings = torch.tensor([]) business_info_available = False print(f"Loaded {len(descriptions_for_embedding)} entries from Google Sheet for embedding/RAG.") if not business_info_available: print("Business information retrieval (RAG) is NOT available.") except gspread.exceptions.SpreadsheetNotFound: print(f"Error: Google Sheet with ID '{SHEET_ID}' not found.") print("Please check the SHEET_ID and ensure your authenticated Google Account has access to this sheet.") business_info_available = False except Exception as e: print(f"An error occurred while accessing the Google Sheet: {e}") print(traceback.format_exc()) business_info_available = False # Business Info Retrieval (RAG) def retrieve_business_info(query: str, top_n: int = 3) -> list: """ Retrieves relevant business information from loaded data based on a query. Args: query: The user's query string. top_n: The number of top relevant entries to retrieve. Returns: A list of dictionaries, where each dictionary is a relevant row from the Google Sheet data. Returns an empty list if RAG is not available or no relevant information is found. """ global data if not business_info_available or embedder is None or not descriptions_for_embedding or not data: print("Business information retrieval is not available or data is empty.") return [] try: query_embedding = embedder.encode(query, convert_to_tensor=True) cosine_scores = util.cos_sim(query_embedding, embeddings)[0] top_results_indices = torch.topk(cosine_scores, k=min(top_n, len(data)))[1].tolist() top_results = [data[i] for i in top_results_indices] if reranker is not None and top_results: print("Re-ranking top results...") rerank_pairs = [(query, descriptions_for_embedding[i]) for i in top_results_indices] rerank_scores = reranker.predict(rerank_pairs) reranked_indices = sorted(range(len(rerank_scores)), key=lambda i: rerank_scores[i], reverse=True) reranked_results = [top_results[i] for i in reranked_indices] print("Re-ranking complete.") return reranked_results else: return top_results except Exception as e: print(f"Error during business information retrieval: {e}") print(traceback.format_exc()) return [] # Function to perform DuckDuckGo Search and return results with URLs def perform_duckduckgo_search(query: str, max_results: int = 5): # Reduced max_results for multi-part queries """ Performs a search using DuckDuckGo and returns a list of dictionaries. Includes a delay to avoid rate limits. Returns an empty list and prints an error if search fails. """ print(f"Executing Tool: perform_duckduckgo_search with query='{query}')") search_results_list = [] try: time.sleep(1) with DDGS() as ddgs: search_query = query.strip() if not search_query or len(search_query.split()) < 2: print(f"Skipping search for short query: '{search_query}'") return [] print(f"Sending search query to DuckDuckGo: '{search_query}'") results_generator = ddgs.text(search_query, max_results=max_results) results_found = False for r in results_generator: search_results_list.append(r) results_found = True print(f"Raw results from DuckDuckGo: {search_results_list}") if not results_found and max_results > 0: print(f"DuckDuckGo search for '{search_query}' returned no results.") elif results_found: print(f"DuckDuckGo search for '{search_query}' completed. Found {len(search_results_list)} results.") except Exception as e: print(f"Error during Duckduckgo search for '{search_query if 'search_query' in locals() else query}': {e}") print(traceback.format_exc()) return [] return search_results_list # Define the new semantic date/time detection and calculation function using dateparser def perform_date_calculation(query: str) -> str or None: """ Analyzes query for date/time information using dateparser. If dateparser finds a date, it returns a human-friendly response string. Otherwise, it returns None. It is designed to handle multiple languages and provide the time for East Africa (Tanzania). """ print(f"Executing Tool: perform_date_calculation with query='{query}') using dateparser.search_dates") try: eafrica_tz = pytz.timezone('Africa/Dar_es_Salaam') now = datetime.now(eafrica_tz) except pytz.UnknownTimeZoneError: print("Error: Unknown timezone 'Africa/Dar_es_Salaam'. Using default system time.") now = datetime.now() try: # Try parsing with Swahili first, then English found = search_dates( query, settings={ "PREFER_DATES_FROM": "future", "RELATIVE_BASE": now }, languages=['sw', 'en'] # Prioritize Swahili ) if not found: print("dateparser.search_dates could not parse any date/time.") return None text_snippet, parsed = found[0] print(f"dateparser.search_dates found: text='{text_snippet}', parsed='{parsed}'") is_swahili = any(swahili_phrase in query.lower() for swahili_phrase in ['tarehe', 'siku', 'saa', 'muda', 'leo', 'kesho', 'jana', 'ngapi', 'gani', 'mwezi', 'mwaka']) # Handle timezone information if now.tzinfo is not None and parsed.tzinfo is None: parsed = now.tzinfo.localize(parsed) elif now.tzinfo is None and parsed.tzinfo is not None: parsed = parsed.replace(tzinfo=None) # Check if the parsed date is today and time is close to now or midnight if parsed.date() == now.date(): # Consider it "now" if within a small time window or if no specific time was parsed (midnight) if abs((parsed - now).total_seconds()) < 60 or parsed.time() == datetime.min.time(): print("Query parsed to today's date and time is close to 'now' or midnight, returning current time/date.") if is_swahili: return f"Kwa saa za Afrika Mashariki (Tanzania), tarehe ya leo ni {now.strftime('%A, %d %B %Y')} na saa ni {now.strftime('%H:%M:%S')}." else: return f"In East Africa (Tanzania), the current date is {now.strftime('%A, %d %B %Y')} and the time is {now.strftime('%H:%M:%S')}." else: print(f"Query parsed to a specific time today: {parsed.strftime('%H:%M:%S')}") if is_swahili: return f"Hiyo inafanyika leo, {parsed.strftime('%A, %d %B %Y')}, saa {parsed.strftime('%H:%M:%S')} saa za Afrika Mashariki." else: return f"That falls on today, {parsed.strftime('%A, %d %B %Y')}, at {parsed.strftime('%H:%M:%S')} East Africa Time." else: print(f"Query parsed to a specific date: {parsed.strftime('%A, %d %B %Y')} at {parsed.strftime('%H:%M:%S')}") time_str = parsed.strftime('%H:%M:%S') date_str = parsed.strftime('%A, %d %B %Y') if parsed.tzinfo: tz_name = parsed.tzinfo.tzname(parsed) or 'UTC' if is_swahili: return f"Hiyo inafanyika tarehe {date_str} saa {time_str} {tz_name}." else: return f"That falls on {date_str} at {time_str} {tz_name}." else: if is_swahili: return f"Hiyo inafanyika tarehe {date_str} saa {time_str}." else: return f"That falls on {date_str} at {time_str}." except Exception as e: print(f"Error during dateparser.search_dates execution: {e}") print(traceback.format_exc()) return f"An error occurred while parsing date/time: {e}" # Function to determine if a query requires a tool or can be answered directly def determine_tool_usage(query: str) -> str: """ Analyzes the query to determine if a specific tool is needed. Returns the name of the tool ('duckduckgo_search', 'business_info_retrieval', 'date_calculation') or 'none' if no specific tool is clearly indicated. Prioritizes business information retrieval, then specific tools based on keywords and LLM judgment. """ query_lower = query.lower() # 1. Prioritize Business Info Retrieval if RAG is available if business_info_available: messages_business_check = [{"role": "user", "content": f"Does the following query ask about a specific person, service, offering, or description that is likely to be found *only* within a specific business's internal knowledge base, and not general knowledge? For example, questions about 'Salum' or 'Jackson Kisanga' are likely business-related, while questions about 'the current president of the USA' or 'who won the Ballon d'Or' are general knowledge. Answer only 'yes' or 'no'. Query: {query}"}] try: business_check_response = client.chat_completion( messages=messages_business_check, max_tokens=10, temperature=0.1 ).choices[0].message.content.strip().lower() # Ensure the response explicitly contains "yes" and is not just a substring match if business_check_response == "yes": print(f"Detected as specific business info query based on LLM check: '{query}'") return "business_info_retrieval" else: print(f"LLM check indicates not a specific business info query: '{query}'") except Exception as e: print(f"Error during LLM call for business info check for query '{query}': {e}") print(traceback.format_exc()) print(f"Proceeding without business info check for query '{query}' due to error.") # 2. Check for Date Calculation date_time_check_result = perform_date_calculation(query) if date_time_check_result is not None: print(f"Detected as date/time calculation query based on dateparser result for: '{query}'") return "date_calculation" # 3. Use LLM to determine if DuckDuckGo search is needed messages_tool_determination_search = [{"role": "user", "content": f"Does the following query require searching the web for current or general knowledge information (e.g., news, facts, definitions, current events)? Respond ONLY with 'duckduckgo_search' or 'none'. Query: {query}"}] try: search_determination_response = client.chat_completion( messages=messages_tool_determination_search, max_tokens=20, temperature=0.1, top_p=0.9 ).choices[0].message.content or "" response_lower = search_determination_response.strip().lower() if "duckduckgo_search" in response_lower: print(f"Model-determined tool for '{query}': 'duckduckgo_search'") return "duckduckgo_search" else: print(f"Model-determined tool for '{query}': 'none' (for search)") except Exception as e: print(f"Error during LLM call for search tool determination for query '{query}': {e}") print(traceback.format_exc()) print(f"Proceeding without search tool check for query '{query}' due to error.") # 4. If none of the specific tools are determined, default to 'none' print(f"No specific tool determined for '{query}'. Defaulting to 'none'.") return "none" # Function to generate text using the LLM, incorporating tool results if available def generate_text(prompt: str, tool_results: dict = None) -> str: """ Generates text using the configured LLM, optionally incorporating tool results. Args: prompt: The initial prompt for the LLM. tool_results: A dictionary containing results from executed tools. Keys are tool names, values are their outputs. Returns: The generated text from the LLM. """ # Add persona instructions to the beginning of the prompt persona_instructions = """You are absa_ai, an AI developed on August 7, 2025, by the absa team. Your knowledge about business data comes from the company's internal Google Sheet. """ full_prompt_builder = [persona_instructions, prompt] if tool_results and any(tool_results.values()): full_prompt_builder.append("\n\nTool Results:\n") for question, results in tool_results.items(): # Iterate through results per question if results: full_prompt_builder.append(f"--- Results for: {question} ---\n") # Add question context if isinstance(results, list): for i, result in enumerate(results): # Check if the result is from business info retrieval if isinstance(result, dict) and 'Service' in result and 'Description' in result: full_prompt_builder.append(f"Business Info {i+1}:\nService: {result.get('Service', 'N/A')}\nDescription: {result.get('Description', 'N/A')}\n\n") elif isinstance(result, dict) and 'url' in result: # Check if the result is from DuckDuckGo full_prompt_builder.append(f"Search Result {i+1}:\nTitle: {result.get('title', 'N/A')}\nURL: {result.get('url', 'N/A')}\nSnippet: {result.get('body', 'N/A')}\n\n") else: full_prompt_builder.append(f"{result}\n\n") # Handle other list items elif isinstance(results, dict): for key, value in results.items(): full_prompt_builder.append(f"{key}: {value}\n") full_prompt_builder.append("\n") else: full_prompt_builder.append(f"{results}\n\n") # Handle single string results (like date calculation) full_prompt_builder.append("Based on the provided tool results, answer the user's original query. If a question was answered by a tool, use the tool's result directly in your response.") print("Added tool results and instruction to final prompt.") else: print("No tool results to add to final prompt.") full_prompt = "".join(full_prompt_builder) print(f"Sending prompt to LLM:\n---\n{full_prompt}\n---") generation_config = { "temperature": 0.7, "max_new_tokens": 500, "top_p": 0.95, "top_k": 50, "do_sample": True, } try: response = client.chat_completion( messages=[ {"role": "user", "content": full_prompt} ], max_tokens=generation_config.get("max_new_tokens", 512), temperature=generation_config.get("temperature", 0.7), top_p=generation_config.get("top_p", 0.95) ).choices[0].message.content or "" print("LLM generation successful using chat_completion.") return response except Exception as e: print(f"Error during final LLM generation: {e}") print(traceback.format_exc()) return "An error occurred while generating the final response." # Main chat function with query breakdown and tool execution per question def chat(query: str): """ Processes user queries by breaking down multi-part queries, determining and executing appropriate tools for each question, and synthesizing results using the LLM. Prioritizes business information retrieval. """ print(f"Received query: {query}") # Step 1: Query Breakdown print("\n--- Breaking down query ---") prompt_for_question_breakdown = f""" Analyze the following query and list each distinct question found within it. Present each question on a new line, starting with a hyphen. Query: {query} """ try: messages_question_breakdown = [{"role": "user", "content": prompt_for_question_breakdown}] question_breakdown_response = client.chat_completion( messages=messages_question_breakdown, max_tokens=100, temperature=0.1, top_p=0.9 ).choices[0].message.content or "" individual_questions = [line.strip() for line in question_breakdown_response.split('\n') if line.strip()] cleaned_questions = [re.sub(r'^[-*]?\s*', '', q) for q in individual_questions] print("Individual questions identified:") for q in cleaned_questions: print(f"- {q}") except Exception as e: print(f"Error during LLM call for question breakdown: {e}") print(traceback.format_exc()) cleaned_questions = [query] # Fallback to treating the whole query as one question # Step 2: Tool Determination per Question print("\n--- Determining tools per question ---") determined_tools = {} for question in cleaned_questions: print(f"\nAnalyzing question for tool determination: '{question}'") determined_tools[question] = determine_tool_usage(question) print(f"Determined tool for '{question}': '{determined_tools[question]}'") print("\nSummary of determined tools per question:") for question, tool in determined_tools.items(): print(f"'{question}': '{tool}'") # Step 3: Execute Tools and Step 4: Synthesize Results print("\n--- Executing tools and collecting results ---") tool_results = {} for question, tool in determined_tools.items(): print(f"\nExecuting tool '{tool}' for question: '{question}'") result = None if tool == "date_calculation": result = perform_date_calculation(question) elif tool == "duckduckgo_search": result = perform_duckduckgo_search(question) elif tool == "business_info_retrieval": result = retrieve_business_info(question) elif tool == "none": # If tool is 'none', the LLM will answer this part using its internal knowledge # in the final response generation step. We don't need a specific tool result here. print(f"Skipping tool execution for question: '{question}' as tool is 'none'. LLM will handle.") result = None # Set result to None so it's not included in tool_results for 'none' tool # Only store results if they are not None (i.e., tool was executed and returned something) if result is not None: tool_results[question] = result print("\n--- Collected Tool Results ---") if tool_results: for question, result in tool_results.items(): print(f"\nQuestion: {question}") print(f"Result: {result}") else: print("No tool results were collected.") print("\n--------------------------") # Step 5: Final Response Generation print("\n--- Generating final response ---") # The generate_text function already handles incorporating tool results if provided final_response = generate_text(query, tool_results) print("\n--- Final Response from LLM ---") print(final_response) print("\n----------------------------") return final_response # Keep the Gradio interface setup as is for now if __name__ == "__main__": # Authenticate Google Sheets when the script starts authenticate_google_sheets() # Load business info after authentication load_business_info() # Check if spacy model, embedder, and reranker loaded correctly if nlp is None: print("Warning: SpaCy model not loaded. Sentence splitting may not work correctly.") if embedder is None: print("Warning: Sentence Transformer (embedder) not loaded. RAG will not be available.") if reranker is None: print("Warning: Cross-Encoder Reranker not loaded. Re-ranking of RAG results will not be performed.") if not business_info_available: print("Warning: Business information (Google Sheet data) not loaded successfully. " "RAG will not be available. Please ensure the GOOGLE_BASE64_CREDENTIALS secret is set correctly.") print("Launching Gradio Interface...") import gradio as gr with gr.Blocks(theme="soft") as demo: gr.Markdown( """ # LLM with Tools (DuckDuckGo Search, Date Calculation, Business Info RAG) Ask me anything! I can perform web searches, calculate dates, and retrieve business information. """ ) with gr.Row(): with gr.Column(scale=3): query = gr.Textbox( label="Query", placeholder="Enter your query here....", lines=3, interactive=True ) submit_btn = gr.Button("Submit") clear_btn = gr.Button("Clear") with gr.Column(scale=3): output = gr.Textbox( label="Output", lines=8, interactive=False ) # Button actions submit_btn.click(fn=chat, inputs=query, outputs=output) clear_btn.click(fn=lambda: "", inputs=None, outputs=output) try: demo.launch(debug=True) except Exception as e: print(f"Error launching Gradio interface: {e}") print(traceback.format_exc()) print("Please check the console output for more details on the error.")