Spaces:
Running
Running
import json | |
import os | |
import random | |
import uuid | |
import datetime | |
import re | |
from typing import List, Tuple, Dict, Optional, Generator, Any | |
from agent import ( | |
PREFIX, | |
COMPRESS_DATA_PROMPT_SMALL, | |
COMPRESS_DATA_PROMPT, | |
LOG_PROMPT, | |
LOG_RESPONSE | |
) | |
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
from pypdf import PdfReader | |
import openai | |
from huggingface_hub import HfApi | |
# Configuration | |
OPENAI_API_BASE = "https://openrouter.ai/api/v1" | |
OPENAI_API_KEY = os.environ.get("OR_KEY", "") | |
REPO_NAME = "LPX55/ArxivPapers" | |
SAVE_DATA_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/" | |
HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
api = HfApi(token=HF_TOKEN) | |
# Initialize OpenAI client | |
openai.api_base = OPENAI_API_BASE | |
openai.api_key = OPENAI_API_KEY | |
VERBOSE = True # Set to False to disable debug logging | |
# Indexing Constants | |
INDEX_PROMPT = """Compile this data into a structured JSON format with these keys: | |
- "keywords": List of important keywords | |
- "title": Descriptive title | |
- "description": Brief summary | |
- "content": Main content | |
- "url": Source URL if available | |
""" | |
def extract_paper_metadata(content: str) -> Dict: | |
"""Extract structured metadata from a paper's content.""" | |
metadata = { | |
"keywords": [], | |
"title": "Untitled", | |
"description": "No description", | |
"content": content[:1000], | |
"url": "" | |
} | |
# Extract URL | |
url_match = re.search(r'https?://[^\s]+', content) | |
if url_match: | |
metadata['url'] = url_match.group(0) | |
# Extract title (first line that looks like a title) | |
lines = content.split('\n') | |
for line in lines: | |
if len(line) > 20 and line[0].isupper() and line[-1] in ('.', '?', '!'): | |
metadata['title'] = line | |
break | |
# Extract description (first paragraph) | |
paragraphs = [p for p in content.split('\n\n') if len(p) > 50] | |
if paragraphs: | |
metadata['description'] = paragraphs[0] | |
# Extract keywords (from title and description) | |
text_for_keywords = f"{metadata['title']} {metadata['description']}" | |
words = [w.lower() for w in re.findall(r'\w+', text_for_keywords) if len(w) > 3] | |
metadata['keywords'] = sorted(list(set(words)))[:10] # Get top 10 unique keywords | |
return metadata | |
def save_paper_to_memory(content: str) -> Dict: | |
"""Save a paper to memory with proper metadata extraction.""" | |
metadata = extract_paper_metadata(content) | |
# Additional processing for academic papers | |
if 'arxiv' in metadata['url'].lower(): | |
metadata['keywords'].extend(['arxiv', 'paper', 'research']) | |
metadata['description'] = f"Academic paper: {metadata['description']}" | |
return metadata | |
def create_index() -> None: | |
"""Create or update the search index from memory files.""" | |
uid = uuid.uuid4() | |
# Load existing index | |
index_url = f"{SAVE_DATA_URL}mem-test2/index.json" | |
r = requests.get(index_url) | |
index_data = json.loads(r.text) if r.status_code == 200 else [{}] | |
# Load main memory data | |
main_url = f"{SAVE_DATA_URL}mem-test2/main.json" | |
m = requests.get(main_url) | |
main_data = json.loads(m.text) if m.status_code == 200 else [] | |
# Update index | |
for entry in main_data: | |
try: | |
for keyword in entry.get('keywords', []): | |
if keyword in index_data[0]: | |
if entry['file_name'] not in index_data[0][keyword]: | |
index_data[0][keyword].append(entry['file_name']) | |
else: | |
index_data[0][keyword] = [entry['file_name']] | |
except Exception as e: | |
print(f"Indexing error: {e}") | |
# Save updated index | |
index_path = f"tmp-index-{uid}.json" | |
with open(index_path, "w") as f: | |
json.dump(index_data, f) | |
api.upload_file( | |
path_or_fileobj=index_path, | |
path_in_repo="/mem-test2/index.json", | |
repo_id=REPO_NAME, | |
repo_type="dataset", | |
) | |
def fetch_url_content(url: str) -> Tuple[bool, str]: | |
"""Fetch content from a URL and return status and content.""" | |
try: | |
if not url: | |
return False, "Enter valid URL" | |
response = requests.get(url) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, "lxml") | |
return True, str(soup) | |
return False, f"Status: {response.status_code}" | |
except Exception as e: | |
return False, f"Error: {e}" | |
def read_file_content(file_path: str) -> str: | |
"""Read content from a file (txt or pdf).""" | |
if file_path.endswith(".pdf"): | |
reader = PdfReader(file_path) | |
return "\n".join(page.extract_text() for page in reader.pages) | |
elif file_path.endswith(".txt"): | |
with open(file_path, "r") as f: | |
return f.read() | |
return "" | |
def generate_response(prompt: str, model: str = "meta-llama/llama-4-maverick:free") -> str: | |
"""Generate response using OpenRouter API.""" | |
try: | |
response = openai.ChatCompletion.create( | |
model=model, | |
messages=[{"role": "user", "content": prompt}], | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def process_pdf_url(pdf_url: str) -> str: | |
"""Process PDF from URL and extract text.""" | |
try: | |
response = requests.get(pdf_url, stream=True) | |
if response.status_code == 200: | |
temp_path = f"temp_{uuid.uuid4()}.pdf" | |
with open(temp_path, "wb") as f: | |
f.write(response.content) | |
return read_file_content(temp_path) | |
return f"Error: Status {response.status_code}" | |
except Exception as e: | |
return f"Error: {e}" | |
def save_memory(purpose: str, content: str) -> List[Dict]: | |
"""Save processed content to memory with proper metadata extraction.""" | |
metadata = extract_paper_metadata(content) | |
return [metadata] | |
def summarize( | |
inp: str, | |
history: List[Tuple[str, str]], | |
report_check: bool, | |
sum_check: bool, | |
mem_check: bool, | |
data: str = "", | |
file: Optional[str] = None, | |
url: str = "", | |
pdf_url: str = "", | |
model: str = "meta-llama/llama-4-maverick:free" | |
) -> Generator[Tuple[str, List[Tuple[str, str]], str, Dict], None, None]: | |
"""Main summarization function with memory support.""" | |
history = [(inp, "Processing...")] | |
yield "", history, "", {} | |
processed_data = "" | |
if pdf_url.startswith("http"): | |
processed_data += f"PDF URL: {pdf_url}\n" | |
if url.startswith("http"): | |
processed_data += f"URL: {url}\n" | |
if file: | |
processed_data += f"File: {file}\n" | |
if data: | |
processed_data += f"Data: {data[:1000]}\n" | |
summary = f"Summary for: {inp[:100]}\n{processed_data[:500]}" | |
memory_entries = [] | |
if mem_check: | |
memory_entries = save_memory(inp, processed_data) | |
if memory_entries: | |
summary += "\n\nSaved to memory" | |
else: | |
summary += "\n\nMemory save failed" | |
yield summary, history, "", memory_entries[0] if memory_entries else {} | |
def create_app(): | |
with gr.Blocks() as app: | |
gr.Markdown("## Mixtral 8x7B Summarizer") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
prompt = gr.Textbox(label="Instruction") | |
with gr.Column(scale=1): | |
report_check = gr.Checkbox(label="Return report", value=True) | |
sum_check = gr.Checkbox(label="Summarize", value=True) | |
mem_check = gr.Checkbox(label="Memory", value=True) | |
submit_btn = gr.Button("Submit") | |
with gr.Row(): | |
with gr.Tab("Text"): | |
data = gr.Textbox(label="Input text") | |
with gr.Tab("File"): | |
file = gr.File(label="Upload file") | |
with gr.Tab("URL"): | |
url = gr.Textbox(label="Website URL") | |
with gr.Tab("PDF"): | |
pdf_url = gr.Textbox(label="PDF URL") | |
chatbot = gr.Chatbot() | |
error_box = gr.Textbox() | |
json_output = gr.JSON() | |
submit_btn.click( | |
summarize, | |
[prompt, chatbot, report_check, sum_check, mem_check, data, file, url, pdf_url], | |
[prompt, chatbot, error_box, json_output] | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_app() | |
app.launch() | |