Gitu / github_ai_agent.py
nihalaninihal's picture
Update github_ai_agent.py
7da0953 verified
# github_ai_agent.py - Improved version with parallel processing and error handling
import os
import re
import time
import json
import datetime
import networkx as nx
from collections import defaultdict, Counter
from itertools import combinations
import numpy as np
from typing import List, Dict, Tuple, Any, Optional, Union
import concurrent.futures
from functools import lru_cache
import google.generativeai as genai
# External libraries
from github import Github, GithubException
from sentence_transformers import SentenceTransformer
import faiss
from gemini_integration import GeminiClient
from visualization_module import RepositoryVisualizer
# Configuration
class Config:
"""Configuration for the GitHub AI Agent"""
def __init__(self):
self.gemini_api_key = os.environ.get("GEMINI_API_KEY")
self.github_token = os.environ.get("GITHUB_ACCESS_TOKEN")
self.embedding_model_name = "all-MiniLM-L6-v2"
self.gemini_model = "gemini-2.0-pro-exp-02-05"
self.max_files_to_load = 100 # Safety limit for large repos
self.max_token_length = 64000 # Gemini Pro context limit
self.enable_advanced_metrics = True
self.visualization_node_limit = 150
self.cache_enabled = True
self.cache_ttl = 3600 # Cache time to live in seconds
# File extensions to analyze
self.code_extensions = [
'.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.c', '.cpp', '.cs',
'.go', '.rb', '.php', '.swift', '.kt', '.rs', '.hs', '.scala', '.ml'
]
self.doc_extensions = [
'.md', '.txt', '.rst', '.html', '.xml', '.json', '.yaml', '.yml'
]
# GitHub Repository Management
class GitHubManager:
"""Manages interaction with GitHub repositories"""
def __init__(self, config: Config):
self.config = config
self.g = Github(config.github_token) if config.github_token else Github()
self.current_repo = None
self.repo_data = {}
self.file_contents = {}
self.contributors_data = {}
self.commit_history = []
self.issues_data = []
self.file_cache = {} # Cache for loaded files
def load_repository(self, repo_url: str) -> bool:
"""Load a repository from URL"""
try:
# Extract repo name from URL
repo_name = self._extract_repo_name(repo_url)
if not repo_name:
return False
# Get repository
self.current_repo = self.g.get_repo(repo_name)
# Load basic repository data
self.repo_data = {
'name': self.current_repo.name,
'full_name': self.current_repo.full_name,
'description': self.current_repo.description,
'stars': self.current_repo.stargazers_count,
'forks': self.current_repo.forks_count,
'watchers': self.current_repo.watchers_count,
'open_issues': self.current_repo.open_issues_count,
'created_at': self.current_repo.created_at,
'updated_at': self.current_repo.updated_at,
'default_branch': self.current_repo.default_branch,
'language': self.current_repo.language,
'topics': self.current_repo.get_topics(),
'license': self.current_repo.license.name if self.current_repo.license else None,
}
return True
except Exception as e:
print(f"Error loading repository: {e}")
return False
def _extract_repo_name(self, repo_url: str) -> Optional[str]:
"""Extract repository name from URL"""
# Handle URLs like: https://github.com/username/repository
github_pattern = r'github\.com[/:]([^/]+)/([^/]+)'
match = re.search(github_pattern, repo_url)
if match:
username, repo = match.groups()
# Remove .git extension if present
repo = repo.replace('.git', '')
return f"{username}/{repo}"
return None
def load_files(self) -> Dict[str, Dict]:
"""Load files from repository with improved performance"""
if not self.current_repo:
return {}
try:
contents = self.current_repo.get_contents("")
self.file_contents = {}
files_loaded = 0
batch_size = 20 # Process files in batches
# Create a queue of files to process
file_queue = []
# First pass - collect all file paths
while contents:
content_item = contents.pop(0)
# Skip directories but add their contents to our processing queue
if content_item.type == "dir":
try:
dir_contents = self.current_repo.get_contents(content_item.path)
contents.extend(dir_contents)
except Exception as e:
print(f"Error accessing directory {content_item.path}: {e}")
continue
# Filter by extensions
_, ext = os.path.splitext(content_item.path)
if ext not in self.config.code_extensions + self.config.doc_extensions:
continue
# Add file to processing queue
file_queue.append(content_item)
# Stop if we've reached our limit
if len(file_queue) >= self.config.max_files_to_load:
break
# Process files in batches
for i in range(0, len(file_queue), batch_size):
batch = file_queue[i:i+batch_size]
# Process batch in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {
executor.submit(self._process_file, file_content): file_content
for file_content in batch
}
for future in concurrent.futures.as_completed(future_to_file):
file_content = future_to_file[future]
try:
result = future.result()
if result:
self.file_contents[file_content.path] = result
files_loaded += 1
except Exception as e:
print(f"Error processing file {file_content.path}: {e}")
return self.file_contents
except Exception as e:
print(f"Error loading files: {e}")
return {}
def _process_file(self, file_content) -> Optional[Dict]:
"""Process a single file (for parallel execution)"""
try:
# Check if in cache
if file_content.path in self.file_cache:
return self.file_cache[file_content.path]
_, ext = os.path.splitext(file_content.path)
# Only process text files with specified extensions
if ext not in self.config.code_extensions + self.config.doc_extensions:
return None
try:
# Decode content
decoded_content = file_content.decoded_content.decode('utf-8')
result = {
'content': decoded_content,
'type': 'code' if ext in self.config.code_extensions else 'document',
'size': file_content.size,
'ext': ext
}
# Update cache
self.file_cache[file_content.path] = result
return result
except UnicodeDecodeError:
# Skip binary files
return None
except Exception as e:
print(f"Error processing file {file_content.path}: {e}")
return None
def load_contributors(self) -> List[Dict]:
"""Load repository contributors with improved performance"""
if not self.current_repo:
return []
try:
contributors = self.current_repo.get_contributors()
self.contributors_data = {}
# Collect basic contributor info
contributor_list = list(contributors) # Convert from PaginatedList to list
# Process in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_contributor = {
executor.submit(self._process_contributor, contributor): contributor
for contributor in contributor_list
}
for future in concurrent.futures.as_completed(future_to_contributor):
contributor = future_to_contributor[future]
try:
contributor_data = future.result()
if contributor_data:
self.contributors_data[contributor.login] = contributor_data
except Exception as e:
print(f"Error processing contributor {contributor.login}: {e}")
return list(self.contributors_data.values())
except Exception as e:
print(f"Error loading contributors: {e}")
return []
def _process_contributor(self, contributor) -> Dict:
"""Process a single contributor (for parallel execution)"""
try:
return {
'login': contributor.login,
'id': contributor.id,
'contributions': contributor.contributions,
'avatar_url': contributor.avatar_url,
'html_url': contributor.html_url,
'type': contributor.type,
'files_modified': [],
'commit_messages': [],
'activity_dates': []
}
except Exception as e:
print(f"Error processing contributor {contributor.login}: {e}")
return None
def load_commits(self, limit: int = 100) -> List[Dict]:
"""Load repository commits with improved performance"""
if not self.current_repo:
return []
try:
commits = self.current_repo.get_commits()[:limit]
self.commit_history = []
commits_list = list(commits) # Convert from PaginatedList to list
# Process commits in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_commit = {
executor.submit(self._process_commit, commit): commit
for commit in commits_list
}
for future in concurrent.futures.as_completed(future_to_commit):
commit = future_to_commit[future]
try:
commit_data = future.result()
if commit_data:
self.commit_history.append(commit_data)
except Exception as e:
print(f"Error processing commit {commit.sha}: {e}")
# Process contributor file statistics
self._update_contributor_file_stats()
return self.commit_history
except Exception as e:
print(f"Error loading commits: {e}")
return []
def _process_commit(self, commit) -> Optional[Dict]:
"""Process a single commit (for parallel execution)"""
try:
# Make sure the commit date is timezone-naive
commit_date = commit.commit.author.date
if hasattr(commit_date, 'tzinfo') and commit_date.tzinfo:
commit_date = commit_date.replace(tzinfo=None)
commit_data = {
'sha': commit.sha,
'author': commit.author.login if commit.author else 'Unknown',
'date': commit_date,
'message': commit.commit.message,
'files': []
}
# Get files changed in this commit
try:
commit_files = commit.files
for file in commit_files:
file_data = {
'filename': file.filename,
'additions': file.additions,
'deletions': file.deletions,
'changes': file.changes,
'status': file.status
}
commit_data['files'].append(file_data)
# Add this file to the contributor's file list
if commit.author and commit.author.login in self.contributors_data:
self.contributors_data[commit.author.login]['files_modified'].append(file.filename)
self.contributors_data[commit.author.login]['commit_messages'].append(commit.commit.message)
self.contributors_data[commit.author.login]['activity_dates'].append(commit_date)
except Exception as e:
print(f"Error processing files for commit {commit.sha}: {e}")
return commit_data
except Exception as e:
print(f"Error processing commit {commit.sha}: {e}")
return None
def _update_contributor_file_stats(self):
"""Update contributor file statistics"""
for login, contributor in self.contributors_data.items():
if 'files_modified' in contributor:
# Count occurrences of each file
file_counts = Counter(contributor['files_modified'])
# Replace list with a list of (filename, count) tuples
self.contributors_data[login]['files_modified'] = [
{'filename': filename, 'count': count}
for filename, count in file_counts.most_common(10)
]
def load_issues(self, limit: int = 30) -> List[Dict]:
"""Load repository issues with improved performance"""
if not self.current_repo:
return []
try:
issues = self.current_repo.get_issues(state='all')[:limit]
self.issues_data = []
issues_list = list(issues) # Convert from PaginatedList to list
# Process issues in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_issue = {
executor.submit(self._process_issue, issue): issue
for issue in issues_list
}
for future in concurrent.futures.as_completed(future_to_issue):
issue = future_to_issue[future]
try:
issue_data = future.result()
if issue_data:
self.issues_data.append(issue_data)
except Exception as e:
print(f"Error processing issue #{issue.number}: {e}")
return self.issues_data
except Exception as e:
print(f"Error loading issues: {e}")
return []
def _process_issue(self, issue) -> Optional[Dict]:
"""Process a single issue (for parallel execution)"""
try:
# Normalize datetime objects
created_at = issue.created_at
updated_at = issue.updated_at
closed_at = issue.closed_at
if hasattr(created_at, 'tzinfo') and created_at.tzinfo:
created_at = created_at.replace(tzinfo=None)
if hasattr(updated_at, 'tzinfo') and updated_at.tzinfo:
updated_at = updated_at.replace(tzinfo=None)
if hasattr(closed_at, 'tzinfo') and closed_at and closed_at.tzinfo:
closed_at = closed_at.replace(tzinfo=None)
issue_data = {
'number': issue.number,
'title': issue.title,
'body': issue.body,
'user': issue.user.login if issue.user else 'Unknown',
'state': issue.state,
'created_at': created_at,
'updated_at': updated_at,
'closed_at': closed_at,
'labels': [label.name for label in issue.labels],
'comments': []
}
# Get comments for this issue (limited to 10)
try:
comments = issue.get_comments()[:10]
for comment in comments:
# Normalize datetime
comment_created_at = comment.created_at
if hasattr(comment_created_at, 'tzinfo') and comment_created_at.tzinfo:
comment_created_at = comment_created_at.replace(tzinfo=None)
issue_data['comments'].append({
'user': comment.user.login if comment.user else 'Unknown',
'body': comment.body,
'created_at': comment_created_at
})
except Exception as e:
print(f"Error loading comments for issue #{issue.number}: {e}")
return issue_data
except Exception as e:
print(f"Error processing issue #{issue.number}: {e}")
return None
# Knowledge Base and Vector Storage
class KnowledgeBase:
"""Manages the knowledge base for the repository"""
def __init__(self, config: Config):
self.config = config
self.embeddings = {}
self.embedding_model = SentenceTransformer(config.embedding_model_name)
self.index = None
self.knowledge_graph = nx.Graph()
self.insights = {}
self.insights_cache = {}
self.cache_timestamp = None
def initialize_vector_storage(self, file_contents: Dict[str, Dict]) -> None:
"""Initialize vector storage with file contents and batched processing"""
try:
# Clear existing data
self.embeddings = {}
self.knowledge_graph = nx.Graph()
# Process files and create embeddings
texts = []
ids = []
# Process files in parallel for large repositories
if len(file_contents) > 50:
with concurrent.futures.ThreadPoolExecutor() as executor:
# Process files in batches
batch_size = 20
keys = list(file_contents.keys())
batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)]
# Create a function to process a batch
def process_batch(batch_keys):
batch_texts = []
batch_ids = []
for path in batch_keys:
file_data = file_contents[path]
content = file_data['content']
# Skip very large files to avoid embedding issues
if len(content) > 10000:
content = content[:10000] + "..."
batch_texts.append(content)
batch_ids.append(path)
return batch_texts, batch_ids
# Submit batch processing tasks
futures = [executor.submit(process_batch, batch) for batch in batches]
# Collect results
for future in concurrent.futures.as_completed(futures):
batch_texts, batch_ids = future.result()
texts.extend(batch_texts)
ids.extend(batch_ids)
else:
# For smaller repositories, process sequentially
for path, file_data in file_contents.items():
content = file_data['content']
# Skip very large files to avoid embedding issues
if len(content) > 10000:
content = content[:10000] + "..."
texts.append(content)
ids.append(path)
# Add nodes to knowledge graph
for path, file_data in file_contents.items():
self.knowledge_graph.add_node(
path,
type='file',
file_type=file_data.get('type', 'unknown'),
size=file_data.get('size', 0),
extension=file_data.get('ext', '')
)
# Create embeddings for all files
if texts:
# Process embeddings in batches to avoid memory issues
batch_size = 32
file_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
batch_embeddings = self.embedding_model.encode(batch_texts)
file_embeddings.append(batch_embeddings)
file_embeddings = np.vstack(file_embeddings)
# Initialize FAISS index
dimension = file_embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(np.array(file_embeddings).astype('float32'))
# Store embeddings with their IDs
for i, file_id in enumerate(ids):
self.embeddings[file_id] = {
'embedding': file_embeddings[i],
'content': texts[i]
}
except Exception as e:
print(f"Error initializing vector storage: {e}")
def build_knowledge_graph(self, commits: List[Dict], contributors: Dict) -> nx.Graph:
"""Build knowledge graph from repository data"""
try:
# Add contributor nodes
for login, data in contributors.items():
self.knowledge_graph.add_node(
login,
type='contributor',
contributions=data['contributions']
)
# Add connections between contributors and files
for login, data in contributors.items():
for file_data in data['files_modified']:
filename = file_data['filename']
count = file_data['count']
# Only add edges if file exists in the graph
if filename in self.knowledge_graph:
if self.knowledge_graph.has_edge(login, filename):
# Update weight if edge exists
self.knowledge_graph[login][filename]['weight'] += count
else:
# Create new edge
self.knowledge_graph.add_edge(login, filename, weight=count)
# Optimized co-occurrence calculation
file_co_occurrence = defaultdict(int)
# Process in batches for large commit histories
batch_size = 50
for i in range(0, len(commits), batch_size):
batch_commits = commits[i:i+batch_size]
for commit in batch_commits:
# Get all files in this commit
commit_files = [file['filename'] for file in commit['files']]
# Add co-occurrence for each pair of files
from itertools import combinations
for file1, file2 in combinations(commit_files, 2):
if file1 in self.knowledge_graph and file2 in self.knowledge_graph:
file_pair = tuple(sorted([file1, file2]))
file_co_occurrence[file_pair] += 1
# Add edges for file co-occurrence
for (file1, file2), count in file_co_occurrence.items():
if count >= 2: # Only add edge if files co-occur at least twice
if self.knowledge_graph.has_edge(file1, file2):
self.knowledge_graph[file1][file2]['weight'] += count
else:
self.knowledge_graph.add_edge(file1, file2, weight=count, type='co-occurrence')
return self.knowledge_graph
except Exception as e:
print(f"Error building knowledge graph: {e}")
return nx.Graph()
@lru_cache(maxsize=32)
def search_similar_files(self, query: str, top_k: int = 5) -> List[Dict]:
"""Search for files similar to query with caching"""
try:
if not self.index:
return []
# Encode query
query_embedding = self.embedding_model.encode([query])
# Search in FAISS
distances, indices = self.index.search(np.array(query_embedding).astype('float32'), top_k)
# Get results
results = []
all_ids = list(self.embeddings.keys())
for i, idx in enumerate(indices[0]):
if idx < len(all_ids):
file_id = all_ids[idx]
results.append({
'file': file_id,
'distance': float(distances[0][i]),
'content': self.embeddings[file_id]['content'][:1000] + "..." if len(self.embeddings[file_id]['content']) > 1000 else self.embeddings[file_id]['content']
})
return results
except Exception as e:
print(f"Error searching similar files: {e}")
return []
def extract_insights(self, repo_data: Dict, commits: List[Dict], contributors: Dict, issues: List[Dict]) -> Dict:
"""Extract insights from repository data with datetime fix and caching"""
# Check if we have a recent cache (less than 10 minutes old)
current_time = time.time()
if self.cache_timestamp and (current_time - self.cache_timestamp < 600) and self.insights_cache:
return self.insights_cache
try:
insights = {
'basic_stats': {},
'activity': {},
'contributors': {},
'code': {},
'issues': {}
}
# Make a deep copy of repo_data to avoid modifying the original
repo_data_copy = {k: v for k, v in repo_data.items()}
# Basic statistics
insights['basic_stats'] = {
'name': repo_data_copy['name'],
'description': repo_data_copy['description'],
'stars': repo_data_copy['stars'],
'forks': repo_data_copy['forks'],
'age_days': None, # Will calculate below
'primary_language': repo_data_copy['language'],
'topics': repo_data_copy['topics']
}
# Fix: Normalize datetime objects to be timezone-naive for consistent comparison
created_at = repo_data_copy.get('created_at')
if created_at:
# Remove timezone info if present
if hasattr(created_at, 'tzinfo') and created_at.tzinfo:
created_at = created_at.replace(tzinfo=None)
# Calculate age
now = datetime.datetime.now()
insights['basic_stats']['age_days'] = (now - created_at).days
# Activity insights
if commits:
# Fix: Normalize all datetime objects to be timezone-naive
commit_dates = []
for commit in commits:
date = commit.get('date')
if date:
# Remove timezone info if present
if hasattr(date, 'tzinfo') and date.tzinfo:
date = date.replace(tzinfo=None)
commit_dates.append(date)
# Sort dates
commit_dates.sort()
if commit_dates:
# Calculate commit frequency
first_commit = commit_dates[0]
last_commit = commit_dates[-1]
days_span = (last_commit - first_commit).days + 1
insights['activity'] = {
'total_commits': len(commits),
'first_commit': first_commit,
'last_commit': last_commit,
'days_span': days_span,
'commits_per_day': round(len(commits) / max(days_span, 1), 2),
}
# Fix: Use Counter for most active day calculation
date_counter = Counter(d.date() for d in commit_dates)
if date_counter:
insights['activity']['most_active_day'] = date_counter.most_common(1)[0][0]
# Commit activity by month
commit_months = [d.strftime('%Y-%m') for d in commit_dates]
month_counts = Counter(commit_months)
insights['activity']['monthly_activity'] = [
{'month': month, 'commits': count} for month, count in month_counts.most_common(12)
]
# Contributor insights
if contributors:
top_contributors = sorted(contributors.values(), key=lambda x: x['contributions'], reverse=True)[:10]
insights['contributors'] = {
'total_contributors': len(contributors),
'top_contributors': [
{
'login': c['login'],
'contributions': c['contributions'],
'top_files': [f['filename'] for f in c['files_modified'][:5]] if c['files_modified'] else []
} for c in top_contributors
]
}
# Calculate bus factor (simplified)
total_commits = sum(c['contributions'] for c in contributors.values())
running_sum = 0
bus_factor = 0
for c in top_contributors:
running_sum += c['contributions']
bus_factor += 1
if running_sum / total_commits > 0.5:
break
insights['contributors']['bus_factor'] = bus_factor
# Code insights
if self.knowledge_graph:
# Get top connected files
file_nodes = [(node, degree) for node, degree in self.knowledge_graph.degree()
if self.knowledge_graph.nodes[node].get('type') == 'file']
top_files = sorted(file_nodes, key=lambda x: x[1], reverse=True)[:10]
insights['code']['central_files'] = [
{'filename': filename, 'connections': degree} for filename, degree in top_files
]
# Most frequently modified files from commits
file_modifications = Counter()
for commit in commits:
for file in commit['files']:
file_modifications[file['filename']] += 1
insights['code']['frequently_modified_files'] = [
{'filename': filename, 'modifications': count}
for filename, count in file_modifications.most_common(10)
]
# File types distribution
file_types = Counter([os.path.splitext(node)[1] for node in self.knowledge_graph.nodes()
if '.' in node and self.knowledge_graph.nodes[node].get('type') == 'file'])
insights['code']['file_types'] = [
{'extension': ext, 'count': count} for ext, count in file_types.most_common()
]
# Issue insights
if issues:
# Calculate issue statistics
open_issues = [issue for issue in issues if issue['state'] == 'open']
closed_issues = [issue for issue in issues if issue['state'] == 'closed']
insights['issues'] = {
'total_issues': len(issues),
'open_issues': len(open_issues),
'closed_issues': len(closed_issues),
'resolution_rate': round(len(closed_issues) / max(len(issues), 1), 2)
}
# Calculate average time to close
close_times = []
for issue in closed_issues:
if issue['created_at'] and issue['closed_at']:
# Fix: Normalize datetime objects to be timezone-naive
created_at = issue['created_at']
closed_at = issue['closed_at']
if hasattr(created_at, 'tzinfo') and created_at.tzinfo:
created_at = created_at.replace(tzinfo=None)
if hasattr(closed_at, 'tzinfo') and closed_at.tzinfo:
closed_at = closed_at.replace(tzinfo=None)
close_time = (closed_at - created_at).days
close_times.append(close_time)
if close_times:
insights['issues']['avg_days_to_close'] = round(sum(close_times) / len(close_times), 1)
# Top issue labels
issue_labels = [label for issue in issues for label in issue['labels']]
label_counts = Counter(issue_labels)
insights['issues']['top_labels'] = [
{'label': label, 'count': count} for label, count in label_counts.most_common(5)
]
# Update cache
self.insights_cache = insights
self.cache_timestamp = current_time
self.insights = insights
return insights
except Exception as e:
import traceback
print(f"Error extracting insights: {e}")
print(traceback.format_exc())
return {}
# Main GitHub AI Agent Class
class GitHubAIAgent:
"""Main class for GitHub AI Agent"""
def __init__(self):
self.config = Config()
self.github_manager = None
self.knowledge_base = None
self.gemini_client = None
self.visualization_manager = None
self.repository_loaded = False
self.repository_url = ""
self.repository_analysis = {}
self.visualizations = {}
# Initialize caches
self.file_cache = {}
self.contributor_cache = {}
self.commit_cache = {}
self.issue_cache = {}
self.query_cache = {}
def set_api_keys(self, gemini_api_key: str, github_token: str = None) -> None:
"""Set API keys"""
# Set environment variables
os.environ["GEMINI_API_KEY"] = gemini_api_key
if github_token:
os.environ["GITHUB_ACCESS_TOKEN"] = github_token
# Update config
self.config.gemini_api_key = gemini_api_key
self.config.github_token = github_token
# Initialize clients
self.github_manager = GitHubManager(self.config)
self.knowledge_base = KnowledgeBase(self.config)
self.gemini_client = GeminiClient(self.config.gemini_api_key, self.config.gemini_model)
self.visualization_manager = RepositoryVisualizer(self.config)
def load_repository(self, repository_url: str) -> Dict:
"""Load and analyze a GitHub repository with improved parallelization"""
result = {
'success': False,
'message': '',
'repo_data': {},
'file_count': 0,
'contributor_count': 0
}
try:
# Reset state
self.repository_loaded = False
self.repository_url = ""
self.repository_analysis = {}
self.visualizations = {}
# Load repository basic info
print(f"Loading repository: {repository_url}")
repo_loaded = self.github_manager.load_repository(repository_url)
if not repo_loaded:
result['message'] = "Failed to load repository. Check the URL and your GitHub access token."
return result
# Store repository URL
self.repository_url = repository_url
# Use parallel processing for loading repository data
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks
files_future = executor.submit(self.github_manager.load_files)
contributors_future = executor.submit(self.github_manager.load_contributors)
commits_future = executor.submit(self.github_manager.load_commits)
issues_future = executor.submit(self.github_manager.load_issues)
# Get results
files = files_future.result()
contributors = contributors_future.result()
commits = commits_future.result()
issues = issues_future.result()
result['file_count'] = len(files)
result['contributor_count'] = len(contributors)
# Initialize vector storage and build knowledge graph
# (These are kept sequential as they depend on previous steps)
print("Building knowledge base")
self.knowledge_base.initialize_vector_storage(files)
knowledge_graph = self.knowledge_base.build_knowledge_graph(
commits, self.github_manager.contributors_data
)
# Extract repository insights
print("Extracting repository insights")
insights = self.knowledge_base.extract_insights(
self.github_manager.repo_data,
commits,
self.github_manager.contributors_data,
issues
)
# Use a separate thread for Gemini analysis which can be slower
# and doesn't block the main thread
def analyze_with_gemini():
print("Analyzing repository with Gemini")
return self.gemini_client.analyze_repository(
self.github_manager.repo_data,
files,
commits,
self.github_manager.contributors_data,
insights
)
# Use another thread pool for visualization generation
def create_visualizations():
print("Creating repository visualizations")
repo_graph_path = self.visualization_manager.create_repository_graph(knowledge_graph)
activity_chart_path = self.visualization_manager.create_commit_activity_chart(commits)
contributor_network_path = self.visualization_manager.create_contributor_network(
self.github_manager.contributors_data, commits
)
dependency_graph_path = self.visualization_manager.create_file_dependency_graph(files)
return {
'repository_graph': repo_graph_path,
'activity_chart': activity_chart_path,
'contributor_network': contributor_network_path,
'dependency_graph': dependency_graph_path,
}
# Run Gemini analysis and visualization generation in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
analysis_future = executor.submit(analyze_with_gemini)
viz_future = executor.submit(create_visualizations)
# Get results
self.repository_analysis = analysis_future.result()
self.visualizations = viz_future.result()
# Update result
result['success'] = True
result['message'] = f"Successfully loaded and analyzed repository: {self.github_manager.repo_data['full_name']}"
result['repo_data'] = self.github_manager.repo_data
self.repository_loaded = True
return result
except Exception as e:
import traceback
print(f"Error loading repository: {str(e)}")
print(traceback.format_exc())
result['message'] = f"Error loading repository: {str(e)}"
return result
@lru_cache(maxsize=32)
def answer_query(self, query: str) -> Dict:
"""Answer a natural language query about the repository with caching"""
if not self.repository_loaded:
return {
'success': False,
'message': "No repository loaded. Please load a repository first.",
'answer': ""
}
# Check cache if enabled
cache_key = f"query_{hash(query)}"
if self.config.cache_enabled and cache_key in self.query_cache:
cached_result = self.query_cache[cache_key]
# Check if cache is still valid
if time.time() - cached_result['timestamp'] < self.config.cache_ttl:
return cached_result['result']
try:
# Search for relevant files
similar_files = self.knowledge_base.search_similar_files(query)
# Get answer from Gemini
answer = self.gemini_client.answer_query(
query,
self.github_manager.repo_data,
similar_files,
self.knowledge_base.insights
)
result = {
'success': True,
'message': "Query answered successfully",
'answer': answer,
'relevant_files': [f['file'] for f in similar_files]
}
# Update cache
if self.config.cache_enabled:
self.query_cache[cache_key] = {
'result': result,
'timestamp': time.time()
}
return result
except Exception as e:
return {
'success': False,
'message': f"Error answering query: {str(e)}",
'answer': ""
}
def analyze_code(self, file_path: str = "", code_snippet: str = "", language: str = "") -> Dict:
"""Analyze a code file or snippet with improved error handling"""
if not file_path and not code_snippet:
return {
'success': False,
'message': "Please provide a file path or code snippet",
'analysis': ""
}
try:
# If file path provided, get code from repository
if file_path:
if not self.repository_loaded:
return {
'success': False,
'message': "No repository loaded. Please load a repository first.",
'analysis': ""
}
if file_path not in self.github_manager.file_contents:
return {
'success': False,
'message': f"File not found: {file_path}",
'analysis': ""
}
code = self.github_manager.file_contents[file_path]['content']
_, ext = os.path.splitext(file_path)
language = ext.lstrip('.')
else:
code = code_snippet
# Analyze code with Gemini
analysis = self.gemini_client.analyze_code_snippet(code, language)
return {
'success': True,
'message': "Code analyzed successfully",
'analysis': analysis
}
except Exception as e:
return {
'success': False,
'message': f"Error analyzing code: {str(e)}",
'analysis': ""
}
def find_collaborators(self, requirements: str) -> Dict:
"""Find potential collaborators based on requirements"""
if not self.repository_loaded:
return {
'success': False,
'message': "No repository loaded. Please load a repository first.",
'collaborators': []
}
try:
# Find collaborators with Gemini
collaborators = self.gemini_client.identify_potential_collaborators(
self.github_manager.contributors_data,
self.knowledge_base.insights,
requirements
)
return {
'success': True,
'message': "Potential collaborators identified",
'collaborators': collaborators
}
except Exception as e:
return {
'success': False,
'message': f"Error finding collaborators: {str(e)}",
'collaborators': []
}
def get_repository_insights(self) -> Dict:
"""Get insights about the repository"""
if not self.repository_loaded:
return {
'success': False,
'message': "No repository loaded. Please load a repository first.",
'insights': {}
}
try:
return {
'success': True,
'message': "Repository insights retrieved",
'insights': self.knowledge_base.insights,
'analysis': self.repository_analysis
}
except Exception as e:
return {
'success': False,
'message': f"Error getting repository insights: {str(e)}",
'insights': {}
}
def get_visualizations(self) -> Dict:
"""Get repository visualizations"""
if not self.repository_loaded:
return {
'success': False,
'message': "No repository loaded. Please load a repository first.",
'visualizations': {}
}
return {
'success': True,
'message': "Repository visualizations retrieved",
'visualizations': self.visualizations
}
def clear_caches(self) -> None:
"""Clear all caches"""
self.file_cache.clear()
self.contributor_cache.clear()
self.commit_cache.clear()
self.issue_cache.clear()
self.query_cache.clear()
# Clear LRU caches
self.answer_query.cache_clear()
if hasattr(self.knowledge_base, 'search_similar_files'):
self.knowledge_base.search_similar_files.cache_clear()