# github_ai_agent.py - Improved version with parallel processing and error handling import os import re import time import json import datetime import networkx as nx from collections import defaultdict, Counter from itertools import combinations import numpy as np from typing import List, Dict, Tuple, Any, Optional, Union import concurrent.futures from functools import lru_cache import google.generativeai as genai # External libraries from github import Github, GithubException from sentence_transformers import SentenceTransformer import faiss from gemini_integration import GeminiClient from visualization_module import RepositoryVisualizer # Configuration class Config: """Configuration for the GitHub AI Agent""" def __init__(self): self.gemini_api_key = os.environ.get("GEMINI_API_KEY") self.github_token = os.environ.get("GITHUB_ACCESS_TOKEN") self.embedding_model_name = "all-MiniLM-L6-v2" self.gemini_model = "gemini-2.0-pro-exp-02-05" self.max_files_to_load = 100 # Safety limit for large repos self.max_token_length = 64000 # Gemini Pro context limit self.enable_advanced_metrics = True self.visualization_node_limit = 150 self.cache_enabled = True self.cache_ttl = 3600 # Cache time to live in seconds # File extensions to analyze self.code_extensions = [ '.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.c', '.cpp', '.cs', '.go', '.rb', '.php', '.swift', '.kt', '.rs', '.hs', '.scala', '.ml' ] self.doc_extensions = [ '.md', '.txt', '.rst', '.html', '.xml', '.json', '.yaml', '.yml' ] # GitHub Repository Management class GitHubManager: """Manages interaction with GitHub repositories""" def __init__(self, config: Config): self.config = config self.g = Github(config.github_token) if config.github_token else Github() self.current_repo = None self.repo_data = {} self.file_contents = {} self.contributors_data = {} self.commit_history = [] self.issues_data = [] self.file_cache = {} # Cache for loaded files def load_repository(self, repo_url: str) -> bool: """Load a repository from URL""" try: # Extract repo name from URL repo_name = self._extract_repo_name(repo_url) if not repo_name: return False # Get repository self.current_repo = self.g.get_repo(repo_name) # Load basic repository data self.repo_data = { 'name': self.current_repo.name, 'full_name': self.current_repo.full_name, 'description': self.current_repo.description, 'stars': self.current_repo.stargazers_count, 'forks': self.current_repo.forks_count, 'watchers': self.current_repo.watchers_count, 'open_issues': self.current_repo.open_issues_count, 'created_at': self.current_repo.created_at, 'updated_at': self.current_repo.updated_at, 'default_branch': self.current_repo.default_branch, 'language': self.current_repo.language, 'topics': self.current_repo.get_topics(), 'license': self.current_repo.license.name if self.current_repo.license else None, } return True except Exception as e: print(f"Error loading repository: {e}") return False def _extract_repo_name(self, repo_url: str) -> Optional[str]: """Extract repository name from URL""" # Handle URLs like: https://github.com/username/repository github_pattern = r'github\.com[/:]([^/]+)/([^/]+)' match = re.search(github_pattern, repo_url) if match: username, repo = match.groups() # Remove .git extension if present repo = repo.replace('.git', '') return f"{username}/{repo}" return None def load_files(self) -> Dict[str, Dict]: """Load files from repository with improved performance""" if not self.current_repo: return {} try: contents = self.current_repo.get_contents("") self.file_contents = {} files_loaded = 0 batch_size = 20 # Process files in batches # Create a queue of files to process file_queue = [] # First pass - collect all file paths while contents: content_item = contents.pop(0) # Skip directories but add their contents to our processing queue if content_item.type == "dir": try: dir_contents = self.current_repo.get_contents(content_item.path) contents.extend(dir_contents) except Exception as e: print(f"Error accessing directory {content_item.path}: {e}") continue # Filter by extensions _, ext = os.path.splitext(content_item.path) if ext not in self.config.code_extensions + self.config.doc_extensions: continue # Add file to processing queue file_queue.append(content_item) # Stop if we've reached our limit if len(file_queue) >= self.config.max_files_to_load: break # Process files in batches for i in range(0, len(file_queue), batch_size): batch = file_queue[i:i+batch_size] # Process batch in parallel with concurrent.futures.ThreadPoolExecutor() as executor: future_to_file = { executor.submit(self._process_file, file_content): file_content for file_content in batch } for future in concurrent.futures.as_completed(future_to_file): file_content = future_to_file[future] try: result = future.result() if result: self.file_contents[file_content.path] = result files_loaded += 1 except Exception as e: print(f"Error processing file {file_content.path}: {e}") return self.file_contents except Exception as e: print(f"Error loading files: {e}") return {} def _process_file(self, file_content) -> Optional[Dict]: """Process a single file (for parallel execution)""" try: # Check if in cache if file_content.path in self.file_cache: return self.file_cache[file_content.path] _, ext = os.path.splitext(file_content.path) # Only process text files with specified extensions if ext not in self.config.code_extensions + self.config.doc_extensions: return None try: # Decode content decoded_content = file_content.decoded_content.decode('utf-8') result = { 'content': decoded_content, 'type': 'code' if ext in self.config.code_extensions else 'document', 'size': file_content.size, 'ext': ext } # Update cache self.file_cache[file_content.path] = result return result except UnicodeDecodeError: # Skip binary files return None except Exception as e: print(f"Error processing file {file_content.path}: {e}") return None def load_contributors(self) -> List[Dict]: """Load repository contributors with improved performance""" if not self.current_repo: return [] try: contributors = self.current_repo.get_contributors() self.contributors_data = {} # Collect basic contributor info contributor_list = list(contributors) # Convert from PaginatedList to list # Process in parallel with concurrent.futures.ThreadPoolExecutor() as executor: future_to_contributor = { executor.submit(self._process_contributor, contributor): contributor for contributor in contributor_list } for future in concurrent.futures.as_completed(future_to_contributor): contributor = future_to_contributor[future] try: contributor_data = future.result() if contributor_data: self.contributors_data[contributor.login] = contributor_data except Exception as e: print(f"Error processing contributor {contributor.login}: {e}") return list(self.contributors_data.values()) except Exception as e: print(f"Error loading contributors: {e}") return [] def _process_contributor(self, contributor) -> Dict: """Process a single contributor (for parallel execution)""" try: return { 'login': contributor.login, 'id': contributor.id, 'contributions': contributor.contributions, 'avatar_url': contributor.avatar_url, 'html_url': contributor.html_url, 'type': contributor.type, 'files_modified': [], 'commit_messages': [], 'activity_dates': [] } except Exception as e: print(f"Error processing contributor {contributor.login}: {e}") return None def load_commits(self, limit: int = 100) -> List[Dict]: """Load repository commits with improved performance""" if not self.current_repo: return [] try: commits = self.current_repo.get_commits()[:limit] self.commit_history = [] commits_list = list(commits) # Convert from PaginatedList to list # Process commits in parallel with concurrent.futures.ThreadPoolExecutor() as executor: future_to_commit = { executor.submit(self._process_commit, commit): commit for commit in commits_list } for future in concurrent.futures.as_completed(future_to_commit): commit = future_to_commit[future] try: commit_data = future.result() if commit_data: self.commit_history.append(commit_data) except Exception as e: print(f"Error processing commit {commit.sha}: {e}") # Process contributor file statistics self._update_contributor_file_stats() return self.commit_history except Exception as e: print(f"Error loading commits: {e}") return [] def _process_commit(self, commit) -> Optional[Dict]: """Process a single commit (for parallel execution)""" try: # Make sure the commit date is timezone-naive commit_date = commit.commit.author.date if hasattr(commit_date, 'tzinfo') and commit_date.tzinfo: commit_date = commit_date.replace(tzinfo=None) commit_data = { 'sha': commit.sha, 'author': commit.author.login if commit.author else 'Unknown', 'date': commit_date, 'message': commit.commit.message, 'files': [] } # Get files changed in this commit try: commit_files = commit.files for file in commit_files: file_data = { 'filename': file.filename, 'additions': file.additions, 'deletions': file.deletions, 'changes': file.changes, 'status': file.status } commit_data['files'].append(file_data) # Add this file to the contributor's file list if commit.author and commit.author.login in self.contributors_data: self.contributors_data[commit.author.login]['files_modified'].append(file.filename) self.contributors_data[commit.author.login]['commit_messages'].append(commit.commit.message) self.contributors_data[commit.author.login]['activity_dates'].append(commit_date) except Exception as e: print(f"Error processing files for commit {commit.sha}: {e}") return commit_data except Exception as e: print(f"Error processing commit {commit.sha}: {e}") return None def _update_contributor_file_stats(self): """Update contributor file statistics""" for login, contributor in self.contributors_data.items(): if 'files_modified' in contributor: # Count occurrences of each file file_counts = Counter(contributor['files_modified']) # Replace list with a list of (filename, count) tuples self.contributors_data[login]['files_modified'] = [ {'filename': filename, 'count': count} for filename, count in file_counts.most_common(10) ] def load_issues(self, limit: int = 30) -> List[Dict]: """Load repository issues with improved performance""" if not self.current_repo: return [] try: issues = self.current_repo.get_issues(state='all')[:limit] self.issues_data = [] issues_list = list(issues) # Convert from PaginatedList to list # Process issues in parallel with concurrent.futures.ThreadPoolExecutor() as executor: future_to_issue = { executor.submit(self._process_issue, issue): issue for issue in issues_list } for future in concurrent.futures.as_completed(future_to_issue): issue = future_to_issue[future] try: issue_data = future.result() if issue_data: self.issues_data.append(issue_data) except Exception as e: print(f"Error processing issue #{issue.number}: {e}") return self.issues_data except Exception as e: print(f"Error loading issues: {e}") return [] def _process_issue(self, issue) -> Optional[Dict]: """Process a single issue (for parallel execution)""" try: # Normalize datetime objects created_at = issue.created_at updated_at = issue.updated_at closed_at = issue.closed_at if hasattr(created_at, 'tzinfo') and created_at.tzinfo: created_at = created_at.replace(tzinfo=None) if hasattr(updated_at, 'tzinfo') and updated_at.tzinfo: updated_at = updated_at.replace(tzinfo=None) if hasattr(closed_at, 'tzinfo') and closed_at and closed_at.tzinfo: closed_at = closed_at.replace(tzinfo=None) issue_data = { 'number': issue.number, 'title': issue.title, 'body': issue.body, 'user': issue.user.login if issue.user else 'Unknown', 'state': issue.state, 'created_at': created_at, 'updated_at': updated_at, 'closed_at': closed_at, 'labels': [label.name for label in issue.labels], 'comments': [] } # Get comments for this issue (limited to 10) try: comments = issue.get_comments()[:10] for comment in comments: # Normalize datetime comment_created_at = comment.created_at if hasattr(comment_created_at, 'tzinfo') and comment_created_at.tzinfo: comment_created_at = comment_created_at.replace(tzinfo=None) issue_data['comments'].append({ 'user': comment.user.login if comment.user else 'Unknown', 'body': comment.body, 'created_at': comment_created_at }) except Exception as e: print(f"Error loading comments for issue #{issue.number}: {e}") return issue_data except Exception as e: print(f"Error processing issue #{issue.number}: {e}") return None # Knowledge Base and Vector Storage class KnowledgeBase: """Manages the knowledge base for the repository""" def __init__(self, config: Config): self.config = config self.embeddings = {} self.embedding_model = SentenceTransformer(config.embedding_model_name) self.index = None self.knowledge_graph = nx.Graph() self.insights = {} self.insights_cache = {} self.cache_timestamp = None def initialize_vector_storage(self, file_contents: Dict[str, Dict]) -> None: """Initialize vector storage with file contents and batched processing""" try: # Clear existing data self.embeddings = {} self.knowledge_graph = nx.Graph() # Process files and create embeddings texts = [] ids = [] # Process files in parallel for large repositories if len(file_contents) > 50: with concurrent.futures.ThreadPoolExecutor() as executor: # Process files in batches batch_size = 20 keys = list(file_contents.keys()) batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)] # Create a function to process a batch def process_batch(batch_keys): batch_texts = [] batch_ids = [] for path in batch_keys: file_data = file_contents[path] content = file_data['content'] # Skip very large files to avoid embedding issues if len(content) > 10000: content = content[:10000] + "..." batch_texts.append(content) batch_ids.append(path) return batch_texts, batch_ids # Submit batch processing tasks futures = [executor.submit(process_batch, batch) for batch in batches] # Collect results for future in concurrent.futures.as_completed(futures): batch_texts, batch_ids = future.result() texts.extend(batch_texts) ids.extend(batch_ids) else: # For smaller repositories, process sequentially for path, file_data in file_contents.items(): content = file_data['content'] # Skip very large files to avoid embedding issues if len(content) > 10000: content = content[:10000] + "..." texts.append(content) ids.append(path) # Add nodes to knowledge graph for path, file_data in file_contents.items(): self.knowledge_graph.add_node( path, type='file', file_type=file_data.get('type', 'unknown'), size=file_data.get('size', 0), extension=file_data.get('ext', '') ) # Create embeddings for all files if texts: # Process embeddings in batches to avoid memory issues batch_size = 32 file_embeddings = [] for i in range(0, len(texts), batch_size): batch_texts = texts[i:i+batch_size] batch_embeddings = self.embedding_model.encode(batch_texts) file_embeddings.append(batch_embeddings) file_embeddings = np.vstack(file_embeddings) # Initialize FAISS index dimension = file_embeddings.shape[1] self.index = faiss.IndexFlatL2(dimension) self.index.add(np.array(file_embeddings).astype('float32')) # Store embeddings with their IDs for i, file_id in enumerate(ids): self.embeddings[file_id] = { 'embedding': file_embeddings[i], 'content': texts[i] } except Exception as e: print(f"Error initializing vector storage: {e}") def build_knowledge_graph(self, commits: List[Dict], contributors: Dict) -> nx.Graph: """Build knowledge graph from repository data""" try: # Add contributor nodes for login, data in contributors.items(): self.knowledge_graph.add_node( login, type='contributor', contributions=data['contributions'] ) # Add connections between contributors and files for login, data in contributors.items(): for file_data in data['files_modified']: filename = file_data['filename'] count = file_data['count'] # Only add edges if file exists in the graph if filename in self.knowledge_graph: if self.knowledge_graph.has_edge(login, filename): # Update weight if edge exists self.knowledge_graph[login][filename]['weight'] += count else: # Create new edge self.knowledge_graph.add_edge(login, filename, weight=count) # Optimized co-occurrence calculation file_co_occurrence = defaultdict(int) # Process in batches for large commit histories batch_size = 50 for i in range(0, len(commits), batch_size): batch_commits = commits[i:i+batch_size] for commit in batch_commits: # Get all files in this commit commit_files = [file['filename'] for file in commit['files']] # Add co-occurrence for each pair of files from itertools import combinations for file1, file2 in combinations(commit_files, 2): if file1 in self.knowledge_graph and file2 in self.knowledge_graph: file_pair = tuple(sorted([file1, file2])) file_co_occurrence[file_pair] += 1 # Add edges for file co-occurrence for (file1, file2), count in file_co_occurrence.items(): if count >= 2: # Only add edge if files co-occur at least twice if self.knowledge_graph.has_edge(file1, file2): self.knowledge_graph[file1][file2]['weight'] += count else: self.knowledge_graph.add_edge(file1, file2, weight=count, type='co-occurrence') return self.knowledge_graph except Exception as e: print(f"Error building knowledge graph: {e}") return nx.Graph() @lru_cache(maxsize=32) def search_similar_files(self, query: str, top_k: int = 5) -> List[Dict]: """Search for files similar to query with caching""" try: if not self.index: return [] # Encode query query_embedding = self.embedding_model.encode([query]) # Search in FAISS distances, indices = self.index.search(np.array(query_embedding).astype('float32'), top_k) # Get results results = [] all_ids = list(self.embeddings.keys()) for i, idx in enumerate(indices[0]): if idx < len(all_ids): file_id = all_ids[idx] results.append({ 'file': file_id, 'distance': float(distances[0][i]), 'content': self.embeddings[file_id]['content'][:1000] + "..." if len(self.embeddings[file_id]['content']) > 1000 else self.embeddings[file_id]['content'] }) return results except Exception as e: print(f"Error searching similar files: {e}") return [] def extract_insights(self, repo_data: Dict, commits: List[Dict], contributors: Dict, issues: List[Dict]) -> Dict: """Extract insights from repository data with datetime fix and caching""" # Check if we have a recent cache (less than 10 minutes old) current_time = time.time() if self.cache_timestamp and (current_time - self.cache_timestamp < 600) and self.insights_cache: return self.insights_cache try: insights = { 'basic_stats': {}, 'activity': {}, 'contributors': {}, 'code': {}, 'issues': {} } # Make a deep copy of repo_data to avoid modifying the original repo_data_copy = {k: v for k, v in repo_data.items()} # Basic statistics insights['basic_stats'] = { 'name': repo_data_copy['name'], 'description': repo_data_copy['description'], 'stars': repo_data_copy['stars'], 'forks': repo_data_copy['forks'], 'age_days': None, # Will calculate below 'primary_language': repo_data_copy['language'], 'topics': repo_data_copy['topics'] } # Fix: Normalize datetime objects to be timezone-naive for consistent comparison created_at = repo_data_copy.get('created_at') if created_at: # Remove timezone info if present if hasattr(created_at, 'tzinfo') and created_at.tzinfo: created_at = created_at.replace(tzinfo=None) # Calculate age now = datetime.datetime.now() insights['basic_stats']['age_days'] = (now - created_at).days # Activity insights if commits: # Fix: Normalize all datetime objects to be timezone-naive commit_dates = [] for commit in commits: date = commit.get('date') if date: # Remove timezone info if present if hasattr(date, 'tzinfo') and date.tzinfo: date = date.replace(tzinfo=None) commit_dates.append(date) # Sort dates commit_dates.sort() if commit_dates: # Calculate commit frequency first_commit = commit_dates[0] last_commit = commit_dates[-1] days_span = (last_commit - first_commit).days + 1 insights['activity'] = { 'total_commits': len(commits), 'first_commit': first_commit, 'last_commit': last_commit, 'days_span': days_span, 'commits_per_day': round(len(commits) / max(days_span, 1), 2), } # Fix: Use Counter for most active day calculation date_counter = Counter(d.date() for d in commit_dates) if date_counter: insights['activity']['most_active_day'] = date_counter.most_common(1)[0][0] # Commit activity by month commit_months = [d.strftime('%Y-%m') for d in commit_dates] month_counts = Counter(commit_months) insights['activity']['monthly_activity'] = [ {'month': month, 'commits': count} for month, count in month_counts.most_common(12) ] # Contributor insights if contributors: top_contributors = sorted(contributors.values(), key=lambda x: x['contributions'], reverse=True)[:10] insights['contributors'] = { 'total_contributors': len(contributors), 'top_contributors': [ { 'login': c['login'], 'contributions': c['contributions'], 'top_files': [f['filename'] for f in c['files_modified'][:5]] if c['files_modified'] else [] } for c in top_contributors ] } # Calculate bus factor (simplified) total_commits = sum(c['contributions'] for c in contributors.values()) running_sum = 0 bus_factor = 0 for c in top_contributors: running_sum += c['contributions'] bus_factor += 1 if running_sum / total_commits > 0.5: break insights['contributors']['bus_factor'] = bus_factor # Code insights if self.knowledge_graph: # Get top connected files file_nodes = [(node, degree) for node, degree in self.knowledge_graph.degree() if self.knowledge_graph.nodes[node].get('type') == 'file'] top_files = sorted(file_nodes, key=lambda x: x[1], reverse=True)[:10] insights['code']['central_files'] = [ {'filename': filename, 'connections': degree} for filename, degree in top_files ] # Most frequently modified files from commits file_modifications = Counter() for commit in commits: for file in commit['files']: file_modifications[file['filename']] += 1 insights['code']['frequently_modified_files'] = [ {'filename': filename, 'modifications': count} for filename, count in file_modifications.most_common(10) ] # File types distribution file_types = Counter([os.path.splitext(node)[1] for node in self.knowledge_graph.nodes() if '.' in node and self.knowledge_graph.nodes[node].get('type') == 'file']) insights['code']['file_types'] = [ {'extension': ext, 'count': count} for ext, count in file_types.most_common() ] # Issue insights if issues: # Calculate issue statistics open_issues = [issue for issue in issues if issue['state'] == 'open'] closed_issues = [issue for issue in issues if issue['state'] == 'closed'] insights['issues'] = { 'total_issues': len(issues), 'open_issues': len(open_issues), 'closed_issues': len(closed_issues), 'resolution_rate': round(len(closed_issues) / max(len(issues), 1), 2) } # Calculate average time to close close_times = [] for issue in closed_issues: if issue['created_at'] and issue['closed_at']: # Fix: Normalize datetime objects to be timezone-naive created_at = issue['created_at'] closed_at = issue['closed_at'] if hasattr(created_at, 'tzinfo') and created_at.tzinfo: created_at = created_at.replace(tzinfo=None) if hasattr(closed_at, 'tzinfo') and closed_at.tzinfo: closed_at = closed_at.replace(tzinfo=None) close_time = (closed_at - created_at).days close_times.append(close_time) if close_times: insights['issues']['avg_days_to_close'] = round(sum(close_times) / len(close_times), 1) # Top issue labels issue_labels = [label for issue in issues for label in issue['labels']] label_counts = Counter(issue_labels) insights['issues']['top_labels'] = [ {'label': label, 'count': count} for label, count in label_counts.most_common(5) ] # Update cache self.insights_cache = insights self.cache_timestamp = current_time self.insights = insights return insights except Exception as e: import traceback print(f"Error extracting insights: {e}") print(traceback.format_exc()) return {} # Main GitHub AI Agent Class class GitHubAIAgent: """Main class for GitHub AI Agent""" def __init__(self): self.config = Config() self.github_manager = None self.knowledge_base = None self.gemini_client = None self.visualization_manager = None self.repository_loaded = False self.repository_url = "" self.repository_analysis = {} self.visualizations = {} # Initialize caches self.file_cache = {} self.contributor_cache = {} self.commit_cache = {} self.issue_cache = {} self.query_cache = {} def set_api_keys(self, gemini_api_key: str, github_token: str = None) -> None: """Set API keys""" # Set environment variables os.environ["GEMINI_API_KEY"] = gemini_api_key if github_token: os.environ["GITHUB_ACCESS_TOKEN"] = github_token # Update config self.config.gemini_api_key = gemini_api_key self.config.github_token = github_token # Initialize clients self.github_manager = GitHubManager(self.config) self.knowledge_base = KnowledgeBase(self.config) self.gemini_client = GeminiClient(self.config.gemini_api_key, self.config.gemini_model) self.visualization_manager = RepositoryVisualizer(self.config) def load_repository(self, repository_url: str) -> Dict: """Load and analyze a GitHub repository with improved parallelization""" result = { 'success': False, 'message': '', 'repo_data': {}, 'file_count': 0, 'contributor_count': 0 } try: # Reset state self.repository_loaded = False self.repository_url = "" self.repository_analysis = {} self.visualizations = {} # Load repository basic info print(f"Loading repository: {repository_url}") repo_loaded = self.github_manager.load_repository(repository_url) if not repo_loaded: result['message'] = "Failed to load repository. Check the URL and your GitHub access token." return result # Store repository URL self.repository_url = repository_url # Use parallel processing for loading repository data with concurrent.futures.ThreadPoolExecutor() as executor: # Submit tasks files_future = executor.submit(self.github_manager.load_files) contributors_future = executor.submit(self.github_manager.load_contributors) commits_future = executor.submit(self.github_manager.load_commits) issues_future = executor.submit(self.github_manager.load_issues) # Get results files = files_future.result() contributors = contributors_future.result() commits = commits_future.result() issues = issues_future.result() result['file_count'] = len(files) result['contributor_count'] = len(contributors) # Initialize vector storage and build knowledge graph # (These are kept sequential as they depend on previous steps) print("Building knowledge base") self.knowledge_base.initialize_vector_storage(files) knowledge_graph = self.knowledge_base.build_knowledge_graph( commits, self.github_manager.contributors_data ) # Extract repository insights print("Extracting repository insights") insights = self.knowledge_base.extract_insights( self.github_manager.repo_data, commits, self.github_manager.contributors_data, issues ) # Use a separate thread for Gemini analysis which can be slower # and doesn't block the main thread def analyze_with_gemini(): print("Analyzing repository with Gemini") return self.gemini_client.analyze_repository( self.github_manager.repo_data, files, commits, self.github_manager.contributors_data, insights ) # Use another thread pool for visualization generation def create_visualizations(): print("Creating repository visualizations") repo_graph_path = self.visualization_manager.create_repository_graph(knowledge_graph) activity_chart_path = self.visualization_manager.create_commit_activity_chart(commits) contributor_network_path = self.visualization_manager.create_contributor_network( self.github_manager.contributors_data, commits ) dependency_graph_path = self.visualization_manager.create_file_dependency_graph(files) return { 'repository_graph': repo_graph_path, 'activity_chart': activity_chart_path, 'contributor_network': contributor_network_path, 'dependency_graph': dependency_graph_path, } # Run Gemini analysis and visualization generation in parallel with concurrent.futures.ThreadPoolExecutor() as executor: analysis_future = executor.submit(analyze_with_gemini) viz_future = executor.submit(create_visualizations) # Get results self.repository_analysis = analysis_future.result() self.visualizations = viz_future.result() # Update result result['success'] = True result['message'] = f"Successfully loaded and analyzed repository: {self.github_manager.repo_data['full_name']}" result['repo_data'] = self.github_manager.repo_data self.repository_loaded = True return result except Exception as e: import traceback print(f"Error loading repository: {str(e)}") print(traceback.format_exc()) result['message'] = f"Error loading repository: {str(e)}" return result @lru_cache(maxsize=32) def answer_query(self, query: str) -> Dict: """Answer a natural language query about the repository with caching""" if not self.repository_loaded: return { 'success': False, 'message': "No repository loaded. Please load a repository first.", 'answer': "" } # Check cache if enabled cache_key = f"query_{hash(query)}" if self.config.cache_enabled and cache_key in self.query_cache: cached_result = self.query_cache[cache_key] # Check if cache is still valid if time.time() - cached_result['timestamp'] < self.config.cache_ttl: return cached_result['result'] try: # Search for relevant files similar_files = self.knowledge_base.search_similar_files(query) # Get answer from Gemini answer = self.gemini_client.answer_query( query, self.github_manager.repo_data, similar_files, self.knowledge_base.insights ) result = { 'success': True, 'message': "Query answered successfully", 'answer': answer, 'relevant_files': [f['file'] for f in similar_files] } # Update cache if self.config.cache_enabled: self.query_cache[cache_key] = { 'result': result, 'timestamp': time.time() } return result except Exception as e: return { 'success': False, 'message': f"Error answering query: {str(e)}", 'answer': "" } def analyze_code(self, file_path: str = "", code_snippet: str = "", language: str = "") -> Dict: """Analyze a code file or snippet with improved error handling""" if not file_path and not code_snippet: return { 'success': False, 'message': "Please provide a file path or code snippet", 'analysis': "" } try: # If file path provided, get code from repository if file_path: if not self.repository_loaded: return { 'success': False, 'message': "No repository loaded. Please load a repository first.", 'analysis': "" } if file_path not in self.github_manager.file_contents: return { 'success': False, 'message': f"File not found: {file_path}", 'analysis': "" } code = self.github_manager.file_contents[file_path]['content'] _, ext = os.path.splitext(file_path) language = ext.lstrip('.') else: code = code_snippet # Analyze code with Gemini analysis = self.gemini_client.analyze_code_snippet(code, language) return { 'success': True, 'message': "Code analyzed successfully", 'analysis': analysis } except Exception as e: return { 'success': False, 'message': f"Error analyzing code: {str(e)}", 'analysis': "" } def find_collaborators(self, requirements: str) -> Dict: """Find potential collaborators based on requirements""" if not self.repository_loaded: return { 'success': False, 'message': "No repository loaded. Please load a repository first.", 'collaborators': [] } try: # Find collaborators with Gemini collaborators = self.gemini_client.identify_potential_collaborators( self.github_manager.contributors_data, self.knowledge_base.insights, requirements ) return { 'success': True, 'message': "Potential collaborators identified", 'collaborators': collaborators } except Exception as e: return { 'success': False, 'message': f"Error finding collaborators: {str(e)}", 'collaborators': [] } def get_repository_insights(self) -> Dict: """Get insights about the repository""" if not self.repository_loaded: return { 'success': False, 'message': "No repository loaded. Please load a repository first.", 'insights': {} } try: return { 'success': True, 'message': "Repository insights retrieved", 'insights': self.knowledge_base.insights, 'analysis': self.repository_analysis } except Exception as e: return { 'success': False, 'message': f"Error getting repository insights: {str(e)}", 'insights': {} } def get_visualizations(self) -> Dict: """Get repository visualizations""" if not self.repository_loaded: return { 'success': False, 'message': "No repository loaded. Please load a repository first.", 'visualizations': {} } return { 'success': True, 'message': "Repository visualizations retrieved", 'visualizations': self.visualizations } def clear_caches(self) -> None: """Clear all caches""" self.file_cache.clear() self.contributor_cache.clear() self.commit_cache.clear() self.issue_cache.clear() self.query_cache.clear() # Clear LRU caches self.answer_query.cache_clear() if hasattr(self.knowledge_base, 'search_similar_files'): self.knowledge_base.search_similar_files.cache_clear()