|
import os |
|
|
|
from dotenv import load_dotenv |
|
from sentence_transformers import ( |
|
SentenceTransformer, |
|
CrossEncoder, |
|
) |
|
from ctransformers import AutoModelForCausalLM |
|
from torch import Tensor |
|
from google import genai |
|
from google.genai import types |
|
from app.core.chunks import Chunk |
|
from app.settings import settings, BASE_DIR, GeminiEmbeddingSettings |
|
|
|
load_dotenv() |
|
|
|
|
|
class Embedder: |
|
def __init__(self, model: str = "BAAI/bge-m3"): |
|
self.device: str = settings.device |
|
self.model_name: str = model |
|
self.model: SentenceTransformer = SentenceTransformer(model, device=self.device) |
|
|
|
""" |
|
Encodes string to dense vector |
|
""" |
|
|
|
def encode(self, text: str | list[str]) -> Tensor | list[Tensor]: |
|
return self.model.encode(sentences=text, show_progress_bar=False, batch_size=32) |
|
|
|
""" |
|
Returns the dimensionality of dense vector |
|
""" |
|
|
|
def get_vector_dimensionality(self) -> int | None: |
|
return self.model.get_sentence_embedding_dimension() |
|
|
|
|
|
class Reranker: |
|
def __init__(self, model: str = "cross-encoder/ms-marco-MiniLM-L6-v2"): |
|
self.device: str = settings.device |
|
self.model_name: str = model |
|
self.model: CrossEncoder = CrossEncoder(model, device=self.device) |
|
|
|
""" |
|
Returns re-sorted (by relevance) vector with dicts, from which we need only the 'corpus_id' |
|
since it is a position of chunk in original list |
|
""" |
|
|
|
def rank(self, query: str, chunks: list[Chunk]) -> list[dict[str, int]]: |
|
return self.model.rank(query, [chunk.get_raw_text() for chunk in chunks]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LocalLLM: |
|
def __init__(self): |
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
**settings.local_llm.model_dump() |
|
) |
|
|
|
""" |
|
Produces the response to user's prompt |
|
|
|
stream -> flag, determines weather we need to wait until the response is ready or can show it token by token |
|
|
|
TODO: invent a way to really stream the answer (as return value) |
|
""" |
|
|
|
def get_response( |
|
self, |
|
prompt: str, |
|
stream: bool = True, |
|
logging: bool = True, |
|
use_default_config: bool = True, |
|
) -> str: |
|
|
|
with open("../prompt.txt", "w") as f: |
|
f.write(prompt) |
|
|
|
generated_text = "" |
|
tokenized_text: list[int] = self.model.tokenize(text=prompt) |
|
response: list[int] = self.model.generate( |
|
tokens=tokenized_text, **settings.local_llm.model_dump() |
|
) |
|
|
|
if logging: |
|
print(response) |
|
|
|
if not stream: |
|
return self.model.detokenize(response) |
|
|
|
for token in response: |
|
chunk = self.model.detokenize([token]) |
|
generated_text += chunk |
|
if logging: |
|
print(chunk, end="", flush=True) |
|
|
|
return generated_text |
|
|
|
|
|
class GeminiLLM: |
|
def __init__(self, model="gemini-2.0-flash"): |
|
self.client = genai.Client(api_key=settings.api_key) |
|
self.model = model |
|
|
|
def get_response( |
|
self, |
|
prompt: str, |
|
stream: bool = True, |
|
logging: bool = True, |
|
use_default_config: bool = False, |
|
) -> str: |
|
path_to_prompt = os.path.join(BASE_DIR, "prompt.txt") |
|
with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f: |
|
f.write(prompt) |
|
|
|
response = self.client.models.generate_content( |
|
model=self.model, |
|
contents=prompt, |
|
config=( |
|
types.GenerateContentConfig(**settings.gemini_generation.model_dump()) |
|
if use_default_config |
|
else None |
|
), |
|
) |
|
|
|
return response.text |
|
|
|
async def get_streaming_response( |
|
self, |
|
prompt: str, |
|
stream: bool = True, |
|
logging: bool = True, |
|
use_default_config: bool = False, |
|
): |
|
path_to_prompt = os.path.join(BASE_DIR, "prompt.txt") |
|
with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f: |
|
f.write(prompt) |
|
|
|
response = self.client.models.generate_content_stream( |
|
model=self.model, |
|
contents=prompt, |
|
config=( |
|
types.GenerateContentConfig(**settings.gemini_generation.model_dump()) |
|
if use_default_config |
|
else None |
|
), |
|
) |
|
|
|
for chunk in response: |
|
yield chunk |
|
|
|
|
|
class GeminiEmbed: |
|
def __init__(self, model="text-embedding-004"): |
|
self.client = genai.Client(api_key=settings.api_key) |
|
self.model = model |
|
self.settings = GeminiEmbeddingSettings() |
|
|
|
def encode(self, text: str | list[str]) -> list[Tensor]: |
|
|
|
if isinstance(text, str): |
|
text = [text] |
|
|
|
output: list[Tensor] = [] |
|
max_batch_size = 100 |
|
|
|
for i in range(0, len(text), max_batch_size): |
|
batch = text[i : i + max_batch_size] |
|
response = self.client.models.embed_content( |
|
model=self.model, |
|
contents=batch, |
|
config=types.EmbedContentConfig( |
|
**settings.gemini_embedding.model_dump() |
|
), |
|
).embeddings |
|
|
|
for i, emb in enumerate(response): |
|
output.append(emb.values) |
|
|
|
return output |
|
|
|
def get_vector_dimensionality(self) -> int | None: |
|
return getattr(self.settings, "output_dimensionality") |
|
|
|
|
|
class Wrapper: |
|
def __init__(self, model: str = "gemini-2.0-flash"): |
|
self.model = model |
|
self.client = genai.Client(api_key=settings.api_key) |
|
|
|
def wrap(self, prompt: str) -> str: |
|
response = self.client.models.generate_content( |
|
model=self.model, |
|
contents=prompt, |
|
config=types.GenerateContentConfig(**settings.gemini_wrapper.model_dump()) |
|
) |
|
|
|
return response.text |
|
|