Spaces:
Running
Running
# app.py | |
import gradio as gr | |
import wikipedia | |
import numpy as np | |
import tempfile | |
import os | |
import time | |
from datetime import datetime, timedelta | |
from gtts import gTTS | |
from langdetect import detect | |
from pydub import AudioSegment | |
from pydub.silence import split_on_silence | |
import speech_recognition as sr | |
from sentence_transformers import SentenceTransformer | |
from transformers import pipeline | |
import re | |
import torch | |
# --- USER MANAGEMENT SYSTEM --- | |
class UserManager: | |
def __init__(self): | |
self.user_data = {} | |
self.max_warnings = 1 | |
self.block_duration = timedelta(days=30) | |
def get_user_status(self, user_id): | |
if user_id not in self.user_data: | |
return "active" | |
if self.user_data[user_id].get('permanently_banned', False): | |
return "banned" | |
if 'blocked_until' in self.user_data[user_id]: | |
if datetime.now() < self.user_data[user_id]['blocked_until']: | |
return "blocked" | |
del self.user_data[user_id]['blocked_until'] | |
return "active" | |
def add_warning(self, user_id, violation_type): | |
if user_id not in self.user_data: | |
self.user_data[user_id] = {'warnings': 1, 'flags': [violation_type]} | |
else: | |
self.user_data[user_id]['warnings'] += 1 | |
self.user_data[user_id]['flags'].append(violation_type) | |
if self.user_data[user_id]['warnings'] > self.max_warnings: | |
self.user_data[user_id]['blocked_until'] = datetime.now() + self.block_duration | |
return "blocked" | |
return "warned" | |
user_manager = UserManager() | |
# --- MODEL INITIALIZATION --- | |
def load_models(): | |
models = { | |
'translator': pipeline('translation', model='Helsinki-NLP/opus-mt-mul-en'), | |
'answer_gen': pipeline('text2text-generation', model='google/flan-t5-base'), | |
'encoder': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'), | |
'toxic-bert': pipeline("text-classification", model="unitary/toxic-bert"), | |
'roberta-hate': pipeline("text-classification", model="facebook/roberta-hate-speech-dynabench-r4-target") | |
} | |
for lang in ['fr', 'ar', 'zh', 'es']: | |
models[f'en_to_{lang}'] = pipeline(f'translation_en_to_{lang}', model=f'Helsinki-NLP/opus-mt-en-{lang}') | |
return models | |
models = load_models() | |
# --- UNIVERSAL HATE SPEECH DETECTION --- | |
class HateSpeechDetector: | |
def __init__(self): | |
self.keyword_banks = { | |
'racial': ['nigger', 'chink', 'spic', 'kike', 'gook', 'wetback'], | |
'gender': ['fag', 'dyke', 'tranny', 'whore', 'slut', 'bitch'], | |
'violence': ['kill', 'murder', 'harm', 'hurt', 'abuse', 'torture'], | |
'general': ['scum', 'vermin', 'subhuman', 'untermensch'] | |
} | |
self.patterns = [ | |
(r'\b(all|every)\s\w+\s(should|must)\s(die|burn)', 'group violence'), | |
(r'\b(how to|ways? to)\s(kill|harm|hurt)', 'harm instructions'), | |
(r'[!@#$%^&*]igg[!@#$%^&*]', 'coded racial slur') | |
] | |
def detect(self, text): | |
text_lower = text.lower() | |
violations = [] | |
# Keyword detection | |
for category, keywords in self.keyword_banks.items(): | |
found = [kw for kw in keywords if kw in text_lower] | |
if found: | |
violations.append(f"{category} terms: {', '.join(found[:3])}") | |
# Pattern detection | |
for pattern, desc in self.patterns: | |
if re.search(pattern, text_lower): | |
violations.append(f"pattern: {desc}") | |
# Model detection | |
try: | |
toxic_result = models['toxic-bert'](text)[0] | |
if toxic_result['label'].lower() in ['toxic', 'hate'] and toxic_result['score'] > 0.7: | |
violations.append(f"toxic-bert: {toxic_result['label']} ({toxic_result['score']:.2f})") | |
hate_result = models['roberta-hate'](text)[0] | |
if hate_result['label'].lower() in ['hate', 'offensive'] and hate_result['score'] > 0.7: | |
violations.append(f"roberta-hate: {hate_result['label']} ({hate_result['score']:.2f})") | |
except Exception as e: | |
print(f"Model error: {e}") | |
return violations if violations else None | |
hate_detector = HateSpeechDetector() | |
# --- RESPONSE GENERATION --- | |
def generate_response(text, topic, lang): | |
try: | |
wikipedia.set_lang('en') | |
try: | |
page = wikipedia.page(topic, auto_suggest=False) | |
context = page.summary[:1000] | |
except wikipedia.exceptions.DisambiguationError as e: | |
page = wikipedia.page(e.options[0]) | |
context = page.summary[:1000] | |
except Exception as e: | |
print(f"Wikipedia error: {e}") | |
return "Could not find information. Please try another topic.", None | |
prompt = f"Context: {context}\nQuestion: {text}\nAnswer:" | |
answer = models['answer_gen'](prompt, max_length=200)[0]['generated_text'] | |
translated = translate(answer, 'en', lang) if lang != 'en' else answer | |
audio_path = text_to_speech(translated, lang) | |
return translated, audio_path | |
# --- WARNING MESSAGES --- | |
def create_warning_message(violations): | |
return gr.HTML(f""" | |
<div style=' | |
border: 2px solid #ff0000; | |
border-radius: 5px; | |
padding: 10px; | |
background-color: #fff0f0; | |
margin: 10px 0; | |
'> | |
<div style='color: #ff0000; font-weight: bold;'> | |
β οΈ WARNING: Violation Detected | |
</div> | |
<div style='margin-top: 8px;'> | |
Your message contains prohibited content | |
</div> | |
<div style='margin-top: 8px; font-size: 0.9em;'> | |
<b>Reason:</b> {', '.join(violations[:2])} | |
</div> | |
</div> | |
""") | |
def create_blocked_message(): | |
return gr.HTML(""" | |
<div style=' | |
border: 2px solid #990000; | |
border-radius: 5px; | |
padding: 10px; | |
background-color: #ffebee; | |
'> | |
β ACCOUNT TEMPORARILY SUSPENDED | |
</div> | |
""") | |
# --- MAIN HANDLER --- | |
def handle_interaction(audio, text, topic, lang, chat_history, request: gr.Request): | |
user_id = request.client.host if request else "default_user" | |
status = user_manager.get_user_status(user_id) | |
if status == "banned": | |
return chat_history.append(("", "β Account permanently banned")), "", None | |
if status == "blocked": | |
return chat_history.append(("", create_blocked_message())), "", None | |
if audio: | |
text = process_audio(audio) or text | |
if not text.strip(): | |
return chat_history.append(("", "βοΈ Please enter a question")), "", None | |
violations = hate_detector.detect(text) | |
if violations: | |
action = user_manager.add_warning(user_id, violations[0]) | |
if action == "warned": | |
chat_history.append((text, create_warning_message(violations))) | |
elif action == "blocked": | |
chat_history.append(("", create_blocked_message())) | |
return chat_history, "", None | |
response, audio_output = generate_response(text, topic, lang) | |
chat_history.append((text, response)) | |
return chat_history, "", audio_output | |
# --- AUDIO PROCESSING --- | |
def process_audio(audio_path): | |
recognizer = sr.Recognizer() | |
sound = AudioSegment.from_file(audio_path) | |
chunks = split_on_silence(sound, min_silence_len=500, silence_thresh=sound.dBFS-14) | |
full_text = "" | |
for chunk in chunks: | |
with tempfile.NamedTemporaryFile(suffix='.wav') as f: | |
chunk.export(f.name, format="wav") | |
with sr.AudioFile(f.name) as source: | |
audio = recognizer.record(source) | |
try: full_text += recognizer.recognize_google(audio) + " " | |
except: continue | |
return full_text.strip() | |
def text_to_speech(text, lang): | |
try: | |
tts = gTTS(text=text, lang=lang) | |
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: | |
tts.save(f.name) | |
return f.name | |
except Exception as e: | |
print(f"TTS Error: {e}") | |
return None | |
def translate(text, src, tgt): | |
if src == tgt: return text | |
if src != 'en': text = models['translator'](text)[0]['translation_text'] | |
if f'en_to_{tgt}' in models: return models[f'en_to_{tgt}'](text)[0]['translation_text'] | |
return text | |
# --- INTERACTIVE DESCRIPTION --- | |
description_html = """ | |
<div style="font-family: 'Arial', sans-serif; max-width: 800px; margin: 0 auto;"> | |
<div style="text-align: center; margin-bottom: 30px;"> | |
<img src="https://i.imgur.com/6wBs5mO.png" style="width: 120px; height: 120px; border-radius: 50%; border: 3px solid #00008b;"> | |
<h1 style="color: #00008b; margin-top: 15px;">π Multilingual AI Assistant</h1> | |
<p style="color: #555;">Powered by Transformers and Gradio</p> | |
</div> | |
<div style="background-color: #e6f2ff; padding: 25px; border-radius: 10px; border: 2px solid #00008b; margin-bottom: 20px;"> | |
<h2 style="color: #00008b; margin-top: 0;">β¨ Features</h2> | |
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px;"> | |
<div style="background: white; padding: 15px; border-radius: 8px;"> | |
<h3 style="margin-top: 0;">π Wikipedia Knowledge</h3> | |
<p>Answers questions using Wikipedia content</p> | |
</div> | |
<div style="background: white; padding: 15px; border-radius: 8px;"> | |
<h3 style="margin-top: 0;">π£οΈ Voice Interaction</h3> | |
<p>Speak or type your questions</p> | |
</div> | |
<div style="background: white; padding: 15px; border-radius: 8px;"> | |
<h3 style="margin-top: 0;">π 5 Languages</h3> | |
<p>English, French, Spanish, Chinese, Arabic</p> | |
</div> | |
<div style="background: white; padding: 15px; border-radius: 8px;"> | |
<h3 style="margin-top: 0;">π‘οΈ Content Moderation</h3> | |
<p>Automated hate speech detection</p> | |
</div> | |
</div> | |
</div> | |
<div style="background-color: #fff0f0; padding: 25px; border-radius: 10px; border: 2px solid #ff0000; margin-bottom: 20px;"> | |
<h2 style="color: #ff0000; margin-top: 0;">π« Restricted Content</h2> | |
<ul> | |
<li>Hate speech or discrimination</li> | |
<li>Violent or harmful content</li> | |
<li>Personal/medical/legal advice</li> | |
</ul> | |
</div> | |
</div> | |
""" | |
# --- GRADIO INTERFACE --- | |
with gr.Blocks(title="π Multilingual AI Assistant") as demo: | |
gr.HTML(description_html) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="π€ Speak or upload audio") | |
topic_input = gr.Textbox("Artificial Intelligence", label="π Wikipedia Topic") | |
lang_input = gr.Dropdown(["en", "fr", "es", "zh", "ar"], value="en", label="π Output Language") | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot(label="Conversation") | |
text_input = gr.Textbox(placeholder="Type your question...", label="βοΈ Or type here") | |
with gr.Row(): | |
clear_btn = gr.Button("ποΈ Clear Chat") | |
submit_btn = gr.Button("π Submit", variant="primary") | |
audio_output = gr.Audio(label="π Answer", visible=True) | |
submit_btn.click( | |
handle_interaction, | |
inputs=[audio_input, text_input, topic_input, lang_input, chatbot], | |
outputs=[chatbot, text_input, audio_output] | |
) | |
text_input.submit( | |
handle_interaction, | |
inputs=[audio_input, text_input, topic_input, lang_input, chatbot], | |
outputs=[chatbot, text_input, audio_output] | |
) | |
clear_btn.click(lambda: ([], "", None), outputs=[chatbot, text_input, audio_output]) | |
if __name__ == "__main__": | |
demo.launch(share=True) |