from Live_audio import GeminiHandler
import os
import re
from langdetect import detect
import asyncio
import gradio as gr
import google.generativeai as genai
import os
import time
import gradio as gr
from datetime import datetime
import langdetect
import RAG_Domain_know_doc
from web_search import search_autism
from RAG import rag_autism
from openai import OpenAI
from dotenv import load_dotenv
import Old_Document
import User_Specific_Documents
import asyncio
import base64
import time
from io import BytesIO
from dotenv import load_dotenv
load_dotenv()
from google.genai import types
from google.genai.types import (
LiveConnectConfig,
SpeechConfig,
VoiceConfig,
PrebuiltVoiceConfig,
Content,
Part,
)
import gradio as gr
import numpy as np
import websockets
from dotenv import load_dotenv
from fastrtc import (
AsyncAudioVideoStreamHandler,
Stream,
WebRTC,
get_cloudflare_turn_credentials_async,
wait_for_item,
)
from google import genai
from gradio.utils import get_space
from PIL import Image
# ------------------------------------------
import asyncio
import base64
import json
import os
import pathlib
import gradio as gr
import google.generativeai as genai
import os
import time
from typing import AsyncGenerator, Literal
import gradio as gr
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastrtc import (
AsyncStreamHandler,
Stream,
get_cloudflare_turn_credentials_async,
wait_for_item,
)
from google import genai
from google.genai.types import (
LiveConnectConfig,
PrebuiltVoiceConfig,
SpeechConfig,
VoiceConfig,
)
from gradio.utils import get_space
from pydantic import BaseModel
# ------------------------------------------------
import os
import gradio as gr
import google.generativeai as genai
import os
import time
import io
import asyncio
from pydub import AudioSegment
DEEPINFRA_API_KEY = "285LUJulGIprqT6hcPhiXtcrphU04FG4"
# Gemini: google-genai
from google import genai
# ---------------------------------------------------
# VAD imports from reference code
import collections
import webrtcvad
import fastrtc
import time
# helper functions
from prompt_template import (
Prompt_template_translation,
Prompt_template_LLM_Generation,
Prompt_template_Reranker,
Prompt_template_Wisal,
Prompt_template_Halluciations,
Prompt_template_paraphrasing,
Prompt_template_Translate_to_original,
Prompt_template_relevance,
Prompt_template_User_document_prompt
)
from query_utils import process_query_for_rewrite, get_non_autism_response
GEMINI_API_KEY="AIzaSyCUCivstFpC9pq_jMHMYdlPrmh9Bx97dFo"
TAVILY_API_KEY="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"
WEAVIATE_URL="yorcqe2sqswhcaivxvt9a.c0.us-west3.gcp.weaviate.cloud"
WEAVIATE_API_KEY="d2d0VGdZQTBmdTFlOWdDZl9tT2h3WDVWd1NpT1dQWHdGK0xjR1hYeWxicUxHVnFRazRUSjY2VlRUVlkwPV92MjAw"
DEEPINFRA_API_KEY="285LUJulGIprqT6hcPhiXtcrphU04FG4"
DEEPINFRA_BASE_URL="https://api.deepinfra.com/v1/openai"
# API Keys and Constants
env = os.getenv("ENVIRONMENT", "production")
openai = OpenAI(
api_key=DEEPINFRA_API_KEY,
base_url="https://api.deepinfra.com/v1/openai",
)
SESSION_ID = "default"
pending_clarifications = {}
def call_llm(model: str, messages: list[dict], temperature: float = 0.0, **kwargs) -> str:
resp = openai.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
**kwargs
)
return resp.choices[0].message.content.strip()
def is_greeting(text: str) -> bool:
return bool(re.search(r"\b(hi|hello|hey|good (morning|afternoon|evening))\b", text, re.I))
def process_query(query: str, first_turn: bool = False, session_id: str = "default"):
intro = ""
process_log = []
# Check if user is responding to a clarification prompt
if session_id in pending_clarifications:
if query.strip().lower() == "yes":
corrected_query = pending_clarifications.pop(session_id)
process_log.append(f"User confirmed: {corrected_query}")
return process_autism_pipeline(corrected_query, process_log, intro)
else:
pending_clarifications.pop(session_id)
redirect = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism."
process_log.append("User rejected clarification.")
_save_process_log(process_log)
return redirect
if first_turn and (not query or query.strip() == ""):
intro = "Hello! I'm Wisal, an AI assistant developed by Compumacy AI, specializing in Autism Spectrum Disorders. How can I help you today?"
process_log.append(intro)
_save_process_log(process_log)
return intro
if is_greeting(query):
greeting = intro + "Hello! I'm Wisal, your AI assistant developed by Compumacy AI. How can I help you today?"
process_log.append(f"Greeting detected.\n{greeting}")
_save_process_log(process_log)
return greeting
# Process query with the new 3-tuple return
corrected_query, is_autism_related, rewritten_query = process_query_for_rewrite(query)
process_log.append(f"Original Query: {query}")
process_log.append(f"Corrected Query: {corrected_query}")
process_log.append(f"Relevance Check: {'RELATED' if is_autism_related else 'NOT RELATED'}")
if rewritten_query:
process_log.append(f"Rewritten Query: {rewritten_query}")
# If not autism-related, show clarification with rewritten question
if not is_autism_related:
redirect_message = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism."
clarification = f"""Your query was not clearly related to autism. Do you mean:"{rewritten_query}"?"""
pending_clarifications[session_id] = rewritten_query
process_log.append(f"Clarification Prompted: {clarification}")
_save_process_log(process_log)
return clarification
return process_autism_pipeline(query,corrected_query, process_log, intro)
def process_autism_pipeline(query,corrected_query, process_log, intro):
web_search_resp = asyncio.run(search_autism(corrected_query))
web_answer = web_search_resp.get("answer", "")
process_log.append(f"Web Search: {web_answer}")
gen_prompt = Prompt_template_LLM_Generation.format(new_query=corrected_query)
generated = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": gen_prompt}],
reasoning_effort="none"
)
process_log.append(f"LLM Generated: {generated}")
rag_resp = asyncio.run(rag_autism(corrected_query, top_k=3))
rag_contexts = rag_resp.get("answer", [])
process_log.append(f"RAG Contexts: {rag_contexts}")
answers_list = f"[1] {generated}\n[2] {web_answer}\n" + "\n".join(f"[{i+3}] {c}" for i, c in enumerate(rag_contexts))
rerank_prompt = Prompt_template_Reranker.format(new_query=corrected_query, answers_list=answers_list)
reranked = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": rerank_prompt}],
reasoning_effort="none"
)
process_log.append(f"Reranked: {reranked}")
wisal_prompt = Prompt_template_Wisal.format(new_query=corrected_query, document=reranked)
wisal = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": wisal_prompt}],
reasoning_effort="none"
)
process_log.append(f"Wisal Answer: {wisal}")
halluc_prompt = Prompt_template_Halluciations.format(
new_query=corrected_query,
answer=wisal,
document=generated
)
halluc = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": halluc_prompt}],
reasoning_effort="none"
)
process_log.append(f"Hallucination Score: {halluc}")
score = int(halluc.split("Score: ")[-1]) if "Score: " in halluc else 3
if score in (2, 3):
paraphrased = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": Prompt_template_paraphrasing.format(document=generated)}],
reasoning_effort="none"
)
wisal = call_llm(
model="Qwen/Qwen3-32B",
messages=[{"role": "user", "content": Prompt_template_Wisal.format(new_query=corrected_query, document=paraphrased)}],
reasoning_effort="none"
)
process_log.append(f"Paraphrased Wisal: {wisal}")
try:
detected_lang = detect(query)
except:
detected_lang = "en"
is_english_text = bool(re.fullmatch(r"[A-Za-z0-9 .,?;:'\"!()\-]+", query))
# Decide whether to translate
needs_translation = detected_lang != "en" or not is_english_text
if needs_translation:
result = call_llm(
model="Qwen/Qwen3-32B",
messages=[{
"role": "user",
"content": Prompt_template_Translate_to_original.format(query=query, document=wisal)
}],
reasoning_effort="none"
)
process_log.append(f"Translated Back: {result}")
else:
result = wisal
process_log.append(f"Final Result: {result}")
rtl_languages = ["ar", "fa", "ur", "he"] # Arabic, Persian, Urdu, Hebrew
text_dir = "rtl" if detected_lang in rtl_languages else "ltr"
# Wrap result in direction-aware HTML
wrapped_result = f'
{result}
'
_save_process_log(process_log)
return intro + wrapped_result
def _save_process_log(log_lines, filename="process_output.txt"):
import datetime
logs_dir = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(logs_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt")
with open(log_filename, "w", encoding="utf-8") as f:
for line in log_lines:
f.write(str(line) + "\n\n")
def _save_process_log(log_lines, filename="process_output.txt"):
import datetime
import os
# Ensure logs directory exists
logs_dir = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(logs_dir, exist_ok=True)
# Unique filename per question (timestamped)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt")
try:
with open(log_filename, "w", encoding="utf-8") as f:
for line in log_lines:
f.write(str(line) + "\n\n")
except Exception as e:
pass
# Gradio UI for main pipeline, RAG_Domain_know_doc, and User_Specific_Documents , Old_Document
def main_pipeline_interface(query):
return process_query(query, first_turn=True)
def main_pipeline_with_doc_and_history(query, doc_file, doc_type, history):
response = main_pipeline_with_doc(query, doc_file, doc_type)
updated_history = history + f"\nUser: {query}\nWisal: {response}\n"
return response, updated_history
def main_pipeline_with_doc(query, doc_file, doc_type):
# If no document, use main pipeline
if doc_file is None or doc_type == "None":
return process_query(query, first_turn=True)
safe_filename = os.path.basename(getattr(doc_file, 'name', str(doc_file)))
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs")
os.makedirs(upload_dir, exist_ok=True)
save_path = os.path.join(upload_dir, safe_filename)
# 💡 Check if doc_file is file-like (has `.read()`) or path-like (str or NamedString)
if hasattr(doc_file, 'read'):
# File-like object
file_bytes = doc_file.read()
else:
# It's a path (NamedString), read from file path
with open(str(doc_file), 'rb') as f:
file_bytes = f.read()
# Save the file content
with open(save_path, "wb") as f:
f.write(file_bytes)
# Route to correct document handler
if doc_type == "Knowledge Document":
status = RAG_Domain_know_doc.ingest_file(save_path)
answer = RAG_Domain_know_doc.answer_question(query)
return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "User-Specific Document":
status = User_Specific_Documents.ingest_file(save_path)
answer = User_Specific_Documents.answer_question(query)
return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "Old Document":
status = Old_Document.ingest_file(save_path)
answer = Old_Document.answer_question(query)
return f"[Old Document Uploaded]\n{status}\n\n{answer}"
elif doc_type == "New Documrnt":
status = User_Specific_Documents.ingest_file(save_path)
answer = User_Specific_Documents.answer_question(query)
return f"[New Documrnt]\n{status}\n\n{answer}"
else:
return "Invalid document type."
def pipeline_with_history(message, doc_file, doc_type, history):
if not message.strip():
return history, ""
response = main_pipeline_with_doc(message, doc_file, doc_type)
history = history + [[message, response]]
return history, ""
import gradio as gr
import google.generativeai as genai
import os
import time
# Function to transcribe audio
def transcribe_audio(audio_filepath):
api_key = "AIzaSyC68cQzvDYEnas6u-5ABgbOSeJLmIKKpP8"
if audio_filepath is None:
return "No audio provided. Please record or upload an audio file first."
if not api_key:
return "API Key is missing. Please provide your Google AI API key."
try:
genai.configure(api_key=api_key)
model = genai.GenerativeModel(model_name="models/gemini-2.0-flash") # Get the model you want to use
print(f"Transcribing audio file: {audio_filepath}")
yield "Uploading audio file..."
# Upload the audio file
audio_file = genai.upload_file(path=audio_filepath)
# Check the processing status of the uploaded file
while audio_file.state.name == "PROCESSING":
time.sleep(2) # Wait for 2 seconds before checking again
audio_file = genai.get_file(audio_file.name)
if audio_file.state.name == "FAILED":
return "[ERROR] Audio file processing failed."
yield "Audio uploaded. Transcribing..."
# Request transcription from the model
response = model.generate_content(
["Please transcribe this audio recording.", audio_file],
request_options={"timeout": 120} # Set a timeout for the request
)
query = response.text if response and response.text else "Transcription failed. The response was empty."
yield query
except Exception as e:
print(f"An error occurred during transcription: {e}")
yield f"[ERROR] An unexpected error occurred: {e}"
def unified_handler(user_text, audio_file, chat_history):
chat_history = chat_history or []
msg_from_user = None
if user_text and user_text.strip():
msg_from_user = user_text
elif audio_file:
transcription = None
gen = transcribe_audio(audio_file)
try:
while True:
out = next(gen)
# Optional: Show progress in chat, if you want
if not out.startswith("[ERROR]"):
last_out = out
except StopIteration as e:
# If generator returns a value, it's in e.value
transcription = e.value if e.value else last_out
if transcription:
msg_from_user = transcription
if msg_from_user:
chat_history.append(("User", msg_from_user))
wisal_reply = process_query(msg_from_user)
chat_history.append(("Wisal", wisal_reply))
return chat_history, "", None
return chat_history, "", None
import gradio as gr
import asyncio
# Your process_query, transcribe_audio, and text_to_speech_ui functions should exist.
def wisal_handler(user_text, audio_file, chat_history):
# If user typed a message
if user_text and user_text.strip():
chat_history = chat_history or []
response = process_query(user_text)
chat_history.append(("User", user_text))
chat_history.append(("Wisal", response))
return chat_history, "", None # Clear input box
# If user provided audio
if audio_file:
transcription = None
gen = transcribe_audio(audio_file)
for out in gen:
if isinstance(out, str) and out.startswith("Uploading"):
continue
if isinstance(out, str) and not out.startswith("[ERROR]"):
transcription = out
if isinstance(out, str) and out.startswith("[ERROR]"):
chat_history.append(("System", out))
return chat_history, "", None
if transcription:
chat_history.append(("User", transcription)) # Show transcription!
wisal_reply = process_query(transcription)
chat_history.append(("Wisal", wisal_reply))
return chat_history, "", None
return chat_history, "", None # Nothing sent
# Make sure to escape backslashes in the file path (use raw strings or forward slashes)
image_path = r"C:\Users\Fouda\OneDrive\Desktop\Aya\Compumacy-Logo-Trans2.png" # Using a raw string
with gr.Blocks(title="Wisal Chatbot", theme='Yntec/HaleyCH_Theme_craiyon_alt') as demo:
chat_history = gr.State([])
# Add Image (local path)
with gr.Row():
gr.Image(value=image_path, show_label=False, container=False, height=100)
gr.Markdown("# 🤖 Wisal: Autism AI Assistant")
gr.CheckboxGroup(["Doctor", "Patient"], label="Checkbox Group")
chatbot = gr.Chatbot(label="Wisal Chat", height=500)
with gr.Row():
user_input = gr.Textbox(placeholder="Type your question here...", label="", lines=1)
audio_input = gr.Audio(
sources=["microphone", "upload"],
type="filepath",
label="Record or Upload Audio"
)
send_btn = gr.Button("Send", variant="primary")
send_btn.click(
fn=wisal_handler,
inputs=[user_input, audio_input, chat_history],
outputs=[chatbot, user_input, audio_input],
)
with gr.Row():
audio_output = gr.Audio(label="TTS Audio Output", interactive=True)
send_btn.click(
fn=wisal_handler,
inputs=[user_input, audio_input, chat_history],
outputs=[chatbot, user_input, audio_output],
api_name="wisal_handler"
)
with gr.Row() as row2:
with gr.Column():
webrtc2 = WebRTC(
label="Live Chat",
modality="audio",
mode="send-receive",
elem_id="audio-source",
rtc_configuration=get_cloudflare_turn_credentials_async,
icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png",
pulse_color="rgb(255, 255, 255)",
icon_button_color="rgb(255, 255, 255)",
)
webrtc2.stream(
GeminiHandler(),
inputs=[webrtc2],
outputs=[webrtc2],
time_limit=180 if get_space() else None,
concurrency_limit=2 if get_space() else None,
)
doc_file = gr.File(label="📎 Upload Document (PDF, DOCX, TXT)", file_types=[".pdf", ".docx", ".txt"])
doc_type = gr.Radio(
["None", "Knowledge Document", "User-Specific Document"],
value="None",
label="Document Type"
)
user_doc_option = gr.Radio(
["New Document", "Old Document"],
label="Select User Document Type",
visible=False
)
def toggle_user_doc_visibility(selected_type):
return gr.update(visible=(selected_type == "User-Specific Document"))
doc_type.change(
toggle_user_doc_visibility,
inputs=doc_type,
outputs=user_doc_option
)
send_btn.click(
fn=pipeline_with_history,
inputs=[user_input, doc_file, doc_type, chatbot],
outputs=[chatbot, user_input]
)
clear_btn = gr.Button("Clear Chat", elem_id="clear-button")
clear_btn.click(lambda: [], outputs=[chatbot])
# Add custom theme CSS to the app
theme_css = """
/* Logo Row */
#logo-row {
display: flex;
justify-content: center;
align-items: center;
padding: 1rem;
background-color: #222222; /* Dark gray background for the logo row */
}
#logo-row img {
max-width: 300px;
object-fit: contain;
}
/* Send Button */
#send-button {
background-color: #f44336; en color for the Send button */
color: white;
font-size: 16px;
padding: 10px 24px;
border: none;
border-radius: 5px;
cursor: pointer;
}
#send-button:hover {
background-color: #e53935;
}
/* Clear Button */
#clear-button {
background-color: #f44336; /* Red color for the Clear button */
color: white;
font-size: 16px;
padding: 10px 24px;
border: none;
border-radius: 5px;
cursor: pointer;
}
#clear-button:hover {
background-color: #e53935; /* Darker red on hover */
}
/* Main Container Background */
.gradio-container {
background-color: #2C2C2C; /* Dark background color */
padding: 20px;
color: white;
}
/* Saved State Item */
.saved-state-item {
padding: 10px;
margin: 5px 0;
border-radius: 5px;
background-color: #333333; /* Dark gray background for saved state items */
color: #ffffff; /* White text color */
cursor: pointer;
transition: background-color 0.2s;
border: 1px solid #444444;
}
.saved-state-item:hover {
background-color: #444444; /* Slightly lighter gray on hover */
}
/* Delete Button */
.delete-button {
color: #ff6b6b; /* Red color for delete button */
margin-left: 10px;
float: right;
font-weight: bold;
}
/* Filesystem Sessions Container */
.filesystem-sessions-container {
max-height: 400px;
overflow-y: auto;
padding: 5px;
border: 1px solid #444;
border-radius: 5px;
background-color: #222222; /* Dark background for the session container */
}
/* Highlight effect when clicking */
.saved-state-item:active {
background-color: #555555; /* Darker gray when clicking */
}
"""
demo.css = theme_css
if __name__ == "__main__":
demo.launch(debug=True)