rafaaa2105's picture
Update app.py
5d4717e verified
import os
import json
import chainlit as cl
from embedchain import Pipeline as App
from datetime import datetime
try:
HF_TOKEN = os.environ['HUGGINGFACE_API_TOKEN']
if HF_TOKEN is None:
raise ValueError('HUGGINGFACE_API_KEY is not set')
except Exception as err:
raise(err)
class DatabaseError(Exception):
pass
class JSONDB:
def __init__(self, file_path):
self.file_path = file_path
def _create_file_if_not_exists(self):
if not os.path.exists(self.file_path):
with open(self.file_path, 'w') as fp:
json.dump([], fp)
def add_record(self, record):
try:
self._create_file_if_not_exists()
with open(self.file_path, 'r+') as fp:
try:
data = json.load(fp)
if record not in data:
data.append(record)
else:
pass
except Exception as err:
print(f'[DEBUG] Error adding record: {str(err)}')
raise(err)
fp.seek(0)
json.dump(data, fp, indent=4)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
raise DatabaseError(f"Error adding record: {str(e)}")
def get_all_records(self):
try:
with open(self.file_path, 'r') as fp:
# Attempt to load data, handle empty file scenario
try:
data = json.load(fp)
except json.JSONDecodeError:
data = []
return data
except (FileNotFoundError, IOError) as e:
raise DatabaseError(f"Error getting all records: {str(e)}")
def get_top_records(self, n):
try:
records = self.get_all_records()
sorted_records = sorted(records, key=lambda x: x.get('added', 0), reverse=True)
return sorted_records[:n]
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
raise DatabaseError(f"Error getting top records: {str(e)}")
@cl.on_chat_start
async def setup_app():
app = App.from_config(config_path='data/config.yaml')
app.collect_metrics = False
cl.user_session.set('app', app)
db = JSONDB('data/index.json')
cl.user_session.set('db', db)
def update_db(data):
db = cl.user_session.get('db')
record = {
'url': data, # Store the URL as a JSON field
'added': datetime.now().strftime('%d/%m/%Y %H:%M:%S')
}
db.add_record(record)
@cl.on_message
async def main(message: cl.Message):
task_list = cl.TaskList()
task_list.status = 'Running...'
app = cl.user_session.get('app')
msg = cl.Message(content='')
user_message = message.content
if user_message.startswith('/help'):
markdown_content = "| Command | Description |\n| --- | --- |\n"
markdown_content += "| /add | Add a document to the knowledge base |\n"
markdown_content += "| /kb | Display the knowledge base |\n"
markdown_content += "| /help | Display the available commands |\n"
markdown_content += "| * | Chat with the AI |\n"
await cl.Message(
content=markdown_content
).send()
elif user_message.startswith('/add'):
data = user_message.replace('/add', '').strip()
db = cl.user_session.get('db')
records = db.get_all_records()
if data in [record['url'] for record in records]:
await cl.Message(
content='This document already exists in the knowledge base!'
).send()
else:
add_task = cl.Task(title='Adding to knowledge base', status=cl.TaskStatus.RUNNING)
await task_list.add_task(add_task)
await task_list.send()
app.add(data)
update_db(data)
add_task.status = cl.TaskStatus.DONE
await task_list.send()
await cl.Message(
content='Added data to knowledge base!'
).send()
elif user_message.startswith('/kb'):
kb_task = cl.Task(title='Getting records', status=cl.TaskStatus.RUNNING)
await task_list.add_task(kb_task)
await task_list.send()
data = cl.user_session.get('db').get_top_records(25)
kb_task.status = cl.TaskStatus.DONE
await task_list.send()
if len(data) == 0:
await cl.Message(
content='No documents in json index!'
).send()
else:
markdown_content = "| URL | Added |\n| --- | --- |\n"
for record in data:
url = record['url']
added = record['added']
markdown_content += f"| {url} | {added} |\n"
await cl.Message(
content=markdown_content
).send()
else:
chat_task = cl.Task(title='Querying LLM', status=cl.TaskStatus.RUNNING)
await task_list.add_task(chat_task)
await task_list.send()
for chunk in await cl.make_async(app.chat)(message.content):
await msg.stream_token(chunk)
chat_task.status = cl.TaskStatus.DONE
await task_list.send()
await msg.send()