ngibs-ai-search / live_search.py
jaihodigital's picture
Update live_search.py
b695e18 verified
import requests
import os
from typing import Dict, Any
from clean_fetch import data_fetcher
class LiveSearchEngine:
def __init__(self):
self.openrouter_api = "https://openrouter.ai/api/v1/chat/completions"
self.cloud_models = [
"meta-llama/llama-3.3-70b-instruct:free",
"openrouter/sonoma-dusk-alpha",
"openrouter/sonoma-sky-alpha",
"deepseek/deepseek-chat-v3.1:free",
"cognitivecomputations/dolphin-mistral-24b-venice-edition:free",
"google/gemini-2.0-flash-exp:free"
]
def search_and_generate(self, query: str, model: str) -> str:
# Step 1: Fetch web data
search_result = data_fetcher.fetch_duckduckgo(query)
if not search_result["success"]:
context = ""
source_info = "⚠️ No web results found"
else:
context = search_result["content"]
source_info = f"📡 Source: DuckDuckGo"
# Step 2: Generate AI response
ai_result = self._query_cloud_model(query, context, model)
if not ai_result["success"]:
return f"❌ Error: {ai_result['error']}"
# Step 3: Format response
response = ai_result["response"]
if search_result["success"]:
response += f"\n\n---\n{source_info}"
if search_result.get("url"):
response += f"\n🔗 [Read more]({search_result['url']})"
response += f"\n🤖 **Model:** {model}"
return response
def _query_cloud_model(self, query: str, context: str, model: str) -> Dict[str, Any]:
try:
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return {"success": False, "response": None, "error": "OpenRouter API key not configured"}
if context:
system_msg = "Use the web search context to answer questions accurately."
user_msg = f"Context: {context}\n\nQuestion: {query}"
else:
system_msg = "Answer questions to the best of your ability."
user_msg = query
messages = [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg}
]
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
data = {"model": model, "messages": messages, "temperature": 0.7, "max_tokens": 1000}
response = requests.post(self.openrouter_api, headers=headers, json=data, timeout=60)
response.raise_for_status()
result = response.json()
return {"success": True, "response": result["choices"][0]["message"]["content"], "error": None}
except Exception as e:
return {"success": False, "response": None, "error": f"Cloud model error: {str(e)}"}
live_search_engine = LiveSearchEngine()