muni's picture
Add TinyTroupe
82a7a28
from tinytroupe.utils import JsonSerializableRegistry
import tinytroupe.utils as utils
from tinytroupe.agent import logger
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
#######################################################################################################################
# Grounding connectors
#######################################################################################################################
class GroundingConnector(JsonSerializableRegistry):
"""
An abstract class representing a grounding connector. A grounding connector is a component that allows an agent to ground
its knowledge in external sources, such as files, web pages, databases, etc.
"""
serializable_attributes = ["name"]
def __init__(self, name:str) -> None:
self.name = name
def retrieve_relevant(self, relevance_target:str, source:str, top_k=20) -> list:
raise NotImplementedError("Subclasses must implement this method.")
def retrieve_by_name(self, name:str) -> str:
raise NotImplementedError("Subclasses must implement this method.")
def list_sources(self) -> list:
raise NotImplementedError("Subclasses must implement this method.")
@utils.post_init
class BaseSemanticGroundingConnector(GroundingConnector):
"""
A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves
documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation
is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's
data structure that stores a unit of content, not necessarily a file.
"""
serializable_attributes = ["documents"]
def __init__(self, name:str="Semantic Grounding") -> None:
super().__init__(name)
self.documents = None
self.name_to_document = None
# @post_init ensures that _post_init is called after the __init__ method
def _post_init(self):
"""
This will run after __init__, since the class has the @post_init decorator.
It is convenient to separate some of the initialization processes to make deserialize easier.
"""
self.index = None
if not hasattr(self, 'documents') or self.documents is None:
self.documents = []
if not hasattr(self, 'name_to_document') or self.name_to_document is None:
self.name_to_document = {}
self.add_documents(self.documents)
def retrieve_relevant(self, relevance_target:str, top_k=20) -> list:
"""
Retrieves all values from memory that are relevant to a given target.
"""
if self.index is not None:
retriever = self.index.as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(relevance_target)
else:
nodes = []
retrieved = []
for node in nodes:
content = "SOURCE: " + node.metadata.get('file_name', '(unknown)')
content += "\n" + "SIMILARITY SCORE:" + str(node.score)
content += "\n" + "RELEVANT CONTENT:" + node.text
retrieved.append(content)
logger.debug(f"Content retrieved: {content[:200]}")
return retrieved
def retrieve_by_name(self, name:str) -> list:
"""
Retrieves a content source by its name.
"""
# TODO also optionally provide a relevance target?
results = []
if self.name_to_document is not None and name in self.name_to_document:
docs = self.name_to_document[name]
for i, doc in enumerate(docs):
if doc is not None:
content = f"SOURCE: {name}\n"
content += f"PAGE: {i}\n"
content += "CONTENT: \n" + doc.text[:10000] # TODO a more intelligent way to limit the content
results.append(content)
return results
def list_sources(self) -> list:
"""
Lists the names of the available content sources.
"""
if self.name_to_document is not None:
return list(self.name_to_document.keys())
else:
return []
def add_document(self, document, doc_to_name_func=None) -> None:
"""
Indexes a document for semantic retrieval.
"""
self.add_documents([document], doc_to_name_func)
def add_documents(self, new_documents, doc_to_name_func=None) -> list:
"""
Indexes documents for semantic retrieval.
"""
# index documents by name
if len(new_documents) > 0:
# add the new documents to the list of documents
self.documents += new_documents
# process documents individually too
for document in new_documents:
# out of an abundance of caution, we sanitize the text
document.text = utils.sanitize_raw_string(document.text)
if doc_to_name_func is not None:
name = doc_to_name_func(document)
# self.name_to_document[name] contains a list, since each source file could be split into multiple pages
if name in self.name_to_document:
self.name_to_document[name].append(document)
else:
self.name_to_document[name] = [document]
# index documents for semantic retrieval
if self.index is None:
self.index = VectorStoreIndex.from_documents(self.documents)
else:
self.index.refresh(self.documents)
@utils.post_init
class LocalFilesGroundingConnector(BaseSemanticGroundingConnector):
serializable_attributes = ["folders_paths"]
def __init__(self, name:str="Local Files", folders_paths: list=None) -> None:
super().__init__(name)
self.folders_paths = folders_paths
# @post_init ensures that _post_init is called after the __init__ method
def _post_init(self):
"""
This will run after __init__, since the class has the @post_init decorator.
It is convenient to separate some of the initialization processes to make deserialize easier.
"""
self.loaded_folders_paths = []
if not hasattr(self, 'folders_paths') or self.folders_paths is None:
self.folders_paths = []
self.add_folders(self.folders_paths)
def add_folders(self, folders_paths:list) -> None:
"""
Adds a path to a folder with files used for grounding.
"""
if folders_paths is not None:
for folder_path in folders_paths:
try:
logger.debug(f"Adding the following folder to grounding index: {folder_path}")
self.add_folder(folder_path)
except (FileNotFoundError, ValueError) as e:
print(f"Error: {e}")
print(f"Current working directory: {os.getcwd()}")
print(f"Provided path: {folder_path}")
print("Please check if the path exists and is accessible.")
def add_folder(self, folder_path:str) -> None:
"""
Adds a path to a folder with files used for grounding.
"""
if folder_path not in self.loaded_folders_paths:
self._mark_folder_as_loaded(folder_path)
# for PDF files, please note that the document will be split into pages: https://github.com/run-llama/llama_index/issues/15903
new_files = SimpleDirectoryReader(folder_path).load_data()
self.add_documents(new_files, lambda doc: doc.metadata["file_name"])
def add_file_path(self, file_path:str) -> None:
"""
Adds a path to a file used for grounding.
"""
# a trick to make SimpleDirectoryReader work with a single file
new_files = SimpleDirectoryReader(input_files=[file_path]).load_data()
logger.debug(f"Adding the following file to grounding index: {new_files}")
self.add_documents(new_files, lambda doc: doc.metadata["file_name"])
def _mark_folder_as_loaded(self, folder_path:str) -> None:
if folder_path not in self.loaded_folders_paths:
self.loaded_folders_paths.append(folder_path)
if folder_path not in self.folders_paths:
self.folders_paths.append(folder_path)
@utils.post_init
class WebPagesGroundingConnector(BaseSemanticGroundingConnector):
serializable_attributes = ["web_urls"]
def __init__(self, name:str="Web Pages", web_urls: list=None) -> None:
super().__init__(name)
self.web_urls = web_urls
# @post_init ensures that _post_init is called after the __init__ method
def _post_init(self):
self.loaded_web_urls = []
if not hasattr(self, 'web_urls') or self.web_urls is None:
self.web_urls = []
# load web urls
self.add_web_urls(self.web_urls)
def add_web_urls(self, web_urls:list) -> None:
"""
Adds the data retrieved from the specified URLs to grounding.
"""
filtered_web_urls = [url for url in web_urls if url not in self.loaded_web_urls]
for url in filtered_web_urls:
self._mark_web_url_as_loaded(url)
if len(filtered_web_urls) > 0:
new_documents = SimpleWebPageReader(html_to_text=True).load_data(filtered_web_urls)
self.add_documents(new_documents, lambda doc: doc.id_)
def add_web_url(self, web_url:str) -> None:
"""
Adds the data retrieved from the specified URL to grounding.
"""
# we do it like this because the add_web_urls could run scrapes in parallel, so it is better
# to implement this one in terms of the other
self.add_web_urls([web_url])
def _mark_web_url_as_loaded(self, web_url:str) -> None:
if web_url not in self.loaded_web_urls:
self.loaded_web_urls.append(web_url)
if web_url not in self.web_urls:
self.web_urls.append(web_url)