Spaces:
Runtime error
Runtime error
# github_ai_agent.py - Improved version with parallel processing and error handling | |
import os | |
import re | |
import time | |
import json | |
import datetime | |
import networkx as nx | |
from collections import defaultdict, Counter | |
from itertools import combinations | |
import numpy as np | |
from typing import List, Dict, Tuple, Any, Optional, Union | |
import concurrent.futures | |
from functools import lru_cache | |
import google.generativeai as genai | |
# External libraries | |
from github import Github, GithubException | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
from gemini_integration import GeminiClient | |
from visualization_module import RepositoryVisualizer | |
# Configuration | |
class Config: | |
"""Configuration for the GitHub AI Agent""" | |
def __init__(self): | |
self.gemini_api_key = os.environ.get("GEMINI_API_KEY") | |
self.github_token = os.environ.get("GITHUB_ACCESS_TOKEN") | |
self.embedding_model_name = "all-MiniLM-L6-v2" | |
self.gemini_model = "gemini-2.0-pro-exp-02-05" | |
self.max_files_to_load = 100 # Safety limit for large repos | |
self.max_token_length = 64000 # Gemini Pro context limit | |
self.enable_advanced_metrics = True | |
self.visualization_node_limit = 150 | |
self.cache_enabled = True | |
self.cache_ttl = 3600 # Cache time to live in seconds | |
# File extensions to analyze | |
self.code_extensions = [ | |
'.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.c', '.cpp', '.cs', | |
'.go', '.rb', '.php', '.swift', '.kt', '.rs', '.hs', '.scala', '.ml' | |
] | |
self.doc_extensions = [ | |
'.md', '.txt', '.rst', '.html', '.xml', '.json', '.yaml', '.yml' | |
] | |
# GitHub Repository Management | |
class GitHubManager: | |
"""Manages interaction with GitHub repositories""" | |
def __init__(self, config: Config): | |
self.config = config | |
self.g = Github(config.github_token) if config.github_token else Github() | |
self.current_repo = None | |
self.repo_data = {} | |
self.file_contents = {} | |
self.contributors_data = {} | |
self.commit_history = [] | |
self.issues_data = [] | |
self.file_cache = {} # Cache for loaded files | |
def load_repository(self, repo_url: str) -> bool: | |
"""Load a repository from URL""" | |
try: | |
# Extract repo name from URL | |
repo_name = self._extract_repo_name(repo_url) | |
if not repo_name: | |
return False | |
# Get repository | |
self.current_repo = self.g.get_repo(repo_name) | |
# Load basic repository data | |
self.repo_data = { | |
'name': self.current_repo.name, | |
'full_name': self.current_repo.full_name, | |
'description': self.current_repo.description, | |
'stars': self.current_repo.stargazers_count, | |
'forks': self.current_repo.forks_count, | |
'watchers': self.current_repo.watchers_count, | |
'open_issues': self.current_repo.open_issues_count, | |
'created_at': self.current_repo.created_at, | |
'updated_at': self.current_repo.updated_at, | |
'default_branch': self.current_repo.default_branch, | |
'language': self.current_repo.language, | |
'topics': self.current_repo.get_topics(), | |
'license': self.current_repo.license.name if self.current_repo.license else None, | |
} | |
return True | |
except Exception as e: | |
print(f"Error loading repository: {e}") | |
return False | |
def _extract_repo_name(self, repo_url: str) -> Optional[str]: | |
"""Extract repository name from URL""" | |
# Handle URLs like: https://github.com/username/repository | |
github_pattern = r'github\.com[/:]([^/]+)/([^/]+)' | |
match = re.search(github_pattern, repo_url) | |
if match: | |
username, repo = match.groups() | |
# Remove .git extension if present | |
repo = repo.replace('.git', '') | |
return f"{username}/{repo}" | |
return None | |
def load_files(self) -> Dict[str, Dict]: | |
"""Load files from repository with improved performance""" | |
if not self.current_repo: | |
return {} | |
try: | |
contents = self.current_repo.get_contents("") | |
self.file_contents = {} | |
files_loaded = 0 | |
batch_size = 20 # Process files in batches | |
# Create a queue of files to process | |
file_queue = [] | |
# First pass - collect all file paths | |
while contents: | |
content_item = contents.pop(0) | |
# Skip directories but add their contents to our processing queue | |
if content_item.type == "dir": | |
try: | |
dir_contents = self.current_repo.get_contents(content_item.path) | |
contents.extend(dir_contents) | |
except Exception as e: | |
print(f"Error accessing directory {content_item.path}: {e}") | |
continue | |
# Filter by extensions | |
_, ext = os.path.splitext(content_item.path) | |
if ext not in self.config.code_extensions + self.config.doc_extensions: | |
continue | |
# Add file to processing queue | |
file_queue.append(content_item) | |
# Stop if we've reached our limit | |
if len(file_queue) >= self.config.max_files_to_load: | |
break | |
# Process files in batches | |
for i in range(0, len(file_queue), batch_size): | |
batch = file_queue[i:i+batch_size] | |
# Process batch in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_file = { | |
executor.submit(self._process_file, file_content): file_content | |
for file_content in batch | |
} | |
for future in concurrent.futures.as_completed(future_to_file): | |
file_content = future_to_file[future] | |
try: | |
result = future.result() | |
if result: | |
self.file_contents[file_content.path] = result | |
files_loaded += 1 | |
except Exception as e: | |
print(f"Error processing file {file_content.path}: {e}") | |
return self.file_contents | |
except Exception as e: | |
print(f"Error loading files: {e}") | |
return {} | |
def _process_file(self, file_content) -> Optional[Dict]: | |
"""Process a single file (for parallel execution)""" | |
try: | |
# Check if in cache | |
if file_content.path in self.file_cache: | |
return self.file_cache[file_content.path] | |
_, ext = os.path.splitext(file_content.path) | |
# Only process text files with specified extensions | |
if ext not in self.config.code_extensions + self.config.doc_extensions: | |
return None | |
try: | |
# Decode content | |
decoded_content = file_content.decoded_content.decode('utf-8') | |
result = { | |
'content': decoded_content, | |
'type': 'code' if ext in self.config.code_extensions else 'document', | |
'size': file_content.size, | |
'ext': ext | |
} | |
# Update cache | |
self.file_cache[file_content.path] = result | |
return result | |
except UnicodeDecodeError: | |
# Skip binary files | |
return None | |
except Exception as e: | |
print(f"Error processing file {file_content.path}: {e}") | |
return None | |
def load_contributors(self) -> List[Dict]: | |
"""Load repository contributors with improved performance""" | |
if not self.current_repo: | |
return [] | |
try: | |
contributors = self.current_repo.get_contributors() | |
self.contributors_data = {} | |
# Collect basic contributor info | |
contributor_list = list(contributors) # Convert from PaginatedList to list | |
# Process in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_contributor = { | |
executor.submit(self._process_contributor, contributor): contributor | |
for contributor in contributor_list | |
} | |
for future in concurrent.futures.as_completed(future_to_contributor): | |
contributor = future_to_contributor[future] | |
try: | |
contributor_data = future.result() | |
if contributor_data: | |
self.contributors_data[contributor.login] = contributor_data | |
except Exception as e: | |
print(f"Error processing contributor {contributor.login}: {e}") | |
return list(self.contributors_data.values()) | |
except Exception as e: | |
print(f"Error loading contributors: {e}") | |
return [] | |
def _process_contributor(self, contributor) -> Dict: | |
"""Process a single contributor (for parallel execution)""" | |
try: | |
return { | |
'login': contributor.login, | |
'id': contributor.id, | |
'contributions': contributor.contributions, | |
'avatar_url': contributor.avatar_url, | |
'html_url': contributor.html_url, | |
'type': contributor.type, | |
'files_modified': [], | |
'commit_messages': [], | |
'activity_dates': [] | |
} | |
except Exception as e: | |
print(f"Error processing contributor {contributor.login}: {e}") | |
return None | |
def load_commits(self, limit: int = 100) -> List[Dict]: | |
"""Load repository commits with improved performance""" | |
if not self.current_repo: | |
return [] | |
try: | |
commits = self.current_repo.get_commits()[:limit] | |
self.commit_history = [] | |
commits_list = list(commits) # Convert from PaginatedList to list | |
# Process commits in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_commit = { | |
executor.submit(self._process_commit, commit): commit | |
for commit in commits_list | |
} | |
for future in concurrent.futures.as_completed(future_to_commit): | |
commit = future_to_commit[future] | |
try: | |
commit_data = future.result() | |
if commit_data: | |
self.commit_history.append(commit_data) | |
except Exception as e: | |
print(f"Error processing commit {commit.sha}: {e}") | |
# Process contributor file statistics | |
self._update_contributor_file_stats() | |
return self.commit_history | |
except Exception as e: | |
print(f"Error loading commits: {e}") | |
return [] | |
def _process_commit(self, commit) -> Optional[Dict]: | |
"""Process a single commit (for parallel execution)""" | |
try: | |
# Make sure the commit date is timezone-naive | |
commit_date = commit.commit.author.date | |
if hasattr(commit_date, 'tzinfo') and commit_date.tzinfo: | |
commit_date = commit_date.replace(tzinfo=None) | |
commit_data = { | |
'sha': commit.sha, | |
'author': commit.author.login if commit.author else 'Unknown', | |
'date': commit_date, | |
'message': commit.commit.message, | |
'files': [] | |
} | |
# Get files changed in this commit | |
try: | |
commit_files = commit.files | |
for file in commit_files: | |
file_data = { | |
'filename': file.filename, | |
'additions': file.additions, | |
'deletions': file.deletions, | |
'changes': file.changes, | |
'status': file.status | |
} | |
commit_data['files'].append(file_data) | |
# Add this file to the contributor's file list | |
if commit.author and commit.author.login in self.contributors_data: | |
self.contributors_data[commit.author.login]['files_modified'].append(file.filename) | |
self.contributors_data[commit.author.login]['commit_messages'].append(commit.commit.message) | |
self.contributors_data[commit.author.login]['activity_dates'].append(commit_date) | |
except Exception as e: | |
print(f"Error processing files for commit {commit.sha}: {e}") | |
return commit_data | |
except Exception as e: | |
print(f"Error processing commit {commit.sha}: {e}") | |
return None | |
def _update_contributor_file_stats(self): | |
"""Update contributor file statistics""" | |
for login, contributor in self.contributors_data.items(): | |
if 'files_modified' in contributor: | |
# Count occurrences of each file | |
file_counts = Counter(contributor['files_modified']) | |
# Replace list with a list of (filename, count) tuples | |
self.contributors_data[login]['files_modified'] = [ | |
{'filename': filename, 'count': count} | |
for filename, count in file_counts.most_common(10) | |
] | |
def load_issues(self, limit: int = 30) -> List[Dict]: | |
"""Load repository issues with improved performance""" | |
if not self.current_repo: | |
return [] | |
try: | |
issues = self.current_repo.get_issues(state='all')[:limit] | |
self.issues_data = [] | |
issues_list = list(issues) # Convert from PaginatedList to list | |
# Process issues in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_to_issue = { | |
executor.submit(self._process_issue, issue): issue | |
for issue in issues_list | |
} | |
for future in concurrent.futures.as_completed(future_to_issue): | |
issue = future_to_issue[future] | |
try: | |
issue_data = future.result() | |
if issue_data: | |
self.issues_data.append(issue_data) | |
except Exception as e: | |
print(f"Error processing issue #{issue.number}: {e}") | |
return self.issues_data | |
except Exception as e: | |
print(f"Error loading issues: {e}") | |
return [] | |
def _process_issue(self, issue) -> Optional[Dict]: | |
"""Process a single issue (for parallel execution)""" | |
try: | |
# Normalize datetime objects | |
created_at = issue.created_at | |
updated_at = issue.updated_at | |
closed_at = issue.closed_at | |
if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
created_at = created_at.replace(tzinfo=None) | |
if hasattr(updated_at, 'tzinfo') and updated_at.tzinfo: | |
updated_at = updated_at.replace(tzinfo=None) | |
if hasattr(closed_at, 'tzinfo') and closed_at and closed_at.tzinfo: | |
closed_at = closed_at.replace(tzinfo=None) | |
issue_data = { | |
'number': issue.number, | |
'title': issue.title, | |
'body': issue.body, | |
'user': issue.user.login if issue.user else 'Unknown', | |
'state': issue.state, | |
'created_at': created_at, | |
'updated_at': updated_at, | |
'closed_at': closed_at, | |
'labels': [label.name for label in issue.labels], | |
'comments': [] | |
} | |
# Get comments for this issue (limited to 10) | |
try: | |
comments = issue.get_comments()[:10] | |
for comment in comments: | |
# Normalize datetime | |
comment_created_at = comment.created_at | |
if hasattr(comment_created_at, 'tzinfo') and comment_created_at.tzinfo: | |
comment_created_at = comment_created_at.replace(tzinfo=None) | |
issue_data['comments'].append({ | |
'user': comment.user.login if comment.user else 'Unknown', | |
'body': comment.body, | |
'created_at': comment_created_at | |
}) | |
except Exception as e: | |
print(f"Error loading comments for issue #{issue.number}: {e}") | |
return issue_data | |
except Exception as e: | |
print(f"Error processing issue #{issue.number}: {e}") | |
return None | |
# Knowledge Base and Vector Storage | |
class KnowledgeBase: | |
"""Manages the knowledge base for the repository""" | |
def __init__(self, config: Config): | |
self.config = config | |
self.embeddings = {} | |
self.embedding_model = SentenceTransformer(config.embedding_model_name) | |
self.index = None | |
self.knowledge_graph = nx.Graph() | |
self.insights = {} | |
self.insights_cache = {} | |
self.cache_timestamp = None | |
def initialize_vector_storage(self, file_contents: Dict[str, Dict]) -> None: | |
"""Initialize vector storage with file contents and batched processing""" | |
try: | |
# Clear existing data | |
self.embeddings = {} | |
self.knowledge_graph = nx.Graph() | |
# Process files and create embeddings | |
texts = [] | |
ids = [] | |
# Process files in parallel for large repositories | |
if len(file_contents) > 50: | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Process files in batches | |
batch_size = 20 | |
keys = list(file_contents.keys()) | |
batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)] | |
# Create a function to process a batch | |
def process_batch(batch_keys): | |
batch_texts = [] | |
batch_ids = [] | |
for path in batch_keys: | |
file_data = file_contents[path] | |
content = file_data['content'] | |
# Skip very large files to avoid embedding issues | |
if len(content) > 10000: | |
content = content[:10000] + "..." | |
batch_texts.append(content) | |
batch_ids.append(path) | |
return batch_texts, batch_ids | |
# Submit batch processing tasks | |
futures = [executor.submit(process_batch, batch) for batch in batches] | |
# Collect results | |
for future in concurrent.futures.as_completed(futures): | |
batch_texts, batch_ids = future.result() | |
texts.extend(batch_texts) | |
ids.extend(batch_ids) | |
else: | |
# For smaller repositories, process sequentially | |
for path, file_data in file_contents.items(): | |
content = file_data['content'] | |
# Skip very large files to avoid embedding issues | |
if len(content) > 10000: | |
content = content[:10000] + "..." | |
texts.append(content) | |
ids.append(path) | |
# Add nodes to knowledge graph | |
for path, file_data in file_contents.items(): | |
self.knowledge_graph.add_node( | |
path, | |
type='file', | |
file_type=file_data.get('type', 'unknown'), | |
size=file_data.get('size', 0), | |
extension=file_data.get('ext', '') | |
) | |
# Create embeddings for all files | |
if texts: | |
# Process embeddings in batches to avoid memory issues | |
batch_size = 32 | |
file_embeddings = [] | |
for i in range(0, len(texts), batch_size): | |
batch_texts = texts[i:i+batch_size] | |
batch_embeddings = self.embedding_model.encode(batch_texts) | |
file_embeddings.append(batch_embeddings) | |
file_embeddings = np.vstack(file_embeddings) | |
# Initialize FAISS index | |
dimension = file_embeddings.shape[1] | |
self.index = faiss.IndexFlatL2(dimension) | |
self.index.add(np.array(file_embeddings).astype('float32')) | |
# Store embeddings with their IDs | |
for i, file_id in enumerate(ids): | |
self.embeddings[file_id] = { | |
'embedding': file_embeddings[i], | |
'content': texts[i] | |
} | |
except Exception as e: | |
print(f"Error initializing vector storage: {e}") | |
def build_knowledge_graph(self, commits: List[Dict], contributors: Dict) -> nx.Graph: | |
"""Build knowledge graph from repository data""" | |
try: | |
# Add contributor nodes | |
for login, data in contributors.items(): | |
self.knowledge_graph.add_node( | |
login, | |
type='contributor', | |
contributions=data['contributions'] | |
) | |
# Add connections between contributors and files | |
for login, data in contributors.items(): | |
for file_data in data['files_modified']: | |
filename = file_data['filename'] | |
count = file_data['count'] | |
# Only add edges if file exists in the graph | |
if filename in self.knowledge_graph: | |
if self.knowledge_graph.has_edge(login, filename): | |
# Update weight if edge exists | |
self.knowledge_graph[login][filename]['weight'] += count | |
else: | |
# Create new edge | |
self.knowledge_graph.add_edge(login, filename, weight=count) | |
# Optimized co-occurrence calculation | |
file_co_occurrence = defaultdict(int) | |
# Process in batches for large commit histories | |
batch_size = 50 | |
for i in range(0, len(commits), batch_size): | |
batch_commits = commits[i:i+batch_size] | |
for commit in batch_commits: | |
# Get all files in this commit | |
commit_files = [file['filename'] for file in commit['files']] | |
# Add co-occurrence for each pair of files | |
from itertools import combinations | |
for file1, file2 in combinations(commit_files, 2): | |
if file1 in self.knowledge_graph and file2 in self.knowledge_graph: | |
file_pair = tuple(sorted([file1, file2])) | |
file_co_occurrence[file_pair] += 1 | |
# Add edges for file co-occurrence | |
for (file1, file2), count in file_co_occurrence.items(): | |
if count >= 2: # Only add edge if files co-occur at least twice | |
if self.knowledge_graph.has_edge(file1, file2): | |
self.knowledge_graph[file1][file2]['weight'] += count | |
else: | |
self.knowledge_graph.add_edge(file1, file2, weight=count, type='co-occurrence') | |
return self.knowledge_graph | |
except Exception as e: | |
print(f"Error building knowledge graph: {e}") | |
return nx.Graph() | |
def search_similar_files(self, query: str, top_k: int = 5) -> List[Dict]: | |
"""Search for files similar to query with caching""" | |
try: | |
if not self.index: | |
return [] | |
# Encode query | |
query_embedding = self.embedding_model.encode([query]) | |
# Search in FAISS | |
distances, indices = self.index.search(np.array(query_embedding).astype('float32'), top_k) | |
# Get results | |
results = [] | |
all_ids = list(self.embeddings.keys()) | |
for i, idx in enumerate(indices[0]): | |
if idx < len(all_ids): | |
file_id = all_ids[idx] | |
results.append({ | |
'file': file_id, | |
'distance': float(distances[0][i]), | |
'content': self.embeddings[file_id]['content'][:1000] + "..." if len(self.embeddings[file_id]['content']) > 1000 else self.embeddings[file_id]['content'] | |
}) | |
return results | |
except Exception as e: | |
print(f"Error searching similar files: {e}") | |
return [] | |
def extract_insights(self, repo_data: Dict, commits: List[Dict], contributors: Dict, issues: List[Dict]) -> Dict: | |
"""Extract insights from repository data with datetime fix and caching""" | |
# Check if we have a recent cache (less than 10 minutes old) | |
current_time = time.time() | |
if self.cache_timestamp and (current_time - self.cache_timestamp < 600) and self.insights_cache: | |
return self.insights_cache | |
try: | |
insights = { | |
'basic_stats': {}, | |
'activity': {}, | |
'contributors': {}, | |
'code': {}, | |
'issues': {} | |
} | |
# Make a deep copy of repo_data to avoid modifying the original | |
repo_data_copy = {k: v for k, v in repo_data.items()} | |
# Basic statistics | |
insights['basic_stats'] = { | |
'name': repo_data_copy['name'], | |
'description': repo_data_copy['description'], | |
'stars': repo_data_copy['stars'], | |
'forks': repo_data_copy['forks'], | |
'age_days': None, # Will calculate below | |
'primary_language': repo_data_copy['language'], | |
'topics': repo_data_copy['topics'] | |
} | |
# Fix: Normalize datetime objects to be timezone-naive for consistent comparison | |
created_at = repo_data_copy.get('created_at') | |
if created_at: | |
# Remove timezone info if present | |
if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
created_at = created_at.replace(tzinfo=None) | |
# Calculate age | |
now = datetime.datetime.now() | |
insights['basic_stats']['age_days'] = (now - created_at).days | |
# Activity insights | |
if commits: | |
# Fix: Normalize all datetime objects to be timezone-naive | |
commit_dates = [] | |
for commit in commits: | |
date = commit.get('date') | |
if date: | |
# Remove timezone info if present | |
if hasattr(date, 'tzinfo') and date.tzinfo: | |
date = date.replace(tzinfo=None) | |
commit_dates.append(date) | |
# Sort dates | |
commit_dates.sort() | |
if commit_dates: | |
# Calculate commit frequency | |
first_commit = commit_dates[0] | |
last_commit = commit_dates[-1] | |
days_span = (last_commit - first_commit).days + 1 | |
insights['activity'] = { | |
'total_commits': len(commits), | |
'first_commit': first_commit, | |
'last_commit': last_commit, | |
'days_span': days_span, | |
'commits_per_day': round(len(commits) / max(days_span, 1), 2), | |
} | |
# Fix: Use Counter for most active day calculation | |
date_counter = Counter(d.date() for d in commit_dates) | |
if date_counter: | |
insights['activity']['most_active_day'] = date_counter.most_common(1)[0][0] | |
# Commit activity by month | |
commit_months = [d.strftime('%Y-%m') for d in commit_dates] | |
month_counts = Counter(commit_months) | |
insights['activity']['monthly_activity'] = [ | |
{'month': month, 'commits': count} for month, count in month_counts.most_common(12) | |
] | |
# Contributor insights | |
if contributors: | |
top_contributors = sorted(contributors.values(), key=lambda x: x['contributions'], reverse=True)[:10] | |
insights['contributors'] = { | |
'total_contributors': len(contributors), | |
'top_contributors': [ | |
{ | |
'login': c['login'], | |
'contributions': c['contributions'], | |
'top_files': [f['filename'] for f in c['files_modified'][:5]] if c['files_modified'] else [] | |
} for c in top_contributors | |
] | |
} | |
# Calculate bus factor (simplified) | |
total_commits = sum(c['contributions'] for c in contributors.values()) | |
running_sum = 0 | |
bus_factor = 0 | |
for c in top_contributors: | |
running_sum += c['contributions'] | |
bus_factor += 1 | |
if running_sum / total_commits > 0.5: | |
break | |
insights['contributors']['bus_factor'] = bus_factor | |
# Code insights | |
if self.knowledge_graph: | |
# Get top connected files | |
file_nodes = [(node, degree) for node, degree in self.knowledge_graph.degree() | |
if self.knowledge_graph.nodes[node].get('type') == 'file'] | |
top_files = sorted(file_nodes, key=lambda x: x[1], reverse=True)[:10] | |
insights['code']['central_files'] = [ | |
{'filename': filename, 'connections': degree} for filename, degree in top_files | |
] | |
# Most frequently modified files from commits | |
file_modifications = Counter() | |
for commit in commits: | |
for file in commit['files']: | |
file_modifications[file['filename']] += 1 | |
insights['code']['frequently_modified_files'] = [ | |
{'filename': filename, 'modifications': count} | |
for filename, count in file_modifications.most_common(10) | |
] | |
# File types distribution | |
file_types = Counter([os.path.splitext(node)[1] for node in self.knowledge_graph.nodes() | |
if '.' in node and self.knowledge_graph.nodes[node].get('type') == 'file']) | |
insights['code']['file_types'] = [ | |
{'extension': ext, 'count': count} for ext, count in file_types.most_common() | |
] | |
# Issue insights | |
if issues: | |
# Calculate issue statistics | |
open_issues = [issue for issue in issues if issue['state'] == 'open'] | |
closed_issues = [issue for issue in issues if issue['state'] == 'closed'] | |
insights['issues'] = { | |
'total_issues': len(issues), | |
'open_issues': len(open_issues), | |
'closed_issues': len(closed_issues), | |
'resolution_rate': round(len(closed_issues) / max(len(issues), 1), 2) | |
} | |
# Calculate average time to close | |
close_times = [] | |
for issue in closed_issues: | |
if issue['created_at'] and issue['closed_at']: | |
# Fix: Normalize datetime objects to be timezone-naive | |
created_at = issue['created_at'] | |
closed_at = issue['closed_at'] | |
if hasattr(created_at, 'tzinfo') and created_at.tzinfo: | |
created_at = created_at.replace(tzinfo=None) | |
if hasattr(closed_at, 'tzinfo') and closed_at.tzinfo: | |
closed_at = closed_at.replace(tzinfo=None) | |
close_time = (closed_at - created_at).days | |
close_times.append(close_time) | |
if close_times: | |
insights['issues']['avg_days_to_close'] = round(sum(close_times) / len(close_times), 1) | |
# Top issue labels | |
issue_labels = [label for issue in issues for label in issue['labels']] | |
label_counts = Counter(issue_labels) | |
insights['issues']['top_labels'] = [ | |
{'label': label, 'count': count} for label, count in label_counts.most_common(5) | |
] | |
# Update cache | |
self.insights_cache = insights | |
self.cache_timestamp = current_time | |
self.insights = insights | |
return insights | |
except Exception as e: | |
import traceback | |
print(f"Error extracting insights: {e}") | |
print(traceback.format_exc()) | |
return {} | |
# Main GitHub AI Agent Class | |
class GitHubAIAgent: | |
"""Main class for GitHub AI Agent""" | |
def __init__(self): | |
self.config = Config() | |
self.github_manager = None | |
self.knowledge_base = None | |
self.gemini_client = None | |
self.visualization_manager = None | |
self.repository_loaded = False | |
self.repository_url = "" | |
self.repository_analysis = {} | |
self.visualizations = {} | |
# Initialize caches | |
self.file_cache = {} | |
self.contributor_cache = {} | |
self.commit_cache = {} | |
self.issue_cache = {} | |
self.query_cache = {} | |
def set_api_keys(self, gemini_api_key: str, github_token: str = None) -> None: | |
"""Set API keys""" | |
# Set environment variables | |
os.environ["GEMINI_API_KEY"] = gemini_api_key | |
if github_token: | |
os.environ["GITHUB_ACCESS_TOKEN"] = github_token | |
# Update config | |
self.config.gemini_api_key = gemini_api_key | |
self.config.github_token = github_token | |
# Initialize clients | |
self.github_manager = GitHubManager(self.config) | |
self.knowledge_base = KnowledgeBase(self.config) | |
self.gemini_client = GeminiClient(self.config.gemini_api_key, self.config.gemini_model) | |
self.visualization_manager = RepositoryVisualizer(self.config) | |
def load_repository(self, repository_url: str) -> Dict: | |
"""Load and analyze a GitHub repository with improved parallelization""" | |
result = { | |
'success': False, | |
'message': '', | |
'repo_data': {}, | |
'file_count': 0, | |
'contributor_count': 0 | |
} | |
try: | |
# Reset state | |
self.repository_loaded = False | |
self.repository_url = "" | |
self.repository_analysis = {} | |
self.visualizations = {} | |
# Load repository basic info | |
print(f"Loading repository: {repository_url}") | |
repo_loaded = self.github_manager.load_repository(repository_url) | |
if not repo_loaded: | |
result['message'] = "Failed to load repository. Check the URL and your GitHub access token." | |
return result | |
# Store repository URL | |
self.repository_url = repository_url | |
# Use parallel processing for loading repository data | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
# Submit tasks | |
files_future = executor.submit(self.github_manager.load_files) | |
contributors_future = executor.submit(self.github_manager.load_contributors) | |
commits_future = executor.submit(self.github_manager.load_commits) | |
issues_future = executor.submit(self.github_manager.load_issues) | |
# Get results | |
files = files_future.result() | |
contributors = contributors_future.result() | |
commits = commits_future.result() | |
issues = issues_future.result() | |
result['file_count'] = len(files) | |
result['contributor_count'] = len(contributors) | |
# Initialize vector storage and build knowledge graph | |
# (These are kept sequential as they depend on previous steps) | |
print("Building knowledge base") | |
self.knowledge_base.initialize_vector_storage(files) | |
knowledge_graph = self.knowledge_base.build_knowledge_graph( | |
commits, self.github_manager.contributors_data | |
) | |
# Extract repository insights | |
print("Extracting repository insights") | |
insights = self.knowledge_base.extract_insights( | |
self.github_manager.repo_data, | |
commits, | |
self.github_manager.contributors_data, | |
issues | |
) | |
# Use a separate thread for Gemini analysis which can be slower | |
# and doesn't block the main thread | |
def analyze_with_gemini(): | |
print("Analyzing repository with Gemini") | |
return self.gemini_client.analyze_repository( | |
self.github_manager.repo_data, | |
files, | |
commits, | |
self.github_manager.contributors_data, | |
insights | |
) | |
# Use another thread pool for visualization generation | |
def create_visualizations(): | |
print("Creating repository visualizations") | |
repo_graph_path = self.visualization_manager.create_repository_graph(knowledge_graph) | |
activity_chart_path = self.visualization_manager.create_commit_activity_chart(commits) | |
contributor_network_path = self.visualization_manager.create_contributor_network( | |
self.github_manager.contributors_data, commits | |
) | |
dependency_graph_path = self.visualization_manager.create_file_dependency_graph(files) | |
return { | |
'repository_graph': repo_graph_path, | |
'activity_chart': activity_chart_path, | |
'contributor_network': contributor_network_path, | |
'dependency_graph': dependency_graph_path, | |
} | |
# Run Gemini analysis and visualization generation in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
analysis_future = executor.submit(analyze_with_gemini) | |
viz_future = executor.submit(create_visualizations) | |
# Get results | |
self.repository_analysis = analysis_future.result() | |
self.visualizations = viz_future.result() | |
# Update result | |
result['success'] = True | |
result['message'] = f"Successfully loaded and analyzed repository: {self.github_manager.repo_data['full_name']}" | |
result['repo_data'] = self.github_manager.repo_data | |
self.repository_loaded = True | |
return result | |
except Exception as e: | |
import traceback | |
print(f"Error loading repository: {str(e)}") | |
print(traceback.format_exc()) | |
result['message'] = f"Error loading repository: {str(e)}" | |
return result | |
def answer_query(self, query: str) -> Dict: | |
"""Answer a natural language query about the repository with caching""" | |
if not self.repository_loaded: | |
return { | |
'success': False, | |
'message': "No repository loaded. Please load a repository first.", | |
'answer': "" | |
} | |
# Check cache if enabled | |
cache_key = f"query_{hash(query)}" | |
if self.config.cache_enabled and cache_key in self.query_cache: | |
cached_result = self.query_cache[cache_key] | |
# Check if cache is still valid | |
if time.time() - cached_result['timestamp'] < self.config.cache_ttl: | |
return cached_result['result'] | |
try: | |
# Search for relevant files | |
similar_files = self.knowledge_base.search_similar_files(query) | |
# Get answer from Gemini | |
answer = self.gemini_client.answer_query( | |
query, | |
self.github_manager.repo_data, | |
similar_files, | |
self.knowledge_base.insights | |
) | |
result = { | |
'success': True, | |
'message': "Query answered successfully", | |
'answer': answer, | |
'relevant_files': [f['file'] for f in similar_files] | |
} | |
# Update cache | |
if self.config.cache_enabled: | |
self.query_cache[cache_key] = { | |
'result': result, | |
'timestamp': time.time() | |
} | |
return result | |
except Exception as e: | |
return { | |
'success': False, | |
'message': f"Error answering query: {str(e)}", | |
'answer': "" | |
} | |
def analyze_code(self, file_path: str = "", code_snippet: str = "", language: str = "") -> Dict: | |
"""Analyze a code file or snippet with improved error handling""" | |
if not file_path and not code_snippet: | |
return { | |
'success': False, | |
'message': "Please provide a file path or code snippet", | |
'analysis': "" | |
} | |
try: | |
# If file path provided, get code from repository | |
if file_path: | |
if not self.repository_loaded: | |
return { | |
'success': False, | |
'message': "No repository loaded. Please load a repository first.", | |
'analysis': "" | |
} | |
if file_path not in self.github_manager.file_contents: | |
return { | |
'success': False, | |
'message': f"File not found: {file_path}", | |
'analysis': "" | |
} | |
code = self.github_manager.file_contents[file_path]['content'] | |
_, ext = os.path.splitext(file_path) | |
language = ext.lstrip('.') | |
else: | |
code = code_snippet | |
# Analyze code with Gemini | |
analysis = self.gemini_client.analyze_code_snippet(code, language) | |
return { | |
'success': True, | |
'message': "Code analyzed successfully", | |
'analysis': analysis | |
} | |
except Exception as e: | |
return { | |
'success': False, | |
'message': f"Error analyzing code: {str(e)}", | |
'analysis': "" | |
} | |
def find_collaborators(self, requirements: str) -> Dict: | |
"""Find potential collaborators based on requirements""" | |
if not self.repository_loaded: | |
return { | |
'success': False, | |
'message': "No repository loaded. Please load a repository first.", | |
'collaborators': [] | |
} | |
try: | |
# Find collaborators with Gemini | |
collaborators = self.gemini_client.identify_potential_collaborators( | |
self.github_manager.contributors_data, | |
self.knowledge_base.insights, | |
requirements | |
) | |
return { | |
'success': True, | |
'message': "Potential collaborators identified", | |
'collaborators': collaborators | |
} | |
except Exception as e: | |
return { | |
'success': False, | |
'message': f"Error finding collaborators: {str(e)}", | |
'collaborators': [] | |
} | |
def get_repository_insights(self) -> Dict: | |
"""Get insights about the repository""" | |
if not self.repository_loaded: | |
return { | |
'success': False, | |
'message': "No repository loaded. Please load a repository first.", | |
'insights': {} | |
} | |
try: | |
return { | |
'success': True, | |
'message': "Repository insights retrieved", | |
'insights': self.knowledge_base.insights, | |
'analysis': self.repository_analysis | |
} | |
except Exception as e: | |
return { | |
'success': False, | |
'message': f"Error getting repository insights: {str(e)}", | |
'insights': {} | |
} | |
def get_visualizations(self) -> Dict: | |
"""Get repository visualizations""" | |
if not self.repository_loaded: | |
return { | |
'success': False, | |
'message': "No repository loaded. Please load a repository first.", | |
'visualizations': {} | |
} | |
return { | |
'success': True, | |
'message': "Repository visualizations retrieved", | |
'visualizations': self.visualizations | |
} | |
def clear_caches(self) -> None: | |
"""Clear all caches""" | |
self.file_cache.clear() | |
self.contributor_cache.clear() | |
self.commit_cache.clear() | |
self.issue_cache.clear() | |
self.query_cache.clear() | |
# Clear LRU caches | |
self.answer_query.cache_clear() | |
if hasattr(self.knowledge_base, 'search_similar_files'): | |
self.knowledge_base.search_similar_files.cache_clear() |