Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/env python3 | |
""" | |
Just search - A Smart Search Agent using Menlo/Lucy-128k | |
Part of the Just, AKA Simple series | |
Built with Gradio, DuckDuckGo Search, and Hugging Face Transformers | |
""" | |
import gradio as gr | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from duckduckgo_search import DDGS | |
import json | |
import re | |
import time | |
from typing import List, Dict, Tuple | |
import spaces | |
# Initialize the model and tokenizer globally for efficiency | |
MODEL_NAME = "Menlo/Lucy-128k" | |
tokenizer = None | |
model = None | |
search_pipeline = None | |
def initialize_model(): | |
"""Initialize the Menlo/Lucy-128k model and tokenizer""" | |
global tokenizer, model, search_pipeline | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
trust_remote_code=True, | |
max_length=131072, # 128k context length | |
rope_scaling={"type": "linear", "factor": 1.0} # Enable extended context | |
) | |
search_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
torch_dtype=torch.float16, | |
device_map="auto", | |
max_new_tokens=16384, # 16k max output | |
temperature=0.3, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
return True | |
except Exception as e: | |
print(f"Error initializing model: {e}") | |
return False | |
def extract_thinking_and_response(text: str) -> Tuple[str, str]: | |
"""Extract thinking process and clean response from AI output""" | |
thinking = "" | |
response = text | |
# Multiple patterns for thinking extraction | |
patterns = [ | |
(r'<think>(.*?)</think>', 1), | |
(r'<thinking>(.*?)</thinking>', 1), | |
(r'(Let me think about.*?)(?=\n\n|\n[A-Z]|$)', 1), # Catch untagged thinking | |
] | |
for pattern, group_idx in patterns: | |
thinking_match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
if thinking_match: | |
thinking = thinking_match.group(group_idx).strip() | |
response = re.sub(pattern, '', text, flags=re.DOTALL | re.IGNORECASE) | |
break | |
# If no thinking found but text looks like reasoning, extract it | |
if not thinking and ('let me think' in text.lower() or 'i need to consider' in text.lower()): | |
lines = text.split('\n') | |
thinking_lines = [] | |
response_lines = [] | |
in_thinking = False | |
for line in lines: | |
lower_line = line.lower().strip() | |
if any(phrase in lower_line for phrase in ['let me think', 'i need to consider', 'first,', 'the user is asking']): | |
in_thinking = True | |
thinking_lines.append(line) | |
elif in_thinking and (line.strip().startswith(('β’', '-', '1.', '2.', '3.')) or len(line.strip()) < 5): | |
in_thinking = False | |
response_lines.append(line) | |
elif in_thinking: | |
thinking_lines.append(line) | |
else: | |
response_lines.append(line) | |
if thinking_lines: | |
thinking = '\n'.join(thinking_lines).strip() | |
response = '\n'.join(response_lines).strip() | |
# Clean up the response | |
response = re.sub(r'^(Assistant:|AI:|Response:|Answer:)\s*', '', response.strip()) | |
response = re.sub(r'\[INST\].*?\[\/INST\]', '', response, flags=re.DOTALL) | |
response = re.sub(r'<\|.*?\|>', '', response) | |
# Remove any remaining thinking artifacts from response | |
response = re.sub(r'Let me think.*?(?=\n\n|\n[A-Z]|$)', '', response, flags=re.DOTALL | re.IGNORECASE) | |
response = re.sub(r'I need to consider.*?(?=\n\n|\n[A-Z]|$)', '', response, flags=re.DOTALL | re.IGNORECASE) | |
return thinking.strip(), response.strip() | |
def clean_response(text: str) -> str: | |
"""Clean up the AI response to extract just the relevant content""" | |
_, response = extract_thinking_and_response(text) | |
return response | |
def generate_search_queries(user_query: str) -> Tuple[List[str], str]: | |
"""Generate multiple search queries based on user input using AI""" | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are an expert search query strategist. Your task is to generate diverse, effective search queries that will find the most comprehensive information to answer the user's question. | |
**Your Approach:** | |
1. Analyze the user's question to identify key concepts, entities, and intent | |
2. Consider different angles: current news, technical details, background context, expert opinions | |
3. Use varied terminology: formal terms, common language, industry jargon, synonyms | |
4. Target different types of sources: news sites, academic papers, official documents, forums | |
**Query Requirements:** | |
- Generate exactly 4 distinct search queries | |
- Each query should be 3-8 words long | |
- Optimize for search engine effectiveness | |
- Cover different aspects or perspectives of the topic | |
- Use specific, relevant keywords | |
**Examples:** | |
User: "What is the current status of artificial intelligence regulation?" | |
Queries: | |
AI regulation 2024 legislation | |
artificial intelligence policy updates | |
government AI rules current | |
machine learning regulation news | |
User: "How does climate change affect coral reefs?" | |
Queries: | |
climate change coral reef impact | |
ocean warming coral bleaching | |
coral reef ecosystem changes | |
marine biodiversity climate effects | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
User question: {user_query} | |
Generate 4 strategic search queries: | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=150, temperature=0.1) | |
generated_text = response[0]['generated_text'] | |
# Extract assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
# Split and clean queries | |
lines = [line.strip() for line in cleaned_response.split('\n') if line.strip()] | |
# Filter to get actual search queries (remove meta-commentary) | |
queries = [] | |
for line in lines: | |
# Skip lines that look like explanations or meta-commentary | |
if any(skip_word in line.lower() for skip_word in [ | |
'user', 'question', 'query', 'search', 'generate', 'here are', | |
'these are', 'i will', 'let me', 'first', 'second', 'third', 'fourth', | |
'based on', 'the user', 'example' | |
]): | |
continue | |
# Skip lines with too many words (likely explanations) | |
if len(line.split()) > 8: | |
continue | |
# Skip numbered/bulleted lines | |
line_clean = re.sub(r'^\d+[\.\)]\s*', '', line) | |
line_clean = re.sub(r'^[\-\*\β’]\s*', '', line_clean) | |
line_clean = line_clean.strip('"\'') | |
if len(line_clean) > 3 and len(line_clean.split()) >= 2: | |
queries.append(line_clean) | |
# If we didn't get good queries, fall back to simple variations | |
if len(queries) < 2: | |
queries = [ | |
user_query, | |
f"{user_query} 2024", | |
f"{user_query} news", | |
f"{user_query} latest" | |
] | |
return queries[:4], thinking | |
except Exception as e: | |
print(f"Error generating queries: {e}") | |
# Fallback to simple query variations | |
return [user_query, f"{user_query} 2024", f"{user_query} news", f"{user_query} latest"], "" | |
def search_web(queries: List[str]) -> List[Dict]: | |
"""Search the web using DuckDuckGo with multiple queries""" | |
all_results = [] | |
ddgs = DDGS() | |
for query in queries: | |
try: | |
results = ddgs.text(query, max_results=5, region='wt-wt', safesearch='moderate') | |
for result in results: | |
result['search_query'] = query | |
all_results.append(result) | |
time.sleep(0.5) # Rate limiting | |
except Exception as e: | |
print(f"Error searching for '{query}': {e}") | |
continue | |
# Remove duplicates based on URL | |
seen_urls = set() | |
unique_results = [] | |
for result in all_results: | |
if result['href'] not in seen_urls: | |
seen_urls.add(result['href']) | |
unique_results.append(result) | |
return unique_results[:15] # Return max 15 results | |
def filter_relevant_results(user_query: str, generated_queries: List[str], search_results: List[Dict]) -> Tuple[List[Dict], str]: | |
"""Use AI to filter and rank search results by relevance""" | |
if not search_results: | |
return [], "" | |
# Prepare results summary for AI | |
results_text = "" | |
for i, result in enumerate(search_results[:15]): # Increased limit for better coverage | |
results_text += f"{i+1}. Title: {result.get('title', 'No title')}\n" | |
results_text += f" URL: {result.get('href', 'No URL')}\n" | |
results_text += f" Snippet: {result.get('body', 'No description')[:300]}...\n" | |
results_text += f" Search Query: {result.get('search_query', 'Unknown')}\n\n" | |
queries_text = "\n".join(f"β’ {q}" for q in generated_queries) | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are an expert information analyst specializing in search result evaluation. Your mission is to identify the highest-quality, most relevant sources that will enable a comprehensive answer to the user's question. | |
**Your Analysis Framework:** | |
**1. Relevance Assessment (40% weight):** | |
- How directly does the content address the user's specific question? | |
- Does it contain factual information needed for the answer? | |
- Is it focused on the core topic or just tangentially related? | |
**2. Source Quality & Authority (25% weight):** | |
- Is this from a credible, authoritative source? | |
- Does the source have expertise in this domain? | |
- Is it from official organizations, established media, academic institutions, or verified experts? | |
**3. Information Completeness (20% weight):** | |
- Does the source provide comprehensive coverage of the topic? | |
- Are there specific details, data, or insights that add value? | |
- Does it cover multiple aspects of the question? | |
**4. Recency & Timeliness (10% weight):** | |
- Is the information current and up-to-date? | |
- For time-sensitive topics, prioritize recent sources | |
- For established facts, older authoritative sources are acceptable | |
**5. Strategic Value (5% weight):** | |
- Does this complement other selected sources well? | |
- Does it provide unique perspectives or fill information gaps? | |
**Task Instructions:** | |
1. Carefully analyze each search result against these criteria | |
2. Consider how the results work together to provide comprehensive coverage | |
3. Select exactly 5 results that will enable the best possible answer | |
4. Prioritize quality over quantity - better to have fewer excellent sources | |
**Output Format:** Return only the numbers of your selected results, comma-separated (e.g., "1, 3, 7, 12, 14") | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
**Original User Question:** {user_query} | |
**Context - Generated Search Queries:** | |
{queries_text} | |
**Search Results for Analysis:** | |
{results_text} | |
**Your Selection (5 most valuable results):** | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=300, temperature=0.1) | |
generated_text = response[0]['generated_text'] | |
# Extract assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, cleaned_response = extract_thinking_and_response(assistant_response) | |
# Extract numbers | |
numbers = re.findall(r'\d+', cleaned_response) | |
selected_indices = [int(n) - 1 for n in numbers if int(n) <= len(search_results)] | |
return [search_results[i] for i in selected_indices if 0 <= i < len(search_results)][:5], thinking | |
except Exception as e: | |
print(f"Error filtering results: {e}") | |
return search_results[:5], "" # Fallback to first 5 results | |
def generate_final_answer(user_query: str, generated_queries: List[str], all_search_results: List[Dict], selected_results: List[Dict]) -> Tuple[str, str]: | |
"""Generate final answer based on complete search context""" | |
if not selected_results: | |
return "I couldn't find relevant information to answer your question. Please try rephrasing your query.", "" | |
# Prepare context from selected results | |
selected_context = "" | |
for i, result in enumerate(selected_results): | |
selected_context += f"**Source {i+1}:** {result.get('title', 'Unknown')}\n" | |
selected_context += f"**Content:** {result.get('body', 'No content available')}\n" | |
selected_context += f"**URL:** {result.get('href', 'No URL')}\n" | |
selected_context += f"**Found via query:** {result.get('search_query', 'Unknown')}\n\n" | |
# Summary of the search process | |
queries_text = "\n".join(f"β’ {q}" for q in generated_queries) | |
process_summary = f""" | |
**Search Process Summary:** | |
- Generated {len(generated_queries)} targeted search queries | |
- Found {len(all_search_results)} total search results | |
- Filtered down to {len(selected_results)} most relevant sources | |
""" | |
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a world-class research synthesist and expert communicator. You have access to comprehensive search intelligence and must craft the definitive answer to the user's question. | |
**Your Complete Context:** | |
- Original user question and intent | |
- Strategic search queries designed to find comprehensive information | |
- Curated high-quality sources selected for maximum relevance and authority | |
- Full visibility into the research methodology used | |
**Answer Quality Standards:** | |
π― **Precision & Relevance (25%)** | |
- Address the user's exact question directly and completely | |
- Stay focused on their specific information needs | |
- Avoid tangential information that doesn't serve the core query | |
π **Source Integration & Synthesis (25%)** | |
- Weave information from multiple sources into a cohesive narrative | |
- Identify patterns, agreements, and contradictions across sources | |
- Present a unified understanding rather than separate source summaries | |
π **Accuracy & Verification (20%)** | |
- Use only information explicitly stated in the provided sources | |
- Clearly attribute claims to specific sources with citations | |
- Acknowledge when information is limited or when sources conflict | |
π **Structure & Clarity (15%)** | |
- Organize information logically with clear flow | |
- Use headings, bullet points, or sections when helpful | |
- Write in clear, accessible language appropriate for the topic | |
π **Completeness & Context (10%)** | |
- Provide sufficient background context for understanding | |
- Address multiple dimensions of the question when relevant | |
- Explain significance and implications of the findings | |
β‘ **Transparency & Limitations (5%)** | |
- Be honest about gaps in available information | |
- Note if search results don't fully address certain aspects | |
- Distinguish between established facts and emerging information | |
**Citation Format:** | |
- When referencing specific information: [Source Title](URL) | |
- For direct quotes: "Quote text" - [Source Title](URL) | |
- Include a "Sources" section at the end with all referenced URLs | |
**Response Structure:** | |
1. **Direct Answer** - Lead with a clear, concise response to the user's question | |
2. **Detailed Analysis** - Comprehensive exploration with evidence and citations | |
3. **Key Insights** - Important takeaways or implications | |
4. **Sources** - List of referenced URLs for further reading | |
<|eot_id|><|start_header_id|>user<|end_header_id|> | |
**Original User Question:** {user_query} | |
**Research Intelligence:** | |
{queries_text} | |
{process_summary} | |
**Curated Source Material:** | |
{selected_context} | |
**Task:** Provide the definitive, well-sourced answer to this question using your complete research context. | |
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
try: | |
response = search_pipeline(prompt, max_new_tokens=12288, temperature=0.2) # Even higher for comprehensive answers | |
generated_text = response[0]['generated_text'] | |
# Extract assistant's response | |
assistant_response = generated_text.split('<|start_header_id|>assistant<|end_header_id|>')[-1] | |
thinking, answer = extract_thinking_and_response(assistant_response) | |
return answer, thinking | |
except Exception as e: | |
print(f"Error generating final answer: {e}") | |
return "I encountered an error while processing the search results. Please try again.", "" | |
def search_agent_workflow(user_query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
"""Main workflow that orchestrates the search agent""" | |
if not user_query.strip(): | |
return "Please enter a search query.", "", "" | |
progress(0.1, desc="Initializing...") | |
all_thinking = [] | |
# Step 1: Generate search queries | |
progress(0.2, desc="Generating search queries...") | |
queries, thinking1 = generate_search_queries(user_query) | |
if thinking1: | |
all_thinking.append(f"**Query Generation:**\n{thinking1}") | |
queries_text = "Generated queries:\n" + "\n".join(f"β’ {q}" for q in queries) | |
# Step 2: Search the web | |
progress(0.4, desc="Searching the web...") | |
search_results = search_web(queries) | |
if not search_results: | |
return "No search results found. Please try a different query.", queries_text, "\n\n".join(all_thinking) | |
# Step 3: Filter relevant results | |
progress(0.6, desc="Filtering relevant results...") | |
relevant_results, thinking2 = filter_relevant_results(user_query, queries, search_results) | |
if thinking2: | |
all_thinking.append(f"**Result Filtering:**\n{thinking2}") | |
# Step 4: Generate final answer | |
progress(0.8, desc="Generating comprehensive answer...") | |
final_answer, thinking3 = generate_final_answer(user_query, queries, search_results, relevant_results) | |
if thinking3: | |
all_thinking.append(f"**Answer Generation:**\n{thinking3}") | |
progress(1.0, desc="Complete!") | |
# Prepare debug info | |
debug_info = f"{queries_text}\n\nSelected {len(relevant_results)} relevant sources:\n" | |
for i, result in enumerate(relevant_results): | |
debug_info += f"{i+1}. {result.get('title', 'No title')} - {result.get('href', 'No URL')}\n" | |
thinking_display = "\n\n".join(all_thinking) if all_thinking else "No thinking process recorded." | |
return final_answer, debug_info, thinking_display | |
# Custom CSS for dark blue theme and mobile responsiveness | |
custom_css = """ | |
/* Dark blue theme */ | |
:root { | |
--primary-bg: #0a1628; | |
--secondary-bg: #1e3a5f; | |
--accent-bg: #2563eb; | |
--text-primary: #f8fafc; | |
--text-secondary: #cbd5e1; | |
--border-color: #334155; | |
--input-bg: #1e293b; | |
--button-bg: #3b82f6; | |
--button-hover: #2563eb; | |
} | |
/* Global styles */ | |
.gradio-container { | |
background: linear-gradient(135deg, var(--primary-bg) 0%, var(--secondary-bg) 100%) !important; | |
color: var(--text-primary) !important; | |
font-family: 'Inter', 'Segoe UI', system-ui, sans-serif !important; | |
} | |
/* Mobile responsiveness */ | |
@media (max-width: 768px) { | |
.gradio-container { | |
padding: 10px !important; | |
} | |
.gr-form { | |
gap: 15px !important; | |
} | |
.gr-button { | |
font-size: 16px !important; | |
padding: 12px 20px !important; | |
} | |
} | |
/* Input styling */ | |
.gr-textbox textarea, .gr-textbox input { | |
background: var(--input-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
color: var(--text-primary) !important; | |
border-radius: 8px !important; | |
} | |
/* Button styling */ | |
.gr-button { | |
background: linear-gradient(135deg, var(--button-bg) 0%, var(--accent-bg) 100%) !important; | |
color: white !important; | |
border: none !important; | |
border-radius: 8px !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
} | |
.gr-button:hover { | |
background: linear-gradient(135deg, var(--button-hover) 0%, var(--button-bg) 100%) !important; | |
transform: translateY(-1px) !important; | |
box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3) !important; | |
} | |
/* Output styling */ | |
.gr-markdown, .gr-textbox { | |
background: var(--input-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: 8px !important; | |
color: var(--text-primary) !important; | |
} | |
/* Header styling */ | |
.gr-markdown h1 { | |
color: var(--accent-bg) !important; | |
text-align: center !important; | |
margin-bottom: 20px !important; | |
font-size: 2.5rem !important; | |
font-weight: 700 !important; | |
} | |
/* Thinking section styling */ | |
#thinking-output { | |
background: var(--secondary-bg) !important; | |
border: 1px solid var(--border-color) !important; | |
border-radius: 8px !important; | |
padding: 15px !important; | |
font-family: 'Fira Code', 'Monaco', monospace !important; | |
font-size: 0.9rem !important; | |
line-height: 1.4 !important; | |
} | |
/* Loading animation */ | |
.gr-loading { | |
background: var(--secondary-bg) !important; | |
border-radius: 8px !important; | |
} | |
/* Scrollbar styling */ | |
::-webkit-scrollbar { | |
width: 8px; | |
} | |
::-webkit-scrollbar-track { | |
background: var(--primary-bg); | |
} | |
::-webkit-scrollbar-thumb { | |
background: var(--accent-bg); | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb:hover { | |
background: var(--button-hover); | |
} | |
""" | |
def create_interface(): | |
"""Create the Gradio interface""" | |
with gr.Blocks( | |
theme=gr.themes.Base( | |
primary_hue="blue", | |
secondary_hue="slate", | |
neutral_hue="slate", | |
text_size="lg", | |
spacing_size="lg", | |
radius_size="md" | |
), | |
css=custom_css, | |
title="Just search - AI Search Agent", | |
head="<meta name='viewport' content='width=device-width, initial-scale=1.0'>" | |
) as interface: | |
gr.Markdown("# π Just search", elem_id="header") | |
gr.Markdown( | |
"*Part of the Just, AKA Simple series*\n\n" | |
"**Intelligent search agent powered by Menlo/Lucy-128k**\n\n" | |
"Ask any question and get comprehensive answers from the web.", | |
elem_id="description" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
query_input = gr.Textbox( | |
label="Your Question", | |
placeholder="Ask me anything... (e.g., 'What are the latest developments in AI?')", | |
lines=2, | |
elem_id="query-input" | |
) | |
with gr.Column(scale=1): | |
search_btn = gr.Button( | |
"π Search", | |
variant="primary", | |
size="lg", | |
elem_id="search-button" | |
) | |
with gr.Row(): | |
answer_output = gr.Markdown( | |
label="Answer", | |
elem_id="answer-output", | |
height=400 | |
) | |
with gr.Accordion("π€ AI Thinking Process", open=False): | |
thinking_output = gr.Markdown( | |
label="Model's Chain of Thought", | |
elem_id="thinking-output", | |
height=300 | |
) | |
with gr.Accordion("π§ Debug Info", open=False): | |
debug_output = gr.Textbox( | |
label="Search Process Details", | |
lines=8, | |
elem_id="debug-output" | |
) | |
# Event handlers | |
search_btn.click( | |
fn=search_agent_workflow, | |
inputs=[query_input], | |
outputs=[answer_output, debug_output, thinking_output], | |
show_progress=True | |
) | |
query_input.submit( | |
fn=search_agent_workflow, | |
inputs=[query_input], | |
outputs=[answer_output, debug_output, thinking_output], | |
show_progress=True | |
) | |
# Example queries | |
gr.Examples( | |
examples=[ | |
["What are the latest breakthroughs in quantum computing?"], | |
["How does climate change affect ocean currents?"], | |
["What are the best practices for sustainable agriculture?"], | |
["Explain the recent developments in renewable energy technology"], | |
["What are the health benefits of the Mediterranean diet?"] | |
], | |
inputs=query_input, | |
outputs=[answer_output, debug_output, thinking_output], | |
fn=search_agent_workflow, | |
cache_examples=False | |
) | |
gr.Markdown( | |
"---\n**Note:** This search agent generates multiple queries, searches the web, " | |
"filters results for relevance, and provides comprehensive answers. " | |
"Results are sourced from DuckDuckGo search." | |
) | |
return interface | |
def main(): | |
"""Main function to initialize and launch the app""" | |
print("π Initializing Just search...") | |
# Initialize the model | |
if not initialize_model(): | |
print("β Failed to initialize model. Please check your setup.") | |
return | |
print("β Model initialized successfully!") | |
print("π Creating interface...") | |
# Create and launch the interface | |
interface = create_interface() | |
print("π Just search is ready!") | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
show_error=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |