|
from flask import Flask, request, jsonify, send_from_directory |
|
import speech_recognition as sr |
|
import datetime |
|
import pyttsx3 |
|
from langdetect import detect |
|
from huggingface_hub import login |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering, AutoModelForSeq2SeqLM |
|
import faiss |
|
import numpy as np |
|
import pandas as pd |
|
import json |
|
import webbrowser |
|
from pydub import AudioSegment |
|
import os |
|
from werkzeug.utils import secure_filename |
|
import tempfile |
|
from dotenv import load_dotenv |
|
|
|
app = Flask(__name__, static_folder='.') |
|
|
|
|
|
load_dotenv() |
|
hf_token = os.environ.get("API_KEY") |
|
if not hf_token: |
|
raise ValueError("Hugging Face API key not found. Please set 'API_KEY' as an environment variable or in a .env file.") |
|
|
|
login(token=hf_token) |
|
|
|
|
|
qa_model = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-base-squad2") |
|
qa_tokenizer = AutoTokenizer.from_pretrained("deepset/roberta-base-squad2") |
|
qa_pipeline = pipeline("question-answering", model=qa_model, tokenizer=qa_tokenizer) |
|
|
|
|
|
summarizer_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn") |
|
summarizer_tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn") |
|
summarizer_pipeline = pipeline("summarization", model=summarizer_model, tokenizer=summarizer_tokenizer) |
|
|
|
embed_model = SentenceTransformer("sentence-transformers/paraphrase-MiniLM-L6-v2") |
|
|
|
|
|
try: |
|
df_parquet = pd.read_parquet("ibtehaj dataset.parquet") |
|
corpus_parquet = df_parquet["text"].dropna().tolist() |
|
except FileNotFoundError: |
|
raise FileNotFoundError("ibtehaj dataset.parquet not found. Make sure it's in the same directory as app.py") |
|
|
|
try: |
|
with open("pdf_data.json", "r", encoding="utf-8") as f: |
|
json_data = json.load(f) |
|
except FileNotFoundError: |
|
raise FileNotFoundError("pdf_data.json not found. Make sure it's in the same directory as app.py") |
|
except json.JSONDecodeError as e: |
|
raise ValueError(f"Error decoding pdf_data.json: {e}") |
|
|
|
|
|
corpus_json = [] |
|
for entry in json_data: |
|
if isinstance(entry, dict) and "text" in entry: |
|
text = entry["text"].strip() |
|
if text: |
|
corpus_json.append(text) |
|
|
|
|
|
corpus = corpus_parquet + corpus_json |
|
|
|
|
|
|
|
embeddings = embed_model.encode(corpus, show_progress_bar=True, batch_size=16) |
|
|
|
|
|
index = faiss.IndexFlatL2(embeddings.shape[1]) |
|
index.add(np.array(embeddings)) |
|
|
|
def rag_answer(question: str, k: int = 3) -> str: |
|
q_emb = embed_model.encode([question]) |
|
D, I = index.search(q_emb, k) |
|
context = "\n\n".join(corpus[i] for i in I[0] if 0 <= i < len(corpus)) |
|
|
|
if not context.strip(): |
|
return "Context is empty. Try rephrasing the question." |
|
|
|
try: |
|
result = qa_pipeline(question=question, context=context) |
|
raw_answer = result.get("answer", "No answer found.") |
|
|
|
|
|
if len(raw_answer.split()) > 40 or len(raw_answer) > 300: |
|
summary = summarizer_pipeline(raw_answer, max_length=50, min_length=15, do_sample=False) |
|
summarized_answer = summary[0]['summary_text'] |
|
else: |
|
summarized_answer = raw_answer |
|
|
|
return f"Answer: {summarized_answer}\n\n[Context Used]:\n{context[:500]}..." |
|
except Exception as e: |
|
return f"Error: {e}" |
|
|
|
|
|
tts_engine = None |
|
|
|
def init_tts_engine(): |
|
global tts_engine |
|
if tts_engine is None: |
|
tts_engine = pyttsx3.init() |
|
tts_engine.setProperty('rate', 150) |
|
tts_engine.setProperty('volume', 1.0) |
|
voices = tts_engine.getProperty('voices') |
|
for v in voices: |
|
if "zira" in v.name.lower() or "female" in v.name.lower(): |
|
tts_engine.setProperty('voice', v.id) |
|
break |
|
|
|
init_tts_engine() |
|
|
|
|
|
conversation_history = [] |
|
last_question_text = "" |
|
last_answer_text = "" |
|
|
|
@app.route('/') |
|
def serve_index(): |
|
return send_from_directory('.', 'index.html') |
|
|
|
@app.route('/<path:path>') |
|
def serve_static_files(path): |
|
|
|
|
|
|
|
return send_from_directory('.', path) |
|
|
|
|
|
@app.route('/answer', methods=['POST']) |
|
def generate_answer_endpoint(): |
|
global last_question_text, last_answer_text, conversation_history |
|
data = request.get_json() |
|
question = data.get('question', '').strip() |
|
|
|
if not question: |
|
return jsonify({"answer": "Please provide a question."}), 400 |
|
|
|
last_question_text = question |
|
timestamp = datetime.datetime.now().strftime("%H:%M:%S") |
|
conversation_history.append({"role": "user", "time": timestamp, "text": question}) |
|
|
|
ans = rag_answer(question) |
|
last_answer_text = ans |
|
conversation_history.append({"role": "bot", "time": timestamp, "text": ans}) |
|
|
|
return jsonify({"answer": ans}) |
|
|
|
@app.route('/read-aloud', methods=['POST']) |
|
def read_aloud_endpoint(): |
|
|
|
|
|
data = request.get_json() |
|
text_to_read = data.get('text', '').strip() |
|
|
|
if not text_to_read: |
|
return jsonify({"status": "No text provided to read."}), 400 |
|
|
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: |
|
temp_audio_path = fp.name |
|
|
|
tts_engine.save_to_file(text_to_read, temp_audio_path) |
|
tts_engine.runAndWait() |
|
|
|
|
|
|
|
return jsonify({"status": "TTS audio generated (server-side)."}) |
|
except Exception as e: |
|
return jsonify({"status": f"Error during TTS: {str(e)}"}), 500 |
|
finally: |
|
if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
|
|
|
|
@app.route('/upload-mp3', methods=['POST']) |
|
def upload_mp3_endpoint(): |
|
global last_question_text, last_answer_text, conversation_history |
|
|
|
if 'file' not in request.files: |
|
return jsonify({"message": "No file part"}), 400 |
|
file = request.files['file'] |
|
if file.filename == '': |
|
return jsonify({"message": "No selected file"}), 400 |
|
if file: |
|
filename = secure_filename(file.filename) |
|
|
|
|
|
try: |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
mp3_path = os.path.join(tmpdir, filename) |
|
file.save(mp3_path) |
|
|
|
wav_path = os.path.join(tmpdir, filename.replace(".mp3", ".wav")) |
|
try: |
|
sound = AudioSegment.from_mp3(mp3_path) |
|
sound.export(wav_path, format="wav") |
|
except Exception as e: |
|
|
|
return jsonify({"message": f"Error converting MP3 to WAV. Ensure FFmpeg is installed and in your system's PATH. Details: {e}"}), 500 |
|
|
|
try: |
|
recognizer = sr.Recognizer() |
|
with sr.AudioFile(wav_path) as src: |
|
audio = recognizer.record(src) |
|
text = recognizer.recognize_google(audio) |
|
except sr.UnknownValueError: |
|
return jsonify({"message": "Speech not understood. Please try again."}), 400 |
|
except sr.RequestError as e: |
|
return jsonify({"message": f"Could not request results from speech recognition service; {e}"}), 500 |
|
except Exception as e: |
|
return jsonify({"message": f"An unexpected error occurred during speech recognition: {e}"}), 500 |
|
|
|
|
|
|
|
|
|
return jsonify({ |
|
"message": "MP3 transcribed successfully.", |
|
"transcription": text |
|
}) |
|
except Exception as e: |
|
|
|
return jsonify({"message": f"An error occurred during file upload or temporary processing: {e}"}), 500 |
|
|
|
return jsonify({"message": "An unknown file processing error occurred."}), 500 |
|
|
|
|
|
@app.route('/summarize', methods=['POST']) |
|
def summarize_endpoint(): |
|
data = request.get_json() |
|
text_to_summarize = data.get('text', '').strip() |
|
|
|
if not text_to_summarize: |
|
return jsonify({"summary": "No text provided for summarization."}), 400 |
|
|
|
def chunk_text(text, max_chunk_size=4000): |
|
sentences = text.split(". ") |
|
chunks = [] |
|
current_chunk = "" |
|
for sentence in sentences: |
|
|
|
if len(current_chunk) + len(sentence) + 2 < max_chunk_size: |
|
current_chunk += sentence + ". " |
|
else: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + ". " |
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
return chunks |
|
|
|
try: |
|
chunks = chunk_text(text_to_summarize) |
|
summaries = [ |
|
summarizer_pipeline(chunk, max_length=150, min_length=50, do_sample=False)[0]["summary_text"] |
|
for chunk in chunks |
|
] |
|
final_input = " ".join(summaries) |
|
final_summary = summarizer_pipeline(final_input, max_length=150, min_length=50, do_sample=False)[0]["summary_text"] |
|
return jsonify({"summary": final_summary}) |
|
except Exception as e: |
|
return jsonify({"summary": f"Error during summarization: {e}"}), 500 |
|
|
|
@app.route('/history', methods=['GET']) |
|
def get_history(): |
|
return jsonify({"history": conversation_history}) |
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
app.run(debug=True) |