Spaces:
Building
Building
import random | |
import logging | |
import json | |
import os | |
from typing import List | |
from fastapi import APIRouter, Depends, HTTPException | |
from pydantic import BaseModel | |
from sqlalchemy import select, delete | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from app.auth import get_current_user | |
from app.database import get_db | |
from app.models import Feed, User | |
from langchain_groq import ChatGroq | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import SupabaseVectorStore | |
from supabase.client import create_client | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_core.prompts import ChatPromptTemplate | |
try: | |
import wikipediaapi | |
except Exception: | |
wikipediaapi = None | |
# Logger early init (used during module init below) | |
logger = logging.getLogger("app.feeds") | |
if not logger.handlers: | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
# Supabase Initialization (independent of other modules) | |
SUPABASE_URL = os.getenv("SUPABASE_URL") | |
SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
supabase_client = None | |
if SUPABASE_URL and SUPABASE_KEY: | |
try: | |
supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
logger.info("✅ Supabase client (feeds) initialized.") | |
except Exception as e: | |
logger.warning(f"Supabase init failed in feeds: {e}") | |
router = APIRouter() | |
class FeedOut(BaseModel): | |
id: int | |
title: str | |
content: str | |
media_url: str | None = None | |
tags: list[str] | None = None | |
category: str | None = None | |
source: str | None = None | |
relevance_score: int | |
likes: int | |
comments_count: int | |
shares: int | |
class Config: | |
from_attributes = True | |
def _fallback_generate(user: User) -> List[dict]: | |
topics = [ | |
"career-growth", "interview-prep", "project-ideas", "resume-tips", | |
"networking", "internships", "leetcode", "system-design", "ai-ml", | |
"cloud", "devops", "frontend", "backend" | |
] | |
base_tags = ["tips", "guide", "beginner", "advanced", "2025", "best-practices"] | |
user_topic = (user.preparing_for or "career-growth").lower().replace(" ", "-") | |
chosen_topics = list({user_topic} | set(random.sample(topics, k=4))) | |
items: List[dict] = [] | |
for _ in range(25): | |
topic = random.choice(chosen_topics) | |
title = f"{topic.title()} insights for {user.name or 'you'}" | |
content = ( | |
f"Actionable {topic.replace('-', ' ')} advice tailored for {user.name or 'your profile'}. " | |
f"Focus on consistent practice, portfolio building, and networking." | |
) | |
tags = list({topic, *(random.sample(base_tags, k=3))}) | |
items.append({ | |
"title": title[:200], | |
"content": content, | |
"media_url": None, | |
"tags": ",".join(tags), | |
"category": topic, | |
"source": "dubsway-ai", | |
"relevance_score": random.randint(70, 100), | |
"likes": random.randint(0, 50), | |
"comments_count": random.randint(0, 20), | |
"shares": random.randint(0, 10), | |
}) | |
return items | |
def _agentic_generate(user: User) -> List[dict]: | |
"""Generate context-aware feeds using vector DB, public info, and LLM. | |
Falls back to heuristic generation if LLM not available. | |
""" | |
# 1) Gather user context | |
user_focus = (user.preparing_for or "career growth").strip() | |
# 2) Retrieve user-related docs from Supabase vector store via LangChain | |
doc_snippets = "" | |
try: | |
if supabase_client is not None: | |
embeddings = OpenAIEmbeddings() | |
vector_store = SupabaseVectorStore( | |
client=supabase_client, | |
embedding=embeddings, | |
table_name="documents", | |
query_name="match_documents", | |
) | |
retriever = vector_store.as_retriever(search_kwargs={"k": 8, "filter": {"user_id": user.id}}) | |
# Use a simple seed question from the user's focus | |
seed_q = f"Key takeaways for {user_focus}" | |
retrieved = retriever.invoke(seed_q) | |
# retrieved may be list of docs depending on LC version | |
docs = retrieved if isinstance(retrieved, list) else retrieved.get("context", []) | |
parts: List[str] = [] | |
for d in docs[:10]: | |
try: | |
parts.append(getattr(d, "page_content", "")[:400]) | |
except Exception: | |
pass | |
doc_snippets = "\n".join(parts)[:4000] | |
except Exception as e: | |
logger.warning(f"Supabase retrieval failed: {e}") | |
# 3) Pull public info (Wikipedia summary on user focus) | |
public_summary = "" | |
try: | |
if wikipediaapi: | |
wiki = wikipediaapi.Wikipedia(language='en', user_agent='DubswayVideoAI/1.0 (contact: support@dubsway.ai)') | |
page = wiki.page(user_focus) | |
if page and page.exists(): | |
public_summary = page.summary[:1200] | |
except Exception as e: | |
logger.warning(f"Wikipedia fetch failed: {e}") | |
# 4) Use Groq LLM to synthesize 25 feeds | |
try: | |
llm = ChatGroq(groq_api_key=os.getenv("GROQ_API_KEY"), model_name=os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile")) | |
system = ( | |
"You are a career coach assistant. Create 25 short personalized feed items that help the user grow " | |
"in their career. Each item should be practical and contextual to the user focus, using the given " | |
"context notes and public info. Output strictly JSON array with objects having keys: title, content, " | |
"tags (array of strings), category, source." | |
) | |
prompt = ( | |
f"User name: {user.name or 'User'}\n" | |
f"User focus: {user_focus}\n\n" | |
f"Context from user's vector docs (may be empty):\n{doc_snippets}\n\n" | |
f"Public info (may be empty):\n{public_summary}\n\n" | |
f"Generate 25 items. Keep title <= 120 chars and content 1-2 sentences." | |
) | |
resp = llm.invoke([{"role": "system", "content": system + " Respond ONLY with a JSON array."}, {"role": "user", "content": prompt}]) | |
text = resp.content if hasattr(resp, "content") else str(resp) | |
# Normalize potential markdown code fences and extract JSON array | |
text_stripped = text.strip() | |
if text_stripped.startswith("```)" ): | |
text_stripped = text_stripped.strip('`') | |
if text_stripped.startswith("```json"): | |
text_stripped = text_stripped[7:] | |
if text_stripped.startswith("```") and text_stripped.endswith("```"): | |
text_stripped = text_stripped[3:-3] | |
# Attempt to find a JSON array inside | |
try: | |
data = json.loads(text_stripped) | |
except Exception: | |
start = text_stripped.find('[') | |
end = text_stripped.rfind(']') | |
if start != -1 and end != -1 and end > start: | |
data = json.loads(text_stripped[start:end+1]) | |
else: | |
raise | |
items: List[dict] = [] | |
for it in data[:25]: | |
tags_joined = ",".join(it.get("tags", [])[:6]) if isinstance(it.get("tags"), list) else None | |
items.append({ | |
"title": (it.get("title") or "").strip()[:200] or "Career insight", | |
"content": (it.get("content") or "").strip() or "Practical career tip.", | |
"media_url": None, | |
"tags": tags_joined, | |
"category": (it.get("category") or user_focus)[:64], | |
"source": (it.get("source") or "agentic-ai")[:64], | |
"relevance_score": random.randint(80, 100), | |
"likes": 0, | |
"comments_count": 0, | |
"shares": 0, | |
}) | |
# Ensure we always return 25 | |
if len(items) < 25: | |
items.extend(_fallback_generate(user)[: 25 - len(items)]) | |
return items[:25] | |
except Exception as e: | |
logger.error(f"Agentic generation failed: {e}") | |
return _fallback_generate(user) | |
async def get_feeds(current_user: User = Depends(get_current_user), | |
db: AsyncSession = Depends(get_db)): | |
# Auto-refresh: clear previous feeds for this user | |
try: | |
await db.execute(delete(Feed).where(Feed.user_id == current_user.id)) | |
await db.commit() | |
except Exception as e: | |
await db.rollback() | |
logger.error(f"Failed deleting old feeds: {e}") | |
raise HTTPException(status_code=500, detail="Failed refreshing feeds") | |
# Generate new 25 items (agentic with fallback) | |
items = _agentic_generate(current_user) | |
# Insert into DB | |
try: | |
feed_rows = [ | |
Feed(user_id=current_user.id, **item) for item in items | |
] | |
db.add_all(feed_rows) | |
await db.commit() | |
except Exception as e: | |
await db.rollback() | |
logger.error(f"Failed inserting feeds: {e}") | |
raise HTTPException(status_code=500, detail="Failed storing feeds") | |
# Return top 25 ordered by relevance desc, then recent | |
result = await db.execute( | |
select(Feed).where(Feed.user_id == current_user.id).order_by(Feed.relevance_score.desc(), Feed.id.desc()).limit(25) | |
) | |
rows = result.scalars().all() | |
# Convert comma tags to list for response | |
out: List[FeedOut] = [] | |
for r in rows: | |
out.append(FeedOut( | |
id=r.id, | |
title=r.title, | |
content=r.content, | |
media_url=r.media_url, | |
tags=(r.tags.split(",") if r.tags else None), | |
category=r.category, | |
source=r.source, | |
relevance_score=r.relevance_score or 0, | |
likes=r.likes or 0, | |
comments_count=r.comments_count or 0, | |
shares=r.shares or 0, | |
)) | |
return out | |