|
import streamlit as st |
|
from ai_config_faiss import get_ai_assistant |
|
from ttv_web_scraper import db_load_metadata_sets |
|
import json |
|
from datetime import datetime |
|
import os |
|
import base64 |
|
|
|
|
|
if 'results' not in st.session_state: |
|
st.session_state.results = None |
|
if 'where' not in st.session_state: |
|
st.session_state.where = {} |
|
if 'num_results' not in st.session_state: |
|
st.session_state.num_results = 3 |
|
if 'favorites' not in st.session_state: |
|
st.session_state.favorites = {} |
|
if 'show_filters' not in st.session_state: |
|
st.session_state.show_filters = True |
|
|
|
for filter_type in ['company', 'speaker', 'subjects']: |
|
if f'selected_{filter_type}' not in st.session_state: |
|
st.session_state[f'selected_{filter_type}'] = [] |
|
|
|
|
|
@st.cache_resource |
|
def get_assistant(): |
|
return get_ai_assistant() |
|
|
|
|
|
def format_timestamp(timestamp): |
|
try: |
|
time = datetime.strptime(timestamp, "%H:%M:%S") |
|
return time.strftime("%M:%S") |
|
except ValueError: |
|
return timestamp |
|
|
|
|
|
def get_file_content(file_path): |
|
if os.path.exists(file_path): |
|
with open(file_path, "rb") as file: |
|
return file.read() |
|
return None |
|
|
|
|
|
def create_markdown_download_link(markdown_content): |
|
b64 = base64.b64encode(markdown_content.encode()).decode() |
|
return f'<a href="data:text/markdown;base64,{b64}" download="favorites.md">Download Favorites</a>' |
|
|
|
|
|
def update_filter(filter_type, item): |
|
if item in st.session_state[f'selected_{filter_type}']: |
|
st.session_state[f'selected_{filter_type}'].remove(item) |
|
else: |
|
st.session_state[f'selected_{filter_type}'].append(item) |
|
update_where() |
|
|
|
|
|
def update_where(): |
|
st.session_state.where = {} |
|
for filter_type in ['company', 'speaker', 'subjects']: |
|
if st.session_state[f'selected_{filter_type}']: |
|
st.session_state.where[filter_type] = st.session_state[f'selected_{filter_type}'] |
|
|
|
|
|
def toggle_show_filters(): |
|
st.session_state.show_filters = not st.session_state.show_filters |
|
|
|
|
|
def update_num_results(): |
|
st.session_state.num_results = st.session_state.num_results_slider |
|
|
|
|
|
def submit_query(): |
|
if not st.session_state.where: |
|
st.warning("Please select at least one filter before submitting.") |
|
return |
|
|
|
assistant = get_assistant() |
|
with st.spinner("Thinking..."): |
|
response = assistant.query("", num_results=st.session_state.num_results, filters=st.session_state.where) |
|
|
|
try: |
|
st.session_state.results = json.loads(response) |
|
except json.JSONDecodeError: |
|
st.error("Failed to parse the response. Please try again.") |
|
|
|
|
|
def update_favorite(result_id): |
|
result = next((r for r in st.session_state.results if r['id'] == result_id), None) |
|
if result: |
|
result['favorite'] = not result['favorite'] |
|
if result['favorite']: |
|
st.session_state.favorites[result_id] = result |
|
else: |
|
st.session_state.favorites.pop(result_id, None) |
|
|
|
|
|
def clear_favorites(): |
|
st.session_state.favorites.clear() |
|
st.success("All favorites have been cleared.") |
|
|
|
|
|
def save_favorites(): |
|
if st.session_state.favorites: |
|
markdown_content = "# Favorites\n\n" |
|
for fav in st.session_state.favorites.values(): |
|
markdown_content += f"## {fav['metadata']['title']}\n\n" |
|
markdown_content += f"**Speaker:** {fav['metadata']['speaker']} ({fav['metadata']['company']})\n\n" |
|
markdown_content += f"**Date:** {fav['metadata']['date']}\n\n" |
|
markdown_content += f"**Time:** {format_timestamp(fav['metadata']['start_timestamp'])} - {format_timestamp(fav['metadata']['end_timestamp'])}\n\n" |
|
markdown_content += f"**Transcript:** {fav['content']}\n\n" |
|
play_link = fav['metadata']['play'] |
|
modified_play_link = f"{play_link}&controls=1&showinfo=0&modestbranding=1" |
|
markdown_content += f"**Video Link:** [{play_link}]({modified_play_link})\n\n" |
|
if fav['metadata']['subjects']: |
|
markdown_content += f"**Subjects:** {', '.join(fav['metadata']['subjects'])}\n\n" |
|
markdown_content += "---\n\n" |
|
st.markdown(create_markdown_download_link(markdown_content), unsafe_allow_html=True) |
|
else: |
|
st.warning("No favorites selected.") |
|
|
|
|
|
def display_result(result, favorite_tab=False): |
|
st.markdown(f"### {result['metadata']['title']}") |
|
col1, col2 = st.columns([3, 2]) |
|
with col1: |
|
st.markdown(f"**Speaker:** {result['metadata']['speaker']} ({result['metadata']['company']})") |
|
st.markdown(f"**Date:** {result['metadata']['date']}") |
|
st.markdown("**Transcript:**") |
|
st.markdown(result['content']) |
|
with col2: |
|
start_time = format_timestamp(result['metadata']['start_timestamp']) |
|
end_time = format_timestamp(result['metadata']['end_timestamp']) |
|
st.markdown(f"**Time:** {start_time} - {end_time}") |
|
play_url = result['metadata']['play'] |
|
if play_url: |
|
st.components.v1.iframe(src=play_url, width=300, height=169, scrolling=True) |
|
else: |
|
st.warning("No video found") |
|
if 'download' in result['metadata']: |
|
download_path = result['metadata']['download'] |
|
file_name = os.path.basename(download_path) |
|
file_content = get_file_content(download_path) |
|
if file_content: |
|
prefix = "fav_dl_" if favorite_tab else "dl_" |
|
st.download_button(label="Download Clip", data=file_content, file_name=file_name, mime="video/mp4", key=f"{prefix}{result['id']}") |
|
else: |
|
st.warning(f"Clip file not found: {file_name}") |
|
if result['metadata']['subjects']: |
|
st.markdown("**Subjects:**") |
|
subject_tags = ' '.join([f"<span style='background-color: #f0f0f0; color:black; padding: 2px 6px; margin: 2px; border-radius: 10px;'>{subject}</span>" for subject in result['metadata']['subjects']]) |
|
st.markdown(subject_tags, unsafe_allow_html=True) |
|
favorite_key = f"fav_{favorite_tab}_{result['id']}" |
|
st.checkbox("Favorite", value=result['favorite'], key=favorite_key, on_change=update_favorite, args=(result['id'],)) |
|
st.markdown("---") |
|
|
|
|
|
def main(): |
|
st.title("Telecom TV Video Expert") |
|
st.markdown("Trained on data from [here](https://www.telecomtv.com/content/dsp-leaders-forum-videos/)") |
|
|
|
_, _, companies, sentiments, subjects = db_load_metadata_sets() |
|
|
|
tab1, tab2 = st.tabs(["Search", "Favorites"]) |
|
|
|
with tab1: |
|
st.header("Filter Options") |
|
st.checkbox("Show Filters", value=st.session_state.show_filters, on_change=toggle_show_filters) |
|
|
|
if st.session_state.show_filters: |
|
col1, col2, col3 = st.columns(3) |
|
for filter_type, items in [('company', companies.keys()), ('speaker', set().union(*companies.values())), ('subjects', subjects)]: |
|
with locals()[f'col{["company", "speaker", "subjects"].index(filter_type) + 1}']: |
|
st.subheader(filter_type.capitalize()) |
|
for item in sorted(items): |
|
st.checkbox(item, key=f'{filter_type}_{item}', |
|
value=item in st.session_state[f'selected_{filter_type}'], |
|
on_change=update_filter, |
|
args=(filter_type, item)) |
|
|
|
st.slider("Number of relevant transcript excerpts to show:", min_value=1, max_value=500, value=st.session_state.num_results, step=1, key='num_results_slider', on_change=update_num_results) |
|
st.button("Submit", on_click=submit_query) |
|
|
|
if st.session_state.results: |
|
for result in st.session_state.results: |
|
result['favorite'] = result['id'] in st.session_state.favorites |
|
display_result(result) |
|
|
|
with tab2: |
|
st.header("Favorites") |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.button("Save Favorites", on_click=save_favorites) |
|
with col2: |
|
st.button("Clear Favorites", on_click=clear_favorites) |
|
|
|
for fav in st.session_state.favorites.values(): |
|
display_result(fav, favorite_tab=True) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|