|
import os, json, re, logging, requests, markdown, time, io |
|
from datetime import datetime |
|
import random |
|
import base64 |
|
from io import BytesIO |
|
from PIL import Image |
|
|
|
import streamlit as st |
|
from openai import OpenAI |
|
|
|
from gradio_client import Client |
|
import pandas as pd |
|
import PyPDF2 |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") |
|
BRAVE_KEY = os.getenv("SERPHOUSE_API_KEY", "") |
|
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" |
|
BRAVE_IMAGE_ENDPOINT = "https://api.search.brave.com/res/v1/images/search" |
|
BRAVE_VIDEO_ENDPOINT = "https://api.search.brave.com/res/v1/videos/search" |
|
BRAVE_NEWS_ENDPOINT = "https://api.search.brave.com/res/v1/news/search" |
|
IMAGE_API_URL = "http://211.233.58.201:7896" |
|
MAX_TOKENS = 7999 |
|
|
|
|
|
SEARCH_MODES = { |
|
"comprehensive": "Comprehensive answer with multiple sources", |
|
"academic": "Academic and research-focused results", |
|
"news": "Latest news and current events", |
|
"technical": "Technical and specialized information", |
|
"educational": "Educational and learning resources" |
|
} |
|
|
|
RESPONSE_STYLES = { |
|
"professional": "Professional and formal tone", |
|
"casual": "Friendly and conversational tone", |
|
"simple": "Simple and easy to understand", |
|
"detailed": "Detailed and thorough explanations" |
|
} |
|
|
|
|
|
EXAMPLE_QUERIES = { |
|
"example1": "What are the latest developments in quantum computing?", |
|
"example2": "How does climate change affect biodiversity in tropical rainforests?", |
|
"example3": "What are the economic implications of artificial intelligence in the job market?" |
|
} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
|
|
@st.cache_resource |
|
def get_openai_client(): |
|
"""Create an OpenAI client with timeout and retry settings.""" |
|
if not OPENAI_API_KEY: |
|
raise RuntimeError("β οΈ OPENAI_API_KEY νκ²½ λ³μκ° μ€μ λμ§ μμμ΅λλ€.") |
|
return OpenAI( |
|
api_key=OPENAI_API_KEY, |
|
timeout=60.0, |
|
max_retries=3 |
|
) |
|
|
|
|
|
def get_system_prompt(mode="comprehensive", style="professional", include_search_results=True, include_uploaded_files=False) -> str: |
|
""" |
|
Generate a system prompt for the 'Perplexity Clone' interface based on: |
|
- The selected search mode and style |
|
- Guidelines for using web search results and uploaded files |
|
""" |
|
comprehensive_prompt = """ |
|
You are an advanced AI assistant that provides comprehensive answers with multiple sources, similar to Perplexity. |
|
|
|
Your task is to: |
|
1. Thoroughly analyze the user's query |
|
2. Provide a clear, well-structured answer integrating information from multiple sources |
|
3. Include relevant images, videos, and links in your response |
|
4. Format your answer with proper headings, bullet points, and sections |
|
5. Cite sources inline and provide a references section at the end |
|
|
|
Important guidelines: |
|
- Organize information logically with clear section headings |
|
- Use bullet points and numbered lists for clarity |
|
- Include specific, factual information whenever possible |
|
- Provide balanced perspectives on controversial topics |
|
- Display relevant statistics, data, or quotes when appropriate |
|
- Format your response using markdown for readability |
|
""" |
|
|
|
mode_prompts = { |
|
"academic": """ |
|
Your focus is on providing academic and research-focused responses: |
|
- Prioritize peer-reviewed research and academic sources |
|
- Include citations in a formal academic format |
|
- Discuss methodologies and research limitations where relevant |
|
- Present different scholarly perspectives on the topic |
|
- Use precise, technical language appropriate for an academic audience |
|
""", |
|
"news": """ |
|
Your focus is on providing the latest news and current events: |
|
- Prioritize recent news articles and current information |
|
- Include publication dates for all news sources |
|
- Present multiple perspectives from different news outlets |
|
- Distinguish between facts and opinions/editorial content |
|
- Update information with the most recent developments |
|
""", |
|
"technical": """ |
|
Your focus is on providing technical and specialized information: |
|
- Use precise technical terminology appropriate to the field |
|
- Include code snippets, formulas, or technical diagrams where relevant |
|
- Break down complex concepts into step-by-step explanations |
|
- Reference technical documentation, standards, and best practices |
|
- Consider different technical approaches or methodologies |
|
""", |
|
"educational": """ |
|
Your focus is on providing educational and learning resources: |
|
- Structure information in a learning-friendly progression |
|
- Include examples, analogies, and visual explanations |
|
- Highlight key concepts and definitions |
|
- Suggest further learning resources at different difficulty levels |
|
- Present information that's accessible to learners at various levels |
|
""" |
|
} |
|
|
|
style_guides = { |
|
"professional": "Use a professional, authoritative voice. Clearly explain technical terms and present data systematically.", |
|
"casual": "Use a relaxed, conversational style with a friendly tone. Include relatable examples and occasionally use informal expressions.", |
|
"simple": "Use straightforward language and avoid jargon. Keep sentences and paragraphs short. Explain concepts as if to someone with no background in the subject.", |
|
"detailed": "Provide thorough explanations with comprehensive background information. Explore nuances and edge cases. Present multiple perspectives and detailed analysis." |
|
} |
|
|
|
search_guide = """ |
|
Guidelines for Using Search Results: |
|
- Include source links directly in your response using markdown: [Source Name](URL) |
|
- For each major claim or piece of information, indicate its source |
|
- If sources conflict, explain the different perspectives and their reliability |
|
- Include relevant images by writing:  |
|
- Include relevant video links when appropriate by writing: [Video: Title](video_url) |
|
- Format search information into a cohesive, well-structured response |
|
- Include a "References" section at the end listing all major sources with links |
|
""" |
|
|
|
upload_guide = """ |
|
Guidelines for Using Uploaded Files: |
|
- Treat the uploaded files as primary sources for your response |
|
- Extract and highlight key information from files that directly addresses the query |
|
- Quote relevant passages and cite the specific file |
|
- For numerical data in CSV files, consider creating summary statements |
|
- For PDF content, reference specific sections or pages |
|
- Integrate file information seamlessly with web search results |
|
- When information conflicts, prioritize file content over general web results |
|
""" |
|
|
|
|
|
if mode == "comprehensive": |
|
final_prompt = comprehensive_prompt |
|
else: |
|
final_prompt = comprehensive_prompt + "\n" + mode_prompts.get(mode, "") |
|
|
|
|
|
if style in style_guides: |
|
final_prompt += f"\n\nTone and Style: {style_guides[style]}" |
|
|
|
if include_search_results: |
|
final_prompt += f"\n\n{search_guide}" |
|
|
|
if include_uploaded_files: |
|
final_prompt += f"\n\n{upload_guide}" |
|
|
|
final_prompt += """ |
|
\n\nAdditional Formatting Requirements: |
|
- Use markdown headings (## and ###) to organize your response |
|
- Use bold text (**text**) for emphasis on important points |
|
- Include a "Related Questions" section at the end with 3-5 follow-up questions |
|
- Format your response with proper spacing and paragraph breaks |
|
- Make all links clickable by using proper markdown format: [text](url) |
|
""" |
|
return final_prompt |
|
|
|
|
|
@st.cache_data(ttl=3600) |
|
def brave_search(query: str, count: int = 20): |
|
if not BRAVE_KEY: |
|
raise RuntimeError("β οΈ SERPHOUSE_API_KEY (Brave API Key) environment variable is empty.") |
|
|
|
headers = {"Accept": "application/json", "Accept-Encoding": "gzip", "X-Subscription-Token": BRAVE_KEY} |
|
params = {"q": query, "count": str(count)} |
|
|
|
for attempt in range(3): |
|
try: |
|
r = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=15) |
|
r.raise_for_status() |
|
data = r.json() |
|
|
|
logging.info(f"Brave search result data structure: {list(data.keys())}") |
|
|
|
raw = data.get("web", {}).get("results") or data.get("results", []) |
|
if not raw: |
|
logging.warning(f"No Brave search results found. Response: {data}") |
|
raise ValueError("No search results found.") |
|
|
|
arts = [] |
|
for i, res in enumerate(raw[:count], 1): |
|
url = res.get("url", res.get("link", "")) |
|
host = re.sub(r"https?://(www\.)?", "", url).split("/")[0] |
|
arts.append({ |
|
"index": i, |
|
"title": res.get("title", "No title"), |
|
"link": url, |
|
"snippet": res.get("description", res.get("text", "No snippet")), |
|
"displayed_link": host |
|
}) |
|
|
|
logging.info(f"Brave search success: {len(arts)} results") |
|
return arts |
|
|
|
except Exception as e: |
|
logging.error(f"Brave search failure (attempt {attempt+1}/3): {e}") |
|
if attempt < 2: |
|
|
|
time.sleep(5) |
|
|
|
return [] |
|
|
|
@st.cache_data(ttl=3600) |
|
def brave_image_search(query: str, count: int = 10): |
|
if not BRAVE_KEY: |
|
raise RuntimeError("β οΈ SERPHOUSE_API_KEY (Brave API Key) environment variable is empty.") |
|
|
|
headers = {"Accept": "application/json","Accept-Encoding": "gzip","X-Subscription-Token": BRAVE_KEY} |
|
params = {"q": query, "count": str(count),"search_lang": "en","country": "us","spellcheck": "1"} |
|
|
|
for attempt in range(3): |
|
try: |
|
r = requests.get(BRAVE_IMAGE_ENDPOINT, headers=headers, params=params, timeout=15) |
|
r.raise_for_status() |
|
data = r.json() |
|
|
|
results = [] |
|
for i, img in enumerate(data.get("results", [])[:count], 1): |
|
results.append({ |
|
"index": i, |
|
"title": img.get("title", "Image"), |
|
"image_url": img.get("image", {}).get("url", ""), |
|
"source_url": img.get("source", ""), |
|
"width": img.get("image", {}).get("width", 0), |
|
"height": img.get("image", {}).get("height", 0) |
|
}) |
|
|
|
logging.info(f"Brave image search success: {len(results)} results") |
|
return results |
|
|
|
except Exception as e: |
|
logging.error(f"Brave image search failure (attempt {attempt+1}/3): {e}") |
|
if attempt < 2: |
|
time.sleep(5) |
|
|
|
return [] |
|
|
|
@st.cache_data(ttl=3600) |
|
def brave_video_search(query: str, count: int = 5): |
|
if not BRAVE_KEY: |
|
raise RuntimeError("β οΈ SERPHOUSE_API_KEY (Brave API Key) environment variable is empty.") |
|
|
|
headers = {"Accept": "application/json","Accept-Encoding": "gzip","X-Subscription-Token": BRAVE_KEY} |
|
params = {"q": query, "count": str(count)} |
|
|
|
for attempt in range(3): |
|
try: |
|
r = requests.get(BRAVE_VIDEO_ENDPOINT, headers=headers, params=params, timeout=15) |
|
r.raise_for_status() |
|
data = r.json() |
|
|
|
results = [] |
|
for i, vid in enumerate(data.get("results", [])[:count], 1): |
|
results.append({ |
|
"index": i, |
|
"title": vid.get("title", "Video"), |
|
"video_url": vid.get("url", ""), |
|
"thumbnail_url": vid.get("thumbnail", {}).get("src", ""), |
|
"source": vid.get("provider", {}).get("name", "Unknown source") |
|
}) |
|
|
|
logging.info(f"Brave video search success: {len(results)} results") |
|
return results |
|
|
|
except Exception as e: |
|
logging.error(f"Brave video search failure (attempt {attempt+1}/3): {e}") |
|
if attempt < 2: |
|
time.sleep(5) |
|
|
|
return [] |
|
|
|
@st.cache_data(ttl=3600) |
|
def brave_news_search(query: str, count: int = 5): |
|
if not BRAVE_KEY: |
|
raise RuntimeError("β οΈ SERPHOUSE_API_KEY (Brave API Key) environment variable is empty.") |
|
|
|
headers = {"Accept": "application/json","Accept-Encoding": "gzip","X-Subscription-Token": BRAVE_KEY} |
|
params = {"q": query, "count": str(count)} |
|
|
|
for attempt in range(3): |
|
try: |
|
r = requests.get(BRAVE_NEWS_ENDPOINT, headers=headers, params=params, timeout=15) |
|
r.raise_for_status() |
|
data = r.json() |
|
|
|
results = [] |
|
for i, news in enumerate(data.get("results", [])[:count], 1): |
|
results.append({ |
|
"index": i, |
|
"title": news.get("title", "News article"), |
|
"url": news.get("url", ""), |
|
"description": news.get("description", ""), |
|
"source": news.get("source", "Unknown source"), |
|
"date": news.get("age", "Unknown date") |
|
}) |
|
|
|
logging.info(f"Brave news search success: {len(results)} results") |
|
return results |
|
|
|
except Exception as e: |
|
logging.error(f"Brave news search failure (attempt {attempt+1}/3): {e}") |
|
if attempt < 2: |
|
time.sleep(5) |
|
|
|
return [] |
|
|
|
def mock_results(query: str) -> str: |
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
return (f"# Fallback Search Content (Generated: {ts})\n\n" |
|
f"The search API request failed or returned no results for '{query}'. " |
|
f"Please generate a response based on any pre-existing knowledge.\n\n" |
|
f"Consider these points:\n\n" |
|
f"- Basic concepts and importance of {query}\n" |
|
f"- Commonly known related statistics or trends\n" |
|
f"- Typical expert opinions on this subject\n" |
|
f"- Questions that readers might have\n\n" |
|
f"Note: This is fallback guidance, not real-time data.\n\n") |
|
|
|
def do_web_search(query: str) -> str: |
|
try: |
|
arts = brave_search(query, 20) |
|
if not arts: |
|
logging.warning("No search results, using fallback content") |
|
return mock_results(query) |
|
|
|
images = brave_image_search(query, 5) |
|
videos = brave_video_search(query, 2) |
|
news = brave_news_search(query, 3) |
|
|
|
result = "# Web Search Results\nUse these results to provide a comprehensive answer with multiple sources.\n\n" |
|
|
|
result += "## Web Results\n\n" |
|
for a in arts[:10]: |
|
result += f"### Result {a['index']}: {a['title']}\n\n{a['snippet']}\n\n" |
|
result += f"**Source**: [{a['displayed_link']}]({a['link']})\n\n---\n" |
|
|
|
if images: |
|
result += "## Image Results\n\n" |
|
for img in images: |
|
if img.get('image_url'): |
|
result += f"![{img['title']}]({img['image_url']})\n\n" |
|
result += f"**Source**: [{img.get('source_url', 'Image source')}]({img.get('source_url', '#')})\n\n" |
|
|
|
if videos: |
|
result += "## Video Results\n\n" |
|
for vid in videos: |
|
result += f"### {vid['title']}\n\n" |
|
if vid.get('thumbnail_url'): |
|
result += f"\n\n" |
|
result += f"**Watch**: [{vid['source']}]({vid['video_url']})\n\n" |
|
|
|
if news: |
|
result += "## News Results\n\n" |
|
for n in news: |
|
result += f"### {n['title']}\n\n{n['description']}\n\n" |
|
result += f"**Source**: [{n['source']}]({n['url']}) - {n['date']}\n\n---\n" |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logging.error(f"Web search process failed: {str(e)}") |
|
return mock_results(query) |
|
|
|
|
|
def process_text_file(file): |
|
try: |
|
content = file.read() |
|
file.seek(0) |
|
|
|
text = content.decode('utf-8', errors='ignore') |
|
if len(text) > 10000: |
|
text = text[:9700] + "...(truncated)..." |
|
|
|
result = f"## Text File: {file.name}\n\n" + text |
|
return result |
|
except Exception as e: |
|
logging.error(f"Error processing text file: {str(e)}") |
|
return f"Error processing text file: {str(e)}" |
|
|
|
def process_csv_file(file): |
|
try: |
|
content = file.read() |
|
file.seek(0) |
|
|
|
df = pd.read_csv(io.BytesIO(content)) |
|
result = f"## CSV File: {file.name}\n\n" |
|
result += f"- Rows: {len(df)}\n" |
|
result += f"- Columns: {len(df.columns)}\n" |
|
result += f"- Column Names: {', '.join(df.columns.tolist())}\n\n" |
|
|
|
result += "### Data Preview\n\n" |
|
preview_df = df.head(10) |
|
try: |
|
markdown_table = preview_df.to_markdown(index=False) |
|
if markdown_table: |
|
result += markdown_table + "\n\n" |
|
else: |
|
result += "Unable to display CSV data.\n\n" |
|
except Exception as e: |
|
logging.error(f"Markdown table conversion error: {e}") |
|
result += "Displaying data as text:\n\n" + str(preview_df) + "\n\n" |
|
|
|
num_cols = df.select_dtypes(include=['number']).columns |
|
if len(num_cols) > 0: |
|
result += "### Basic Statistical Information\n\n" |
|
try: |
|
stats_df = df[num_cols].describe().round(2) |
|
stats_markdown = stats_df.to_markdown() |
|
if stats_markdown: |
|
result += stats_markdown + "\n\n" |
|
else: |
|
result += "Unable to display statistical information.\n\n" |
|
except Exception as e: |
|
logging.error(f"Statistical info conversion error: {e}") |
|
result += "Unable to generate statistical information.\n\n" |
|
|
|
return result |
|
except Exception as e: |
|
logging.error(f"CSV file processing error: {str(e)}") |
|
return f"Error processing CSV file: {str(e)}" |
|
|
|
def process_pdf_file(file): |
|
try: |
|
file_bytes = file.read() |
|
file.seek(0) |
|
|
|
pdf_file = io.BytesIO(file_bytes) |
|
reader = PyPDF2.PdfReader(pdf_file, strict=False) |
|
|
|
result = f"## PDF File: {file.name}\n\n- Total pages: {len(reader.pages)}\n\n" |
|
|
|
max_pages = min(5, len(reader.pages)) |
|
all_text = "" |
|
|
|
for i in range(max_pages): |
|
try: |
|
page = reader.pages[i] |
|
page_text = page.extract_text() |
|
current_page_text = f"### Page {i+1}\n\n" |
|
if page_text and len(page_text.strip()) > 0: |
|
if len(page_text) > 1500: |
|
current_page_text += page_text[:1500] + "...(truncated)...\n\n" |
|
else: |
|
current_page_text += page_text + "\n\n" |
|
else: |
|
current_page_text += "(No text could be extracted)\n\n" |
|
|
|
all_text += current_page_text |
|
|
|
if len(all_text) > 8000: |
|
all_text += "...(truncating remaining pages)...\n\n" |
|
break |
|
|
|
except Exception as page_err: |
|
logging.error(f"Error processing PDF page {i+1}: {str(page_err)}") |
|
all_text += f"### Page {i+1}\n\n(Error extracting content: {str(page_err)})\n\n" |
|
|
|
if len(reader.pages) > max_pages: |
|
all_text += f"\nNote: Only the first {max_pages} pages are shown.\n\n" |
|
|
|
result += "### PDF Content\n\n" + all_text |
|
return result |
|
|
|
except Exception as e: |
|
logging.error(f"PDF file processing error: {str(e)}") |
|
return f"## PDF File: {file.name}\n\nError: {str(e)}\n\nCannot process." |
|
|
|
def process_uploaded_files(files): |
|
if not files: |
|
return None |
|
|
|
result = "# Uploaded File Contents\n\nBelow is the content from the files provided by the user.\n\n" |
|
for file in files: |
|
try: |
|
ext = file.name.split('.')[-1].lower() |
|
if ext == 'txt': |
|
result += process_text_file(file) + "\n\n---\n\n" |
|
elif ext == 'csv': |
|
result += process_csv_file(file) + "\n\n---\n\n" |
|
elif ext == 'pdf': |
|
result += process_pdf_file(file) + "\n\n---\n\n" |
|
else: |
|
result += f"### Unsupported File: {file.name}\n\n---\n\n" |
|
except Exception as e: |
|
logging.error(f"File processing error {file.name}: {e}") |
|
result += f"### File processing error: {file.name}\n\nError: {e}\n\n---\n\n" |
|
|
|
return result |
|
|
|
|
|
|
|
def load_and_show_image(img_url: str, caption: str = "Image"): |
|
""" |
|
1) User-Agentλ₯Ό λ£μ΄ hotlink λ°©μ΄ μ°ν |
|
2) λ€μ΄λ‘λ ν νμ |
|
""" |
|
headers = { |
|
"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)" |
|
" AppleWebKit/537.36 (KHTML, like Gecko)" |
|
" Chrome/98.0.4758.102 Safari/537.36") |
|
} |
|
try: |
|
response = requests.get(img_url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
image = Image.open(BytesIO(response.content)) |
|
st.image(image, caption=caption, use_container_width=True) |
|
except Exception as e: |
|
st.warning(f"μ΄λ―Έμ§ λ‘λ© μ€ν¨: {e}") |
|
|
|
def generate_image(prompt, w=768, h=768, g=3.5, steps=30, seed=3): |
|
if not prompt: |
|
return None, "Insufficient prompt" |
|
try: |
|
res = Client(IMAGE_API_URL).predict( |
|
prompt=prompt, width=w, height=h, guidance=g, |
|
inference_steps=steps, seed=seed, |
|
do_img2img=False, init_image=None, |
|
image2image_strength=0.8, resize_img=True, |
|
api_name="/generate_image" |
|
) |
|
return res[0], f"Seed: {res[1]}" |
|
except Exception as e: |
|
logging.error(e) |
|
return None, str(e) |
|
|
|
def extract_image_prompt(response_text: str, topic: str): |
|
client = get_openai_client() |
|
try: |
|
response = client.chat.completions.create( |
|
model="gpt-4.1-mini", |
|
messages=[ |
|
{"role": "system", "content": "Generate a single-line English image prompt from the following text. Return only the prompt text, nothing else."}, |
|
{"role": "user", "content": f"Topic: {topic}\n\n---\n{response_text}\n\n---"} |
|
], |
|
temperature=1, |
|
max_tokens=80, |
|
top_p=1 |
|
) |
|
return response.choices[0].message.content.strip() |
|
except Exception as e: |
|
logging.error(f"OpenAI image prompt generation error: {e}") |
|
return f"A professional photo related to {topic}, high quality" |
|
|
|
def md_to_html(md: str, title="Perplexity Clone Response"): |
|
return f"<!DOCTYPE html><html><head><title>{title}</title><meta charset='utf-8'></head><body>{markdown.markdown(md)}</body></html>" |
|
|
|
def keywords(text: str, top=5): |
|
cleaned = re.sub(r"[^κ°-ν£a-zA-Z0-9\s]", "", text) |
|
return " ".join(cleaned.split()[:top]) |
|
|
|
|
|
def perplexity_app(): |
|
st.title("Perplexity Clone AI Assistant") |
|
|
|
if "ai_model" not in st.session_state: |
|
st.session_state.ai_model = "gpt-4.1-mini" |
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
if "auto_save" not in st.session_state: |
|
st.session_state.auto_save = True |
|
if "generate_image" not in st.session_state: |
|
st.session_state.generate_image = False |
|
if "web_search_enabled" not in st.session_state: |
|
st.session_state.web_search_enabled = True |
|
if "search_mode" not in st.session_state: |
|
st.session_state.search_mode = "comprehensive" |
|
if "response_style" not in st.session_state: |
|
st.session_state.response_style = "professional" |
|
|
|
sb = st.sidebar |
|
sb.title("Search Settings") |
|
|
|
sb.subheader("Response Configuration") |
|
sb.selectbox( |
|
"Search Mode", |
|
options=list(SEARCH_MODES.keys()), |
|
format_func=lambda x: SEARCH_MODES[x], |
|
key="search_mode" |
|
) |
|
|
|
sb.selectbox( |
|
"Response Style", |
|
options=list(RESPONSE_STYLES.keys()), |
|
format_func=lambda x: RESPONSE_STYLES[x], |
|
key="response_style" |
|
) |
|
|
|
|
|
sb.subheader("Example Queries") |
|
c1, c2, c3 = sb.columns(3) |
|
if c1.button("Quantum Computing", key="ex1"): |
|
process_example(EXAMPLE_QUERIES["example1"]) |
|
if c2.button("Climate Change", key="ex2"): |
|
process_example(EXAMPLE_QUERIES["example2"]) |
|
if c3.button("AI Economics", key="ex3"): |
|
process_example(EXAMPLE_QUERIES["example3"]) |
|
|
|
sb.subheader("Other Settings") |
|
sb.toggle("Auto Save", key="auto_save") |
|
sb.toggle("Auto Image Generation", key="generate_image") |
|
|
|
web_search_enabled = sb.toggle("Use Web Search", value=st.session_state.web_search_enabled) |
|
st.session_state.web_search_enabled = web_search_enabled |
|
|
|
if web_search_enabled: |
|
st.sidebar.info("β
Web search results will be integrated into the response.") |
|
|
|
|
|
latest_response = next( |
|
(m["content"] for m in reversed(st.session_state.messages) |
|
if m["role"] == "assistant" and m["content"].strip()), |
|
None |
|
) |
|
if latest_response: |
|
title_match = re.search(r"# (.*?)(\n|$)", latest_response) |
|
if title_match: |
|
title = title_match.group(1).strip() |
|
else: |
|
first_line = latest_response.split('\n', 1)[0].strip() |
|
title = first_line[:40] + "..." if len(first_line) > 40 else first_line |
|
|
|
sb.subheader("Download Latest Response") |
|
d1, d2 = sb.columns(2) |
|
d1.download_button("Download as Markdown", latest_response, |
|
file_name=f"{title}.md", mime="text/markdown") |
|
d2.download_button("Download as HTML", md_to_html(latest_response, title), |
|
file_name=f"{title}.html", mime="text/html") |
|
|
|
|
|
up = sb.file_uploader("Load Conversation History (.json)", type=["json"], key="json_uploader") |
|
if up: |
|
try: |
|
st.session_state.messages = json.load(up) |
|
sb.success("Conversation history loaded successfully") |
|
except Exception as e: |
|
sb.error(f"Failed to load: {e}") |
|
|
|
|
|
if sb.button("Download Conversation as JSON"): |
|
sb.download_button( |
|
"Save", |
|
data=json.dumps(st.session_state.messages, ensure_ascii=False, indent=2), |
|
file_name="conversation_history.json", |
|
mime="application/json" |
|
) |
|
|
|
|
|
st.subheader("Upload Files") |
|
uploaded_files = st.file_uploader( |
|
"Upload files to be used as reference (txt, csv, pdf)", |
|
type=["txt", "csv", "pdf"], |
|
accept_multiple_files=True, |
|
key="file_uploader" |
|
) |
|
|
|
if uploaded_files: |
|
file_count = len(uploaded_files) |
|
st.success(f"{file_count} files uploaded. They will be used as sources for your query.") |
|
|
|
with st.expander("Preview Uploaded Files", expanded=False): |
|
for idx, file in enumerate(uploaded_files): |
|
st.write(f"**File Name:** {file.name}") |
|
ext = file.name.split('.')[-1].lower() |
|
|
|
if ext == 'txt': |
|
preview = file.read(1000).decode('utf-8', errors='ignore') |
|
file.seek(0) |
|
st.text_area( |
|
f"Preview of {file.name}", |
|
preview + ("..." if len(preview) >= 1000 else ""), |
|
height=150 |
|
) |
|
elif ext == 'csv': |
|
try: |
|
df = pd.read_csv(file) |
|
file.seek(0) |
|
st.write("CSV Preview (up to 5 rows)") |
|
st.dataframe(df.head(5)) |
|
except Exception as e: |
|
st.error(f"CSV preview failed: {e}") |
|
elif ext == 'pdf': |
|
try: |
|
file_bytes = file.read() |
|
file.seek(0) |
|
|
|
pdf_file = io.BytesIO(file_bytes) |
|
reader = PyPDF2.PdfReader(pdf_file, strict=False) |
|
|
|
pc = len(reader.pages) |
|
st.write(f"PDF File: {pc} pages") |
|
|
|
if pc > 0: |
|
try: |
|
page_text = reader.pages[0].extract_text() |
|
preview = page_text[:500] if page_text else "(No text extracted)" |
|
st.text_area("Preview of the first page", preview + "...", height=150) |
|
except: |
|
st.warning("Failed to extract text from the first page") |
|
except Exception as e: |
|
st.error(f"PDF preview failed: {e}") |
|
|
|
if idx < file_count - 1: |
|
st.divider() |
|
|
|
|
|
for m in st.session_state.messages: |
|
with st.chat_message(m["role"]): |
|
st.markdown(m["content"], unsafe_allow_html=True) |
|
|
|
|
|
if "images" in m and m["images"]: |
|
st.subheader("Related Images") |
|
cols = st.columns(min(3, len(m["images"]))) |
|
for i, img_data in enumerate(m["images"]): |
|
col_idx = i % len(cols) |
|
with cols[col_idx]: |
|
try: |
|
img_url = img_data.get('url', '') |
|
caption = img_data.get('title', 'Related image') |
|
if img_url: |
|
load_and_show_image(img_url, caption=caption) |
|
if img_data.get('source'): |
|
st.markdown(f"[Source]({img_data['source']})") |
|
except Exception as img_err: |
|
st.warning(f"Could not display image: {img_err}") |
|
|
|
|
|
if "videos" in m and m["videos"]: |
|
st.subheader("Related Videos") |
|
for video in m["videos"]: |
|
video_title = video.get('title', 'Related video') |
|
video_url = video.get('url', '') |
|
thumbnail = video.get('thumbnail', '') |
|
|
|
if thumbnail: |
|
col1, col2 = st.columns([1, 3]) |
|
with col1: |
|
try: |
|
load_and_show_image(thumbnail, caption="Video Thumbnail") |
|
except: |
|
st.write("π¬") |
|
with col2: |
|
st.markdown(f"**[{video_title}]({video_url})**") |
|
st.write(f"Source: {video.get('source', 'Unknown')}") |
|
else: |
|
st.markdown(f"π¬ **[{video_title}]({video_url})**") |
|
st.write(f"Source: {video.get('source', 'Unknown')}") |
|
|
|
|
|
query = st.chat_input("Enter your query or question here.") |
|
if query: |
|
process_input(query, uploaded_files) |
|
|
|
sb.markdown("---") |
|
sb.markdown("Created by [https://ginigen.com](https://ginigen.com) | [YouTube Channel](https://www.youtube.com/@ginipickaistudio)") |
|
|
|
def process_example(topic): |
|
process_input(topic, []) |
|
|
|
def process_input(query: str, uploaded_files): |
|
if not any(m["role"] == "user" and m["content"] == query for m in st.session_state.messages): |
|
st.session_state.messages.append({"role": "user", "content": query}) |
|
|
|
with st.chat_message("user"): |
|
st.markdown(query) |
|
|
|
with st.chat_message("assistant"): |
|
placeholder = st.empty() |
|
message_placeholder = st.empty() |
|
full_response = "" |
|
|
|
use_web_search = st.session_state.web_search_enabled |
|
has_uploaded_files = bool(uploaded_files) and len(uploaded_files) > 0 |
|
|
|
try: |
|
status = st.status("Preparing to answer your query...") |
|
status.update(label="Initializing client...") |
|
|
|
client = get_openai_client() |
|
|
|
search_content = None |
|
image_results = [] |
|
video_results = [] |
|
news_results = [] |
|
|
|
if use_web_search: |
|
status.update(label="Performing web search...") |
|
with st.spinner("Searching the web..."): |
|
search_content = do_web_search(keywords(query, top=5)) |
|
|
|
try: |
|
status.update(label="Finding images and videos...") |
|
image_results = brave_image_search(query, 5) |
|
video_results = brave_video_search(query, 2) |
|
news_results = brave_news_search(query, 3) |
|
except Exception as search_err: |
|
logging.error(f"Media search error: {search_err}") |
|
|
|
file_content = None |
|
if has_uploaded_files: |
|
status.update(label="Processing uploaded files...") |
|
with st.spinner("Analyzing files..."): |
|
file_content = process_uploaded_files(uploaded_files) |
|
|
|
valid_images = [] |
|
for img in image_results: |
|
url = img.get('image_url') |
|
if url and url.startswith('http'): |
|
valid_images.append({ |
|
'url': url, |
|
'title': img.get('title', f"Related to: {query}"), |
|
'source': img.get('source_url', '') |
|
}) |
|
|
|
valid_videos = [] |
|
for vid in video_results: |
|
url = vid.get('video_url') |
|
if url and url.startswith('http'): |
|
valid_videos.append({ |
|
'url': url, |
|
'title': vid.get('title', 'Video'), |
|
'thumbnail': vid.get('thumbnail_url', ''), |
|
'source': vid.get('source', 'Video source') |
|
}) |
|
|
|
status.update(label="Preparing comprehensive answer...") |
|
sys_prompt = get_system_prompt( |
|
mode=st.session_state.search_mode, |
|
style=st.session_state.response_style, |
|
include_search_results=use_web_search, |
|
include_uploaded_files=has_uploaded_files |
|
) |
|
|
|
api_messages = [ |
|
{"role": "system", "content": sys_prompt} |
|
] |
|
|
|
user_content = query |
|
if search_content: |
|
user_content += "\n\n" + search_content |
|
if file_content: |
|
user_content += "\n\n" + file_content |
|
|
|
if valid_images: |
|
user_content += "\n\n# Available Images\n" |
|
for i, img in enumerate(valid_images): |
|
user_content += f"\n{i+1}. ![{img['title']}]({img['url']})\n" |
|
if img['source']: |
|
user_content += f" Source: {img['source']}\n" |
|
|
|
if valid_videos: |
|
user_content += "\n\n# Available Videos\n" |
|
for i, vid in enumerate(valid_videos): |
|
user_content += f"\n{i+1}. **{vid['title']}** - [{vid['source']}]({vid['url']})\n" |
|
|
|
api_messages.append({"role": "user", "content": user_content}) |
|
|
|
try: |
|
stream = client.chat.completions.create( |
|
model="gpt-4.1-mini", |
|
messages=api_messages, |
|
temperature=1, |
|
max_tokens=MAX_TOKENS, |
|
top_p=1, |
|
stream=True |
|
) |
|
|
|
for chunk in stream: |
|
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None: |
|
content_delta = chunk.choices[0].delta.content |
|
full_response += content_delta |
|
message_placeholder.markdown(full_response + "β", unsafe_allow_html=True) |
|
|
|
message_placeholder.markdown(full_response, unsafe_allow_html=True) |
|
|
|
if valid_images: |
|
st.subheader("Related Images") |
|
image_cols = st.columns(min(3, len(valid_images))) |
|
|
|
for i, img_data in enumerate(valid_images): |
|
col_idx = i % len(image_cols) |
|
try: |
|
with image_cols[col_idx]: |
|
img_url = img_data['url'] |
|
caption = img_data['title'] |
|
load_and_show_image(img_url, caption=caption) |
|
if img_data.get('source'): |
|
st.markdown(f"[Source]({img_data['source']})") |
|
except Exception as img_err: |
|
logging.warning(f"Error displaying image: {img_err}") |
|
|
|
if valid_videos: |
|
st.subheader("Related Videos") |
|
for video in valid_videos: |
|
video_title = video.get('title', 'Related video') |
|
video_url = video.get('url', '') |
|
thumbnail = video.get('thumbnail', '') |
|
|
|
if thumbnail: |
|
try: |
|
col1, col2 = st.columns([1, 3]) |
|
with col1: |
|
try: |
|
load_and_show_image(thumbnail, caption="Video Thumbnail") |
|
except: |
|
st.write("π¬") |
|
with col2: |
|
st.markdown(f"**[{video_title}]({video_url})**") |
|
st.write(f"Source: {video.get('source', 'Unknown')}") |
|
except Exception as vid_err: |
|
st.markdown(f"π¬ **[{video_title}]({video_url})**") |
|
st.write(f"Source: {video.get('source', 'Unknown')}") |
|
else: |
|
st.markdown(f"π¬ **[{video_title}]({video_url})**") |
|
st.write(f"Source: {video.get('source', 'Unknown')}") |
|
|
|
status.update(label="Response completed!", state="complete") |
|
|
|
st.session_state.messages.append({ |
|
"role": "assistant", |
|
"content": full_response, |
|
"images": valid_images, |
|
"videos": valid_videos |
|
}) |
|
|
|
except Exception as api_error: |
|
error_message = str(api_error) |
|
logging.error(f"API error: {error_message}") |
|
status.update(label=f"Error: {error_message}", state="error") |
|
raise Exception(f"Response generation error: {error_message}") |
|
|
|
if st.session_state.generate_image and full_response: |
|
with st.spinner("Generating custom image..."): |
|
try: |
|
ip = extract_image_prompt(full_response, query) |
|
img, cap = generate_image(ip) |
|
if img: |
|
st.subheader("AI-Generated Image") |
|
st.image(img, caption=cap, use_container_width=True) |
|
except Exception as img_error: |
|
logging.error(f"Image generation error: {str(img_error)}") |
|
st.warning("Custom image generation failed.") |
|
|
|
if full_response: |
|
st.subheader("Download This Response") |
|
c1, c2 = st.columns(2) |
|
c1.download_button( |
|
"Markdown", |
|
data=full_response, |
|
file_name=f"{query[:30]}.md", |
|
mime="text/markdown" |
|
) |
|
c2.download_button( |
|
"HTML", |
|
data=md_to_html(full_response, query[:30]), |
|
file_name=f"{query[:30]}.html", |
|
mime="text/html" |
|
) |
|
|
|
if st.session_state.auto_save and st.session_state.messages: |
|
try: |
|
fn = f"conversation_history_auto_{datetime.now():%Y%m%d_%H%M%S}.json" |
|
with open(fn, "w", encoding="utf-8") as fp: |
|
json.dump(st.session_state.messages, fp, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
logging.error(f"Auto-save failed: {e}") |
|
|
|
except Exception as e: |
|
error_message = str(e) |
|
placeholder.error(f"An error occurred: {error_message}") |
|
logging.error(f"Process input error: {error_message}") |
|
ans = f"An error occurred while processing your request: {error_message}" |
|
st.session_state.messages.append({"role": "assistant", "content": ans}) |
|
|
|
|
|
def main(): |
|
st.write("==== Application Startup at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "=====") |
|
perplexity_app() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|