Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import logging | |
from typing import List, Dict, Tuple | |
import numpy as np | |
from analyzer import combine_repo_files_for_llm, handle_load_repository | |
from hf_utils import download_filtered_space_files | |
# Setup logger | |
logger = logging.getLogger(__name__) | |
class SimpleVectorStore: | |
"""Simple in-memory vector store for repository chunks.""" | |
def __init__(self): | |
self.chunks = [] | |
self.embeddings = [] | |
self.chunk_metadata = [] | |
self.model = None | |
def _get_embedding_model(self): | |
"""Lazy load the embedding model.""" | |
if self.model is None: | |
try: | |
from sentence_transformers import SentenceTransformer | |
self.model = SentenceTransformer('all-MiniLM-L6-v2') # Lightweight, fast model | |
logger.info("Loaded SentenceTransformer model for vectorization") | |
except ImportError: | |
logger.error("sentence-transformers not installed. Install with: pip install sentence-transformers") | |
raise ImportError("sentence-transformers package is required for vectorization") | |
return self.model | |
def add_chunks(self, chunks: List[str], metadata: List[Dict] = None): | |
"""Add text chunks and create embeddings.""" | |
try: | |
model = self._get_embedding_model() | |
embeddings = model.encode(chunks, convert_to_tensor=False) | |
self.chunks.extend(chunks) | |
self.embeddings.extend(embeddings) | |
self.chunk_metadata.extend(metadata or [{} for _ in chunks]) | |
logger.info(f"Added {len(chunks)} chunks to vector store") | |
except Exception as e: | |
logger.error(f"Error adding chunks to vector store: {e}") | |
def search(self, query: str, top_k: int = 3) -> List[Tuple[str, float, Dict]]: | |
"""Search for similar chunks using cosine similarity.""" | |
if not self.chunks or not self.embeddings: | |
return [] | |
try: | |
model = self._get_embedding_model() | |
query_embedding = model.encode([query], convert_to_tensor=False)[0] | |
# Calculate cosine similarities | |
similarities = [] | |
for i, chunk_embedding in enumerate(self.embeddings): | |
similarity = np.dot(query_embedding, chunk_embedding) / ( | |
np.linalg.norm(query_embedding) * np.linalg.norm(chunk_embedding) | |
) | |
similarities.append((self.chunks[i], similarity, self.chunk_metadata[i])) | |
# Sort by similarity and return top_k | |
similarities.sort(key=lambda x: x[1], reverse=True) | |
return similarities[:top_k] | |
except Exception as e: | |
logger.error(f"Error searching vector store: {e}") | |
return [] | |
def clear(self): | |
"""Clear all stored data.""" | |
self.chunks = [] | |
self.embeddings = [] | |
self.chunk_metadata = [] | |
def get_stats(self) -> Dict: | |
"""Get statistics about the vector store.""" | |
return { | |
'total_chunks': len(self.chunks), | |
'total_embeddings': len(self.embeddings), | |
'model_loaded': self.model is not None | |
} | |
# Global vector store instance | |
vector_store = SimpleVectorStore() | |
def vectorize_repository_content(repo_content: str, repo_id: str, chunk_size: int = 500) -> bool: | |
""" | |
Vectorize repository content by splitting into chunks and creating embeddings. | |
Args: | |
repo_content: The combined repository content | |
repo_id: Repository identifier | |
chunk_size: Number of lines per chunk | |
Returns: | |
bool: True if vectorization was successful | |
""" | |
try: | |
# Clear previous data | |
vector_store.clear() | |
lines = repo_content.split('\n') | |
chunks = [] | |
metadata = [] | |
# Split into chunks with overlap for better context | |
overlap = 50 # lines of overlap between chunks | |
for i in range(0, len(lines), chunk_size - overlap): | |
chunk_lines = lines[i:i + chunk_size] | |
chunk_text = '\n'.join(chunk_lines) | |
if chunk_text.strip(): # Only add non-empty chunks | |
chunks.append(chunk_text) | |
metadata.append({ | |
'repo_id': repo_id, | |
'chunk_index': len(chunks) - 1, | |
'start_line': i, | |
'end_line': min(i + chunk_size, len(lines)) | |
}) | |
# Add chunks to vector store | |
vector_store.add_chunks(chunks, metadata) | |
logger.info(f"Successfully vectorized {len(chunks)} chunks for repository {repo_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Error vectorizing repository content: {e}") | |
return False | |
def create_repo_explorer_tab() -> Tuple[Dict[str, gr.components.Component], Dict[str, gr.State]]: | |
""" | |
Creates the Repo Explorer tab content and returns the component references and state variables. | |
""" | |
# State variables for repo explorer | |
states = { | |
"repo_context_summary": gr.State(""), | |
"current_repo_id": gr.State("") | |
} | |
gr.Markdown("### 🗂️ Deep Dive into a Specific Repository") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
repo_explorer_input = gr.Textbox( | |
label="📁 Repository ID", | |
placeholder="microsoft/DialoGPT-medium", | |
info="Enter a Hugging Face repository ID to explore" | |
) | |
with gr.Column(scale=1): | |
load_repo_btn = gr.Button("🚀 Load Repository", variant="primary", size="lg") | |
with gr.Row(): | |
visit_hf_link = gr.HTML( | |
value="", | |
label="🔗 Repository Link", | |
visible=False | |
) | |
with gr.Row(): | |
repo_status_display = gr.Textbox( | |
label="📊 Repository Status", | |
interactive=False, | |
lines=4, | |
info="Current repository loading status and vectorization info" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
repo_chatbot = gr.Chatbot( | |
label="🤖 Repository Assistant", | |
height=400, | |
type="messages", | |
avatar_images=( | |
"https://cdn-icons-png.flaticon.com/512/149/149071.png", | |
"https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png" | |
), | |
show_copy_button=True, | |
value=[] # Start empty - welcome message will appear only after repo is loaded | |
) | |
with gr.Row(): | |
repo_msg_input = gr.Textbox( | |
label="💭 Ask about this repository", | |
placeholder="What does this repository do? How do I use it?", | |
lines=1, | |
scale=4, | |
info="Ask anything about the loaded repository" | |
) | |
repo_send_btn = gr.Button("📤 Send", variant="primary", scale=1) | |
# with gr.Column(scale=1): | |
# # Repository content preview | |
# repo_content_display = gr.Textbox( | |
# label="📄 Repository Content Preview", | |
# lines=20, | |
# show_copy_button=True, | |
# interactive=False, | |
# info="Overview of the loaded repository structure and content" | |
# ) | |
# Component references | |
components = { | |
"repo_explorer_input": repo_explorer_input, | |
"load_repo_btn": load_repo_btn, | |
"visit_hf_link": visit_hf_link, | |
"repo_status_display": repo_status_display, | |
"repo_chatbot": repo_chatbot, | |
"repo_msg_input": repo_msg_input, | |
"repo_send_btn": repo_send_btn, | |
# "repo_content_display": repo_content_display | |
} | |
return components, states | |
def handle_repo_user_message(user_message: str, history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> Tuple[List[Dict[str, str]], str]: | |
"""Handle user messages in the repo-specific chatbot.""" | |
if not repo_context_summary.strip(): | |
return history, "" | |
# Initialize with repository-specific welcome message if empty | |
if not history: | |
welcome_msg = f"Hello! I'm your assistant for the '{repo_id}' repository. I have analyzed all the files and created a comprehensive understanding of this repository. I'm ready to answer any questions about its functionality, usage, architecture, and more. What would you like to know?" | |
history = [{"role": "assistant", "content": welcome_msg}] | |
if user_message: | |
history.append({"role": "user", "content": user_message}) | |
return history, "" | |
def handle_repo_bot_response(history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> List[Dict[str, str]]: | |
"""Generate bot response for repo-specific questions using comprehensive context and vector search.""" | |
if not history or history[-1]["role"] != "user" or not repo_context_summary.strip(): | |
return history | |
user_message = history[-1]["content"] | |
# Use vector search to find relevant chunks | |
relevant_chunks = vector_store.search(user_message, top_k=3) | |
# Build enhanced context using vector search results | |
vector_context = "" | |
if relevant_chunks: | |
vector_context = "\n\n=== MOST RELEVANT CODE SECTIONS ===\n" | |
for i, (chunk, similarity, metadata) in enumerate(relevant_chunks): | |
chunk_id = metadata.get('chunk_index', i) | |
start_line = metadata.get('start_line', 'unknown') | |
end_line = metadata.get('end_line', 'unknown') | |
vector_context += f"\n--- Relevant Section {i+1} (similarity: {similarity:.3f}, lines {start_line}-{end_line}) ---\n{chunk}\n" | |
# Create a specialized prompt using both comprehensive context and vector search results | |
repo_system_prompt = f"""You are an expert assistant for the Hugging Face repository '{repo_id}'. | |
You have comprehensive knowledge about this repository based on detailed analysis of all its files and components. | |
Use the following comprehensive analysis to answer user questions accurately and helpfully: | |
{repo_context_summary} | |
{vector_context} | |
Instructions: | |
- Answer questions clearly and conversationally about this specific repository | |
- Reference specific components, functions, or features when relevant | |
- Provide practical guidance on installation, usage, and implementation | |
- If asked about code details, refer to the analysis above and the relevant code sections | |
- Use the most relevant code sections to provide specific examples and implementation details | |
- Be helpful and informative while staying focused on this repository | |
- If something isn't covered in the analysis, acknowledge the limitation | |
Answer the user's question based on your comprehensive knowledge of this repository.""" | |
try: | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("OpenAI_API")) | |
response = client.chat.completions.create( | |
model="gpt-4.1-nano", | |
messages=[ | |
{"role": "system", "content": repo_system_prompt}, | |
{"role": "user", "content": user_message} | |
], | |
max_tokens=1024, | |
temperature=0.7 | |
) | |
bot_response = response.choices[0].message.content | |
history.append({"role": "assistant", "content": bot_response}) | |
except Exception as e: | |
logger.error(f"Error generating repo bot response: {e}") | |
error_response = f"I apologize, but I encountered an error while processing your question: {e}" | |
history.append({"role": "assistant", "content": error_response}) | |
return history | |
def get_huggingface_url(repo_id: str) -> str: | |
"""Generate the Hugging Face Spaces URL for a repository.""" | |
if not repo_id.strip(): | |
return "" | |
return f"https://huggingface.co/spaces/{repo_id}" | |
def generate_repo_link_html(repo_id: str) -> str: | |
"""Generate HTML with clickable link for the repository.""" | |
if not repo_id or not repo_id.strip(): | |
return "" | |
clean_repo_id = str(repo_id).strip() | |
hf_url = f"https://huggingface.co/spaces/{clean_repo_id}" | |
html_link = f''' | |
<div style="margin: 10px 0; padding: 15px; background: rgba(255, 255, 255, 0.1); border-radius: 12px; backdrop-filter: blur(10px); text-align: center;"> | |
<a href="{hf_url}" target="_blank" style="display: inline-block; padding: 12px 24px; background: linear-gradient(45deg, #667eea, #764ba2); color: white; text-decoration: none; border-radius: 8px; font-weight: 600; font-size: 16px; transition: all 0.3s ease; box-shadow: 0 4px 12px rgba(0,0,0,0.2);"> | |
🔗 Visit {clean_repo_id} on Hugging Face | |
</a> | |
</div> | |
''' | |
return html_link | |
def handle_load_repository_with_vectorization(repo_id: str) -> Tuple[str, str, gr.HTML]: | |
"""Load repository and create both context summary and vector embeddings.""" | |
if not repo_id.strip(): | |
return "Status: Please enter a repository ID.", "", gr.update(value="", visible=False) | |
try: | |
logger.info(f"Loading repository with vectorization: {repo_id}") | |
# Download and process the repository (existing logic) | |
try: | |
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) | |
combined_text_path = combine_repo_files_for_llm() | |
except Exception as e: | |
logger.error(f"Error downloading repository {repo_id}: {e}") | |
error_status = f"❌ Error downloading repository: {e}" | |
return error_status, "", gr.update(value="", visible=False) | |
# Read the combined content | |
with open(combined_text_path, "r", encoding="utf-8") as f: | |
repo_content = f.read() | |
# Create vectorized representation | |
vectorization_success = vectorize_repository_content(repo_content, repo_id) | |
# Get the original context summary | |
from analyzer import create_repo_context_summary | |
context_summary = create_repo_context_summary(repo_content, repo_id) | |
# Update status message | |
if vectorization_success: | |
status = f"✅ Repository '{repo_id}' loaded successfully!\n📁 Files processed and ready for exploration.\n🔍 Vector embeddings created for semantic search.\n💬 You can now ask questions about this repository." | |
else: | |
status = f"✅ Repository '{repo_id}' loaded successfully!\n📁 Files processed and ready for exploration.\n⚠️ Vectorization failed - using text-only analysis.\n💬 You can now ask questions about this repository." | |
# Generate the HTML link for the repository | |
repo_link_html = generate_repo_link_html(repo_id) | |
logger.info(f"Repository {repo_id} loaded and processed successfully") | |
return status, context_summary, gr.update(value=repo_link_html, visible=True) | |
except Exception as e: | |
logger.error(f"Error loading repository {repo_id}: {e}") | |
error_status = f"❌ Error loading repository: {e}" | |
return error_status, "", gr.update(value="", visible=False) | |
def initialize_repo_chatbot(repo_status: str, repo_id: str, repo_context_summary: str) -> List[Dict[str, str]]: | |
"""Initialize the repository chatbot with a welcome message after successful repo loading.""" | |
# Only initialize if repository was loaded successfully | |
if repo_context_summary.strip() and "successfully" in repo_status.lower(): | |
# Check if vectorization was successful | |
vectorization_status = "🔍 **Enhanced with vector search** for finding relevant code sections" if "Vector embeddings created" in repo_status else "📄 **Text-based analysis** (vector search unavailable)" | |
welcome_msg = f"👋 Welcome! I've successfully analyzed the **{repo_id}** repository.\n\n🧠 **I now have comprehensive knowledge of:**\n• All files and code structure\n• Key features and capabilities\n• Installation and usage instructions\n• Architecture and implementation details\n• Dependencies and requirements\n\n{vectorization_status}\n\n💬 **Ask me anything about this repository!** \nFor example:\n• \"What does this repository do?\"\n• \"How do I install and use it?\"\n• \"What are the main components?\"\n• \"Show me usage examples\"\n\nWhat would you like to know? 🤔" | |
return [{"role": "assistant", "content": welcome_msg}] | |
else: | |
# Keep chatbot empty if loading failed | |
return [] | |
def setup_repo_explorer_events(components: Dict[str, gr.components.Component], states: Dict[str, gr.State]): | |
"""Setup event handlers for the repo explorer components.""" | |
# Load repository event with vectorization | |
components["load_repo_btn"].click( | |
fn=handle_load_repository_with_vectorization, | |
inputs=[components["repo_explorer_input"]], | |
outputs=[components["repo_status_display"], states["repo_context_summary"], components["visit_hf_link"]] | |
).then( | |
fn=lambda repo_id: repo_id, | |
inputs=[components["repo_explorer_input"]], | |
outputs=[states["current_repo_id"]] | |
).then( | |
fn=initialize_repo_chatbot, | |
inputs=[components["repo_status_display"], states["current_repo_id"], states["repo_context_summary"]], | |
outputs=[components["repo_chatbot"]] | |
) | |
# Chat message submission events | |
components["repo_msg_input"].submit( | |
fn=handle_repo_user_message, | |
inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
outputs=[components["repo_chatbot"], components["repo_msg_input"]] | |
).then( | |
fn=handle_repo_bot_response, | |
inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
outputs=[components["repo_chatbot"]] | |
) | |
components["repo_send_btn"].click( | |
fn=handle_repo_user_message, | |
inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
outputs=[components["repo_chatbot"], components["repo_msg_input"]] | |
).then( | |
fn=handle_repo_bot_response, | |
inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], | |
outputs=[components["repo_chatbot"]] | |
) |