Spaces:
Build error
Build error
import os | |
import json | |
import chainlit as cl | |
from embedchain import Pipeline as App | |
from datetime import datetime | |
try: | |
HF_TOKEN = os.environ['HUGGINGFACE_API_TOKEN'] | |
if HF_TOKEN is None: | |
raise ValueError('HUGGINGFACE_API_KEY is not set') | |
except Exception as err: | |
raise(err) | |
class DatabaseError(Exception): | |
pass | |
class JSONDB: | |
def __init__(self, file_path): | |
self.file_path = file_path | |
def _create_file_if_not_exists(self): | |
if not os.path.exists(self.file_path): | |
with open(self.file_path, 'w') as fp: | |
json.dump([], fp) | |
def add_record(self, record): | |
try: | |
self._create_file_if_not_exists() | |
with open(self.file_path, 'r+') as fp: | |
try: | |
data = json.load(fp) | |
if record not in data: | |
data.append(record) | |
else: | |
pass | |
except Exception as err: | |
print(f'[DEBUG] Error adding record: {str(err)}') | |
raise(err) | |
fp.seek(0) | |
json.dump(data, fp, indent=4) | |
except (FileNotFoundError, json.JSONDecodeError, IOError) as e: | |
raise DatabaseError(f"Error adding record: {str(e)}") | |
def get_all_records(self): | |
try: | |
with open(self.file_path, 'r') as fp: | |
# Attempt to load data, handle empty file scenario | |
try: | |
data = json.load(fp) | |
except json.JSONDecodeError: | |
data = [] | |
return data | |
except (FileNotFoundError, IOError) as e: | |
raise DatabaseError(f"Error getting all records: {str(e)}") | |
def get_top_records(self, n): | |
try: | |
records = self.get_all_records() | |
sorted_records = sorted(records, key=lambda x: x.get('added', 0), reverse=True) | |
return sorted_records[:n] | |
except (FileNotFoundError, json.JSONDecodeError, IOError) as e: | |
raise DatabaseError(f"Error getting top records: {str(e)}") | |
async def setup_app(): | |
app = App.from_config(config_path='data/config.yaml') | |
app.collect_metrics = False | |
cl.user_session.set('app', app) | |
db = JSONDB('data/index.json') | |
cl.user_session.set('db', db) | |
def update_db(data): | |
db = cl.user_session.get('db') | |
record = { | |
'url': data, # Store the URL as a JSON field | |
'added': datetime.now().strftime('%d/%m/%Y %H:%M:%S') | |
} | |
db.add_record(record) | |
async def main(message: cl.Message): | |
task_list = cl.TaskList() | |
task_list.status = 'Running...' | |
app = cl.user_session.get('app') | |
msg = cl.Message(content='') | |
user_message = message.content | |
if user_message.startswith('/help'): | |
markdown_content = "| Command | Description |\n| --- | --- |\n" | |
markdown_content += "| /add | Add a document to the knowledge base |\n" | |
markdown_content += "| /kb | Display the knowledge base |\n" | |
markdown_content += "| /help | Display the available commands |\n" | |
markdown_content += "| * | Chat with the AI |\n" | |
await cl.Message( | |
content=markdown_content | |
).send() | |
elif user_message.startswith('/add'): | |
data = user_message.replace('/add', '').strip() | |
db = cl.user_session.get('db') | |
records = db.get_all_records() | |
if data in [record['url'] for record in records]: | |
await cl.Message( | |
content='This document already exists in the knowledge base!' | |
).send() | |
else: | |
add_task = cl.Task(title='Adding to knowledge base', status=cl.TaskStatus.RUNNING) | |
await task_list.add_task(add_task) | |
await task_list.send() | |
app.add(data) | |
update_db(data) | |
add_task.status = cl.TaskStatus.DONE | |
await task_list.send() | |
await cl.Message( | |
content='Added data to knowledge base!' | |
).send() | |
elif user_message.startswith('/kb'): | |
kb_task = cl.Task(title='Getting records', status=cl.TaskStatus.RUNNING) | |
await task_list.add_task(kb_task) | |
await task_list.send() | |
data = cl.user_session.get('db').get_top_records(25) | |
kb_task.status = cl.TaskStatus.DONE | |
await task_list.send() | |
if len(data) == 0: | |
await cl.Message( | |
content='No documents in json index!' | |
).send() | |
else: | |
markdown_content = "| URL | Added |\n| --- | --- |\n" | |
for record in data: | |
url = record['url'] | |
added = record['added'] | |
markdown_content += f"| {url} | {added} |\n" | |
await cl.Message( | |
content=markdown_content | |
).send() | |
else: | |
chat_task = cl.Task(title='Querying LLM', status=cl.TaskStatus.RUNNING) | |
await task_list.add_task(chat_task) | |
await task_list.send() | |
for chunk in await cl.make_async(app.chat)(message.content): | |
await msg.stream_token(chunk) | |
chat_task.status = cl.TaskStatus.DONE | |
await task_list.send() | |
await msg.send() | |