from Live_audio import GeminiHandler import os import re from langdetect import detect import asyncio import gradio as gr import google.generativeai as genai import os import time import gradio as gr from datetime import datetime import langdetect import RAG_Domain_know_doc from web_search import search_autism from RAG import rag_autism from openai import OpenAI from dotenv import load_dotenv import Old_Document import User_Specific_Documents import asyncio import base64 import time from io import BytesIO from dotenv import load_dotenv load_dotenv() from google.genai import types from google.genai.types import ( LiveConnectConfig, SpeechConfig, VoiceConfig, PrebuiltVoiceConfig, Content, Part, ) import gradio as gr import numpy as np import websockets from dotenv import load_dotenv from fastrtc import ( AsyncAudioVideoStreamHandler, Stream, WebRTC, get_cloudflare_turn_credentials_async, wait_for_item, ) from google import genai from gradio.utils import get_space from PIL import Image # ------------------------------------------ import asyncio import base64 import json import os import pathlib import gradio as gr import google.generativeai as genai import os import time from typing import AsyncGenerator, Literal import gradio as gr import numpy as np from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastrtc import ( AsyncStreamHandler, Stream, get_cloudflare_turn_credentials_async, wait_for_item, ) from google import genai from google.genai.types import ( LiveConnectConfig, PrebuiltVoiceConfig, SpeechConfig, VoiceConfig, ) from gradio.utils import get_space from pydantic import BaseModel # ------------------------------------------------ import os import gradio as gr import google.generativeai as genai import os import time import io import asyncio from pydub import AudioSegment DEEPINFRA_API_KEY = "285LUJulGIprqT6hcPhiXtcrphU04FG4" # Gemini: google-genai from google import genai # --------------------------------------------------- # VAD imports from reference code import collections import webrtcvad import fastrtc import time # helper functions from prompt_template import ( Prompt_template_translation, Prompt_template_LLM_Generation, Prompt_template_Reranker, Prompt_template_Wisal, Prompt_template_Halluciations, Prompt_template_paraphrasing, Prompt_template_Translate_to_original, Prompt_template_relevance, Prompt_template_User_document_prompt ) from query_utils import process_query_for_rewrite, get_non_autism_response GEMINI_API_KEY="AIzaSyCUCivstFpC9pq_jMHMYdlPrmh9Bx97dFo" TAVILY_API_KEY="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" WEAVIATE_URL="yorcqe2sqswhcaivxvt9a.c0.us-west3.gcp.weaviate.cloud" WEAVIATE_API_KEY="d2d0VGdZQTBmdTFlOWdDZl9tT2h3WDVWd1NpT1dQWHdGK0xjR1hYeWxicUxHVnFRazRUSjY2VlRUVlkwPV92MjAw" DEEPINFRA_API_KEY="285LUJulGIprqT6hcPhiXtcrphU04FG4" DEEPINFRA_BASE_URL="https://api.deepinfra.com/v1/openai" # API Keys and Constants env = os.getenv("ENVIRONMENT", "production") openai = OpenAI( api_key=DEEPINFRA_API_KEY, base_url="https://api.deepinfra.com/v1/openai", ) SESSION_ID = "default" pending_clarifications = {} def call_llm(model: str, messages: list[dict], temperature: float = 0.0, **kwargs) -> str: resp = openai.chat.completions.create( model=model, messages=messages, temperature=temperature, **kwargs ) return resp.choices[0].message.content.strip() def is_greeting(text: str) -> bool: return bool(re.search(r"\b(hi|hello|hey|good (morning|afternoon|evening))\b", text, re.I)) def process_query(query: str, first_turn: bool = False, session_id: str = "default"): intro = "" process_log = [] # Check if user is responding to a clarification prompt if session_id in pending_clarifications: if query.strip().lower() == "yes": corrected_query = pending_clarifications.pop(session_id) process_log.append(f"User confirmed: {corrected_query}") return process_autism_pipeline(corrected_query, process_log, intro) else: pending_clarifications.pop(session_id) redirect = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism." process_log.append("User rejected clarification.") _save_process_log(process_log) return redirect if first_turn and (not query or query.strip() == ""): intro = "Hello! I'm Wisal, an AI assistant developed by Compumacy AI, specializing in Autism Spectrum Disorders. How can I help you today?" process_log.append(intro) _save_process_log(process_log) return intro if is_greeting(query): greeting = intro + "Hello! I'm Wisal, your AI assistant developed by Compumacy AI. How can I help you today?" process_log.append(f"Greeting detected.\n{greeting}") _save_process_log(process_log) return greeting # Process query with the new 3-tuple return corrected_query, is_autism_related, rewritten_query = process_query_for_rewrite(query) process_log.append(f"Original Query: {query}") process_log.append(f"Corrected Query: {corrected_query}") process_log.append(f"Relevance Check: {'RELATED' if is_autism_related else 'NOT RELATED'}") if rewritten_query: process_log.append(f"Rewritten Query: {rewritten_query}") # If not autism-related, show clarification with rewritten question if not is_autism_related: redirect_message = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism." clarification = f"""Your query was not clearly related to autism. Do you mean:"{rewritten_query}"?""" pending_clarifications[session_id] = rewritten_query process_log.append(f"Clarification Prompted: {clarification}") _save_process_log(process_log) return clarification return process_autism_pipeline(query,corrected_query, process_log, intro) def process_autism_pipeline(query,corrected_query, process_log, intro): web_search_resp = asyncio.run(search_autism(corrected_query)) web_answer = web_search_resp.get("answer", "") process_log.append(f"Web Search: {web_answer}") gen_prompt = Prompt_template_LLM_Generation.format(new_query=corrected_query) generated = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": gen_prompt}], reasoning_effort="none" ) process_log.append(f"LLM Generated: {generated}") rag_resp = asyncio.run(rag_autism(corrected_query, top_k=3)) rag_contexts = rag_resp.get("answer", []) process_log.append(f"RAG Contexts: {rag_contexts}") answers_list = f"[1] {generated}\n[2] {web_answer}\n" + "\n".join(f"[{i+3}] {c}" for i, c in enumerate(rag_contexts)) rerank_prompt = Prompt_template_Reranker.format(new_query=corrected_query, answers_list=answers_list) reranked = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": rerank_prompt}], reasoning_effort="none" ) process_log.append(f"Reranked: {reranked}") wisal_prompt = Prompt_template_Wisal.format(new_query=corrected_query, document=reranked) wisal = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": wisal_prompt}], reasoning_effort="none" ) process_log.append(f"Wisal Answer: {wisal}") halluc_prompt = Prompt_template_Halluciations.format( new_query=corrected_query, answer=wisal, document=generated ) halluc = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": halluc_prompt}], reasoning_effort="none" ) process_log.append(f"Hallucination Score: {halluc}") score = int(halluc.split("Score: ")[-1]) if "Score: " in halluc else 3 if score in (2, 3): paraphrased = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": Prompt_template_paraphrasing.format(document=generated)}], reasoning_effort="none" ) wisal = call_llm( model="Qwen/Qwen3-32B", messages=[{"role": "user", "content": Prompt_template_Wisal.format(new_query=corrected_query, document=paraphrased)}], reasoning_effort="none" ) process_log.append(f"Paraphrased Wisal: {wisal}") try: detected_lang = detect(query) except: detected_lang = "en" is_english_text = bool(re.fullmatch(r"[A-Za-z0-9 .,?;:'\"!()\-]+", query)) # Decide whether to translate needs_translation = detected_lang != "en" or not is_english_text if needs_translation: result = call_llm( model="Qwen/Qwen3-32B", messages=[{ "role": "user", "content": Prompt_template_Translate_to_original.format(query=query, document=wisal) }], reasoning_effort="none" ) process_log.append(f"Translated Back: {result}") else: result = wisal process_log.append(f"Final Result: {result}") rtl_languages = ["ar", "fa", "ur", "he"] # Arabic, Persian, Urdu, Hebrew text_dir = "rtl" if detected_lang in rtl_languages else "ltr" # Wrap result in direction-aware HTML wrapped_result = f'
{result}
' _save_process_log(process_log) return intro + wrapped_result def _save_process_log(log_lines, filename="process_output.txt"): import datetime logs_dir = os.path.join(os.path.dirname(__file__), "logs") os.makedirs(logs_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt") with open(log_filename, "w", encoding="utf-8") as f: for line in log_lines: f.write(str(line) + "\n\n") def _save_process_log(log_lines, filename="process_output.txt"): import datetime import os # Ensure logs directory exists logs_dir = os.path.join(os.path.dirname(__file__), "logs") os.makedirs(logs_dir, exist_ok=True) # Unique filename per question (timestamped) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt") try: with open(log_filename, "w", encoding="utf-8") as f: for line in log_lines: f.write(str(line) + "\n\n") except Exception as e: pass # Gradio UI for main pipeline, RAG_Domain_know_doc, and User_Specific_Documents , Old_Document def main_pipeline_interface(query): return process_query(query, first_turn=True) def main_pipeline_with_doc_and_history(query, doc_file, doc_type, history): response = main_pipeline_with_doc(query, doc_file, doc_type) updated_history = history + f"\nUser: {query}\nWisal: {response}\n" return response, updated_history def main_pipeline_with_doc(query, doc_file, doc_type): # If no document, use main pipeline if doc_file is None or doc_type == "None": return process_query(query, first_turn=True) safe_filename = os.path.basename(getattr(doc_file, 'name', str(doc_file))) upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs") os.makedirs(upload_dir, exist_ok=True) save_path = os.path.join(upload_dir, safe_filename) # 💡 Check if doc_file is file-like (has `.read()`) or path-like (str or NamedString) if hasattr(doc_file, 'read'): # File-like object file_bytes = doc_file.read() else: # It's a path (NamedString), read from file path with open(str(doc_file), 'rb') as f: file_bytes = f.read() # Save the file content with open(save_path, "wb") as f: f.write(file_bytes) # Route to correct document handler if doc_type == "Knowledge Document": status = RAG_Domain_know_doc.ingest_file(save_path) answer = RAG_Domain_know_doc.answer_question(query) return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "User-Specific Document": status = User_Specific_Documents.ingest_file(save_path) answer = User_Specific_Documents.answer_question(query) return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "Old Document": status = Old_Document.ingest_file(save_path) answer = Old_Document.answer_question(query) return f"[Old Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "New Documrnt": status = User_Specific_Documents.ingest_file(save_path) answer = User_Specific_Documents.answer_question(query) return f"[New Documrnt]\n{status}\n\n{answer}" else: return "Invalid document type." def pipeline_with_history(message, doc_file, doc_type, history): if not message.strip(): return history, "" response = main_pipeline_with_doc(message, doc_file, doc_type) history = history + [[message, response]] return history, "" import gradio as gr import google.generativeai as genai import os import time # Function to transcribe audio def transcribe_audio(audio_filepath): api_key = "AIzaSyC68cQzvDYEnas6u-5ABgbOSeJLmIKKpP8" if audio_filepath is None: return "No audio provided. Please record or upload an audio file first." if not api_key: return "API Key is missing. Please provide your Google AI API key." try: genai.configure(api_key=api_key) model = genai.GenerativeModel(model_name="models/gemini-2.0-flash") # Get the model you want to use print(f"Transcribing audio file: {audio_filepath}") yield "Uploading audio file..." # Upload the audio file audio_file = genai.upload_file(path=audio_filepath) # Check the processing status of the uploaded file while audio_file.state.name == "PROCESSING": time.sleep(2) # Wait for 2 seconds before checking again audio_file = genai.get_file(audio_file.name) if audio_file.state.name == "FAILED": return "[ERROR] Audio file processing failed." yield "Audio uploaded. Transcribing..." # Request transcription from the model response = model.generate_content( ["Please transcribe this audio recording.", audio_file], request_options={"timeout": 120} # Set a timeout for the request ) query = response.text if response and response.text else "Transcription failed. The response was empty." yield query except Exception as e: print(f"An error occurred during transcription: {e}") yield f"[ERROR] An unexpected error occurred: {e}" def unified_handler(user_text, audio_file, chat_history): chat_history = chat_history or [] msg_from_user = None if user_text and user_text.strip(): msg_from_user = user_text elif audio_file: transcription = None gen = transcribe_audio(audio_file) try: while True: out = next(gen) # Optional: Show progress in chat, if you want if not out.startswith("[ERROR]"): last_out = out except StopIteration as e: # If generator returns a value, it's in e.value transcription = e.value if e.value else last_out if transcription: msg_from_user = transcription if msg_from_user: chat_history.append(("User", msg_from_user)) wisal_reply = process_query(msg_from_user) chat_history.append(("Wisal", wisal_reply)) return chat_history, "", None return chat_history, "", None import gradio as gr import asyncio # Your process_query, transcribe_audio, and text_to_speech_ui functions should exist. def wisal_handler(user_text, audio_file, chat_history): # If user typed a message if user_text and user_text.strip(): chat_history = chat_history or [] response = process_query(user_text) chat_history.append(("User", user_text)) chat_history.append(("Wisal", response)) return chat_history, "", None # Clear input box # If user provided audio if audio_file: transcription = None gen = transcribe_audio(audio_file) for out in gen: if isinstance(out, str) and out.startswith("Uploading"): continue if isinstance(out, str) and not out.startswith("[ERROR]"): transcription = out if isinstance(out, str) and out.startswith("[ERROR]"): chat_history.append(("System", out)) return chat_history, "", None if transcription: chat_history.append(("User", transcription)) # Show transcription! wisal_reply = process_query(transcription) chat_history.append(("Wisal", wisal_reply)) return chat_history, "", None return chat_history, "", None # Nothing sent # Make sure to escape backslashes in the file path (use raw strings or forward slashes) image_path = r"C:\Users\Fouda\OneDrive\Desktop\Aya\Compumacy-Logo-Trans2.png" # Using a raw string with gr.Blocks(title="Wisal Chatbot", theme='Yntec/HaleyCH_Theme_craiyon_alt') as demo: chat_history = gr.State([]) # Add Image (local path) with gr.Row(): gr.Image(value=image_path, show_label=False, container=False, height=100) gr.Markdown("# 🤖 Wisal: Autism AI Assistant") gr.CheckboxGroup(["Doctor", "Patient"], label="Checkbox Group") chatbot = gr.Chatbot(label="Wisal Chat", height=500) with gr.Row(): user_input = gr.Textbox(placeholder="Type your question here...", label="", lines=1) audio_input = gr.Audio( sources=["microphone", "upload"], type="filepath", label="Record or Upload Audio" ) send_btn = gr.Button("Send", variant="primary") send_btn.click( fn=wisal_handler, inputs=[user_input, audio_input, chat_history], outputs=[chatbot, user_input, audio_input], ) with gr.Row(): audio_output = gr.Audio(label="TTS Audio Output", interactive=True) send_btn.click( fn=wisal_handler, inputs=[user_input, audio_input, chat_history], outputs=[chatbot, user_input, audio_output], api_name="wisal_handler" ) with gr.Row() as row2: with gr.Column(): webrtc2 = WebRTC( label="Live Chat", modality="audio", mode="send-receive", elem_id="audio-source", rtc_configuration=get_cloudflare_turn_credentials_async, icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png", pulse_color="rgb(255, 255, 255)", icon_button_color="rgb(255, 255, 255)", ) webrtc2.stream( GeminiHandler(), inputs=[webrtc2], outputs=[webrtc2], time_limit=180 if get_space() else None, concurrency_limit=2 if get_space() else None, ) doc_file = gr.File(label="📎 Upload Document (PDF, DOCX, TXT)", file_types=[".pdf", ".docx", ".txt"]) doc_type = gr.Radio( ["None", "Knowledge Document", "User-Specific Document"], value="None", label="Document Type" ) user_doc_option = gr.Radio( ["New Document", "Old Document"], label="Select User Document Type", visible=False ) def toggle_user_doc_visibility(selected_type): return gr.update(visible=(selected_type == "User-Specific Document")) doc_type.change( toggle_user_doc_visibility, inputs=doc_type, outputs=user_doc_option ) send_btn.click( fn=pipeline_with_history, inputs=[user_input, doc_file, doc_type, chatbot], outputs=[chatbot, user_input] ) clear_btn = gr.Button("Clear Chat", elem_id="clear-button") clear_btn.click(lambda: [], outputs=[chatbot]) # Add custom theme CSS to the app theme_css = """ /* Logo Row */ #logo-row { display: flex; justify-content: center; align-items: center; padding: 1rem; background-color: #222222; /* Dark gray background for the logo row */ } #logo-row img { max-width: 300px; object-fit: contain; } /* Send Button */ #send-button { background-color: #f44336; en color for the Send button */ color: white; font-size: 16px; padding: 10px 24px; border: none; border-radius: 5px; cursor: pointer; } #send-button:hover { background-color: #e53935; } /* Clear Button */ #clear-button { background-color: #f44336; /* Red color for the Clear button */ color: white; font-size: 16px; padding: 10px 24px; border: none; border-radius: 5px; cursor: pointer; } #clear-button:hover { background-color: #e53935; /* Darker red on hover */ } /* Main Container Background */ .gradio-container { background-color: #2C2C2C; /* Dark background color */ padding: 20px; color: white; } /* Saved State Item */ .saved-state-item { padding: 10px; margin: 5px 0; border-radius: 5px; background-color: #333333; /* Dark gray background for saved state items */ color: #ffffff; /* White text color */ cursor: pointer; transition: background-color 0.2s; border: 1px solid #444444; } .saved-state-item:hover { background-color: #444444; /* Slightly lighter gray on hover */ } /* Delete Button */ .delete-button { color: #ff6b6b; /* Red color for delete button */ margin-left: 10px; float: right; font-weight: bold; } /* Filesystem Sessions Container */ .filesystem-sessions-container { max-height: 400px; overflow-y: auto; padding: 5px; border: 1px solid #444; border-radius: 5px; background-color: #222222; /* Dark background for the session container */ } /* Highlight effect when clicking */ .saved-state-item:active { background-color: #555555; /* Darker gray when clicking */ } """ demo.css = theme_css if __name__ == "__main__": demo.launch(debug=True)