|
|
|
|
|
import sqlite3 |
|
import numpy as np |
|
import unicodedata |
|
import re |
|
import json |
|
import os |
|
import gradio as gr |
|
import spacy |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import random |
|
|
|
|
|
modelo = SentenceTransformer("hiiamsid/sentence_similarity_spanish_es") |
|
|
|
|
|
try: |
|
nlp = spacy.load("es_core_news_sm") |
|
except: |
|
from spacy.cli import download |
|
download("es_core_news_sm") |
|
nlp = spacy.load("es_core_news_sm") |
|
|
|
|
|
|
|
conn = sqlite3.connect("chatbot_fcefn.db", check_same_thread=False) |
|
cursor = conn.cursor() |
|
|
|
|
|
with open("sinonimos.json", "r", encoding="utf-8") as f: |
|
sinonimos = json.load(f) |
|
|
|
with open("stopwords.json", "r", encoding="utf-8") as f: |
|
stopwords = set(json.load(f)) |
|
|
|
with open("preguntas_repaso.json", "r", encoding="utf-8") as f: |
|
modelos_por_categoria = json.load(f) |
|
|
|
|
|
|
|
def normalizar(texto): |
|
texto = texto.lower() |
|
texto = unicodedata.normalize("NFKD", texto).encode("ASCII", "ignore").decode("utf-8") |
|
texto = re.sub(r"[^\w\s]", "", texto) |
|
for original, estandar in sinonimos.items(): |
|
texto = re.sub(r'\b' + re.escape(original) + r'\b', estandar, texto) |
|
texto = re.sub(r"\s+", " ", texto).strip() |
|
return texto |
|
|
|
def extraer_palabras_clave(texto): |
|
doc = nlp(texto) |
|
return [ |
|
token.lemma_.lower() |
|
for token in doc |
|
if token.is_alpha and |
|
token.lemma_.lower() not in stopwords and |
|
token.pos_ in ["NOUN", "PROPN"] and |
|
len(token.lemma_) > 2 |
|
] |
|
|
|
|
|
|
|
def generar_y_guardar_embeddings(): |
|
|
|
cursor.execute("SELECT pregunta_id FROM embeddings") |
|
existentes = set(row[0] for row in cursor.fetchall()) |
|
|
|
|
|
cursor.execute("SELECT id, question, category FROM faq") |
|
preguntas = cursor.fetchall() |
|
|
|
n_nuevos = 0 |
|
for pregunta_id, texto, categoria in preguntas: |
|
if pregunta_id in existentes: |
|
continue |
|
|
|
emb = modelo.encode(texto) |
|
emb_bytes = emb.astype(np.float32).tobytes() |
|
|
|
|
|
cursor.execute( |
|
"INSERT INTO embeddings (pregunta_id, embedding, categoria) VALUES (?, ?, ?)", |
|
(pregunta_id, emb_bytes, categoria) |
|
) |
|
n_nuevos += 1 |
|
|
|
conn.commit() |
|
print(f"Embeddings generados y guardados para {n_nuevos} preguntas nuevas.") |
|
|
|
generar_y_guardar_embeddings() |
|
|
|
|
|
|
|
cursor.execute(""" |
|
SELECT f.question, f.answer, e.embedding, f.category |
|
FROM faq f JOIN embeddings e ON f.id = e.pregunta_id |
|
""") |
|
rows = cursor.fetchall() |
|
|
|
preguntas = [row[0] for row in rows] |
|
respuestas = [row[1] for row in rows] |
|
categorias = [row[3] for row in rows] |
|
embeddings_preguntas = [np.frombuffer(row[2], dtype=np.float32) for row in rows] |
|
|
|
|
|
respuestas_por_categoria = {} |
|
embeddings_por_categoria = {} |
|
categorias_unicas = sorted(set(categorias)) |
|
|
|
for pregunta, respuesta, emb, cat in zip(preguntas, respuestas, embeddings_preguntas, categorias): |
|
cat = cat.lower() |
|
respuestas_por_categoria.setdefault(cat, []).append(respuesta) |
|
embeddings_por_categoria.setdefault(cat, []).append(emb) |
|
|
|
|
|
|
|
|
|
preguntas_norm = [normalizar(p) for p in preguntas] |
|
|
|
|
|
indices_por_categoria = {} |
|
for i, cat in enumerate(categorias): |
|
cat_l = cat.lower() |
|
indices_por_categoria.setdefault(cat_l, []).append(i) |
|
|
|
|
|
|
|
|
|
import difflib |
|
|
|
def responder(pregunta_usuario, categoria_usuario, fuente_alternativa): |
|
if not pregunta_usuario.strip(): |
|
return "<p style='color:red;'>Por favor, escribí una palabra clave.</p>" |
|
|
|
if not categoria_usuario: |
|
return "<p style='color:red;'>Seleccioná una categoría.</p>" |
|
|
|
categoria = categoria_usuario.lower() |
|
if categoria not in embeddings_por_categoria: |
|
return "<p style='color:red;'>Categoría no encontrada.</p>" |
|
|
|
|
|
texto_normalizado = normalizar(pregunta_usuario) |
|
|
|
|
|
palabras_clave = extraer_palabras_clave(pregunta_usuario) |
|
|
|
|
|
idxs_cat = indices_por_categoria.get(categoria, []) |
|
if not idxs_cat: |
|
return "<p style='color:red;'>No hay preguntas en esa categoría.</p>" |
|
|
|
|
|
for idx in idxs_cat: |
|
if preguntas_norm[idx] == texto_normalizado: |
|
return ( |
|
f"<p>🧠 <strong>Coincidencia exacta encontrada:</strong></p>" |
|
f"<p>📘 <strong>Pregunta:</strong> {preguntas[idx]}<br>" |
|
f"<strong>Respuesta:</strong><br>{respuestas[idx]}</p>" |
|
) |
|
|
|
|
|
for idx in idxs_cat: |
|
q_norm = preguntas_norm[idx] |
|
|
|
if texto_normalizado in q_norm or q_norm in texto_normalizado: |
|
return ( |
|
f"<p>🧠 <strong>Coincidencia por frase encontrada:</strong></p>" |
|
f"<p>📘 <strong>Pregunta:</strong> {preguntas[idx]}<br>" |
|
f"<strong>Respuesta:</strong><br>{respuestas[idx]}</p>" |
|
) |
|
|
|
for palabra in palabras_clave: |
|
if re.search(r'\b' + re.escape(normalizar(palabra)) + r'\b', q_norm): |
|
return ( |
|
f"<p>🧠 <strong>Coincidencia literal para '{palabra}':</strong></p>" |
|
f"<p>📘 <strong>Pregunta:</strong> {preguntas[idx]}<br>" |
|
f"<strong>Respuesta:</strong><br>{respuestas[idx]}</p>" |
|
) |
|
|
|
|
|
|
|
best_ratio = 0.0 |
|
best_idx = None |
|
for idx in idxs_cat: |
|
ratio = difflib.SequenceMatcher(None, texto_normalizado, preguntas_norm[idx]).ratio() |
|
if ratio > best_ratio: |
|
best_ratio = ratio |
|
best_idx = idx |
|
if best_ratio >= 0.94: |
|
return ( |
|
f"<p>🧠 <strong>Coincidencia aproximada (ratio {best_ratio:.2f}):</strong></p>" |
|
f"<p>📘 <strong>Pregunta:</strong> {preguntas[best_idx]}<br>" |
|
f"<strong>Respuesta:</strong><br>{respuestas[best_idx]}</p>" |
|
) |
|
|
|
|
|
|
|
pregunta_proc = texto_normalizado if texto_normalizado else normalizar(" ".join(palabras_clave)) |
|
emb_usuario = modelo.encode(pregunta_proc).reshape(1, -1) |
|
|
|
emb_cat = np.vstack(embeddings_por_categoria[categoria]) |
|
sims = cosine_similarity(emb_usuario, emb_cat)[0] |
|
mejor_idx_en_cat = np.argmax(sims) |
|
mejor_sim = sims[mejor_idx_en_cat] |
|
|
|
|
|
|
|
|
|
|
|
if mejor_idx_en_cat < len(idxs_cat): |
|
idx_global = idxs_cat[mejor_idx_en_cat] |
|
else: |
|
|
|
idx_global = idxs_cat[0] |
|
|
|
if mejor_sim < 0.5: |
|
busqueda = re.sub(r"\s+", "+", pregunta_usuario.strip()) |
|
link = f"https://www.google.com/search?q={busqueda}" if fuente_alternativa == "Google" else f"https://www.youtube.com/results?search_query={busqueda}" |
|
return f"No encontré una respuesta clara.<br>Consultá en <a href='{link}' target='_blank'>{fuente_alternativa}</a>" |
|
|
|
return ( |
|
f"<p>🧠 <strong>Palabras clave detectadas:</strong> {' '.join(palabras_clave) if palabras_clave else '(ninguna)'}</p>" |
|
f"<p>📘 <strong>Respuesta (sim {mejor_sim:.3f}):</strong><br>{respuestas[idx_global]}</p>" |
|
) |
|
|
|
|
|
|
|
|
|
def mostrar_pregunta_por_categoria(categoria): |
|
if categoria not in modelos_por_categoria: |
|
return "Categoría no encontrada", [], "", "" |
|
pregunta = random.choice(modelos_por_categoria[categoria]) |
|
return pregunta["pregunta"], pregunta["opciones"], pregunta["respuesta"], categoria |
|
|
|
def verificar_respuesta_cat(pregunta_usuario, opciones, respuesta_correcta, seleccion_usuario, categoria_actual): |
|
if not seleccion_usuario: |
|
return "<p style='color:red;'>Seleccioná una opción para continuar.</p>" |
|
if seleccion_usuario == respuesta_correcta: |
|
return "<p style='color:green;'>✅ ¡Correcto!</p>" |
|
else: |
|
return f"<p style='color:red;'>❌ Incorrecto. La respuesta correcta es: <strong>{respuesta_correcta}</strong></p>" |
|
|
|
|
|
|
|
chatbot = gr.Interface( |
|
fn=responder, |
|
inputs=[ |
|
gr.Textbox(lines=2, label="Tu pregunta"), |
|
gr.Dropdown(choices=categorias_unicas, label="Seleccioná la categoría"), |
|
gr.Dropdown(choices=["Google", "YouTube"], label="Buscar en otra fuente si no se encuentra") |
|
], |
|
outputs=gr.HTML(label="Respuesta del chatbot"), |
|
title="CHATBOT DE FCEFN", |
|
description="🧠 Ingresá una pregunta o palabra clave teórica de las asignaturas. Seleccioná categoría. Si no se encuentra una respuesta clara, se sugiere una fuente externa.", |
|
theme="soft" |
|
) |
|
|
|
def interfaz_parciales_cat(): |
|
categorias_disponibles = list(modelos_por_categoria.keys()) |
|
with gr.Blocks() as demo: |
|
gr.Markdown("### 🎯 Preguntas Multiple Choice para repaso") |
|
with gr.Row(): |
|
selector_categoria = gr.Dropdown(choices=categorias_disponibles, label="Elegí una categoría", value=categorias_disponibles[0]) |
|
boton_nueva = gr.Button("Siguiente pregunta") |
|
|
|
pregunta = gr.Textbox(label="Pregunta", interactive=False) |
|
opciones_radio = gr.Radio(choices=[""], label="Elegí una opción") |
|
resultado = gr.HTML() |
|
|
|
estado_respuesta = gr.State() |
|
estado_categoria = gr.State() |
|
|
|
def cargar_pregunta(categoria): |
|
p, opciones, r, c = mostrar_pregunta_por_categoria(categoria) |
|
return p, gr.update(choices=opciones, value=None), r, c, "", None |
|
|
|
def manejar_verificacion(p, o, r, s, c): |
|
return verificar_respuesta_cat(p, o, r, s, c) |
|
|
|
boton_nueva.click( |
|
fn=cargar_pregunta, |
|
inputs=[selector_categoria], |
|
outputs=[pregunta, opciones_radio, estado_respuesta, estado_categoria, resultado, opciones_radio] |
|
) |
|
|
|
opciones_radio.change( |
|
fn=manejar_verificacion, |
|
inputs=[pregunta, opciones_radio, estado_respuesta, opciones_radio, estado_categoria], |
|
outputs=resultado |
|
) |
|
|
|
return demo |
|
|
|
|
|
app = gr.TabbedInterface( |
|
[chatbot, interfaz_parciales_cat()], |
|
tab_names=["Consultar Conceptos", "Repasar Conceptos"] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
app.launch() |
|
|
|
|
|
|