Spaces:
Running
Running
import os | |
import time | |
import json | |
import gradio | |
import functools | |
import uuid | |
import httpx | |
from httpx_sse import connect_sse | |
from tavily import TavilyClient | |
from linkup import LinkupClient | |
from ddgs import DDGS | |
from langfuse import Langfuse | |
from langfuse.openai import OpenAI | |
tavily = TavilyClient(os.getenv("TAVILY_API_KEY")) | |
linkup = LinkupClient(os.getenv("LINKUP_API_KEY")) | |
openai = OpenAI(base_url=os.getenv("NVIDIA_BASE_URL"), api_key=os.getenv("NVIDIA_API_KEY")) | |
langfuse = Langfuse() | |
system_prompt = "Jesteś pomocnym Asystentem Interesanta Gov.pl stworzonym przez Jerzego Głowackiego z wbudowaną funkcją wyszukiwania na stronach rządowych Gov.pl. Twoim zadaniem jest dostarczanie aktualnych, rzetelnych, szczegółowych i wyczerpujących odpowiedzi, krok po kroku, korzystając z własnej wiedzy oraz z podanych wyników wyszukiwania na stronach Gov.pl. Dzisiaj jest " + time.strftime("%d.%m.%Y") + " r. Wybory Prezydenta RP w 2025 r. wygrał Karol Nawrocki, który pokonał w drugiej turze Rafała Trzaskowskiego i jest aktualnym Prezydentem RP, a Premierem jest Donald Tusk." | |
def set_new_session_id(): | |
global session_id | |
session_id = str(uuid.uuid4()) | |
set_new_session_id() | |
def tavily_search(query): | |
search = tavily.search( | |
query=query, | |
search_depth="advanced", | |
chunks_per_source=2, | |
# include_raw_content=True, | |
country="poland", | |
include_domains=["gov.pl"] | |
) | |
return [{"tytul": r["title"], "url": r["url"], "tresc": r["content"]} for r in search["results"]] | |
def linkup_search(query): | |
search = linkup.search( | |
query=query, | |
depth="standard", | |
output_type="searchResults", | |
include_domains=["gov.pl"], | |
include_images=False | |
) | |
return [{"tytul": r.name, "url": r.url, "tresc": r.content} for r in search.results[:5]] | |
def ddgs_search(query): | |
results = DDGS().text( | |
query=query+" site:gov.pl", | |
region="pl-pl", | |
max_results=5 | |
) | |
return [{"tytul": r["title"], "url": r["href"], "tresc": r["body"]} for r in results] | |
def search(message, search_provider): | |
return tavily_search(message[:400]) if search_provider == "tavily" else linkup_search(message) if search_provider == "linkup" else ddgs_search(message) | |
def bielik_inference(history, trace_id, model_id): | |
return openai.chat.completions.create( | |
model="speakleash/"+model_id, | |
messages=history, | |
temperature=0.1, | |
stream=True, | |
trace_id=trace_id, | |
stop=["Źródła:"] | |
) | |
def pllum_inference(history, model_id): | |
json = { | |
"prompt": f"{history[0]['content']}\n\n{history[-1]['content']}", | |
"system_prompt": "", | |
"temperature": 0.1, | |
"top_p": 0.5, | |
"model": model_id | |
} | |
try: | |
with httpx.Client() as client: | |
with connect_sse(client, "POST", os.getenv("PLLUM_BASE_URL"), json=json) as event_source: | |
for sse in event_source.iter_sse(): | |
if sse.event == "new_message": | |
yield sse.data | |
except httpx.HTTPError as ex: | |
print(ex) | |
def respond(message, history, model_id="bielik-11b-v2.3-instruct", search_provider="tavily"): | |
if not message: | |
return | |
message = message.strip() | |
global session_id | |
trace = langfuse.trace(name="respond", session_id=session_id, input=message, metadata={"model_id": model_id}) | |
response = gradio.ChatMessage(content="", metadata={"title": "Wyszukiwanie na stronach Gov.pl…", "id": 0, "status": "pending"}) | |
yield response | |
start_time = time.time() | |
span = trace.span(name="search", input=message[:400]) | |
try: | |
serp = search(message, search_provider) | |
except: | |
try: | |
search_provider = "linkup" if search_provider == "tavily" else "ddgs" if search_provider == "linkup" else "tavily" | |
serp = search(message, search_provider) | |
except: | |
search_provider = "ddgs" if search_provider == "tavily" else "tavily" if search_provider == "linkup" else "linkup" | |
serp = search(message, search_provider) | |
span.end(output=serp) | |
response.content = "Gotowe!" | |
response.metadata["status"] = "done" | |
response.metadata["duration"] = time.time() - start_time | |
yield response | |
system_message = f"{system_prompt}\n\nWyniki wyszukiwania w formacie JSON: {json.dumps(serp).encode('utf-8').decode('unicode_escape')}" | |
history = [h for h in history if h["content"] != "Gotowe!"] | |
history.insert(0, {"role": "system", "content": system_message}) | |
history.append({"role": "user", "content": message}) | |
print("===\nsearch: ", search_provider, "\n", history) | |
curr_chunk = "" | |
response = [response, gradio.ChatMessage(content="")] | |
completion = bielik_inference(history, trace.id, model_id) if model_id.startswith("bielik") else pllum_inference(history, model_id) | |
for chunk in completion: | |
if type(chunk) is str: | |
curr_chunk = chunk | |
response[1].content += curr_chunk | |
yield response | |
elif chunk.choices[0].delta.content is not None: | |
curr_chunk = chunk.choices[0].delta.content | |
response[1].content += curr_chunk | |
yield response | |
if curr_chunk and response[1].content.endswith(curr_chunk * 2): | |
response[1].content = response[1].content[:-len(curr_chunk)] | |
yield response | |
print(response[1]) | |
if serp: | |
response[1].content += "\n\nŹródła:\n" + "\n".join([f"{str(i)}. [{r['tytul']}]({r['url']})" for i, r in enumerate(serp, 1)]) | |
yield response | |
trace.update(output=response[1].content) | |
with gradio.Blocks(theme="shivi/calm_seafoam",css=":root{--body-text-color:var(--neutral-900)}button.primary{font-size:0 !important}button.primary:after{font-size:var(--button-medium-text-size);content:'Nowy czat'}#component-0 .form:first-child{order:1}") as chat: | |
textbox = gradio.Textbox( | |
show_label=False, | |
label="Pytanie", | |
placeholder="Zadaj pytanie…", | |
scale=7, | |
autofocus=True, | |
submit_btn=True, | |
stop_btn=True | |
) | |
chatbot = gradio.Chatbot( | |
label="Czatbot", | |
scale=1, | |
height=400, | |
type="messages", | |
autoscroll=True, | |
show_share_button=False, | |
show_copy_button=True | |
) | |
chatbot.clear(set_new_session_id) | |
gradio.ChatInterface( | |
respond, | |
chatbot=chatbot, | |
textbox=textbox, | |
title="Asystent Interesanta Gov.pl", | |
description="Czatbot AI pomagający wyszukiwać aktualne informacje na stronach Gov.pl, oparty o polski otwarty model językowy Bielik 11B od SpeakLeash. Stworzony przez Jerzego Głowackiego na licencji Apache 2.0. Sprawdź też <a href='https://ai-gov.pl/widzet.html' target='_blank'>widżet czatu na stronę WWW</a> i <a href='https://huggingface.co/spaces/jglowa/ai-gov.pl/tree/main' target='_blank'>kod źródłowy</a>. Wersja beta, używasz na własną odpowiedzialność.", | |
examples=[["Jak uzyskać NIP?"], ["Jak założyć profil zaufany?"], ["Komu przysługuje zasiłek pielęgnacyjny?"], ["Jak zarejestrować JDG?"]], | |
cache_mode="lazy", | |
save_history=True, | |
analytics_enabled=False, | |
show_progress="full", | |
type="messages", | |
additional_inputs=[ | |
gradio.Dropdown(["bielik-11b-v2.3-instruct", "pllum-12b-chat"], label="Model językowy"), | |
gradio.Dropdown(["tavily", "linkup", "ddgs"], label="Silnik wyszukiwania") | |
] | |
) | |
chat.launch() |