import gradio as gr import google.generativeai as genai import os from dotenv import load_dotenv from github import Github, RateLimitExceededException, GithubException import json from pathlib import Path from datetime import datetime, timedelta from collections import defaultdict import base64 from typing import Dict, List, Any, Optional, Tuple import tempfile from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import asyncio import aiohttp import re import ast from concurrent.futures import ThreadPoolExecutor import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from packaging import version import requests from bs4 import BeautifulSoup import networkx as nx import math import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables (consider handling missing .env) load_dotenv() # --- Constants and Global Variables --- # Store API tokens globally GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") #getting github token using os GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") #getting gemini api key using os # Constants for rate limiting - make them configurable if needed MIN_RATE_LIMIT_BUFFER = 50 # Keep a buffer to avoid hitting the limit INITIAL_BACKOFF = 60 # Initial backoff time in seconds # Enhanced relevant file extensions RELEVANT_EXTENSIONS = { ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", ".jsx": "React", ".tsx": "React TypeScript", ".java": "Java", ".cpp": "C++", ".c": "C", ".h": "C Header", ".hpp": "C++ Header", ".rb": "Ruby", ".php": "PHP", ".go": "Go", ".rs": "Rust", ".swift": "Swift", ".kt": "Kotlin", ".cs": "C#", ".scala": "Scala", ".r": "R", ".dart": "Dart", ".lua": "Lua", ".sql": "SQL", ".sh": "Shell", ".md": "Markdown", # Include Markdown for documentation analysis ".txt": "Text", ".json": "JSON", ".yml": "YAML", ".yaml": "YAML", ".xml": "XML", ".html": "HTML", ".css": "CSS" } # --- Initialization and Validation --- def validate_github_token(token: str) -> Tuple[bool, str]: """ Validate GitHub token before proceeding with analysis. Returns (is_valid: bool, message: str) """ if not token: return False, "GitHub token is missing." # Check for missing try: gh = Github(token) user = gh.get_user() username = user.login #important: accessing properties for validation rate_limit = gh.get_rate_limit() remaining = rate_limit.core.remaining if remaining == 0: #using remaining reset_time = rate_limit.core.reset.strftime("%Y-%m-%d %H:%M:%S UTC") return False, f"Rate limit exceeded. Resets at {reset_time}" return True, f"Token validated successfully (authenticated as {username})" except GithubException as e: if e.status == 401: return False, "Invalid token - authentication failed" elif e.status == 403: return False, "Token lacks required permissions or rate limit exceeded" #more specific 403 message elif e.status == 404: return False, "Invalid token or API endpoint not found" # More specific 404 message else: return False, f"GitHub error (status {e.status}): {e.data.get('message', str(e))}" except Exception as e: # General exception handling as a fallback. return False, f"Error validating token: {str(e)}" def initialize_tokens(github_token: str, gemini_key: str) -> str: """Initialize API tokens globally with enhanced validation (using env vars now).""" global GITHUB_TOKEN, GEMINI_API_KEY if not github_token or not gemini_key: return "❌ Both GitHub and Gemini API keys are required." is_valid, message = validate_github_token(github_token) if not is_valid: return f"❌ GitHub token validation failed: {message}" try: genai.configure(api_key=gemini_key) model = genai.GenerativeModel('gemini-1.0-pro') response = model.generate_content("Test") if response.text is None : # important check. return "❌ Invalid Gemini API key (no response)" #More informative. # else: # return "Invalid" except Exception as e: return f"❌ Gemini API key validation failed: {str(e)}" GITHUB_TOKEN = github_token # Overwrite with validated tokens GEMINI_API_KEY = gemini_key return "✅ All tokens validated and initialized successfully!" # --- Classes --- class GitHubAPIHandler: """Enhanced GitHub API handler with minimal authentication checks and robust error handling.""" def __init__(self, token: Optional[str] = None): self.logger = logging.getLogger(__name__) self.token = token self._min_rate_limit_buffer = MIN_RATE_LIMIT_BUFFER self._initial_backoff = INITIAL_BACKOFF if not self.token: raise ValueError("GitHub token not provided") # Create the GitHub client *within* the class self.gh = self._create_github_client() def _create_github_client(self) -> Github: """Create GitHub client with enhanced error handling""" try: # Create Github instance with basic configuration gh = Github( self.token, retry=3, # Number of retries for failed requests timeout=30, # Timeout in seconds per_page=100 # Maximum items per page ) # Verify authentication try: user = gh.get_user() self.logger.info(f"Authenticated as: {user.login}") except GithubException as e: if e.status == 401: raise ValueError("Invalid GitHub token - authentication failed") elif e.status == 403: raise ValueError("GitHub token lacks required permissions or rate limit exceeded") else: raise ValueError(f"GitHub initialization failed: {str(e)}") return gh # Return the authenticated client except Exception as e: raise ValueError(f"Failed to initialize GitHub client: {str(e)}") # More informative error @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type((RateLimitExceededException, GithubException)), before_sleep=lambda retry_state: logging.info( f"Rate limited, retrying in {retry_state.next_action.sleep} seconds..."), ) def get_repository(self, repo_url: str) -> Any: """Get repository object using PyGithub, with error handling and validation.""" try: parts = repo_url.rstrip('/').split('/') if len(parts) < 2: raise ValueError(f"Invalid repository URL format: {repo_url}") owner = parts[-2] repo_name = parts[-1] # Using PyGithub's get_repo method repo = self.gh.get_repo(f"{owner}/{repo_name}") return repo # Return the repo object except GithubException as e: # Specifically handle Github exceptions if e.status == 404: raise ValueError(f"Repository not found: {owner}/{repo_name}") elif e.status == 403: self._handle_forbidden_error() # Handle forbidden access (rate limits, etc.) raise #Re raise the exception so program doesn't continue else: raise ValueError(f"Failed to access repository: {str(e)}") except Exception as e: #catch all other exception. raise ValueError(f"Failed to access repository(An unexpected error occurred):{str(e)}") def _check_rate_limits(self): """Enhanced rate limit checking with predictive waiting.""" try: rate_limit = self.gh.get_rate_limit() remaining = rate_limit.core.remaining reset_time = rate_limit.core.reset.timestamp() self.logger.info(f"Rate limit - Remaining: {remaining}, Reset: {datetime.fromtimestamp(reset_time)}") if remaining < self._min_rate_limit_buffer: wait_time = self._get_rate_limit_wait_time() if wait_time > 0: # Only log if there's a wait. self.logger.warning(f"Approaching rate limit. Waiting {wait_time:.2f} seconds.") time.sleep(wait_time) # Wait before hitting the limit except GithubException as e: # Be specific about the exceptions you handle self.logger.error(f"Error checking rate limits: {str(e)}") time.sleep(60) # Wait a reasonable amount of time even if you cannot check except Exception as e: # Always have general exception to handle self.logger.error(f"Unexpected Error: {str(e)}") #General unexpected Error handle. time.sleep(60) def _get_rate_limit_wait_time(self) -> float: """Calculate the time to wait until the rate limit resets.""" try: rate_limit = self.gh.get_rate_limit() reset_time = rate_limit.core.reset.timestamp() current_time = time.time() return max(0, reset_time - current_time + 1) # Add 1 second buffer except Exception: return self._initial_backoff # Fallback on any error in getting rate limits def _handle_forbidden_error(self): """Handle a 403 Forbidden error from the GitHub API.""" try: # Check if it's a rate limit issue. rate_limit = self.gh.get_rate_limit() if rate_limit.core.remaining == 0: wait_time = self._get_rate_limit_wait_time() self.logger.warning(f"Rate limit exceeded. Waiting {wait_time:.2f} seconds.") time.sleep(wait_time) else: # If not rate limited, then likely a permissions issue self.logger.error("Access forbidden. Token may lack required permissions.") except Exception as e: #handling other errors. self.logger.error(f"Error handling forbidden response: {str(e)}") @retry( stop=stop_after_attempt(3), # Maximum 3 retries wait=wait_exponential(multiplier=1, min=4, max=10), #exponential backoff. reraise=True # Reraise exception after retries. ) def get_file_content(self, repo: Any, path: str) -> Optional[str]: """Get content of a file, with retries, rate limit check and error handling.""" try: self._check_rate_limits() # Check rate limits *before* each attempt. content = repo.get_contents(path) return content except GithubException as e: if e.status == 404: self.logger.warning(f"File not found: {path}") # 404 is not critical. return None # explicitly return None elif e.status == 403: # Explicitly handle forbidden self._handle_forbidden_error() # Rate limiting or other access problem raise # Raise after handling (waiting, logging). # Any other GitHub error is an issue - log and re-raise self.logger.error(f"Error getting file content: {str(e)}") #handle raise #re-raise after loggng except Exception as e: # General exception for unexpected issue. self.logger.error(f"Unexpected Error : {str(e)}") #General exception handelling raise class CodeMetricsAnalyzer: """Handles detailed code metrics analysis with proper error handling.""" def __init__(self): self.logger = logging.getLogger(__name__) self.size_metrics_cache = {} # Consider if needed with parallelization def calculate_halstead_metrics(self, content: str, language: str = "Unknown") -> Dict[str, float]: """ Calculate Halstead complexity metrics for code. """ try: # Define language-specific operators (more comprehensive) operators = { "Python": set([ '+', '-', '*', '/', '//', '**', '%', '==', '!=', '>', '<', '>=', '<=', 'and', 'or', 'not', 'is', 'in', '+=', '-=', '*=', '/=', '=', 'if', 'elif', 'else', 'for', 'while', 'def', 'class', 'return', 'yield', 'raise', 'break', 'continue', 'pass', 'assert', 'import', 'from', 'as', 'try', 'except', 'finally', 'with', 'async', 'await' ]), "JavaScript": set([ '+', '-', '*', '/', '%', '**', '==', '===', '!=', '!==', '>', '<', '>=', '<=', '&&', '||', '!', '=', '+=', '-=', '*=', '/=', 'if', 'else', 'for', 'while', 'function', 'return', 'class', 'new', 'delete', 'typeof', 'instanceof', 'void', 'try', 'catch', 'finally', 'throw', 'break', 'continue', 'default', 'case', 'async', 'await' ]), "Java": set([ # Added Java operators '+', '-', '*', '/', '%', '++', '--', '==', '!=', '>', '<', '>=', '<=', '&&', '||', '!', '=', '+=', '-=', '*=', '/=', '%=', 'if', 'else', 'for', 'while', 'do', 'switch', 'case', 'default', 'break', 'continue', 'return', 'try', 'catch', 'finally', 'throw', 'throws', 'class', 'interface', 'extends', 'implements', 'new', 'instanceof', 'this', 'super' ]), }.get(language, set(['+', '-', '*', '/', '=', '==', '>', '<', '>=', '<='])) unique_operators = set() unique_operands = set() total_operators = 0 total_operands = 0 lines = content.splitlines() for line in lines: line = line.strip() if line.startswith(('#', '//', '/*', '*')): # Handle comments continue for operator in operators: if operator in line: unique_operators.add(operator) total_operators += line.count(operator) # Improved operand counting (numbers, strings, identifiers) numbers = re.findall(r'\b\d+(?:\.\d+)?\b', line) unique_operands.update(numbers) total_operands += len(numbers) strings = re.findall(r'["\'][^"\']*["\']', line) unique_operands.update(strings) total_operands += len(strings) identifiers = re.findall(r'\b[a-zA-Z_]\w*\b', line) for ident in identifiers: if ident not in operators: unique_operands.add(ident) total_operands += 1 n1 = len(unique_operators) n2 = len(unique_operands) N1 = total_operators N2 = total_operands # Handle edge cases to avoid division by zero if n1 > 0 and n2 > 0: program_length = N1 + N2 vocabulary = n1 + n2 volume = program_length * (math.log2(vocabulary) if vocabulary > 0 else 0) difficulty = (n1 * N2) / (2 * n2) if n2 > 0 else 0 effort = volume * difficulty time = effort / 18 # Standard Halstead time estimation else: program_length = vocabulary = volume = difficulty = effort = time = 0 return { "halstead_unique_operators": n1, "halstead_unique_operands": n2, "halstead_total_operators": N1, "halstead_total_operands": N2, "halstead_program_length": program_length, "halstead_vocabulary": vocabulary, "halstead_volume": volume, "halstead_difficulty": difficulty, "halstead_effort": effort, "halstead_time": time } except Exception as e: self.logger.error(f"Error calculating Halstead metrics: {str(e)}") # Return default 0 values for all metrics on error return {metric: 0 for metric in [ "halstead_unique_operators", "halstead_unique_operands", "halstead_total_operators", "halstead_total_operands", "halstead_program_length", "halstead_vocabulary", "halstead_volume", "halstead_difficulty", "halstead_effort", "halstead_time" ]} def calculate_comment_density(self, content: str, language: str = "Unknown") -> Dict[str, Any]: try: metrics = { "comment_lines": 0, "code_lines": 0, "blank_lines": 0, "comment_density": 0.0, "docstring_lines": 0, # Docstrings (Python) "total_lines": 0, #Total no of line. "inline_comments": 0 } patterns = { "Python": { "single_line": ["#"], "multi_start": ['"""', "'''"], "multi_end": ['"""', "'''"], "inline_start": "#" }, "JavaScript": { "single_line": ["//"], "multi_start": ["/*"], "multi_end": ["*/"], "inline_start": "//" }, "Java": { # Added Java comment patterns "single_line": ["//"], "multi_start": ["/*"], "multi_end": ["*/"], "inline_start": "//" } }.get(language, { "single_line": ["//", "#"], "multi_start": ["/*", '"""', "'''"], "multi_end": ["*/", '"""', "'''"], "inline_start": ["//", "#"] }) lines = content.splitlines() in_multiline_comment = False current_multiline_delimiter = None for line in lines: stripped = line.strip() metrics["total_lines"] += 1 if not stripped: metrics["blank_lines"] += 1 continue if not in_multiline_comment: for delimiter in patterns["multi_start"]: if stripped.startswith(delimiter): in_multiline_comment = True current_multiline_delimiter = delimiter metrics["comment_lines"] += 1 if delimiter in ['"""', "'''"]: metrics["docstring_lines"] += 1 break elif delimiter in stripped: # Handle same-line multi-line comments end_delimiter = "*/" if delimiter == "/*" else delimiter if end_delimiter in stripped[stripped.index(delimiter) + len(delimiter):]: metrics["comment_lines"] += 1 if delimiter in ['"""', "'''"]: metrics["docstring_lines"] += 1 break if not in_multiline_comment: is_comment = False for prefix in patterns["single_line"]: if stripped.startswith(prefix): metrics["comment_lines"] += 1 is_comment = True break elif prefix in stripped: # Count inline comments metrics["inline_comments"] += 1 break if not is_comment: metrics["code_lines"] += 1 else: metrics["comment_lines"] += 1 if current_multiline_delimiter in ['"""', "'''"]: metrics["docstring_lines"] += 1 #checking current multi line delimeter stripped if current_multiline_delimiter in stripped: # Handle triple quotes properly if current_multiline_delimiter in ['"""', "'''"] and \ stripped.count(current_multiline_delimiter) == 1: continue # in_multiline_comment = False current_multiline_delimiter = None non_blank_lines = metrics["total_lines"] - metrics["blank_lines"] #non blank lines calculating. if non_blank_lines > 0: metrics["comment_density"] = (metrics["comment_lines"] + metrics["inline_comments"]) / non_blank_lines * 100 metrics["docstring_density"] = metrics["docstring_lines"] / non_blank_lines * 100 if language == "Python": # Check for module-level docstring if len(lines) > 0 and (lines[0].strip().startswith('"""') or lines[0].strip().startswith("'''")): metrics["has_module_docstring"] = True metrics["module_docstring_lines"] = sum(1 for line in lines if '"""' not in line and "'''" not in line and bool(line.strip()))#counts the number of lines within a module-level docstring that are not the delimiters themselves and contain actual text else: metrics["has_module_docstring"] = False metrics["module_docstring_lines"] = 0 return metrics except Exception as e: self.logger.error(f"Error calculating comment density: {str(e)}") # Return 0s for all density metrics on error return { "comment_lines": 0, "code_lines": 0, "blank_lines": 0, "comment_density": 0.0, "docstring_lines": 0, "total_lines": 0, "inline_comments": 0, "error": str(e) # Include the error message } def calculate_cyclomatic_complexity(self, content: str, language: str = "Unknown") -> Dict[str, Any]: """Calculate cyclomatic complexity metrics for code with language-specific handling.""" metrics = { "complexity": 1, # Base complexity (always start at 1) "cognitive_complexity": 0, "max_nesting_depth": 0 } try: lines = content.splitlines() current_depth = 0 # Language-specific complexity indicators (expanded) complexity_keywords = { "Python": { "if", "else", "elif", "for", "while", "try", "except", "with", "async for", "async with", "break", "continue" }, "JavaScript": { "if", "else", "for", "while", "try", "catch", "switch", "case", "break", "continue", "&&", "||", "?", "async", "await" # Add async/await }, "Java": { # Added Java keywords "if", "else", "for", "while", "do", "switch", "case", "default", "break", "continue", "try", "catch", "finally" } # Add more language-specific keywords as needed }.get(language, { # Default keywords for unknown languages "if", "else", "elif", "for", "while", "try", "catch", "case", "switch", "&&", "||", "?", "except", "finally", "with" }) for line in lines: # Calculate nesting depth opens = line.count('{') - line.count('}') current_depth += opens metrics["max_nesting_depth"] = max(metrics["max_nesting_depth"], current_depth) # Increment complexity for control structures stripped_line = line.strip() for keyword in complexity_keywords: if keyword in stripped_line and not stripped_line.startswith(("//", "#", "/*", "*")): # Exclude comments metrics["complexity"] += 1 metrics["cognitive_complexity"] += (1 + current_depth) # Cognitive complexity increase if language == "Python": # Add complexity for list/dict comprehensions if "for" in stripped_line and ("[" in stripped_line or "{" in stripped_line): metrics["complexity"] += 1 metrics["cognitive_complexity"] += 1 # Also add to cognitive return metrics except Exception as e: self.logger.error(f"Error calculating complexity: {str(e)}") # Return defaults, not just an error string, but also include 1 as base. return { "complexity": 1, # Ensure baseline complexity "cognitive_complexity": 0, "max_nesting_depth": 0 } def detect_code_duplication(self, content: str, min_lines: int = 6) -> Dict[str, Any]: """Detect code duplication within the content""" try: metrics = { "duplicate_blocks": 0, "duplicate_lines": 0, "duplication_percentage": 0.0 } lines = content.splitlines() total_lines = len(lines) # Return early if there are not enough lines if total_lines < min_lines: return metrics blocks = {} for i in range(total_lines - min_lines + 1): block = '\n'.join(lines[i:i + min_lines]) normalized_block = self._normalize_code_block(block) if normalized_block.strip(): # Ignore all-whitespace blocks if normalized_block in blocks: blocks[normalized_block].append(i) else: blocks[normalized_block] = [i] duplicate_line_set = set() # Track duplicate line indices using a *set* for block, positions in blocks.items(): if len(positions) > 1: metrics["duplicate_blocks"] += 1 # Count duplicate blocks for pos in positions: for i in range(pos, pos + min_lines): # Add all lines in duplicate block duplicate_line_set.add(i) metrics["duplicate_lines"] = len(duplicate_line_set) # Total count of duplicated lines if total_lines > 0: metrics["duplication_percentage"] = (metrics["duplicate_lines"] / total_lines) * 100 # Duplication metrics calcutation. return metrics except Exception as e: self.logger.error(f"Error detecting code duplication: {str(e)}") # Return 0 for all duplication metrics in case of error return { "duplicate_blocks": 0, "duplicate_lines": 0, "duplication_percentage": 0.0 } def _normalize_code_block(self, block: str) -> str: """Normalize a block of code for comparison by removing comments, whitespace, etc.""" lines = [] for line in block.splitlines(): # Remove comments (handle both Python and JavaScript/Java comments) line = re.sub(r'#.*$', '', line) # Python comments line = re.sub(r'//.*$', '', line) # JavaScript comments line = re.sub(r'/\*.*?\*/', '', line) # Multi-line comments # Normalize whitespace line = re.sub(r'\s+', ' ', line.strip()) if line: # Add non-empty lines lines.append(line) return '\n'.join(lines) def calculate_size_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: try: metrics = { "size_bytes": len(content), "total_lines": 0, "code_lines": 0, "blank_lines": 0, "comment_lines": 0, "avg_line_length": 0, "max_line_length": 0, "file_entropy": 0, # Added file entropy. } comments = { # handling diff comments. "Python": { "line_comment": "#", "block_start": ['"""', "'''"], "block_end": ['"""', "'''"] }, "JavaScript": { "line_comment": "//", "block_start": ["/*"], "block_end": ["*/"] }, "Java": { # Added Java comment definitions "line_comment": "//", "block_start": ["/*"], "block_end": ["*/"] } }.get(language, { "line_comment": "#", "block_start": ["/*", '"""', "'''"], "block_end": ["*/", '"""', "'''"] }) lines = content.splitlines() total_length = 0 # Track the total character count of all lines char_counts = {} #count the occurance of characters in file in_block_comment = False for line in lines: metrics["total_lines"] += 1 line_length = len(line) #length of lines total_length += line_length metrics["max_line_length"] = max(metrics["max_line_length"], line_length) for char in line: char_counts[char] = char_counts.get(char, 0) + 1 stripped = line.strip() # Remove the strip function here. if not stripped: metrics["blank_lines"] += 1 continue if not in_block_comment: is_comment = False for start in comments["block_start"]: if stripped.startswith(start): # Use startswith on the stripped line. in_block_comment = True metrics["comment_lines"] += 1 is_comment = True # break #must add break otherwise count may vary. if not is_comment: # Out of block_start scope so we have more appropriate behaviour. if stripped.startswith(comments["line_comment"]): # check if line is comment or code. metrics["comment_lines"] += 1 else: metrics["code_lines"] += 1 else: metrics["comment_lines"] += 1 #comment lines for end in comments["block_end"]: # Block end condition. if end in stripped: # check comment block ends in_block_comment = False # break # if metrics["total_lines"] > 0: metrics["avg_line_length"] = total_length / metrics["total_lines"] # Calculate entropy. total_chars = sum(char_counts.values()) if total_chars > 0: entropy = 0 for count in char_counts.values(): prob = count / total_chars entropy -= prob * math.log2(prob) metrics["file_entropy"] = entropy # These aren't always in 'comment_density', so calculate here. metrics["source_lines"] = metrics["code_lines"] + metrics["comment_lines"] metrics["comment_ratio"] = (metrics["comment_lines"] / metrics["source_lines"] * 100 if metrics["source_lines"] > 0 else 0) # Handle potential division by zero. return metrics except Exception as e: self.logger.error(f"Error calculating size metrics: {str(e)}") # Return 0s and basic size info on error. Still provide content length return { "size_bytes": len(content) if content else 0, # File Size is valuable,even in error. "total_lines": 0, "code_lines": 0, "blank_lines": 0, "comment_lines": 0, "avg_line_length": 0, "max_line_length": 0, "file_entropy": 0, # file_entropy added to default values. "source_lines": 0, # return metrics initialized 0 for other metrices. "comment_ratio": 0 #Return default values on errors } def analyze_function_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: try: metrics = { "total_functions": 0, "avg_function_length": 0, "max_function_length": 0, "avg_function_complexity": 0, "max_function_complexity": 0, "documented_functions": 0, "function_lengths": [], # Collect all lengths "function_complexities": [], # Collect all complexities "function_details": [] # Store details of each function } # Language-specific function patterns patterns = { "Python": r"(?:async\s+)?def\s+(\w+)\s*\([^)]*\)\s*(?:->.*?)?:", "JavaScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "TypeScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "Java": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*\{", "C#": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:where\s+[^{]+)?\s*\{", }.get(language, r"function\s+(\w+)\s*\([^)]*\)") lines = content.splitlines() current_function = None function_start = 0 in_function = False function_content = [] brace_count = 0 #for count braces. for i, line in enumerate(lines): stripped = line.strip() if not stripped or stripped.startswith(('/', '#')): #handle empty lines continue if re.search(patterns, line): current_function = { "name": re.search(patterns, line).group(1), # Extract function name "start_line": i + 1, # 1-based line numbers "has_docstring": False, "complexity": 1, #base complexity is one. "nested_depth": 0, "parameters": len(re.findall(r',', line)) + 1 if '(' in line else 0 # Count parameters } function_start = i #starting function line number. in_function = True function_content = [line] # Start collecting content continue if in_function: function_content.append(line) #add the functions to function content. brace_count += line.count('{') - line.count('}') if language == "Python" and i == function_start + 1: # Check for docstring right after def if stripped.startswith('"""') or stripped.startswith("'''"): current_function["has_docstring"] = True # More robust function end detection if (language in ["Python"] and brace_count == 0 and not line.startswith(' ')) or \ (language not in ["Python"] and brace_count == 0 and line.rstrip().endswith('}')): #Robust function end check func_content = '\n'.join(function_content) #join content function for metrics current_function["length"] = len(function_content) # lines of function complexity_metrics = self.calculate_cyclomatic_complexity(func_content, language) current_function["complexity"] = complexity_metrics["complexity"] # Cyclomatic complexity metrics["total_functions"] += 1 # Total Number of functions count. metrics["function_lengths"].append(current_function["length"]) metrics["function_complexities"].append(current_function["complexity"]) metrics["max_function_length"] = max(metrics["max_function_length"],current_function["length"])# Compare current max value and store greater one. metrics["max_function_complexity"] = max(metrics["max_function_complexity"], current_function["complexity"]) # compare and find the max if current_function["has_docstring"]: metrics["documented_functions"] += 1 # count Document function metrics["function_details"].append(current_function) in_function = False current_function = None function_content = [] # Clear all collected datas. if metrics["total_functions"] > 0: metrics["avg_function_length"] = sum(metrics["function_lengths"]) / metrics["total_functions"] metrics["avg_function_complexity"] = sum(metrics["function_complexities"]) / metrics["total_functions"] metrics["documentation_ratio"] = metrics["documented_functions"] / metrics["total_functions"] return metrics except Exception as e: self.logger.error(f"Error analyzing function metrics: {str(e)}") # Return default values for all metrics in case of error. return { "total_functions": 0, "avg_function_length": 0, "max_function_length": 0, "avg_function_complexity": 0, "max_function_complexity": 0, "documented_functions": 0, "function_lengths": [], "function_complexities": [], "function_details": [], "error": str(e) # Include the error for debugging. } def _analyze_file_metrics(self, file_content) -> Optional[Dict[str, Any]]: """Analyze metrics for a single file with proper error handling.""" try: # Decode the file content (assuming it's base64 encoded) content = base64.b64decode(file_content.content).decode('utf-8') language = RELEVANT_EXTENSIONS.get(Path(file_content.path).suffix.lower(), "Unknown") metrics = { "path": file_content.path, "metrics": {} } # Size metrics (always calculated) try: size_metrics = self.calculate_size_metrics(content, language) metrics["metrics"].update(size_metrics) # Store results, handling None. except Exception as e: self.logger.error(f"Error calculating size metrics for {file_content.path}: {str(e)}") # Provide default values even if there is error metrics["metrics"].update({ "size_bytes": len(content), #we have this data even in errors. "total_lines": len(content.splitlines()), "code_lines": 0, "blank_lines": 0, "comment_lines": 0 }) # Complexity metrics (only for supported languages) if language != "Unknown": try: complexity = self.calculate_cyclomatic_complexity(content, language) metrics["metrics"]["complexity"] = complexity.get("complexity", 0) metrics["metrics"]["cognitive_complexity"] = complexity.get("cognitive_complexity", 0) # Store cognitive. except Exception as e: self.logger.error(f"Error calculating complexity for {file_content.path}: {str(e)}") metrics["metrics"].update({ "complexity": 0, "cognitive_complexity": 0 # Default to 0 if error. }) # Halstead metrics (for supported languages) if language in ["Python", "JavaScript", "Java"]: # Check if language is supported try: halstead = self.calculate_halstead_metrics(content, language) metrics["metrics"].update(halstead) # Add the results to file data. except Exception as e: self.logger.error(f"Error calculating Halstead metrics for {file_content.path}: {str(e)}") # No defaults needed, halstead already returns 0s. # Duplication metrics (always calculate) try: duplication = self.detect_code_duplication(content) metrics["metrics"]["duplicate_segments"] = len(duplication.get("duplicate_segments", [])) except Exception as e: self.logger.error(f"Error detecting duplication for {file_content.path}: {str(e)}") metrics["metrics"]["duplicate_segments"] = 0 # Set to 0 on error # Function-level metrics (for supported languages). if language != "Unknown": try: function_metrics = self.analyze_function_metrics(content, language) if function_metrics and "error" not in function_metrics: # Check for None AND no error metrics["metrics"].update(function_metrics) # except Exception as e: self.logger.error(f"Error analyzing functions for {file_content.path}: {str(e)}") # no default to add as function metrics handles defaults. # Comment density (always calculated). try: comment_metrics = self.calculate_comment_density(content, language) metrics["metrics"].update(comment_metrics) # Merge except Exception as e: self.logger.error(f"Error calculating comment density for {file_content.path}: {str(e)}") metrics["metrics"].update({ "comment_density": 0, # Defaults on error "docstring_lines": 0 # Add other relevant metrics }) return metrics #Returns calculated data except Exception as e: # General Exception to prevent crash. self.logger.error(f"Error analyzing file {file_content.path}: {str(e)}") # Return minimal error metrics (important) return { "path": file_content.path, "metrics": { "size_bytes": 0, # Important basic metric, try to preserve. "total_lines": 0, # and total lines "error": str(e) } } class DependencyAnalyzer: """Handles dependency analysis with improved error handling.""" def __init__(self, repo): self.repo = repo self.logger = logging.getLogger(__name__) self.dependency_files = { "python": ["requirements.txt", "setup.py", "Pipfile", "pyproject.toml"], "javascript": ["package.json", "yarn.lock", "package-lock.json"], "java": ["pom.xml", "build.gradle"], "ruby": ["Gemfile"], "php": ["composer.json"], "go": ["go.mod"], "rust": ["Cargo.toml"], "dotnet": ["*.csproj", "*.fsproj", "*.vbproj"] # .NET project files } async def analyze_dependencies(self) -> Dict[str, Any]: """Analyze project dependencies (async for aiohttp).""" results = { "dependency_files": [], # Files that specify the dependencies. "dependencies": defaultdict(list), # Parsed dependencies. "dependency_graph": defaultdict(list), # Relationship b/w Dependencies. "outdated_dependencies": [], # "security_alerts": [] # Placeholder for future security checks } try: contents = self.repo.get_contents("") while contents: file_content = contents.pop(0) if file_content.type == "dir": contents.extend(self.repo.get_contents(file_content.path)) else: for lang, patterns in self.dependency_files.items(): if any(self._matches_pattern(file_content.path, pattern) for pattern in patterns): # try: file_text = base64.b64decode(file_content.content).decode('utf-8') # deps = await self._parse_dependency_file(file_content.path, file_text) #parsing the files to find dependency. if deps: #check deps is not none. results["dependencies"][file_content.path] = deps results["dependency_files"].append(file_content.path) # add current file to list of dependency files. except Exception as e: self.logger.error(f"Error parsing {file_content.path}: {str(e)}") results["outdated_dependencies"] = await self._check_outdated_dependencies(results["dependencies"])# results["dependency_graph"] = self._build_dependency_graph(results["dependencies"]) except Exception as e: self.logger.error(f"Error analyzing dependencies: {str(e)}") # No need to return default values here, as the initialized 'results' dict is sufficient return results def _matches_pattern(self, filename: str, pattern: str) -> bool: """Check if a filename matches a given pattern (supports wildcards).""" if pattern.startswith("*"): return filename.endswith(pattern[1:]) # Simple wildcard match return filename.endswith(pattern) async def _parse_dependency_file(self, filepath: str, content: str) -> List[Dict[str, str]]: """Parse different dependency file formats and extract dependencies.""" deps = [] # Initialize an empty list to hold dependencies try: if filepath.endswith(('requirements.txt', 'Pipfile')): #requirements.txt or pipfile for line in content.split('\n'): if '==' in line: name, version = line.strip().split('==') deps.append({"name": name, "version": version, "type": "python"}) elif filepath.endswith('package.json'): #package.json data = json.loads(content) for dep_type in ['dependencies', 'devDependencies']: # Check both dependencies and devDependencies if dep_type in data: for name, version in data[dep_type].items(): # Remove semver characters like ^ and ~ for accurate comparisons deps.append({ "name": name, "version": version.replace('^', '').replace('~', ''), # Remove ^ and ~ "type": "npm" }) # Add more file type parsing as needed (e.g., pom.xml for Java, Gemfile for Ruby) except Exception as e: self.logger.error(f"Error parsing {filepath}: {str(e)}") # Don't add any dependencies if parsing fails return deps # Always return the list, even if empty async def _check_outdated_dependencies(self, dependencies: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]: """Check for outdated dependencies using respective package registries (async).""" outdated = [] async with aiohttp.ClientSession() as session: #use aiotthp for faster http requests. for filepath, deps in dependencies.items(): for dep in deps: try: if dep["type"] == "python": async with session.get(f"https://pypi.org/pypi/{dep['name']}/json") as response: if response.status == 200: data = await response.json() latest_version = data["info"]["version"] # Use packaging.version for robust version comparison if version.parse(latest_version) > version.parse(dep["version"]): outdated.append({ "name": dep["name"], "current_version": dep["version"], "latest_version": latest_version, "type": "python" }) elif dep["type"] == "npm": # Use npm registry API async with session.get(f"https://registry.npmjs.org/{dep['name']}") as response: if response.status == 200: data = await response.json() latest_version = data["dist-tags"]["latest"] if version.parse(latest_version) > version.parse(dep['version']): outdated.append({ "name": dep['name'], "current_version": dep["version"], "latest_version": latest_version, "type": "npm" }) # Add checks for other package types (Java, Ruby, etc.) except Exception as e: self.logger.error(f"Error checking version for {dep['name']}: {str(e)}") # Continue checking other dependencies even if one fails return outdated # Return the list, even if empty def _build_dependency_graph(self, dependencies: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[str]]: """Build a dependency graph to visualize relationships (using networkx).""" graph = nx.DiGraph() # directed graph. try: for dep_file, deps in dependencies.items(): for dep in deps: # Add edges to represent dependencies graph.add_edge(dep_file, dep["name"]) # Dep file depends on individual libraries. # Convert to a dictionary of lists for easier handling return nx.to_dict_of_lists(graph) except Exception as e: self.logger.error(f"Error building dependency graph: {str(e)}") return defaultdict(list) # Return an empty graph in case of error class TestAnalyzer: """Handles test analysis.""" def __init__(self, repo): self.repo = repo self.logger = logging.getLogger(__name__) # Add logger self.test_patterns = { "python": ["test_*.py", "*_test.py", "tests/*.py"], "javascript": ["*.test.js", "*.spec.js", "__tests__/*.js"], "java": ["*Test.java", "*Tests.java"], "ruby": ["*_test.rb", "*_spec.rb"], "go": ["*_test.go"] } def analyze_tests(self) -> Dict[str, Any]: """Analyze test files, test counts, and (if possible) coverage information.""" results = { "test_files": [], "test_count": 0, "coverage_data": {}, # Dictionary to hold any parsed coverage information. "test_patterns": defaultdict(list) # Store the information about diff. testing pattern. } try: contents = self.repo.get_contents("") while contents: content = contents.pop(0) if content.type == "dir": contents.extend(self.repo.get_contents(content.path)) elif self._is_test_file(content.path): results["test_files"].append(content.path) test_metrics = self._analyze_test_file(content) #metrics of single files. results["test_patterns"][content.path] = test_metrics # Store results. results["test_count"] += test_metrics.get("test_count", 0) # Safely get test_count results["coverage_data"] = self._find_coverage_data() # Get any coverage. except Exception as e: self.logger.error(f"Error analyzing tests: {str(e)}") # Use logger return results # Always return results def _is_test_file(self, filepath: str) -> bool: """Check if a file is likely to be a test file, based on common patterns.""" for patterns in self.test_patterns.values(): for pattern in patterns: if Path(filepath).match(pattern): # Use Path.match for wildcard matching return True return False def _analyze_test_file(self, file_content) -> Dict[str, Any]: """Analyze an individual test file to count tests, assertions, etc.""" try: content = base64.b64decode(file_content.content).decode('utf-8') metrics = { "test_count": 0, "assertions": 0, "test_classes": 0 # If using class-based tests } # Count test cases (using regex for common patterns) metrics["test_count"] += len(re.findall(r'def test_', content)) # Python metrics["test_count"] += len(re.findall(r'it\s*\([\'""]', content)) # JavaScript (Jest/Mocha) metrics["assertions"] += len(re.findall(r'assert', content)) # General assertions metrics["test_classes"] += len(re.findall(r'class\s+\w+Test', content)) # test class patterns. return metrics except Exception as e: self.logger.error(f"Error analyzing test file: {str(e)}") # Use logger return {} # Return empty dict on error def _find_coverage_data(self) -> Dict[str, Any]: """Try to find coverage information (if available, e.g., from coverage reports).""" coverage_data = { "total_coverage": None, "file_coverage": {}, # If file-level data available. "coverage_report_found": False # for indicating we find coverage files. } try: # Look for common coverage report files coverage_files = [ ".coverage", # Python coverage.py "coverage.xml", # Cobertura (Python, Java) "coverage.json", # Jest, other JavaScript "coverage/lcov.info", # LCOV (C/C++, others) "coverage/coverage-final.json" # Istanbul (JavaScript) ] contents = self.repo.get_contents("") while contents: content = contents.pop(0) if content.type == "dir": contents.extend(self.repo.get_contents(content.path)) elif any(content.path.endswith(f) for f in coverage_files): coverage_data["coverage_report_found"] = True # set covarage to True, Indicate report present. parsed_coverage = self._parse_coverage_file(content) # Try to parse. if parsed_coverage: #check parse_coverage is present coverage_data.update(parsed_coverage) # Merge into result except Exception as e: self.logger.error(f"Error finding coverage data: {str(e)}") return coverage_data def _parse_coverage_file(self, file_content) -> Dict[str, Any]: """Parse a coverage report file (handles multiple formats).""" try: content = base64.b64decode(file_content.content).decode('utf-8') if file_content.path.endswith('.json'): data = json.loads(content) # Handle different JSON formats (e.g., coverage.py, Istanbul) if 'total' in data: # coverage.py format return { 'total_coverage': data['total'].get('lines', {}).get('percent', 0), 'file_coverage': { file: stats.get('lines', {}).get('percent', 0) for file, stats in data.get('files', {}).items() } } # Add handling for other JSON formats (e.g., Istanbul) as needed elif file_content.path.endswith('.xml'): # Parse XML (Cobertura format) from xml.etree import ElementTree #for parse XML format root = ElementTree.fromstring(content) total = float(root.get('line-rate', 0)) * 100 # Overall coverage file_coverage = {} # Extract coverage per class/file for class_elem in root.findall('.//class'): filename = class_elem.get('filename', '') line_rate = float(class_elem.get('line-rate', 0)) * 100 file_coverage[filename] = line_rate return { 'total_coverage': total, 'file_coverage': file_coverage } elif file_content.path.endswith('lcov.info'): # Parse LCOV format total_lines = 0 covered_lines = 0 current_file = None file_coverage = {} for line in content.split('\n'): if line.startswith('SF:'): # Source file current_file = line[3:].strip() elif line.startswith('LH:'): # Lines hit covered = int(line[3:]) covered_lines += covered elif line.startswith('LF:'): # Lines found total = int(line[3:]) total_lines += total if current_file and total > 0: # calculate coverage. file_coverage[current_file] = (covered / total) * 100 return { 'total_coverage': (covered_lines / total_lines * 100) if total_lines > 0 else 0, # handle Total lines may be 0 'file_coverage': file_coverage } except Exception as e: self.logger.error(f"Error parsing coverage file: {str(e)}") return {} # Return empty dict on error def analyze_test_quality(self, content: str) -> Dict[str, Any]: """ Analyze the quality of the tests themselves. """ try: metrics = { "assertion_density": 0, # Assertions per line of test code "test_setup_complexity": 0, # How complex is the test setup? "mock_usage": 0, # How frequently are mocks used? "test_patterns": [], # List of identified test patterns and best practices. "anti_patterns": [] # list of identified Anti patterns } lines = content.splitlines() assertion_count = sum(1 for line in lines if 'assert' in line) # check assertion present. metrics["assertion_density"] = assertion_count / len(lines) if lines else 0 setup_lines = [] in_setup = False for line in lines: if 'def setUp' in line or 'def setup' in line: in_setup = True elif in_setup and line.strip() and not line.startswith(' '): # if present it has any leading space of not. in_setup = False if in_setup: setup_lines.append(line) metrics["test_setup_complexity"] = len(setup_lines) mock_count = sum(1 for line in lines if 'mock' in line.lower()) # count mock if present metrics["mock_usage"] = mock_count #detect patterns. if any('parameterized' in line for line in lines): metrics["test_patterns"].append("parameterized_tests") # if any('fixture' in line for line in lines): metrics["test_patterns"].append("fixture_usage")# # Identify potential anti-patterns if any('time.sleep' in line for line in lines): metrics["anti_patterns"].append("sleep_in_tests") if any('test' not in line.lower() for line in lines if line.strip().startswith('def')): # all method related to test or not. metrics["anti_patterns"].append("non_test_methods") # anti_patterns if other extra methods there. return metrics except Exception as e: self.logger.error(f"Error analyzing test quality: {str(e)}") return { # Return default 0 values on error. "assertion_density": 0, "test_setup_complexity": 0, "mock_usage": 0, "test_patterns": [], "anti_patterns": [] } class DocumentationAnalyzer: """Handles documentation analysis.""" def __init__(self, repo): self.repo = repo self.logger = logging.getLogger(__name__) # Add logger self.doc_patterns = [ "README.md", "CONTRIBUTING.md", "CHANGELOG.md", "LICENSE", "docs/", # Common documentation directories "documentation/", "wiki/" # Consider wiki as documentation ] def analyze_documentation(self) -> Dict[str, Any]: """Analyze repository documentation (README, CONTRIBUTING, API docs, etc.).""" results = { "readme_analysis": None, "contributing_guidelines": None, "api_documentation": None, # Placeholder - can be expanded "documentation_files": [], # All documantation. "wiki_pages": [], # If the repo has a wiki "documentation_coverage": 0.0 # Overall score } try: # Analyze README readme = self._get_file_content("README.md") if readme: results["readme_analysis"] = self._analyze_readme(readme) # Check contributing guidelines contributing = self._get_file_content("CONTRIBUTING.md") if contributing: results["contributing_guidelines"] = self._analyze_contributing(contributing) contents = self.repo.get_contents("") while contents: content = contents.pop(0) if content.type == "dir": # Check for dedicated documentation directories if content.path.lower() in ["docs", "documentation"]: results["documentation_files"].extend(self._analyze_doc_directory(content.path)) contents.extend(self.repo.get_contents(content.path)) # Check for specific documentation files elif any(content.path.endswith(pattern) for pattern in self.doc_patterns): results["documentation_files"].append(content.path) results["documentation_coverage"] = self._calculate_doc_coverage() # Get wiki pages if available try: wiki_pages = self.repo.get_wiki_pages() # Requires PyGithub 2.x results["wiki_pages"] = [page.title for page in wiki_pages] except: # GitHub API might raise an exception if no wiki pass except Exception as e: self.logger.error(f"Error analyzing documentation: {str(e)}") # Use logger return results # Always return results def _get_file_content(self, filepath: str) -> Optional[str]: """Helper to get the content of a specific file (handles not found).""" try: content = self.repo.get_contents(filepath) return base64.b64decode(content.content).decode('utf-8') except: return None # File not found def _analyze_readme(self, content: str) -> Dict[str, Any]: """Analyze the README content for completeness and key information.""" analysis = { "sections": [], # List of identified sections (e.g., from headings) "has_quickstart": False, # Quick start guide "has_installation": False, # Installation instructions "has_usage": False, # Basic usage examples "has_api_docs": False, # Link to API docs? "has_examples": False, # Code examples "word_count": len(content.split()), "completeness_score": 0.0 } # Extract sections (using regex for headings) sections = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE) # match and return the content. analysis["sections"] = sections # Check for key components (using regex for robustness) analysis["has_quickstart"] = bool(re.search(r'quick\s*start', content, re.I)) # Case-insensitive analysis["has_installation"] = bool(re.search(r'install|setup', content, re.I)) analysis["has_usage"] = bool(re.search(r'usage|how\s+to\s+use', content, re.I)) # More flexible matching. analysis["has_api_docs"] = bool(re.search(r'api|documentation', content, re.I)) analysis["has_examples"] = bool(re.search(r'example|demo', content, re.I)) # Broader example terms # Calculate a simple completeness score key_elements = [ analysis["has_quickstart"], analysis["has_installation"], analysis["has_usage"], analysis["has_api_docs"], analysis["has_examples"] ] analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 return analysis def _analyze_contributing(self, content: str) -> Dict[str, Any]: """Analyze CONTRIBUTING.md for guidelines.""" analysis = { "has_code_style": False, # Code Style Guide "has_pr_process": False, # How to make PR "has_issue_guidelines": False, #Guidelines for reporting issue. "has_setup_instructions": False, # setup environment Instructions. "completeness_score": 0.0 } analysis["has_code_style"] = bool(re.search(r'code\s+style|coding\s+standards', content, re.I)) analysis["has_pr_process"] = bool(re.search(r'pull\s+request|PR', content, re.I)) # checking pull request analysis["has_issue_guidelines"] = bool(re.search(r'issue|bug\s+report', content, re.I)) #issue and bug report. analysis["has_setup_instructions"] = bool(re.search(r'setup|getting\s+started', content, re.I))# Setup. key_elements = [ #key components present or not. analysis["has_code_style"], analysis["has_pr_process"], analysis["has_issue_guidelines"], analysis["has_setup_instructions"] ] analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 # calculate return analysis def _analyze_doc_directory(self, directory: str) -> List[str]: """Analyze a dedicated documentation directory (if present).""" doc_files = [] try: contents = self.repo.get_contents(directory) for content in contents: if content.type == "file": doc_files.append(content.path) except Exception as e: self.logger.error(f"Error analyzing doc directory: {str(e)}") # Use logger return doc_files def _calculate_doc_coverage(self) -> float: """Calculate an overall documentation coverage score (heuristic).""" # This is a simplified scoring system and should be customized score = 0.0 total_points = 0 # Check README presence and quality readme = self._get_file_content("README.md") if readme: readme_analysis = self._analyze_readme(readme) score += readme_analysis["completeness_score"] / 100 * 40 # README is worth 40% total_points += 40 # Check contributing guidelines contributing = self._get_file_content("CONTRIBUTING.md") if contributing: contributing_analysis = self._analyze_contributing(contributing) score += contributing_analysis["completeness_score"] / 100 * 20 # Contributing is worth 20% total_points += 20 # Check API documentation (basic presence check) if any(f.endswith(('.md', '.rst')) for f in self.doc_patterns): score += 20 # API docs are worth 20% total_points += 20 # Check for examples (this is simplified - could be improved) if any('example' in f.lower() for f in self.doc_patterns): # Case-insensitive check score += 20 # Examples are worth 20% total_points += 20 return (score / total_points * 100) if total_points > 0 else 0.0 # Avoid division by 0 class CommunityAnalyzer: """Handles community metrics analysis.""" def __init__(self, repo): self.repo = repo self.logger = logging.getLogger(__name__) # Add logger async def analyze_community(self) -> Dict[str, Any]: """Analyze community engagement, health, and contribution patterns.""" results = { "engagement_metrics": await self._get_engagement_metrics(), # Await async calls "issue_metrics": await self._analyze_issues(), # Await for analysis "pr_metrics": await self._analyze_pull_requests(), # Await for PR "contributor_metrics": self._analyze_contributors(), "discussion_metrics": await self._analyze_discussions() # If discussions are enabled } return results # Returns Calculated community metrics. async def _get_engagement_metrics(self) -> Dict[str, Any]: """Get basic repository engagement metrics (stars, forks, watchers).""" metrics = { "stars": self.repo.stargazers_count, "forks": self.repo.forks_count, "watchers": self.repo.subscribers_count, "star_history": [], # Historical star data "fork_history": [] # Historical fork data } try: # Get star history (last 100 stars for efficiency) stargazers = self.repo.get_stargazers_with_dates() metrics["star_history"] = [ {"date": star.starred_at.isoformat(), "count": i + 1} # count: i+1 to show progression. for i, star in enumerate(stargazers) ] # Get fork history forks = self.repo.get_forks() # No need for with_date. metrics["fork_history"] = [ {"date": fork.created_at.isoformat(), "count": i + 1} for i, fork in enumerate(forks) ] except Exception as e: self.logger.error(f"Error getting engagement metrics: {str(e)}") # Use logger return metrics # Return calculated metrics data. async def _analyze_issues(self) -> Dict[str, Any]: """Analyze repository issues (open, closed, response times, labels).""" metrics = { "total_issues": 0, "open_issues": 0, "closed_issues": 0, "avg_time_to_close": None, # Average time to close an issue "issue_categories": defaultdict(int), # Categorize issues by label "response_times": [] # List of response times } try: issues = self.repo.get_issues(state='all') # Get all issues (open and closed) for issue in issues: metrics["total_issues"] += 1 if issue.state == 'open': metrics["open_issues"] += 1 else: metrics["closed_issues"] += 1 # Calculate time to close (if closed_at is available) if issue.closed_at and issue.created_at: #Calculate time,if issue closed. time_to_close = (issue.closed_at - issue.created_at).total_seconds() metrics["response_times"].append(time_to_close) # Categorize issues by labels for label in issue.labels: metrics["issue_categories"][label.name] += 1 # Calculate average response time if metrics["response_times"]: # Calculate Avg_response only if any time available. metrics["avg_time_to_close"] = sum(metrics["response_times"]) / len(metrics["response_times"]) #avg = tot / no. except Exception as e: self.logger.error(f"Error analyzing issues: {str(e)}") # Use logger return metrics async def _analyze_pull_requests(self) -> Dict[str, Any]: """Analyze pull requests (open, closed, merged, review times, sizes).""" metrics = { "total_prs": 0, "open_prs": 0, "merged_prs": 0, "closed_prs": 0, "avg_time_to_merge": None, # Average time to merge a PR "pr_sizes": defaultdict(int), # Categorize PRs by size (lines of code) "review_times": [] # List of review times } try: pulls = self.repo.get_pulls(state='all') # Get all PRs (open, closed, merged) for pr in pulls: metrics["total_prs"] += 1 if pr.state == 'open': metrics["open_prs"] += 1 elif pr.merged: metrics["merged_prs"] += 1 # Calculate time to merge if pr.merged_at and pr.created_at: time_to_merge = (pr.merged_at - pr.created_at).total_seconds() metrics["review_times"].append(time_to_merge) #store calculated value else: metrics["closed_prs"] += 1 # # Categorize PR sizes (simplified, based on additions + deletions) if pr.additions + pr.deletions < 10: metrics["pr_sizes"]["xs"] += 1 # Extra small elif pr.additions + pr.deletions < 50: metrics["pr_sizes"]["s"] += 1 # Small elif pr.additions + pr.deletions < 250: metrics["pr_sizes"]["m"] += 1 # Medium elif pr.additions + pr.deletions < 1000: metrics["pr_sizes"]["l"] += 1 # Large else: metrics["pr_sizes"]["xl"] += 1 # Extra large # Calculate average review time if metrics["review_times"]: #calculate Avg_time to merge if review times available. metrics["avg_time_to_merge"] = sum(metrics["review_times"]) / len(metrics["review_times"]) #calculate Average. except Exception as e: self.logger.error(f"Error analyzing pull requests: {str(e)}") # Use logger return metrics # retrun calculated metrics value. def _analyze_contributors(self) -> Dict[str, Any]: """Analyze contributor patterns and engagement.""" metrics = { "total_contributors": 0, "active_contributors": 0, # Contributors active in the last 90 days "contributor_types": defaultdict(int), # User, Organization, Bot "contribution_frequency": defaultdict(int), # High, medium, low "core_contributors": [] # List of core contributors (e.g., top 10%) } try: contributors = self.repo.get_contributors() for contributor in contributors: metrics["total_contributors"] += 1 # Check for recent activity (last 90 days) recent_commits = contributor.get_commits(since=datetime.now() - timedelta(days=90)) # active since if recent_commits.totalCount > 0: metrics["active_contributors"] += 1 # Categorize contributor types metrics["contributor_types"][contributor.type] += 1 # increment by type. # Analyze contribution frequency (simplified) if contributor.contributions > 100: #Contribution level checking. metrics["contribution_frequency"]["high"] += 1 # Consider contributors with >100 contributions as "core" metrics["core_contributors"].append({ "login": contributor.login, "contributions": contributor.contributions, # store "type": contributor.type #Store. }) elif contributor.contributions > 20: metrics["contribution_frequency"]["medium"] += 1 # store in medium if condition satisfy. else: metrics["contribution_frequency"]["low"] += 1# except Exception as e: self.logger.error(f"Error analyzing contributors: {str(e)}") # Use logger return metrics #return Calculated Contributer metrics async def _analyze_discussions(self) -> Dict[str, Any]: """Analyze repository discussions (if enabled).""" metrics = { "total_discussions": 0, "active_discussions": 0, # Discussions with recent activity "categories": defaultdict(int), # Discussion categories "avg_responses": 0, # Average number of responses per discussion "response_times": [] # List of response times } try: # Check if discussions are enabled if self.repo.has_discussions: # first check for discussion enabled. discussions = self.repo.get_discussions() # retrive all the discussion using get_discussions. total_responses = 0 for discussion in discussions: metrics["total_discussions"] += 1 # Check for active discussions (simplified: any comments = active) if discussion.comments > 0: metrics["active_discussions"] += 1 total_responses += discussion.comments # Calculate Total no of comments. # Categorize discussions metrics["categories"][discussion.category.name] += 1 # Calculate response times (time to first response) if discussion.comments > 0: first_response = discussion.get_comments().reversed[0] # Get first comment response_time = (first_response.created_at - discussion.created_at).total_seconds() # time calcualtion. metrics["response_times"].append(response_time) # append that. # Calculate average responses per discussion if metrics["active_discussions"] > 0: # Calculate only if value present. metrics["avg_responses"] = total_responses / metrics["active_discussions"] except Exception as e: self.logger.error(f"Error analyzing discussions: {str(e)}") # Use logger return metrics class RepositoryAnalyzer: """Main class to analyze a GitHub repository.""" def __init__(self, repo_url: str, github_token: str): self.logger = logging.getLogger(__name__) self.gh = Github(github_token) # Keep for some top-level calls self.gh_handler = GitHubAPIHandler(github_token) # Use the handler self.code_metrics = CodeMetricsAnalyzer() parts = repo_url.rstrip('/').split('/') if len(parts) < 2: raise ValueError("Invalid repository URL format") self.repo_name = parts[-1] self.owner = parts[-2] self.analysis_data = { # Initialize data here "basic_info": {}, "structure": {}, "code_metrics": {}, "dependencies": {}, "tests": {}, "documentation": {}, "community": {}, "visualizations": {} } try: self.repo = self.gh_handler.get_repository(repo_url) # Use handler # Initialize other analyzers *after* successfully getting the repo self.dependency_analyzer = DependencyAnalyzer(self.repo) self.test_analyzer = TestAnalyzer(self.repo) self.doc_analyzer = DocumentationAnalyzer(self.repo) self.community_analyzer = CommunityAnalyzer(self.repo) except Exception as e: self.logger.error(f"Failed to initialize repository analyzer: {str(e)}") raise async def analyze(self) -> Dict[str, Any]: """Perform the full repository analysis.""" try: # Basic repository information self.analysis_data["basic_info"] = { "name": self.repo.name, "owner": self.repo.owner.login, "description": self.repo.description or "No description available", # Handle None "stars": self.repo.stargazers_count, "forks": self.repo.forks_count, "created_at": self.repo.created_at.isoformat(), # Use isoformat() "last_updated": self.repo.updated_at.isoformat(), "primary_language": self.repo.language or "Not specified", } # Analyze repository structure with sampling self.analysis_data["structure"] = await self._analyze_structure() # Analyze code patterns and metrics self.analysis_data["code_metrics"] = await self._analyze_code_metrics() # Analyze dependencies self.analysis_data["dependencies"] = await self.dependency_analyzer.analyze_dependencies() # Analyze tests and coverage self.analysis_data["tests"] = self.test_analyzer.analyze_tests() # Analyze documentation self.analysis_data["documentation"] = self.doc_analyzer.analyze_documentation() # Analyze community health self.analysis_data["community"] = await self.community_analyzer.analyze_community() # Generate visualizations self.analysis_data["visualizations"] = await self._generate_visualizations() return self.analysis_data # Return the populated dict except Exception as e: self.logger.error(f"Error during analysis: {str(e)}") raise async def _analyze_structure(self) -> Dict[str, Any]: """Analyze the repository's file and directory structure, with sampling.""" structure = { "files": defaultdict(int), # File type counts (e.g., .py, .js) "directories": set(), # Unique directory paths "total_size": 0, # Total size in bytes "directory_tree": defaultdict(list), # Parent -> [children] "file_samples": [] # Sample files for detailed analysis } try: all_files = [] # Store all relevant files first contents = self.repo.get_contents("") while contents: content = contents.pop(0) if content.type == "dir": structure["directories"].add(content.path) # Build directory tree structure structure["directory_tree"][os.path.dirname(content.path)].append(content.path) #correct way contents.extend(self.repo.get_contents(content.path)) else: ext = Path(content.path).suffix.lower() # Get lowercase extension # Only consider relevant files if ext in RELEVANT_EXTENSIONS: structure["files"][ext] += 1 # Increment count for the file type structure["total_size"] += content.size all_files.append(content) # Smart sampling of files if all_files: # Stratified sampling based on file types samples_per_type = min(5, max(1, len(all_files) // len(structure["files"]) if structure["files"] else 1)) # At least one sample for ext in structure["files"].keys(): ext_files = [f for f in all_files if f.path.endswith(ext)] #select the all file if ext_files: # Sort by size, and select a diverse sample ext_files.sort(key=lambda x: x.size) total_samples = min(samples_per_type, len(ext_files)) # Take samples evenly across the size range step = max(1, len(ext_files) // total_samples) for i in range(0, len(ext_files), step)[:total_samples]:# Select diverse files from list. structure["file_samples"].append({ "path": ext_files[i].path, "size": ext_files[i].size, "type": RELEVANT_EXTENSIONS.get(ext, "Unknown") # Get language }) except Exception as e: self.logger.error(f"Error analyzing structure: {str(e)}") # Don't need to return defaults if 'structure' dict is initialized. return { "file_types": dict(structure["files"]), # Convert defaultdict to dict "directory_count": len(structure["directories"]), "total_size": structure["total_size"], "file_count": sum(structure["files"].values()), # Total relevant files "directory_tree": dict(structure["directory_tree"]), # convert "file_samples": structure["file_samples"] } async def _analyze_code_metrics(self) -> Dict[str, Any]: """Analyze code metrics for a sample of files, with parallel processing.""" metrics = { "complexity_metrics": defaultdict(list), # Cyclomatic/cognitive, nesting "duplication_metrics": defaultdict(list), "function_metrics": defaultdict(list), # From function analysis "comment_metrics": defaultdict(list), # Comment density "language_metrics": defaultdict(dict) # Aggregate by language } try: # Get all relevant files contents = self.repo.get_contents("") files_to_analyze = [] while contents: content = contents.pop(0) if content.type == "dir": contents.extend(self.repo.get_contents(content.path)) elif Path(content.path).suffix.lower() in RELEVANT_EXTENSIONS: # Check file. files_to_analyze.append(content) # Use parallel processing for file analysis with ThreadPoolExecutor(max_workers=min(10, len(files_to_analyze))) as executor: # Limit max worker upto 10. futures = [] for file_content in files_to_analyze: futures.append(executor.submit(self.code_metrics._analyze_file_metrics, file_content)) # passing arguments for future in futures: # try: file_metrics = future.result() # Collect the results from the File Analysis if file_metrics: language = RELEVANT_EXTENSIONS.get(Path(file_metrics["path"]).suffix.lower(), "Unknown") # Aggregate metrics (by language, for example) # Correctly handle string keys for metrics for metric_type, value in file_metrics["metrics"].items(): if isinstance(value, (int, float)): metrics.setdefault(f"{metric_type}_metrics", defaultdict(list))[language].append(value) # store # Update language-specific metrics if language not in metrics["language_metrics"]: metrics["language_metrics"][language] = { "file_count": 0, "total_lines": 0, "total_complexity": 0 } lang_metrics = metrics["language_metrics"][language] #get value based on language. lang_metrics["file_count"] += 1 lang_metrics["total_lines"] += file_metrics["metrics"].get("total_lines", 0) # Total lines addition. lang_metrics["total_complexity"] += file_metrics["metrics"].get("complexity", 0) #complexity count except Exception as e: self.logger.error(f"Error processing file metrics: {str(e)}") return metrics # return aggregated except Exception as e: self.logger.error(f"Error analyzing code metrics: {str(e)}") return metrics # Return the initialized dict (possibly empty) async def _generate_visualizations(self) -> Dict[str, Any]: """Generate visualizations from the analyzed data (using matplotlib, seaborn, etc.).""" visualizations = {} try: # Language distribution pie chart if self.analysis_data.get("structure", {}).get("file_types"): fig, ax = plt.subplots() languages = self.analysis_data["structure"]["file_types"] plt.pie(languages.values(), labels=languages.keys(), autopct='%1.1f%%') plt.title("Language Distribution") from io import BytesIO buffer = BytesIO() # convert bytes plt.savefig(buffer, format='png') visualizations["language_distribution"] = base64.b64encode(buffer.getvalue()).decode() plt.close() # Code complexity heatmap (example using average complexity) if self.analysis_data.get("code_metrics", {}).get("complexity_metrics"): complexity_data = [] for lang, values in self.analysis_data["code_metrics"]["complexity_metrics"].items(): if values: # Ensure there are values to average complexity_data.append({ "language": lang, "avg_complexity": sum(values) / len(values) }) if complexity_data: # If Data present generate graph. df = pd.DataFrame(complexity_data) plt.figure(figsize=(10, 6)) sns.barplot(data=df, x="language", y="avg_complexity") plt.title("Average Code Complexity by Language") plt.xticks(rotation=45) # Rotate x-axis labels buffer = BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight') # Improve layout visualizations["complexity_distribution"] = base64.b64encode(buffer.getvalue()).decode() plt.close() # Commit activity heatmap (example) if self.analysis_data.get("community", {}).get("commit_history"): #check whether community & commit-history metrics commit_data = self.analysis_data["community"]["commit_history"] df = pd.DataFrame(commit_data) df['date'] = pd.to_datetime(df['date']) # change into date time for visualization df = df.set_index('date') # Resample to daily counts df = df.resample('D').count() plt.figure(figsize=(12, 4)) # fixed size. sns.heatmap(df.pivot_table(index=df.index.dayofweek, columns=df.index.month, values='count', aggfunc='sum')) # cretae heat map plt.title("Commit Activity Heatmap") #tile. buffer = BytesIO() # plt.savefig(buffer, format='png', bbox_inches='tight') visualizations["commit_heatmap"] = base64.b64encode(buffer.getvalue()).decode() # plt.close() # # Add more visualizations as needed (e.g., dependency graph, test coverage) except Exception as e: self.logger.error(f"Error generating visualizations: {str(e)}") return visualizations # Even if empty # --- Prompt Creation and LLM Interaction --- def create_enhanced_analysis_prompt(analysis_data: Dict[str, Any]) -> str: """Create an enhanced prompt for the LLM analysis.""" return f"""You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. Analyze the provided repository data and create a detailed, insightful analysis using the following sections: # Repository Analysis for {analysis_data['basic_info']['name']} ## 📊 Project Overview [Analyze the basic repository information, including: - Project purpose and description - Repository age and activity level - Key metrics (stars, forks, contributors) - Primary technologies used - Overall project health indicators] ## 🏗️ Architecture and Code Organization [Analyze the repository structure and code organization: - Directory structure and organization patterns - Code distribution across languages - File organization and modularity - Architectural patterns - Development standards and practices - Code complexity distribution - Potential architectural improvements] ## 💻 Code Quality and Metrics [Provide detailed analysis of code quality metrics: - Cyclomatic complexity trends - Code duplication patterns - Function length and complexity - Comment density and documentation quality - Test coverage and quality - Areas for potential improvement] ## 📦 Dependencies and Security [Analyze the project's dependencies: - Major dependencies and their versions - Outdated dependencies - Security vulnerabilities - Dependency graph complexity - Licensing considerations] ## 📚 Documentation Assessment [Evaluate the project's documentation: - README completeness and quality - API documentation coverage - Contributing guidelines - Code comments and inline documentation - Examples and tutorials - Documentation maintenance status] ## 🧪 Testing and Quality Assurance [Analyze testing practices: - Test coverage metrics - Testing patterns and approaches - CI/CD implementation - Quality assurance processes - Areas needing additional testing] ## 👥 Community Health and Engagement [Evaluate community aspects: - Contributor demographics and activity - Issue and PR response times - Community engagement metrics - Communication patterns - Governance model] ## 📈 Development Trends [Analyze development patterns: - Commit frequency and distribution - Code change patterns - Release cycle analysis - Development velocity - Team collaboration patterns] ## 🚀 Performance and Scalability [Assess technical characteristics: - Code performance indicators - Scalability considerations - Resource usage patterns - Technical debt indicators - Optimization opportunities] ## 💡 Key Insights [Summarize the most important findings: - Top 3 strengths - Top 3 areas for improvement - Unique characteristics - Notable patterns or practices - Risk factors] ## 📋 Recommendations [Provide actionable recommendations: - Immediate improvement opportunities - Long-term strategic suggestions - Specific tools or practices to consider - Priority areas for focus - Resource allocation suggestions] Please analyze the following repository data thoroughly and provide detailed insights for each section: {json.dumps(analysis_data, indent=2)} """ async def analyze_repository(repo_url: str, github_token: str, gemini_key: str, progress=gr.Progress()) -> Tuple[str, str, str]: """Analyze repository and generate LLM summary (async, with progress).""" try: # Re-initialize tokens each time initialize_tokens(github_token, gemini_key) # Ensure fresh tokens progress(0, desc="Initializing repository analysis...") analyzer = RepositoryAnalyzer(repo_url, github_token) progress(0.3, desc="Analyzing repository structure and patterns...") analysis_data = await analyzer.analyze() # Await the analysis progress(0.7, desc="Generating comprehensive analysis...") # Use the more powerful Gemini 1.5 Pro model model = genai.GenerativeModel( model_name="gemini-1.5-pro", # Use 1.5 Pro generation_config={ "temperature": 0.7, "top_p": 0.95, # Use nucleus sampling "top_k": 40, "max_output_tokens": 8192, # Increased token limit } ) prompt = create_enhanced_analysis_prompt(analysis_data) # Use a better, sectioned prompt. # Use streaming for a better user experience chat = model.start_chat(history=[]) # Start fresh response = chat.send_message(prompt) progress(0.9, desc="Saving analysis results...") # Save analysis data to a temporary file (for follow-up Q&A) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: json.dump(analysis_data, f, indent=2) analysis_file = f.name progress(1.0, desc="Analysis complete!") return response.text, analysis_file, "✅ Analysis completed successfully!" except Exception as e: error_message = f"❌ Error analyzing repository: {str(e)}" return "", "", error_message # Return empty strings for Markdown and file async def ask_question(question: str, analysis_file: str, chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: """Process a follow-up question about the analysis with enhanced context.""" if not analysis_file: return chat_history + [(question, "Please analyze a repository first before asking questions.")] try: with open(analysis_file, 'r') as f: analysis_data = json.load(f) # Initialize chat with system prompt and history model = genai.GenerativeModel( model_name="gemini-1.5-pro", # Use 1.5 Pro generation_config={ "temperature": 0.7, "top_p": 0.8, # More focused sampling "top_k": 40, "max_output_tokens": 4096, # Increased token limit } ) # Build the context context = """You are an expert code analyst helping users understand repository analysis results. Provide detailed, technical, and actionable insights based on the analysis data. When appropriate, reference specific metrics and patterns from the analysis. If making recommendations, be specific and explain the reasoning behind them. Repository Analysis Data: """ context += json.dumps(analysis_data, indent=2) + "\n\n" if chat_history: # Previous Chat history if have any. context += "Previous conversation:\n" for user_msg, assistant_msg in chat_history[-3:]: # Only include last 3 exchanges for relevance. context += f"User: {user_msg}\nAssistant: {assistant_msg}\n" prompt = f"""{context} User's Question: {question} Please provide a detailed analysis that: 1. Directly addresses the user's question 2. References relevant metrics and data from the analysis 3. Provides context and explanations for technical concepts 4. Suggests actionable next steps or recommendations when appropriate 5. Maintains technical accuracy while being clear and understandable Your response:""" chat = model.start_chat(history=[]) # Start a new chat response = chat.send_message(prompt) return chat_history + [(question, response.text)] # Store new except Exception as e: error_message = f"Error processing question: {str(e)}" return chat_history + [(question, error_message)] # --- Gradio Interface --- def create_interface(): with gr.Blocks(theme=gr.themes.Soft()) as app: # Use a theme gr.Markdown(""" # 🔍 GitHub Repository Analyzer (Colab Version) Analyze any public GitHub repository using AI. """) # API tokens with gr.Row(): github_token = gr.Textbox( label="GitHub Token", type="password", placeholder="Enter your GitHub token" ) gemini_key = gr.Textbox( label="Gemini API Key", type="password", placeholder="Enter your Gemini API key" ) init_btn = gr.Button("Initialize Tokens", variant="secondary") # Repository URL and analysis button with gr.Row(): repo_url = gr.Textbox( label="GitHub Repository URL", placeholder="https://github.com/owner/repo", scale=4 # Larger input box ) analyze_btn = gr.Button("🔍 Analyze", variant="primary", scale=1) # Status message status_msg = gr.Markdown("") # Display Error Status. # Analysis results with gr.Tabs(): with gr.Tab("📝 Analysis Report"): # report Analysis. summary = gr.Markdown("") # output report. with gr.Tab("💭 Q&A"): # Improved label chatbot = gr.Chatbot( [], label="Ask questions about the analysis", height=400 ) with gr.Row(): question = gr.Textbox( label="Your Question", placeholder="Ask about specific aspects of the analysis...", scale=4 ) ask_btn = gr.Button("Ask", scale=1) clear_btn = gr.Button("Clear", scale=1) # Hidden state to store the analysis data file path analysis_file = gr.State("") async def safe_analyze(repo_url: str, github_token: str, gemini_key: str): """Wrapper function to handle analysis and errors gracefully.""" try: if not repo_url: return None, None, "❌ Please enter a GitHub repository URL" if not github_token or not gemini_key: return None, None, "❌ Please initialize tokens first" if not re.match(r'https?://github\.com/[\w-]+/[\w-]+/?$', repo_url): return None, None, "❌ Invalid GitHub repository URL format" summary, analysis_file, status = await analyze_repository(repo_url, github_token, gemini_key) return summary, analysis_file, status except Exception as e: return None, None, f"❌ Analysis failed: {str(e)}" # Event handlers init_btn.click( initialize_tokens, inputs=[github_token, gemini_key], outputs=status_msg ) analyze_btn.click( fn=lambda: "⏳ Analysis in progress...", # Immediate feedback inputs=None, outputs=status_msg, queue=False # Don't queue this click ).then( safe_analyze, # Call the wrapper inputs=[repo_url, github_token, gemini_key], outputs=[summary, analysis_file, status_msg] ) ask_btn.click( ask_question, inputs=[question, analysis_file, chatbot], # Include chatbot history outputs=[chatbot] ).then( lambda: "", # Clear the question box after asking None, question, queue=False ) clear_btn.click( lambda: ([], ""), # Clear chatbot and question outputs=[chatbot, question] ) return app # Run the interface if __name__ == "__main__": app = create_interface() app.launch(debug=True, share=True)