import os import json import time import re import logging import datetime import concurrent.futures import sys import base64 import tempfile from pathlib import Path from typing import Dict, List, Union, Any, Optional, Tuple, Set from collections import Counter, defaultdict from dataclasses import dataclass, field, asdict from io import BytesIO, StringIO import urllib.request import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import networkx as nx import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from tqdm.notebook import tqdm from dateutil.relativedelta import relativedelta from github import Github, GithubException, RateLimitExceededException import gradio as gr # For PDF Generation from reportlab.lib.pagesizes import letter, A4 from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak from reportlab.lib.units import inch from reportlab.pdfgen import canvas from reportlab.lib.enums import TA_CENTER, TA_LEFT # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger("github_analyzer") @dataclass class GitHubAPIConfig: """Configuration for the GitHub API client with sensible defaults.""" # API access configuration token: str = None max_retries: int = 5 backoff_factor: int = 2 per_page: int = 100 # Max allowed by GitHub timeout: int = 30 # Retry status codes retry_status_codes: Set[int] = field(default_factory=lambda: { 403, 429, 500, 502, 503, 504 }) # Permission types collaborator_permission_types: List[str] = field(default_factory=lambda: [ "admin", "push", "pull", "maintain", "triage" ]) # File classification code_extensions: List[str] = field(default_factory=lambda: [ ".py", ".js", ".java", ".c", ".cpp", ".cs", ".go", ".php", ".rb", ".swift", ".kt", ".ts", ".rs", ".scala", ".lua", ".m", ".mm", ".h", ".hpp", ".cc", ".hh", ".f", ".f90", ".f95", ".f03", ".f08", ".for", ".f77", ".jl", ".pl", ".pm", ".t", ".r", ".dart", ".groovy", ".v", ".vhd", ".vhdl", ".erl", ".hrl", ".hs", ".lhs", ".ex", ".exs", ".hx" ]) markup_extensions: List[str] = field(default_factory=lambda: [ ".md", ".html", ".htm", ".xml", ".json", ".yaml", ".yml", ".txt", ".rst", ".tex", ".adoc", ".csv", ".tsv", ".toml", ".ini", ".cfg" ]) script_extensions: List[str] = field(default_factory=lambda: [ ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd" ]) notebook_extensions: List[str] = field(default_factory=lambda: [ ".ipynb" ]) data_extensions: List[str] = field(default_factory=lambda: [ ".csv", ".tsv", ".json", ".xml", ".xls", ".xlsx", ".hdf5", ".parquet", ".feather", ".pkl", ".sav", ".dta", ".arff" ]) config_extensions: List[str] = field(default_factory=lambda: [ ".yml", ".yaml", ".json", ".toml", ".ini", ".cfg", ".conf" ]) other_extensions: List[str] = field(default_factory=lambda: [ ".txt", ".log", ".svg", ".png", ".jpg", ".jpeg" ]) # Data collection limits (set to None for no limit) max_contributors: Optional[int] = 50 max_issues: Optional[int] = 100 max_commits: Optional[int] = 200 max_search_results: Optional[int] = 50 max_pull_requests: Optional[int] = 100 max_collaborators: Optional[int] = 30 # Output configuration output_dir: str = "/tmp/github_data" generate_visualizations: bool = True def __post_init__(self): """Ensure output directory exists""" os.makedirs(self.output_dir, exist_ok=True) def all_code_extensions(self) -> List[str]: """Return all code-related file extensions""" return list(set( self.code_extensions + self.script_extensions + self.config_extensions )) class GithubClient: """ A robust GitHub client that handles rate limiting, retries, and provides consistent error handling. """ def __init__(self, config: GitHubAPIConfig): """Initialize the GitHub client with configuration.""" self.config = config self.github = Github( config.token, per_page=config.per_page, timeout=config.timeout, retry=config.max_retries ) self.cache = {} # Simple in-memory cache def get_repo(self, repo_path: str): """Get a repository by owner/name with caching.""" cache_key = f"repo:{repo_path}" if cache_key in self.cache: return self.cache[cache_key] repo = self.github.get_repo(repo_path) self.cache[cache_key] = repo return repo def _handle_exception(self, e: GithubException, retry_count: int) -> bool: """ Handle GitHub exceptions with proper retries and backoff strategy. Args: e: The exception to handle retry_count: Current retry count Returns: bool: True if retry should be attempted, False otherwise """ if retry_count >= self.config.max_retries: logger.error(f"Max retries ({self.config.max_retries}) exceeded.") return False if isinstance(e, RateLimitExceededException): # Handle primary rate limit rate_limit = self.github.get_rate_limit() reset_time = rate_limit.core.reset.timestamp() if hasattr(rate_limit, 'core') else time.time() + 3600 sleep_time = max(0, int(reset_time - time.time())) + 1 logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds...") time.sleep(sleep_time) return True elif e.status in self.config.retry_status_codes: # Handle secondary rate limits and server errors sleep_time = self.config.backoff_factor ** retry_count logger.warning( f"Temporary error (status {e.status}). Retrying in {sleep_time} seconds. " f"Attempt {retry_count+1}/{self.config.max_retries}." ) time.sleep(sleep_time) return True # Non-recoverable error logger.error(f"Non-recoverable GitHub API error: {e}") return False def _paginated_request(self, method, *args, **kwargs): """ Execute a paginated GitHub API request with retry logic. Args: method: The PyGithub method to call Returns: List of results or None on non-recoverable error """ results = [] retry_count = 0 max_results = kwargs.pop('max_results', None) while retry_count <= self.config.max_retries: try: paginated_list = method(*args, **kwargs) # Process items for item in paginated_list: results.append(item) if max_results and len(results) >= max_results: return results # Check if we've reached the end if paginated_list.totalCount <= len(results): break # Reset retry counter on success retry_count = 0 except GithubException as e: if self._handle_exception(e, retry_count): retry_count += 1 else: return None return results def _execute_request(self, method, *args, **kwargs): """ Execute a single GitHub API request with retry logic. Args: method: The PyGithub method to call Returns: Result of the API call or None on non-recoverable error """ retry_count = 0 while retry_count <= self.config.max_retries: try: result = method(*args, **kwargs) return result except GithubException as e: # Special case for 404 errors - file not found if e.status == 404: logger.info(f"Resource not found: {e}") return None if self._handle_exception(e, retry_count): retry_count += 1 else: return None return None class GitHubRepoAnalyzer: """ Main class for analyzing GitHub repositories and generating insights. """ def __init__(self, config: GitHubAPIConfig): """Initialize the analyzer with configuration.""" self.config = config self.client = GithubClient(config) def get_repo_details(self, repo) -> Dict[str, Any]: """Get comprehensive repository metadata.""" logger.info(f"Fetching repository details for {repo.full_name}") return { "name": repo.name, "full_name": repo.full_name, "description": repo.description, "html_url": repo.html_url, "stargazers_count": repo.stargazers_count, "watchers_count": repo.watchers_count, "forks_count": repo.forks_count, "open_issues_count": repo.open_issues_count, "language": repo.language, "default_branch": repo.default_branch, "created_at": repo.created_at.isoformat() if repo.created_at else None, "updated_at": repo.updated_at.isoformat() if repo.updated_at else None, "pushed_at": repo.pushed_at.isoformat() if repo.pushed_at else None, "license": repo.license.name if repo.license else None, "topics": list(repo.get_topics()), "archived": repo.archived, "disabled": repo.disabled, "visibility": repo.visibility, "has_wiki": repo.has_wiki, "has_pages": repo.has_pages, "has_projects": repo.has_projects, "has_issues": repo.has_issues, "has_discussions": repo.has_discussions if hasattr(repo, 'has_discussions') else None, "size": repo.size, # Size in KB "network_count": repo.network_count, "subscribers_count": repo.subscribers_count, "organization": repo.organization.login if repo.organization else None, "parent": repo.parent.full_name if hasattr(repo, 'parent') and repo.parent else None, "fork": repo.fork, } def get_contributors(self, repo) -> List[Dict[str, Any]]: """Get repository contributors with detailed information.""" logger.info(f"Fetching contributors for {repo.full_name}") contributors = self.client._paginated_request( repo.get_contributors, max_results=self.config.max_contributors ) if contributors is None: return [] return [ { "login": c.login, "id": c.id, "contributions": c.contributions, "type": c.type, "html_url": c.html_url, "followers": c.followers, "following": c.following, "public_repos": c.public_repos if hasattr(c, 'public_repos') else None, "bio": c.bio if hasattr(c, 'bio') else None, "location": c.location if hasattr(c, 'location') else None, "company": c.company if hasattr(c, 'company') else None, "email": c.email if hasattr(c, 'email') else None, "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, } for c in contributors ] def get_languages(self, repo) -> Dict[str, int]: """Get languages used in the repository.""" logger.info(f"Fetching languages for {repo.full_name}") languages = self.client._execute_request(repo.get_languages) return languages or {} def get_issues(self, repo, state: str = "all") -> List[Dict[str, Any]]: """Get repository issues.""" logger.info(f"Fetching issues for {repo.full_name} with state={state}") issues = self.client._paginated_request( repo.get_issues, state=state, max_results=self.config.max_issues ) if issues is None: return [] return [ { "id": issue.id, "number": issue.number, "title": issue.title, "body": issue.body, "state": issue.state, "user_login": issue.user.login if issue.user else None, "labels": [label.name for label in issue.labels], "comments": issue.comments, "created_at": issue.created_at.isoformat() if issue.created_at else None, "updated_at": issue.updated_at.isoformat() if issue.updated_at else None, "closed_at": issue.closed_at.isoformat() if issue.closed_at else None, "pull_request": issue.pull_request is not None, "milestone": issue.milestone.title if issue.milestone else None, "assignees": [user.login for user in issue.assignees] if issue.assignees else [], } for issue in issues ] def get_commits(self, repo) -> List[Dict[str, Any]]: """Get repository commits.""" logger.info(f"Fetching commits for {repo.full_name}") commits = self.client._paginated_request( repo.get_commits, max_results=self.config.max_commits ) if commits is None: return [] return [ { "sha": commit.sha, "commit_message": commit.commit.message, "author_login": commit.author.login if commit.author else None, "author_name": commit.commit.author.name if commit.commit and commit.commit.author else None, "author_email": commit.commit.author.email if commit.commit and commit.commit.author else None, "committer_login": commit.committer.login if commit.committer else None, "committer_name": commit.commit.committer.name if commit.commit and commit.commit.committer else None, "date": commit.commit.author.date.isoformat() if commit.commit and commit.commit.author else None, "html_url": commit.html_url, "stats": { "additions": commit.stats.additions if hasattr(commit, 'stats') else None, "deletions": commit.stats.deletions if hasattr(commit, 'stats') else None, "total": commit.stats.total if hasattr(commit, 'stats') else None, }, "files_changed": [ {"filename": f.filename, "additions": f.additions, "deletions": f.deletions, "status": f.status} for f in commit.files ] if hasattr(commit, 'files') else [], } for commit in commits ] def get_readme(self, repo) -> str: """Get repository README content.""" logger.info(f"Fetching README for {repo.full_name}") readme = self.client._execute_request(repo.get_readme) if readme is None: return "" try: return readme.decoded_content.decode('utf-8') except UnicodeDecodeError: logger.warning(f"Could not decode README content for {repo.full_name}") return "" def get_pull_requests(self, repo, state: str = "all") -> List[Dict[str, Any]]: """Get repository pull requests.""" logger.info(f"Fetching pull requests for {repo.full_name} with state={state}") pulls = self.client._paginated_request( repo.get_pulls, state=state, max_results=self.config.max_pull_requests ) if pulls is None: return [] return [ { "id": pull.id, "number": pull.number, "title": pull.title, "body": pull.body, "state": pull.state, "user_login": pull.user.login if pull.user else None, "created_at": pull.created_at.isoformat() if pull.created_at else None, "updated_at": pull.updated_at.isoformat() if pull.updated_at else None, "closed_at": pull.closed_at.isoformat() if pull.closed_at else None, "merged_at": pull.merged_at.isoformat() if pull.merged_at else None, "draft": pull.draft if hasattr(pull, 'draft') else None, "mergeable": pull.mergeable if hasattr(pull, 'mergeable') else None, "mergeable_state": pull.mergeable_state if hasattr(pull, 'mergeable_state') else None, "merged": pull.merged if hasattr(pull, 'merged') else None, "merge_commit_sha": pull.merge_commit_sha if hasattr(pull, 'merge_commit_sha') else None, "comments": pull.comments if hasattr(pull, 'comments') else 0, "review_comments": pull.review_comments if hasattr(pull, 'review_comments') else 0, "commits": pull.commits if hasattr(pull, 'commits') else 0, "additions": pull.additions if hasattr(pull, 'additions') else 0, "deletions": pull.deletions if hasattr(pull, 'deletions') else 0, "changed_files": pull.changed_files if hasattr(pull, 'changed_files') else 0, "head_ref": pull.head.ref if hasattr(pull, 'head') and pull.head else None, "base_ref": pull.base.ref if hasattr(pull, 'base') and pull.base else None, "labels": [label.name for label in pull.labels] if hasattr(pull, 'labels') else [], "assignees": [user.login for user in pull.assignees] if hasattr(pull, 'assignees') else [], "requested_reviewers": [user.login for user in pull.requested_reviewers] if hasattr(pull, 'requested_reviewers') else [], } for pull in pulls ] def get_collaborators(self, repo, affiliation: str = "all") -> List[Dict[str, Any]]: """Get repository collaborators.""" logger.info(f"Fetching collaborators for {repo.full_name} with affiliation={affiliation}") collaborators = self.client._paginated_request( repo.get_collaborators, affiliation=affiliation, max_results=self.config.max_collaborators ) if collaborators is None: return [] return [ { "login": c.login, "id": c.id, "type": c.type, "url": c.url, "site_admin": c.site_admin if hasattr(c, 'site_admin') else None, "role_name": self._get_permission_level(repo, c.login), "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, } for c in collaborators ] def _get_permission_level(self, repo, username: str) -> str: """Get permission level for a collaborator.""" try: return repo.get_collaborator_permission(username) except GithubException: return "unknown" def get_file_distribution(self, repo) -> Dict[str, int]: """Analyze file types distribution in the repository.""" logger.info(f"Analyzing file distribution for {repo.full_name}") # Get all files in the repo (only feasible for smaller repos) try: contents = self.client._execute_request(repo.get_contents, "") if not contents: return {} file_types = defaultdict(int) directories = [] # Process initial contents for item in contents: if item.type == "dir": directories.append(item.path) elif item.type == "file": ext = os.path.splitext(item.name)[1].lower() file_types[ext if ext else "no_extension"] += 1 # Process directories (up to a reasonable depth to avoid API rate limits) max_depth = 3 for depth in range(max_depth): if not directories: break next_level = [] for directory in directories[:100]: # Limit to avoid excessive API calls dir_contents = self.client._execute_request(repo.get_contents, directory) if not dir_contents: continue for item in dir_contents: if item.type == "dir": next_level.append(item.path) elif item.type == "file": ext = os.path.splitext(item.name)[1].lower() file_types[ext if ext else "no_extension"] += 1 directories = next_level return dict(file_types) except GithubException: logger.warning(f"Could not get file distribution for {repo.full_name}") return {} def search_code(self, repo, query_terms: List[str]) -> List[Dict[str, Any]]: """Search for specific terms in the repository code.""" logger.info(f"Searching code in {repo.full_name} for terms: {query_terms}") results = [] for term in query_terms: query = f"repo:{repo.full_name} {term}" search_results = self.client._paginated_request( self.client.github.search_code, query, max_results=self.config.max_search_results ) if search_results: results.extend([ { "term": term, "name": result.name, "path": result.path, "sha": result.sha, "url": result.html_url, "repository": result.repository.full_name, } for result in search_results if result.repository.full_name == repo.full_name ]) return results def get_branches(self, repo) -> List[Dict[str, Any]]: """Get repository branches.""" logger.info(f"Fetching branches for {repo.full_name}") branches = self.client._paginated_request(repo.get_branches) if branches is None: return [] return [ { "name": branch.name, "protected": branch.protected, "commit_sha": branch.commit.sha if branch.commit else None, } for branch in branches ] def get_releases(self, repo) -> List[Dict[str, Any]]: """Get repository releases.""" logger.info(f"Fetching releases for {repo.full_name}") releases = self.client._paginated_request(repo.get_releases) if releases is None: return [] return [ { "id": release.id, "tag_name": release.tag_name, "name": release.title, "body": release.body, "draft": release.draft, "prerelease": release.prerelease, "created_at": release.created_at.isoformat() if release.created_at else None, "published_at": release.published_at.isoformat() if release.published_at else None, "author_login": release.author.login if release.author else None, "html_url": release.html_url, "assets": [ { "name": asset.name, "label": asset.label, "content_type": asset.content_type, "size": asset.size, "download_count": asset.download_count, "browser_download_url": asset.browser_download_url, } for asset in release.get_assets() ], } for release in releases ] def get_workflows(self, repo) -> List[Dict[str, Any]]: """Get repository GitHub Actions workflows.""" logger.info(f"Fetching workflows for {repo.full_name}") try: workflows = self.client._paginated_request(repo.get_workflows) if workflows is None: return [] return [ { "id": workflow.id, "name": workflow.name, "path": workflow.path, "state": workflow.state, "created_at": workflow.created_at.isoformat() if workflow.created_at else None, "updated_at": workflow.updated_at.isoformat() if workflow.updated_at else None, } for workflow in workflows ] except (GithubException, AttributeError): # Older PyGithub versions or repositories without workflows return [] def analyze_commit_activity(self, repo) -> Dict[str, Any]: """Analyze commit activity patterns.""" logger.info(f"Analyzing commit activity for {repo.full_name}") # Get stats commit activity stats = self.client._execute_request(repo.get_stats_commit_activity) if not stats: return {} weekly_commits = [] for week in stats: if hasattr(week, 'week') and hasattr(week, 'total'): date = datetime.datetime.fromtimestamp(week.week).strftime('%Y-%m-%d') weekly_commits.append({ "week": date, "total": week.total, "days": week.days if hasattr(week, 'days') else [], }) # Get code frequency code_freq = self.client._execute_request(repo.get_stats_code_frequency) if not code_freq: code_frequency = [] else: code_frequency = [] for item in code_freq: date = datetime.datetime.fromtimestamp(item[0]).strftime('%Y-%m-%d') code_frequency.append({ "week": date, "additions": item[1], "deletions": -item[2], # Convert to positive for better readability }) return { "weekly_commits": weekly_commits, "code_frequency": code_frequency, } def analyze_contributor_activity(self, repo) -> Dict[str, Any]: """Analyze contributor activity patterns.""" logger.info(f"Analyzing contributor activity for {repo.full_name}") # Get contributor stats stats = self.client._execute_request(repo.get_stats_contributors) if not stats: return {} contributor_stats = [] for stat in stats: if not hasattr(stat, 'author') or not stat.author: continue weeks_data = [] for week in stat.weeks: if hasattr(week, 'w'): date = datetime.datetime.fromtimestamp(week.w).strftime('%Y-%m-%d') weeks_data.append({ "week": date, "additions": week.a, "deletions": week.d, "commits": week.c, }) contributor_stats.append({ "author": stat.author.login, "total_commits": stat.total, "weeks": weeks_data, }) return { "contributor_stats": contributor_stats, } def analyze_issue_distribution(self, issues: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze distribution of issues by various metrics.""" if not issues: return {} # Convert to DataFrame for easier analysis df = pd.DataFrame(issues) # Issues by state state_counts = df['state'].value_counts().to_dict() if 'state' in df else {} # Issues by user user_counts = df['user_login'].value_counts().head(10).to_dict() if 'user_login' in df else {} # Pull requests vs regular issues is_pr_counts = df['pull_request'].value_counts().to_dict() if 'pull_request' in df else {} # Issues by labels (flattening the labels list) labels = [] if 'labels' in df: for label_list in df['labels']: if label_list: labels.extend(label_list) label_counts = Counter(labels) top_labels = dict(label_counts.most_common(10)) # Time analysis if 'created_at' in df: df['created_date'] = pd.to_datetime(df['created_at']) df['month_year'] = df['created_date'].dt.strftime('%Y-%m') issues_by_month = df.groupby('month_year').size().to_dict() else: issues_by_month = {} # Calculate resolution time for closed issues resolution_times = [] if 'created_at' in df and 'closed_at' in df: for _, issue in df.iterrows(): if pd.notna(issue.get('closed_at')) and pd.notna(issue.get('created_at')): created = pd.to_datetime(issue['created_at']) closed = pd.to_datetime(issue['closed_at']) resolution_time = (closed - created).total_seconds() / 3600 # hours resolution_times.append(resolution_time) resolution_stats = {} if resolution_times: resolution_stats = { "mean_hours": sum(resolution_times) / len(resolution_times), "median_hours": sorted(resolution_times)[len(resolution_times) // 2], "min_hours": min(resolution_times), "max_hours": max(resolution_times), } return { "by_state": state_counts, "by_user": user_counts, "pr_vs_issue": is_pr_counts, "by_label": top_labels, "by_month": issues_by_month, "resolution_time": resolution_stats, } def generate_insights(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: """Generate higher-level insights from the collected repository data.""" insights = {} # Repository activity and health if "repo_details" in repo_data: repo_details = repo_data["repo_details"] insights["repository_age_days"] = self._calculate_age_days(repo_details.get("created_at")) insights["freshness_days"] = self._calculate_freshness_days(repo_details.get("pushed_at")) # Popularity metrics insights["popularity"] = { "stars": repo_details.get("stargazers_count", 0), "forks": repo_details.get("forks_count", 0), "watchers": repo_details.get("watchers_count", 0), "star_fork_ratio": self._calculate_ratio( repo_details.get("stargazers_count", 0), repo_details.get("forks_count", 0) ), } # Language distribution if "languages" in repo_data: languages = repo_data["languages"] total_bytes = sum(languages.values()) if languages else 0 if total_bytes > 0: language_percentages = { lang: (bytes_count / total_bytes) * 100 for lang, bytes_count in languages.items() } insights["language_distribution"] = { "primary_language": max(languages.items(), key=lambda x: x[1])[0] if languages else None, "language_count": len(languages), "percentages": language_percentages, } # Contributor insights if "contributors" in repo_data: contributors = repo_data["contributors"] if contributors: total_contributions = sum(c.get("contributions", 0) for c in contributors) insights["contributor_insights"] = { "contributor_count": len(contributors), "total_contributions": total_contributions, "avg_contributions_per_contributor": total_contributions / len(contributors) if len(contributors) > 0 else 0, "contribution_distribution": self._analyze_contribution_distribution(contributors), } # Issue and PR dynamics if "issues" in repo_data: issues = repo_data["issues"] insights["issue_insights"] = self.analyze_issue_distribution(issues) if "pull_requests" in repo_data: prs = repo_data["pull_requests"] insights["pr_insights"] = self.analyze_issue_distribution(prs) # Reuse the same analysis # Additional PR-specific metrics if prs: insights["pr_code_change_stats"] = self._analyze_pr_code_changes(prs) # Commit patterns if "commits" in repo_data: commits = repo_data["commits"] insights["commit_insights"] = self._analyze_commit_patterns(commits) # Check for CI/CD presence insights["ci_cd_presence"] = self._detect_ci_cd(repo_data) # Documentation quality if "readme" in repo_data: readme = repo_data["readme"] insights["documentation_quality"] = self._assess_documentation_quality(readme) # Project Activity Level insights["activity_level"] = self._calculate_activity_level(repo_data) # Code complexity analysis insights["code_complexity"] = self._analyze_code_complexity(repo_data) # Community health analysis insights["community_health"] = self._analyze_community_health(repo_data) return insights def _calculate_age_days(self, created_at_iso: str) -> float: """Calculate repository age in days.""" if not created_at_iso: return 0 try: created_at = datetime.datetime.fromisoformat(created_at_iso.replace('Z', '+00:00')) now = datetime.datetime.now(datetime.timezone.utc) return (now - created_at).total_seconds() / (24 * 3600) except ValueError: return 0 def _calculate_freshness_days(self, pushed_at_iso: str) -> float: """Calculate days since last push.""" if not pushed_at_iso: return float('inf') try: pushed_at = datetime.datetime.fromisoformat(pushed_at_iso.replace('Z', '+00:00')) now = datetime.datetime.now(datetime.timezone.utc) return (now - pushed_at).total_seconds() / (24 * 3600) except ValueError: return float('inf') def _calculate_ratio(self, numerator: int, denominator: int) -> float: """Calculate ratio with handling for zero denominator.""" return numerator / denominator if denominator and denominator > 0 else float('inf') def _analyze_contribution_distribution(self, contributors: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze the distribution of contributions among contributors.""" if not contributors: return {} # Sort contributors by number of contributions sorted_contributors = sorted(contributors, key=lambda c: c.get("contributions", 0), reverse=True) # Calculate percentiles total_contributions = sum(c.get("contributions", 0) for c in contributors) cumulative_contributions = 0 percentile_20 = 0 percentile_50 = 0 percentile_80 = 0 for i, contributor in enumerate(sorted_contributors): contributions = contributor.get("contributions", 0) cumulative_contributions += contributions percentage = (cumulative_contributions / total_contributions) * 100 if percentage >= 20 and percentile_20 == 0: percentile_20 = i + 1 if percentage >= 50 and percentile_50 == 0: percentile_50 = i + 1 if percentage >= 80 and percentile_80 == 0: percentile_80 = i + 1 # Calculate Gini coefficient to measure inequality gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) return { "contributors_for_20_percent": percentile_20, "contributors_for_50_percent": percentile_50, "contributors_for_80_percent": percentile_80, "gini_coefficient": gini, "top_contributor_percentage": (sorted_contributors[0].get("contributions", 0) / total_contributions) * 100 if sorted_contributors else 0, } def _calculate_gini(self, values: List[int]) -> float: """Calculate the Gini coefficient of a distribution.""" if not values or sum(values) == 0: return 0 values = sorted(values) n = len(values) cumsum = 0 for i, value in enumerate(values): cumsum += value values[i] = cumsum return (2 * sum(values) / (n * sum(values[-1]))) - (n + 1) / n def _analyze_pr_code_changes(self, prs: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze code changes across pull requests.""" if not prs: return {} # Extract metrics additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] # Calculate stats stats = {} if additions: stats["additions"] = { "mean": sum(additions) / len(additions), "median": sorted(additions)[len(additions) // 2], "max": max(additions), "total": sum(additions), } if deletions: stats["deletions"] = { "mean": sum(deletions) / len(deletions), "median": sorted(deletions)[len(deletions) // 2], "max": max(deletions), "total": sum(deletions), } if changed_files: stats["changed_files"] = { "mean": sum(changed_files) / len(changed_files), "median": sorted(changed_files)[len(changed_files) // 2], "max": max(changed_files), "total": sum(changed_files), } return stats def _analyze_commit_patterns(self, commits: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze patterns in commit data.""" if not commits: return {} # Count by author commit_counts = Counter( commit.get("author_login", "Unknown") for commit in commits if commit.get("author_login") ) # Analyze message patterns message_lengths = [ len(commit.get("commit_message", "")) for commit in commits if commit.get("commit_message") ] # Extract dates for time-based analysis dates = [] for commit in commits: date_str = commit.get("date") if date_str: try: date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) dates.append(date) except ValueError: pass # Analyze times of day hours = [date.hour for date in dates] hour_counts = Counter(hours) # Analyze days of week weekdays = [date.weekday() for date in dates] weekday_counts = Counter(weekdays) weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] weekday_data = {weekday_names[day]: count for day, count in weekday_counts.items()} # Analyze frequency of commits over time commit_frequency = {} if dates: dates_sorted = sorted(dates) first_date = dates_sorted[0] last_date = dates_sorted[-1] # Calculate commit frequency by month current_date = first_date.replace(day=1) while current_date <= last_date: next_month = current_date.replace(day=28) + datetime.timedelta(days=4) next_month = next_month.replace(day=1) month_key = current_date.strftime('%Y-%m') commit_frequency[month_key] = sum( 1 for date in dates if date.year == current_date.year and date.month == current_date.month ) current_date = next_month return { "top_contributors": dict(commit_counts.most_common(5)), "message_length": { "mean": sum(message_lengths) / len(message_lengths) if message_lengths else 0, "max": max(message_lengths) if message_lengths else 0, "min": min(message_lengths) if message_lengths else 0, }, "commit_time_patterns": { "by_hour": dict(sorted(hour_counts.items())), "by_weekday": weekday_data, }, "commit_frequency": commit_frequency, } def _detect_ci_cd(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: """Detect CI/CD presence and configuration in the repository.""" ci_cd_indicators = { "github_actions": False, "travis": False, "circle_ci": False, "jenkins": False, "gitlab_ci": False, "azure_pipelines": False, } # Check workflows if "workflows" in repo_data and repo_data["workflows"]: ci_cd_indicators["github_actions"] = True # Check for CI configuration files if "file_distribution" in repo_data: files = repo_data.get("file_distribution", {}) if ".travis.yml" in files: ci_cd_indicators["travis"] = True if ".circleci/config.yml" in files or "circle.yml" in files: ci_cd_indicators["circle_ci"] = True if "Jenkinsfile" in files: ci_cd_indicators["jenkins"] = True if ".gitlab-ci.yml" in files: ci_cd_indicators["gitlab_ci"] = True if "azure-pipelines.yml" in files: ci_cd_indicators["azure_pipelines"] = True return { "has_ci_cd": any(ci_cd_indicators.values()), "ci_cd_systems": ci_cd_indicators, } def _assess_documentation_quality(self, readme: str) -> Dict[str, Any]: """Assess the quality of documentation based on the README.""" if not readme: return { "has_readme": False, "readme_length": 0, "score": 0, "sections": {}, } # Analyze the README content lines = readme.strip().split('\n') word_count = len(readme.split()) sections = {} # Check for common README sections section_keywords = { "introduction": ["introduction", "overview", "about"], "installation": ["installation", "install", "setup", "getting started"], "usage": ["usage", "using", "example", "examples"], "api": ["api", "reference", "documentation"], "contributing": ["contributing", "contribute", "development"], "license": ["license", "licensing"], "code_of_conduct": ["code of conduct"], } for section, keywords in section_keywords.items(): sections[section] = any( any(keyword.lower() in line.lower() for keyword in keywords) for line in lines ) # Count images/diagrams (markdown format) image_count = readme.count("![") # Count code examples code_block_count = readme.count("```") # Calculate a simple score section_score = sum(1 for present in sections.values() if present) / len(sections) has_images = image_count > 0 has_code = code_block_count > 0 length_score = min(1.0, word_count / 1000) # Normalize to 0-1, with 1000+ words being "complete" score = (section_score * 0.5) + (has_images * 0.2) + (has_code * 0.2) + (length_score * 0.1) return { "has_readme": True, "readme_length": word_count, "score": score, "sections": sections, "has_images": has_images, "image_count": image_count, "has_code_examples": has_code, "code_block_count": code_block_count // 2, # Each block has opening and closing ``` } def _calculate_activity_level(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: """Calculate repository activity level based on commits, PRs, and issues.""" activity_score = 0 activity_details = {} # Get repository age in months if "repo_details" in repo_data: age_days = self._calculate_age_days(repo_data["repo_details"].get("created_at")) age_months = age_days / 30.5 # Approximate if age_months < 1: age_months = 1 # Avoid division by zero activity_details["age_months"] = age_months else: age_months = 1 # Check recent commits (last 3 months) recent_commits = 0 if "commits" in repo_data: commits = repo_data["commits"] three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) for commit in commits: if commit.get("date"): commit_date = datetime.datetime.fromisoformat(commit["date"].replace('Z', '+00:00')) if commit_date >= three_months_ago: recent_commits += 1 activity_details["recent_commits"] = recent_commits activity_score += min(10, recent_commits / 10) # Up to 10 points for recent commits # Check recent PRs and issues (last 3 months) recent_prs = 0 if "pull_requests" in repo_data: prs = repo_data["pull_requests"] three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) for pr in prs: if pr.get("created_at"): pr_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) if pr_date >= three_months_ago: recent_prs += 1 activity_details["recent_prs"] = recent_prs activity_score += min(5, recent_prs / 5) # Up to 5 points for recent PRs recent_issues = 0 if "issues" in repo_data: issues = [issue for issue in repo_data["issues"] if not issue.get("pull_request")] three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) for issue in issues: if issue.get("created_at"): issue_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) if issue_date >= three_months_ago: recent_issues += 1 activity_details["recent_issues"] = recent_issues activity_score += min(5, recent_issues / 5) # Up to 5 points for recent issues # Check release frequency if "releases" in repo_data: releases = repo_data["releases"] release_count = len(releases) # Calculate releases per month releases_per_month = release_count / max(1, age_months) activity_details["releases_per_month"] = releases_per_month activity_score += min(5, releases_per_month * 2.5) # Up to 5 points for regular releases # Determine activity level activity_level = "None" if activity_score >= 20: activity_level = "Very High" elif activity_score >= 15: activity_level = "High" elif activity_score >= 10: activity_level = "Medium" elif activity_score >= 5: activity_level = "Low" elif activity_score > 0: activity_level = "Very Low" return { "score": activity_score, "level": activity_level, "details": activity_details, } def _analyze_code_complexity(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: """Estimate code complexity based on available metrics.""" complexity = {} # Analyze file distribution if "file_distribution" in repo_data: file_types = repo_data["file_distribution"] total_files = sum(file_types.values()) code_files = sum( count for ext, count in file_types.items() if ext in self.config.all_code_extensions() ) complexity["file_counts"] = { "total_files": total_files, "code_files": code_files, } # Analyze PR complexity if "pull_requests" in repo_data: prs = repo_data["pull_requests"] # Get average changes per PR additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] if additions and deletions and changed_files: avg_additions = sum(additions) / len(additions) avg_deletions = sum(deletions) / len(deletions) avg_changed_files = sum(changed_files) / len(changed_files) complexity["pr_complexity"] = { "avg_additions": avg_additions, "avg_deletions": avg_deletions, "avg_changed_files": avg_changed_files, } # Estimate complexity score pr_complexity_score = min(10, (avg_additions + avg_deletions) / 100) complexity["pr_complexity_score"] = pr_complexity_score # Check dependency complexity dependency_complexity_score = 0 if "commit_insights" in repo_data.get("insights", {}): commit_messages = [ commit.get("commit_message", "").lower() for commit in repo_data.get("commits", []) ] # Check for dependency-related keywords dependency_keywords = ["dependency", "dependencies", "upgrade", "update", "version", "package"] dependency_commits = sum( 1 for message in commit_messages if any(keyword in message for keyword in dependency_keywords) ) dependency_ratio = dependency_commits / len(commit_messages) if commit_messages else 0 dependency_complexity_score = min(5, dependency_ratio * 20) # Up to 5 points complexity["dependency_complexity"] = { "dependency_commits": dependency_commits, "dependency_ratio": dependency_ratio, "score": dependency_complexity_score, } # Overall complexity score overall_score = 0 contributors = len(repo_data.get("contributors", [])) if contributors > 0: contributor_score = min(5, contributors / 10) # Up to 5 points overall_score += contributor_score if "pr_complexity_score" in complexity: overall_score += complexity["pr_complexity_score"] overall_score += dependency_complexity_score # Code size complexity if "languages" in repo_data: languages = repo_data["languages"] total_bytes = sum(languages.values()) if languages else 0 # Size points based on code size in MB size_mb = total_bytes / (1024 * 1024) size_score = min(10, size_mb / 5) # Up to 10 points for large codebases overall_score += size_score complexity["code_size"] = { "total_bytes": total_bytes, "size_mb": size_mb, "score": size_score, } # Determine complexity level complexity_level = "Low" if overall_score >= 25: complexity_level = "Very High" elif overall_score >= 20: complexity_level = "High" elif overall_score >= 15: complexity_level = "Medium-High" elif overall_score >= 10: complexity_level = "Medium" elif overall_score >= 5: complexity_level = "Low-Medium" complexity["overall"] = { "score": overall_score, "level": complexity_level, } return complexity def _analyze_community_health(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze the community health of the repository.""" health = {} # Calculate issue responsiveness if "issues" in repo_data: issues = repo_data["issues"] closed_issues = [issue for issue in issues if issue.get("state") == "closed"] if issues: closure_rate = len(closed_issues) / len(issues) health["issue_closure_rate"] = closure_rate # Calculate average time to close resolution_times = [] for issue in closed_issues: if issue.get("created_at") and issue.get("closed_at"): created = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) closed = datetime.datetime.fromisoformat(issue["closed_at"].replace('Z', '+00:00')) resolution_time = (closed - created).total_seconds() / 3600 # hours resolution_times.append(resolution_time) if resolution_times: avg_resolution_time = sum(resolution_times) / len(resolution_times) health["avg_issue_resolution_time_hours"] = avg_resolution_time # Calculate PR review responsiveness if "pull_requests" in repo_data: prs = repo_data["pull_requests"] merged_prs = [pr for pr in prs if pr.get("merged")] if prs: merge_rate = len(merged_prs) / len(prs) health["pr_merge_rate"] = merge_rate # Calculate average time to merge merge_times = [] for pr in merged_prs: if pr.get("created_at") and pr.get("merged_at"): created = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) merged = datetime.datetime.fromisoformat(pr["merged_at"].replace('Z', '+00:00')) merge_time = (merged - created).total_seconds() / 3600 # hours merge_times.append(merge_time) if merge_times: avg_merge_time = sum(merge_times) / len(merge_times) health["avg_pr_merge_time_hours"] = avg_merge_time # Check for community guidelines community_files = [ "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", "SECURITY.md", "SUPPORT.md", "GOVERNANCE.md", ] community_file_presence = {} if "file_distribution" in repo_data: file_paths = [] for item in repo_data.get("file_distribution", {}): file_paths.append(item) for community_file in community_files: present = any(community_file.lower() in path.lower() for path in file_paths) community_file_presence[community_file] = present health["community_guidelines"] = community_file_presence # Calculate contributor diversity if "contributors" in repo_data: contributors = repo_data["contributors"] if contributors: # Calculate Gini coefficient for contribution distribution gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) health["contributor_gini"] = gini # Interpret Gini coefficient if gini < 0.4: diversity_level = "High" elif gini < 0.6: diversity_level = "Medium" else: diversity_level = "Low" health["contributor_diversity"] = diversity_level # Calculate overall health score health_score = 0 # Points for issue responsiveness if "issue_closure_rate" in health: health_score += health["issue_closure_rate"] * 10 # Up to 10 points # Points for PR responsiveness if "pr_merge_rate" in health: health_score += health["pr_merge_rate"] * 10 # Up to 10 points # Points for community guidelines guideline_count = sum(1 for present in community_file_presence.values() if present) health_score += guideline_count * 2 # Up to 10 points # Points for contributor diversity if "contributor_gini" in health: diversity_score = 10 * (1 - health["contributor_gini"]) # Up to 10 points health_score += diversity_score # Determine health level health_level = "Poor" if health_score >= 30: health_level = "Excellent" elif health_score >= 25: health_level = "Very Good" elif health_score >= 20: health_level = "Good" elif health_score >= 15: health_level = "Fair" elif health_score >= 10: health_level = "Needs Improvement" health["overall"] = { "score": health_score, "level": health_level, } return health def generate_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: """ Generate visualizations of repository data. Returns: Dict of visualization figures """ if not self.config.generate_visualizations: return {} figures = {} # Create visualizations lang_fig = self._visualize_language_distribution(repo_data) if lang_fig: figures["language_distribution"] = lang_fig commit_figs = self._visualize_commit_activity(repo_data, insights) figures.update(commit_figs) contrib_figs = self._visualize_contributor_activity(repo_data, insights) figures.update(contrib_figs) issue_figs = self._visualize_issues_and_prs(repo_data, insights) figures.update(issue_figs) # Add interactive visualizations with Plotly plotly_figs = self._generate_plotly_visualizations(repo_data, insights) figures.update(plotly_figs) # Generate collaboration network collab_fig = self._visualize_collaboration_network(repo_data, insights) if collab_fig: figures["collaboration_network"] = collab_fig return figures def _visualize_language_distribution(self, repo_data: Dict[str, Any]) -> Optional[plt.Figure]: """Create a visualization of language distribution.""" languages = repo_data.get("languages", {}) if not languages: return None # Create a pie chart of language distribution fig, ax = plt.subplots(figsize=(10, 6)) total = sum(languages.values()) # Filter out small languages for better visualization threshold = total * 0.01 # 1% threshold other_sum = sum(size for lang, size in languages.items() if size < threshold) filtered_languages = {lang: size for lang, size in languages.items() if size >= threshold} if other_sum > 0: filtered_languages["Other"] = other_sum sizes = list(filtered_languages.values()) labels = list(filtered_languages.keys()) wedges, texts, autotexts = ax.pie( sizes, labels=labels, autopct='%1.1f%%', startangle=90, shadow=False, textprops={'fontsize': 9}, # Smaller font for better fit wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge ) # Make the percentage labels more readable for autotext in autotexts: autotext.set_color('white') autotext.set_fontweight('bold') ax.axis('equal') plt.title(f"Language Distribution", fontsize=16) plt.tight_layout() return fig def _visualize_commit_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: """Create visualizations of commit activity.""" figures = {} commit_activity = repo_data.get("commit_activity", {}) weekly_commits = commit_activity.get("weekly_commits", []) if weekly_commits: # Extract weeks and commit counts weeks = [item["week"] for item in weekly_commits] commits = [item["total"] for item in weekly_commits] # Create a time series plot fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(weeks, commits, marker='o', linestyle='-', color='blue', alpha=0.7) # Add trend line z = np.polyfit(range(len(weeks)), commits, 1) p = np.poly1d(z) ax.plot(weeks, p(range(len(weeks))), "r--", alpha=0.7) ax.set_title("Weekly Commit Activity", fontsize=16) ax.set_xlabel("Week") ax.set_ylabel("Number of Commits") plt.xticks(rotation=45) ax.grid(True, linestyle='--', alpha=0.7) # Show only some x-axis labels to avoid crowding if len(weeks) > 20: every_nth = len(weeks) // 10 for n, label in enumerate(ax.xaxis.get_ticklabels()): if n % every_nth != 0: label.set_visible(False) plt.tight_layout() figures["weekly_commits"] = fig # Visualize code frequency if available code_frequency = commit_activity.get("code_frequency", []) if code_frequency: weeks = [item["week"] for item in code_frequency] additions = [item["additions"] for item in code_frequency] deletions = [item["deletions"] for item in code_frequency] fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(weeks, additions, marker='o', linestyle='-', color='green', label='Additions') ax.plot(weeks, deletions, marker='o', linestyle='-', color='red', label='Deletions') ax.set_title("Code Frequency", fontsize=16) ax.set_xlabel("Week") ax.set_ylabel("Lines Changed") plt.xticks(rotation=45) ax.legend() ax.grid(True, linestyle='--', alpha=0.7) # Show only some x-axis labels to avoid crowding if len(weeks) > 20: every_nth = len(weeks) // 10 for n, label in enumerate(ax.xaxis.get_ticklabels()): if n % every_nth != 0: label.set_visible(False) plt.tight_layout() figures["code_frequency"] = fig # Commits by weekday if "commit_insights" in insights: commit_insights = insights["commit_insights"] by_weekday = commit_insights.get("commit_time_patterns", {}).get("by_weekday", {}) if by_weekday: fig, ax = plt.subplots(figsize=(10, 6)) weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] counts = [by_weekday.get(day, 0) for day in weekdays] # Create gradient colors based on commit counts colors = plt.cm.Blues(np.array(counts) / max(counts)) ax.bar(weekdays, counts, color=colors) ax.set_title("Commits by Day of Week", fontsize=16) ax.set_xlabel("Day of Week") ax.set_ylabel("Number of Commits") ax.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() figures["commits_by_weekday"] = fig # Commits by hour by_hour = commit_insights.get("commit_time_patterns", {}).get("by_hour", {}) if by_hour: fig, ax = plt.subplots(figsize=(12, 6)) hours = sorted(by_hour.keys()) counts = [by_hour[hour] for hour in hours] # Create gradient colors based on commit counts colors = plt.cm.Greens(np.array(counts) / max(counts)) ax.bar(hours, counts, color=colors) ax.set_title("Commits by Hour of Day (UTC)", fontsize=16) ax.set_xlabel("Hour") ax.set_ylabel("Number of Commits") ax.set_xticks(range(0, 24, 2)) ax.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() figures["commits_by_hour"] = fig return figures def _visualize_contributor_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: """Create visualizations of contributor activity.""" figures = {} contributors = repo_data.get("contributors", []) if contributors: # Create a bar chart of top contributors contributors_sorted = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) top_n = min(10, len(contributors_sorted)) fig, ax = plt.subplots(figsize=(12, 6)) names = [c.get("login", "Unknown") for c in contributors_sorted[:top_n]] contributions = [c.get("contributions", 0) for c in contributors_sorted[:top_n]] # Create gradient colors based on contribution counts colors = plt.cm.viridis(np.array(contributions) / max(contributions)) bars = ax.bar(names, contributions, color=colors) ax.set_title("Top Contributors by Commit Count", fontsize=16) ax.set_xlabel("Contributor") ax.set_ylabel("Number of Commits") plt.xticks(rotation=45, ha='right') ax.grid(True, axis='y', linestyle='--', alpha=0.7) # Add value labels on top of bars for bar in bars: height = bar.get_height() ax.annotate(f'{height}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom') plt.tight_layout() figures["top_contributors"] = fig # Visualize contribution distribution if insights available if "contributor_insights" in insights: contributor_insights = insights["contributor_insights"] distribution = contributor_insights.get("contribution_distribution", {}) if distribution: # Create a pie chart showing contributor concentration fig, ax = plt.subplots(figsize=(10, 6)) percentiles = [ distribution.get("contributors_for_20_percent", 0), distribution.get("contributors_for_50_percent", 0) - distribution.get("contributors_for_20_percent", 0), distribution.get("contributors_for_80_percent", 0) - distribution.get("contributors_for_50_percent", 0), len(contributors) - distribution.get("contributors_for_80_percent", 0) ] labels = [ f"Top {percentiles[0]} contributors (0-20%)", f"Next {percentiles[1]} contributors (20-50%)", f"Next {percentiles[2]} contributors (50-80%)", f"Remaining {percentiles[3]} contributors (80-100%)" ] wedges, texts, autotexts = ax.pie( [20, 30, 30, 20], # Fixed percentages for visualization labels=labels, autopct='%1.1f%%', startangle=90, shadow=False, explode=(0.1, 0, 0, 0), # Emphasize the top contributors wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge ) # Make the percentage labels more readable for autotext in autotexts: autotext.set_color('white') autotext.set_fontweight('bold') ax.axis('equal') ax.set_title("Contribution Distribution", fontsize=16) plt.tight_layout() figures["contribution_distribution"] = fig return figures def _visualize_issues_and_prs(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: """Create visualizations of issues and pull requests.""" figures = {} # Visualize issue distribution if available if "issue_insights" in insights: issue_insights = insights["issue_insights"] # Issues by state by_state = issue_insights.get("by_state", {}) if by_state: fig, ax = plt.subplots(figsize=(8, 6)) states = list(by_state.keys()) counts = list(by_state.values()) colors = ['red' if state.lower() == 'open' else 'green' for state in states] ax.bar(states, counts, color=colors) ax.set_title("Issues by State", fontsize=16) ax.set_xlabel("State") ax.set_ylabel("Count") # Add count labels on top of bars for i, v in enumerate(counts): ax.text(i, v + 0.5, str(v), ha='center') ax.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() figures["issues_by_state"] = fig # Issues by month by_month = issue_insights.get("by_month", {}) if by_month: fig, ax = plt.subplots(figsize=(12, 6)) months = sorted(by_month.keys()) counts = [by_month[month] for month in months] ax.plot(months, counts, marker='o', linestyle='-', color='blue') # Add trend line z = np.polyfit(range(len(months)), counts, 1) p = np.poly1d(z) ax.plot(months, p(range(len(months))), "r--", alpha=0.7) ax.set_title("Issues Created by Month", fontsize=16) ax.set_xlabel("Month") ax.set_ylabel("Number of Issues") plt.xticks(rotation=45) ax.grid(True, linestyle='--', alpha=0.7) # Show only some x-axis labels to avoid crowding if len(months) > 12: every_nth = max(1, len(months) // 12) for n, label in enumerate(ax.xaxis.get_ticklabels()): if n % every_nth != 0: label.set_visible(False) plt.tight_layout() figures["issues_by_month"] = fig # Issues by label by_label = issue_insights.get("by_label", {}) if by_label and len(by_label) > 1: fig, ax = plt.subplots(figsize=(12, 6)) labels = list(by_label.keys()) counts = list(by_label.values()) # Sort by count sorted_indices = np.argsort(counts)[::-1] labels = [labels[i] for i in sorted_indices] counts = [counts[i] for i in sorted_indices] # Limit to top 10 if len(labels) > 10: labels = labels[:10] counts = counts[:10] # Create gradient colors colors = plt.cm.tab10(np.linspace(0, 1, len(labels))) bars = ax.barh(labels, counts, color=colors) ax.set_title("Top Issue Labels", fontsize=16) ax.set_xlabel("Count") ax.set_ylabel("Label") # Add count labels for bar in bars: width = bar.get_width() ax.annotate(f'{int(width)}', xy=(width, bar.get_y() + bar.get_height() / 2), xytext=(3, 0), # 3 points horizontal offset textcoords="offset points", ha='left', va='center') ax.grid(True, axis='x', linestyle='--', alpha=0.7) plt.tight_layout() figures["issues_by_label"] = fig # Visualize PR insights if available if "pr_insights" in insights and "pr_code_change_stats" in insights: pr_code_stats = insights["pr_code_change_stats"] # Additions and deletions by PR if "additions" in pr_code_stats and "deletions" in pr_code_stats: fig, ax = plt.subplots(figsize=(10, 6)) categories = ["Mean", "Median", "Max"] additions = [ pr_code_stats["additions"].get("mean", 0), pr_code_stats["additions"].get("median", 0), pr_code_stats["additions"].get("max", 0) / 10 # Scale down for visibility ] deletions = [ pr_code_stats["deletions"].get("mean", 0), pr_code_stats["deletions"].get("median", 0), pr_code_stats["deletions"].get("max", 0) / 10 # Scale down for visibility ] x = range(len(categories)) width = 0.35 addition_bars = ax.bar([i - width/2 for i in x], additions, width, label='Additions', color='green') deletion_bars = ax.bar([i + width/2 for i in x], deletions, width, label='Deletions', color='red') ax.set_xlabel('Metric') ax.set_ylabel('Lines of Code') ax.set_title('PR Code Change Statistics') plt.xticks(x, categories) ax.legend() # Add value labels for bars in [addition_bars, deletion_bars]: for bar in bars: height = bar.get_height() ax.annotate(f'{int(height)}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom') if "max" in pr_code_stats["additions"]: plt.annotate(f"Max: {int(pr_code_stats['additions']['max'])}", (2 - width/2, additions[2] + 5), textcoords="offset points", xytext=(0,10), ha='center') if "max" in pr_code_stats["deletions"]: plt.annotate(f"Max: {int(pr_code_stats['deletions']['max'])}", (2 + width/2, deletions[2] + 5), textcoords="offset points", xytext=(0,10), ha='center') plt.tight_layout() figures["pr_code_changes"] = fig return figures def _generate_plotly_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, Any]: """Generate interactive Plotly visualizations.""" plotly_figures = {} # Activity heatmap (commits by day and hour) if "commits" in repo_data: commits = repo_data["commits"] dates = [] for commit in commits: date_str = commit.get("date") if date_str: try: date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) dates.append(date) except ValueError: pass if dates: # Group by day of week and hour day_hour_counts = defaultdict(int) for date in dates: day_hour_counts[(date.weekday(), date.hour)] += 1 # Create 2D array for heatmap days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] hours = list(range(24)) z = np.zeros((7, 24)) for (day, hour), count in day_hour_counts.items(): z[day][hour] = count # Create heatmap fig = go.Figure(data=go.Heatmap( z=z, x=hours, y=days, colorscale='Viridis', hoverongaps=False, hovertemplate='Day: %{y}
Hour: %{x}
Commits: %{z}' )) fig.update_layout( title='Commit Activity Heatmap', xaxis_title='Hour of Day (UTC)', yaxis_title='Day of Week', yaxis={'categoryorder': 'array', 'categoryarray': days}, width=900, height=500 ) plotly_figures["commit_heatmap"] = fig # Language breakdown treemap if "languages" in repo_data: languages = repo_data["languages"] if languages: # Create data for treemap labels = list(languages.keys()) values = list(languages.values()) fig = go.Figure(go.Treemap( labels=labels, values=values, parents=[""] * len(labels), marker_colorscale='RdBu', hovertemplate='Language: %{label}
Bytes: %{value}
Percentage: %{percentRoot:.2%}' )) fig.update_layout( title='Repository Language Breakdown', width=800, height=600 ) plotly_figures["language_treemap"] = fig # Issue/PR timeline issues = repo_data.get("issues", []) prs = repo_data.get("pull_requests", []) if issues or prs: # Create timeline data timeline_data = [] for issue in issues: if not issue.get("pull_request") and issue.get("created_at"): try: created_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) timeline_data.append({ "date": created_date, "type": "Issue", "id": issue.get("number", ""), "title": issue.get("title", ""), "state": issue.get("state", "") }) except ValueError: pass for pr in prs: if pr.get("created_at"): try: created_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) timeline_data.append({ "date": created_date, "type": "PR", "id": pr.get("number", ""), "title": pr.get("title", ""), "state": pr.get("state", "") }) except ValueError: pass if timeline_data: # Sort by date timeline_data.sort(key=lambda x: x["date"]) # Create DataFrame for easier plotting df = pd.DataFrame(timeline_data) # Calculate cumulative counts df["cumulative_issues"] = (df["type"] == "Issue").cumsum() df["cumulative_prs"] = (df["type"] == "PR").cumsum() # Create plot fig = go.Figure() fig.add_trace(go.Scatter( x=df["date"], y=df["cumulative_issues"], mode='lines', name='Issues', line=dict(color='red', width=2) )) fig.add_trace(go.Scatter( x=df["date"], y=df["cumulative_prs"], mode='lines', name='Pull Requests', line=dict(color='blue', width=2) )) fig.update_layout( title='Cumulative Issues and Pull Requests Over Time', xaxis_title='Date', yaxis_title='Count', legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01 ), width=900, height=500 ) plotly_figures["issue_pr_timeline"] = fig return plotly_figures def _visualize_collaboration_network(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Optional[plt.Figure]: """Create a visualization of the collaboration network.""" if "pull_requests" not in repo_data or "contributors" not in repo_data: return None prs = repo_data["pull_requests"] contributors = repo_data["contributors"] # Create a network of collaborations G = nx.Graph() # Add nodes (contributors) contributor_logins = [c.get("login") for c in contributors if c.get("login")] for login in contributor_logins: G.add_node(login) # Add edges (collaborations through PRs) collaborations = defaultdict(int) for pr in prs: author = pr.get("user_login") if not author or author not in contributor_logins: continue # Consider reviewers as collaborators reviewers = pr.get("requested_reviewers", []) for reviewer in reviewers: if reviewer in contributor_logins and reviewer != author: pair = tuple(sorted([author, reviewer])) collaborations[pair] += 1 for (author, reviewer), weight in collaborations.items(): G.add_edge(author, reviewer, weight=weight) if not G.edges(): return None # Draw the collaboration network fig, ax = plt.subplots(figsize=(12, 10)) # Calculate node sizes based on contributions contributor_dict = {c.get("login"): c.get("contributions", 1) for c in contributors if c.get("login")} node_sizes = [contributor_dict.get(node, 1) * 30 for node in G.nodes()] # Calculate edge widths based on collaboration count edge_widths = [G[u][v]['weight'] * 0.5 for u, v in G.edges()] # Calculate node colors based on contributor roles # (assign different colors to different types of contributors) color_map = [] for node in G.nodes(): degree = G.degree(node) if degree > 5: color_map.append('red') # Central collaborators elif degree > 2: color_map.append('blue') # Active collaborators else: color_map.append('green') # Peripheral contributors # Position nodes using a force-directed layout pos = nx.spring_layout(G, seed=42) # Draw the network nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=color_map, alpha=0.8) nx.draw_networkx_edges(G, pos, width=edge_widths, alpha=0.5, edge_color='gray') nx.draw_networkx_labels(G, pos, font_size=8, font_family='sans-serif') ax.set_title("Collaboration Network", fontsize=16) ax.axis('off') plt.tight_layout() return fig def analyze_repo(self, owner: str, repo_name: str) -> Dict[str, Any]: """ Main method to analyze a repository. Args: owner: GitHub username or organization repo_name: Name of the repository Returns: Dict containing all repository data and insights """ start_time = time.time() logger.info(f"Starting analysis of {owner}/{repo_name}") repo_path = f"{owner}/{repo_name}" repo = self.client.get_repo(repo_path) repo_data = {} # Collect basic repository metadata repo_data["repo_details"] = self.get_repo_details(repo) # Define data collection tasks tasks = [ ("contributors", lambda: self.get_contributors(repo)), ("languages", lambda: self.get_languages(repo)), ("issues", lambda: self.get_issues(repo, "all")), ("pull_requests", lambda: self.get_pull_requests(repo, "all")), ("commits", lambda: self.get_commits(repo)), ("readme", lambda: self.get_readme(repo)), ("branches", lambda: self.get_branches(repo)), ("releases", lambda: self.get_releases(repo)), ("workflows", lambda: self.get_workflows(repo)), ("file_distribution", lambda: self.get_file_distribution(repo)), ("collaborators", lambda: self.get_collaborators(repo)), ("commit_activity", lambda: self.analyze_commit_activity(repo)), ("contributor_activity", lambda: self.analyze_contributor_activity(repo)), ] # Search for security and quality indicators important_terms = [ "security", "vulnerability", "auth", "password", "token", "test", "spec", "fixture", "mock", "stub", "TODO", "FIXME", "HACK", "XXX" ] tasks.append(("code_search", lambda: self.search_code(repo, important_terms))) # Collect data with progress bar with tqdm(total=len(tasks), desc="Collecting repository data") as pbar: for key, task_func in tasks: try: result = task_func() repo_data[key] = result except Exception as e: logger.error(f"Error collecting {key}: {e}") finally: pbar.update(1) # Generate insights from collected data repo_data["insights"] = self.generate_insights(repo_data) # Generate visualizations if self.config.generate_visualizations: repo_data["visualizations"] = self.generate_visualizations(repo_data, repo_data["insights"]) end_time = time.time() logger.info(f"Analysis completed in {end_time - start_time:.2f} seconds") return repo_data class PDFReportGenerator: """ Class for generating comprehensive PDF reports from repository analysis data. """ def __init__(self, repo_data: Dict[str, Any], output_path: str = None): """Initialize the PDF report generator with repository data.""" self.repo_data = repo_data self.output_path = output_path or tempfile.mktemp(suffix='.pdf') self.styles = getSampleStyleSheet() # Create custom styles self.styles.add(ParagraphStyle( name='SectionTitle', parent=self.styles['Heading2'], fontSize=14, spaceAfter=10 )) self.styles.add(ParagraphStyle( name='SubsectionTitle', parent=self.styles['Heading3'], fontSize=12, spaceAfter=6 )) self.styles.add(ParagraphStyle( name='MetricsTable', parent=self.styles['Normal'], fontSize=10, alignment=TA_LEFT )) self.styles.add(ParagraphStyle( name='Small', parent=self.styles['Normal'], fontSize=8 )) self.styles.add(ParagraphStyle( name='ReportTitle', parent=self.styles['Title'], fontSize=24, alignment=TA_CENTER, spaceAfter=20 )) def generate_report(self) -> str: """ Generate a PDF report of repository analysis. Returns: str: Path to the generated PDF file """ doc = SimpleDocTemplate( self.output_path, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=72 ) elements = [] # Add report title repo_name = self.repo_data.get("repo_details", {}).get("full_name", "Repository") elements.append(Paragraph(f"GitHub Repository Analysis: {repo_name}", self.styles['ReportTitle'])) # Add report generation date report_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") elements.append(Paragraph(f"Report generated on: {report_date}", self.styles['Normal'])) elements.append(Spacer(1, 20)) # Add repository overview section elements.extend(self._create_repo_overview()) elements.append(PageBreak()) # Add activity analysis section elements.extend(self._create_activity_analysis()) elements.append(PageBreak()) # Add code analysis section elements.extend(self._create_code_analysis()) elements.append(PageBreak()) # Add community analysis section elements.extend(self._create_community_analysis()) # Add visualizations if available if self.repo_data.get("visualizations"): elements.append(PageBreak()) elements.extend(self._create_visualization_pages()) # Add summary and recommendations elements.append(PageBreak()) elements.extend(self._create_summary_and_recommendations()) # Build the PDF doc.build(elements) return self.output_path def _create_repo_overview(self) -> List[Any]: """Create repository overview section of the report.""" elements = [] # Section title elements.append(Paragraph("Repository Overview", self.styles['Heading1'])) elements.append(Spacer(1, 10)) # Basic repository information repo_details = self.repo_data.get("repo_details", {}) # Create a table for repository details data = [ ["Name", repo_details.get("name", "N/A")], ["Full Name", repo_details.get("full_name", "N/A")], ["Description", repo_details.get("description", "No description")], ["URL", repo_details.get("html_url", "N/A")], ["Primary Language", repo_details.get("language", "Not specified")], ["Created On", repo_details.get("created_at", "N/A")], ["Last Updated", repo_details.get("updated_at", "N/A")], ["Stars", str(repo_details.get("stargazers_count", 0))], ["Forks", str(repo_details.get("forks_count", 0))], ["Watchers", str(repo_details.get("watchers_count", 0))], ["Open Issues", str(repo_details.get("open_issues_count", 0))], ["License", repo_details.get("license", "Not specified")], ["Fork", "Yes" if repo_details.get("fork", False) else "No"], ["Archived", "Yes" if repo_details.get("archived", False) else "No"], ["Visibility", repo_details.get("visibility", "N/A").capitalize()], ] table = Table(data, colWidths=[100, 350]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), ('TEXTCOLOR', (0, 0), (0, -1), colors.black), ('ALIGN', (0, 0), (0, -1), 'RIGHT'), ('ALIGN', (1, 0), (1, -1), 'LEFT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 10), ('BOTTOMPADDING', (0, 0), (-1, -1), 6), ('TOPPADDING', (0, 0), (-1, -1), 6), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 20)) # Key metrics and insights elements.append(Paragraph("Key Metrics & Insights", self.styles['SectionTitle'])) insights = self.repo_data.get("insights", {}) # Repository age age_days = insights.get("repository_age_days", 0) age_years = age_days / 365.25 freshness_days = insights.get("freshness_days", 0) age_text = f"Repository Age: {age_years:.1f} years ({int(age_days)} days)" freshness_text = f"Last Activity: {int(freshness_days)} days ago" elements.append(Paragraph(age_text, self.styles['Normal'])) elements.append(Paragraph(freshness_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Activity level activity_level = insights.get("activity_level", {}) if activity_level: activity_text = f"Activity Level: {activity_level.get('level', 'Unknown')} (Score: {activity_level.get('score', 0):.1f}/25)" elements.append(Paragraph(activity_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Code complexity code_complexity = insights.get("code_complexity", {}).get("overall", {}) if code_complexity: complexity_text = f"Code Complexity: {code_complexity.get('level', 'Unknown')} (Score: {code_complexity.get('score', 0):.1f}/30)" elements.append(Paragraph(complexity_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Documentation quality doc_quality = insights.get("documentation_quality", {}) if doc_quality: quality_score = doc_quality.get("score", 0) quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" doc_text = f"Documentation Quality: {quality_level} (Score: {quality_score:.2f})" elements.append(Paragraph(doc_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Community health community_health = insights.get("community_health", {}).get("overall", {}) if community_health: health_text = f"Community Health: {community_health.get('level', 'Unknown')} (Score: {community_health.get('score', 0):.1f}/40)" elements.append(Paragraph(health_text, self.styles['Normal'])) return elements def _create_activity_analysis(self) -> List[Any]: """Create activity analysis section of the report.""" elements = [] # Section title elements.append(Paragraph("Activity Analysis", self.styles['Heading1'])) elements.append(Spacer(1, 10)) insights = self.repo_data.get("insights", {}) # Commit activity elements.append(Paragraph("Commit Activity", self.styles['SectionTitle'])) commit_insights = insights.get("commit_insights", {}) if commit_insights: # Top contributors top_contributors = commit_insights.get("top_contributors", {}) if top_contributors: elements.append(Paragraph("Top Contributors by Commits:", self.styles['SubsectionTitle'])) data = [["Contributor", "Commits"]] for contributor, commits in top_contributors.items(): data.append([contributor, str(commits)]) table = Table(data, colWidths=[200, 100]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 10), ('BOTTOMPADDING', (0, 0), (-1, -1), 4), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # Commit time patterns time_patterns = commit_insights.get("commit_time_patterns", {}) if time_patterns: elements.append(Paragraph("Commit Timing Patterns:", self.styles['SubsectionTitle'])) weekday_data = time_patterns.get("by_weekday", {}) if weekday_data: day_text = "Most active day: " + max(weekday_data.items(), key=lambda x: x[1])[0] elements.append(Paragraph(day_text, self.styles['Normal'])) hour_data = time_patterns.get("by_hour", {}) if hour_data and hour_data: hour = max(hour_data.items(), key=lambda x: x[1])[0] hour_text = f"Most active hour: {hour}:00 UTC" elements.append(Paragraph(hour_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Pull Request activity elements.append(Paragraph("Pull Request Activity", self.styles['SectionTitle'])) pr_insights = insights.get("pr_insights", {}) pr_code_changes = insights.get("pr_code_change_stats", {}) if pr_insights or pr_code_changes: # PR state distribution state_counts = pr_insights.get("by_state", {}) if state_counts: elements.append(Paragraph("Pull Request States:", self.styles['SubsectionTitle'])) data = [["State", "Count"]] for state, count in state_counts.items(): data.append([state.capitalize(), str(count)]) table = Table(data, colWidths=[100, 100]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # PR code change statistics if pr_code_changes: elements.append(Paragraph("Pull Request Size Statistics:", self.styles['SubsectionTitle'])) # Table for code change stats data = [["Metric", "Additions", "Deletions", "Files Changed"]] metrics = ["mean", "median", "max", "total"] for metric in metrics: row = [metric.capitalize()] for stat_type in ["additions", "deletions", "changed_files"]: if stat_type in pr_code_changes and metric in pr_code_changes[stat_type]: value = pr_code_changes[stat_type][metric] row.append(f"{value:.1f}" if isinstance(value, float) else str(value)) else: row.append("N/A") data.append(row) table = Table(data, colWidths=[80, 80, 80, 80]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (-1, -1), 'RIGHT'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # Issue activity elements.append(Paragraph("Issue Activity", self.styles['SectionTitle'])) issue_insights = insights.get("issue_insights", {}) if issue_insights: # Issue state distribution state_counts = issue_insights.get("by_state", {}) if state_counts: elements.append(Paragraph("Issue States:", self.styles['SubsectionTitle'])) data = [["State", "Count"]] for state, count in state_counts.items(): data.append([state.capitalize(), str(count)]) table = Table(data, colWidths=[100, 100]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # Issue resolution time resolution_stats = issue_insights.get("resolution_time", {}) if resolution_stats: elements.append(Paragraph("Issue Resolution Time (hours):", self.styles['SubsectionTitle'])) mean_hours = resolution_stats.get("mean_hours", 0) median_hours = resolution_stats.get("median_hours", 0) if mean_hours > 24: mean_days = mean_hours / 24 mean_text = f"Mean: {mean_days:.1f} days" else: mean_text = f"Mean: {mean_hours:.1f} hours" if median_hours > 24: median_days = median_hours / 24 median_text = f"Median: {median_days:.1f} days" else: median_text = f"Median: {median_hours:.1f} hours" elements.append(Paragraph(mean_text, self.styles['Normal'])) elements.append(Paragraph(median_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Top issue labels top_labels = issue_insights.get("by_label", {}) if top_labels: elements.append(Paragraph("Top Issue Labels:", self.styles['SubsectionTitle'])) data = [["Label", "Count"]] for label, count in list(top_labels.items())[:5]: # Top 5 labels data.append([label, str(count)]) table = Table(data, colWidths=[150, 50]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) return elements def _create_code_analysis(self) -> List[Any]: """Create code analysis section of the report.""" elements = [] # Section title elements.append(Paragraph("Code Analysis", self.styles['Heading1'])) elements.append(Spacer(1, 10)) # Language distribution elements.append(Paragraph("Language Distribution", self.styles['SectionTitle'])) languages = self.repo_data.get("languages", {}) insights = self.repo_data.get("insights", {}) if languages: # Sort languages by byte count sorted_languages = sorted(languages.items(), key=lambda x: x[1], reverse=True) # Create language distribution table data = [["Language", "Bytes", "Percentage"]] total_bytes = sum(languages.values()) for language, bytes_count in sorted_languages[:10]: # Top 10 languages percentage = (bytes_count / total_bytes) * 100 data.append([ language, f"{bytes_count:,}", f"{percentage:.1f}%" ]) table = Table(data, colWidths=[120, 120, 80]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (2, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # File distribution elements.append(Paragraph("File Type Distribution", self.styles['SectionTitle'])) file_dist = self.repo_data.get("file_distribution", {}) if file_dist: # Group extensions by type file_types = { "Code": sum(file_dist.get(ext, 0) for ext in self.config.code_extensions), "Markup": sum(file_dist.get(ext, 0) for ext in self.config.markup_extensions), "Scripts": sum(file_dist.get(ext, 0) for ext in self.config.script_extensions), "Data": sum(file_dist.get(ext, 0) for ext in self.config.data_extensions), "Config": sum(file_dist.get(ext, 0) for ext in self.config.config_extensions), "Notebooks": sum(file_dist.get(ext, 0) for ext in self.config.notebook_extensions), "Other": sum(file_dist.get(ext, 0) for ext in self.config.other_extensions) } # Create file type distribution table data = [["File Type", "Count", "Percentage"]] total_files = sum(file_types.values()) for file_type, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True): if count > 0: percentage = (count / total_files) * 100 data.append([ file_type, str(count), f"{percentage:.1f}%" ]) table = Table(data, colWidths=[120, 80, 80]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (2, -1), 'RIGHT'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(table) elements.append(Spacer(1, 15)) # Code complexity analysis elements.append(Paragraph("Code Complexity Analysis", self.styles['SectionTitle'])) code_complexity = insights.get("code_complexity", {}) if code_complexity: complexity_overall = code_complexity.get("overall", {}) elements.append(Paragraph( f"Overall Complexity: {complexity_overall.get('level', 'Unknown')} (Score: {complexity_overall.get('score', 0):.1f}/30)", self.styles['Normal'] )) elements.append(Spacer(1, 10)) # Code size code_size = code_complexity.get("code_size", {}) if code_size: size_mb = code_size.get("size_mb", 0) elements.append(Paragraph(f"Code Size: {size_mb:.2f} MB", self.styles['Normal'])) elements.append(Spacer(1, 5)) # PR complexity pr_complexity = code_complexity.get("pr_complexity", {}) if pr_complexity: elements.append(Paragraph("Average Pull Request Size:", self.styles['SubsectionTitle'])) avg_additions = pr_complexity.get("avg_additions", 0) avg_deletions = pr_complexity.get("avg_deletions", 0) avg_files = pr_complexity.get("avg_changed_files", 0) elements.append(Paragraph(f"Lines Added: {avg_additions:.1f}", self.styles['Normal'])) elements.append(Paragraph(f"Lines Deleted: {avg_deletions:.1f}", self.styles['Normal'])) elements.append(Paragraph(f"Files Changed: {avg_files:.1f}", self.styles['Normal'])) elements.append(Spacer(1, 10)) # CI/CD presence elements.append(Paragraph("CI/CD Systems", self.styles['SectionTitle'])) ci_cd = insights.get("ci_cd_presence", {}) if ci_cd: has_ci_cd = ci_cd.get("has_ci_cd", False) systems = ci_cd.get("ci_cd_systems", {}) if has_ci_cd: elements.append(Paragraph("Detected CI/CD Systems:", self.styles['Normal'])) detected_systems = [name for name, present in systems.items() if present] for system in detected_systems: elements.append(Paragraph(f"• {system.replace('_', ' ').title()}", self.styles['Normal'])) else: elements.append(Paragraph("No CI/CD systems detected", self.styles['Normal'])) return elements def _create_community_analysis(self) -> List[Any]: """Create community analysis section of the report.""" elements = [] # Section title elements.append(Paragraph("Community Analysis", self.styles['Heading1'])) elements.append(Spacer(1, 10)) insights = self.repo_data.get("insights", {}) # Contributor insights elements.append(Paragraph("Contributor Analysis", self.styles['SectionTitle'])) contributor_insights = insights.get("contributor_insights", {}) if contributor_insights: contributor_count = contributor_insights.get("contributor_count", 0) total_contributions = contributor_insights.get("total_contributions", 0) avg_contributions = contributor_insights.get("avg_contributions_per_contributor", 0) elements.append(Paragraph(f"Total Contributors: {contributor_count}", self.styles['Normal'])) elements.append(Paragraph(f"Total Contributions: {total_contributions}", self.styles['Normal'])) elements.append(Paragraph(f"Average Contributions per Contributor: {avg_contributions:.1f}", self.styles['Normal'])) elements.append(Spacer(1, 10)) # Contribution distribution distribution = contributor_insights.get("contribution_distribution", {}) if distribution: elements.append(Paragraph("Contribution Distribution:", self.styles['SubsectionTitle'])) gini = distribution.get("gini_coefficient", 0) top_percent = distribution.get("top_contributor_percentage", 0) contributors_20 = distribution.get("contributors_for_20_percent", 0) contributors_50 = distribution.get("contributors_for_50_percent", 0) contributors_80 = distribution.get("contributors_for_80_percent", 0) # Format distribution metrics elements.append(Paragraph(f"Top Contributor: {top_percent:.1f}% of all contributions", self.styles['Normal'])) elements.append(Paragraph(f"Contributors for first 20% work: {contributors_20}", self.styles['Normal'])) elements.append(Paragraph(f"Contributors for first 50% work: {contributors_50}", self.styles['Normal'])) elements.append(Paragraph(f"Contributors for first 80% work: {contributors_80}", self.styles['Normal'])) elements.append(Paragraph(f"Gini Coefficient: {gini:.2f} ({'High' if gini > 0.6 else 'Medium' if gini > 0.4 else 'Low'} inequality)", self.styles['Normal'])) elements.append(Spacer(1, 15)) # Community health elements.append(Paragraph("Community Health", self.styles['SectionTitle'])) community_health = insights.get("community_health", {}) if community_health: health_overall = community_health.get("overall", {}) elements.append(Paragraph( f"Overall Health: {health_overall.get('level', 'Unknown')} (Score: {health_overall.get('score', 0):.1f}/40)", self.styles['Normal'] )) elements.append(Spacer(1, 10)) # Issue and PR responsiveness if "issue_closure_rate" in community_health: closure_rate = community_health.get("issue_closure_rate", 0) elements.append(Paragraph(f"Issue Closure Rate: {closure_rate:.1%}", self.styles['Normal'])) if "avg_issue_resolution_time_hours" in community_health: resolution_hours = community_health.get("avg_issue_resolution_time_hours", 0) if resolution_hours > 72: resolution_days = resolution_hours / 24 elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_days:.1f} days", self.styles['Normal'])) else: elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_hours:.1f} hours", self.styles['Normal'])) if "pr_merge_rate" in community_health: merge_rate = community_health.get("pr_merge_rate", 0) elements.append(Paragraph(f"PR Merge Rate: {merge_rate:.1%}", self.styles['Normal'])) if "avg_pr_merge_time_hours" in community_health: merge_hours = community_health.get("avg_pr_merge_time_hours", 0) if merge_hours > 72: merge_days = merge_hours / 24 elements.append(Paragraph(f"Avg. PR Merge Time: {merge_days:.1f} days", self.styles['Normal'])) else: elements.append(Paragraph(f"Avg. PR Merge Time: {merge_hours:.1f} hours", self.styles['Normal'])) elements.append(Spacer(1, 10)) # Community guidelines community_files = community_health.get("community_guidelines", {}) if community_files: elements.append(Paragraph("Community Guidelines:", self.styles['SubsectionTitle'])) files = [ ("CONTRIBUTING.md", "Contributing Guidelines"), ("CODE_OF_CONDUCT.md", "Code of Conduct"), ("SECURITY.md", "Security Policy"), ("SUPPORT.md", "Support Information"), ("GOVERNANCE.md", "Governance Model") ] data = [["Guideline", "Present"]] for file_name, display_name in files: present = community_files.get(file_name, False) data.append([display_name, "✓" if present else "✗"]) table = Table(data, colWidths=[150, 50]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'CENTER'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), ])) elements.append(table) elements.append(Spacer(1, 15)) # Documentation quality elements.append(Paragraph("Documentation Analysis", self.styles['SectionTitle'])) doc_quality = insights.get("documentation_quality", {}) if doc_quality: has_readme = doc_quality.get("has_readme", False) if has_readme: quality_score = doc_quality.get("score", 0) quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" word_count = doc_quality.get("readme_length", 0) elements.append(Paragraph(f"README Quality: {quality_level} (Score: {quality_score:.2f})", self.styles['Normal'])) elements.append(Paragraph(f"README Length: {word_count} words", self.styles['Normal'])) elements.append(Spacer(1, 10)) # Section analysis sections = doc_quality.get("sections", {}) if sections: elements.append(Paragraph("README Sections Present:", self.styles['SubsectionTitle'])) section_labels = { "introduction": "Introduction/Overview", "installation": "Installation Instructions", "usage": "Usage Examples", "api": "API Documentation", "contributing": "Contributing Guidelines", "license": "License Information", "code_of_conduct": "Code of Conduct" } data = [["Section", "Present"]] for section_key, section_label in section_labels.items(): present = sections.get(section_key, False) data.append([section_label, "✓" if present else "✗"]) table = Table(data, colWidths=[150, 50]) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('ALIGN', (1, 0), (1, -1), 'CENTER'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), ])) elements.append(table) elements.append(Spacer(1, 10)) # Additional doc quality metrics has_images = doc_quality.get("has_images", False) has_code = doc_quality.get("has_code_examples", False) metrics_text = "Additional Features: " if has_images: img_count = doc_quality.get("image_count", 0) metrics_text += f"{img_count} images/diagrams, " if has_code: code_blocks = doc_quality.get("code_block_count", 0) metrics_text += f"{code_blocks} code examples" if has_images or has_code: elements.append(Paragraph(metrics_text, self.styles['Normal'])) else: elements.append(Paragraph("No README file found.", self.styles['Normal'])) return elements def _create_visualization_pages(self) -> List[Any]: """Create pages with visualizations.""" elements = [] # Section title elements.append(Paragraph("Visualizations", self.styles['Heading1'])) elements.append(Spacer(1, 10)) visualizations = self.repo_data.get("visualizations", {}) # Organize visualizations by category categories = { "Language Analysis": ["language_distribution", "language_treemap"], "Commit Activity": ["weekly_commits", "code_frequency", "commits_by_weekday", "commits_by_hour", "commit_heatmap"], "Contributor Analysis": ["top_contributors", "contribution_distribution", "collaboration_network"], "Issue & PR Analysis": ["issues_by_state", "issues_by_month", "issues_by_label", "pr_code_changes", "issue_pr_timeline"] } # Add visualizations by category for category, viz_keys in categories.items(): category_visualizations = [key for key in viz_keys if key in visualizations] if category_visualizations: elements.append(Paragraph(category, self.styles['SectionTitle'])) elements.append(Spacer(1, 10)) for viz_key in category_visualizations: fig = visualizations.get(viz_key) if fig: # Save figure to a temporary buffer img_buffer = BytesIO() if isinstance(fig, go.Figure): # Handle Plotly figures fig.write_image(img_buffer, format="png", width=800, height=500) else: # Handle Matplotlib figures fig.savefig(img_buffer, format="png", dpi=150) img_buffer.seek(0) img = Image(img_buffer, width=6*inch, height=4*inch) # Add caption caption = viz_key.replace("_", " ").title() elements.append(Paragraph(caption, self.styles['SubsectionTitle'])) elements.append(img) elements.append(Spacer(1, 20)) # Add page break after each category elements.append(PageBreak()) return elements def _create_summary_and_recommendations(self) -> List[Any]: """Create summary and recommendations section.""" elements = [] # Section title elements.append(Paragraph("Summary & Recommendations", self.styles['Heading1'])) elements.append(Spacer(1, 10)) # Repository summary elements.append(Paragraph("Project Summary", self.styles['SectionTitle'])) insights = self.repo_data.get("insights", {}) repo_details = self.repo_data.get("repo_details", {}) # Short description of the project repo_name = repo_details.get("name", "The repository") repo_desc = repo_details.get("description", "") primary_lang = repo_details.get("language", "various languages") summary_text = f"{repo_name} is a {primary_lang} project" if repo_desc: summary_text += f" that {repo_desc.lower() if repo_desc[0].isupper() else repo_desc}" summary_text += "." elements.append(Paragraph(summary_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Key metrics summary community_health = insights.get("community_health", {}).get("overall", {}) activity_level = insights.get("activity_level", {}) code_complexity = insights.get("code_complexity", {}).get("overall", {}) metrics_text = f"The project has {repo_details.get('stargazers_count', 0)} stars and {repo_details.get('forks_count', 0)} forks." if "contributor_insights" in insights: contributor_count = insights["contributor_insights"].get("contributor_count", 0) metrics_text += f" It has {contributor_count} contributors" gini = insights["contributor_insights"].get("contribution_distribution", {}).get("gini_coefficient", 0) if gini > 0.7: metrics_text += " with a highly centralized contribution pattern" elif gini > 0.4: metrics_text += " with a moderately distributed contribution pattern" else: metrics_text += " with a well-distributed contribution pattern" metrics_text += "." elements.append(Paragraph(metrics_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Activity summary if activity_level: activity_text = f"The project shows {activity_level.get('level', 'Unknown').lower()} activity levels" # Add activity context if activity_level.get('level') in ["High", "Very High"]: activity_text += " with regular commits and issue management." elif activity_level.get('level') in ["Medium"]: activity_text += " with moderate development progress." else: activity_text += " with limited recent development." elements.append(Paragraph(activity_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Code quality summary if code_complexity: complexity_text = f"The codebase has {code_complexity.get('level', 'Unknown').lower()} complexity" if code_complexity.get('level') in ["High", "Very High"]: complexity_text += ", which may present challenges for new contributors and maintenance." elif code_complexity.get('level') in ["Medium", "Medium-High"]: complexity_text += " with a reasonable balance between functionality and maintainability." else: complexity_text += " and should be relatively straightforward to understand and maintain." elements.append(Paragraph(complexity_text, self.styles['Normal'])) elements.append(Spacer(1, 10)) # Community health summary if community_health: health_text = f"The project demonstrates {community_health.get('level', 'Unknown').lower()} community health" if community_health.get('level') in ["Excellent", "Very Good", "Good"]: health_text += " with responsive maintainers and clear contribution guidelines." elif community_health.get('level') in ["Fair"]: health_text += " with some community structures in place." else: health_text += " with opportunities for improved community engagement." elements.append(Paragraph(health_text, self.styles['Normal'])) elements.append(Spacer(1, 15)) # Recommendations elements.append(Paragraph("Recommendations", self.styles['SectionTitle'])) recommendations = [] # Documentation recommendations doc_quality = insights.get("documentation_quality", {}) if doc_quality: score = doc_quality.get("score", 0) if score < 0.4: recommendations.append("Improve documentation by adding more comprehensive README content, including usage examples and API documentation.") elif score < 0.7: recommendations.append("Enhance existing documentation with more examples and clearer installation instructions.") sections = doc_quality.get("sections", {}) missing_key_sections = [] if not sections.get("installation", False): missing_key_sections.append("installation instructions") if not sections.get("usage", False): missing_key_sections.append("usage examples") if missing_key_sections: recommendations.append(f"Add missing documentation sections: {', '.join(missing_key_sections)}.") # Community recommendations community_files = insights.get("community_health", {}).get("community_guidelines", {}) if community_files: missing_guidelines = [] if not community_files.get("CONTRIBUTING.md", False): missing_guidelines.append("contribution guidelines") if not community_files.get("CODE_OF_CONDUCT.md", False): missing_guidelines.append("code of conduct") if missing_guidelines: recommendations.append(f"Create missing community files: {', '.join(missing_guidelines)}.") # Issue management recommendations issue_insights = insights.get("issue_insights", {}) if issue_insights: resolution_time = issue_insights.get("resolution_time", {}).get("mean_hours", 0) if resolution_time > 168: # 1 week recommendations.append("Improve issue response time to enhance user experience and community engagement.") # Code complexity recommendations if code_complexity and code_complexity.get('level') in ["High", "Very High"]: recommendations.append("Consider refactoring complex parts of the codebase to improve maintainability.") # CI/CD recommendations ci_cd = insights.get("ci_cd_presence", {}) if not ci_cd.get("has_ci_cd", False): recommendations.append("Implement CI/CD pipelines (e.g., GitHub Actions) to automate testing and deployment.") # Activity recommendations if activity_level and activity_level.get('level') in ["Low", "Very Low", "None"]: recommendations.append("Revitalize project with regular updates and community engagement to attract more contributors.") # Add recommendations to the report if recommendations: for i, recommendation in enumerate(recommendations, 1): elements.append(Paragraph(f"{i}. {recommendation}", self.styles['Normal'])) elements.append(Spacer(1, 5)) else: elements.append(Paragraph("This project follows good development practices and no significant improvements are needed at this time.", self.styles['Normal'])) return elements class RAGHelper: """ Helper class for Retrieval Augmented Generation (RAG) to enhance chatbot responses with repository insights. """ def __init__(self, repo_data: Dict[str, Any]): """Initialize with repository data.""" self.repo_data = repo_data self.insights = repo_data.get("insights", {}) # Extract key information for easy retrieval self._extract_key_info() def _extract_key_info(self): """Extract and organize key information from repository data.""" self.repo_info = {} # Basic repository details if "repo_details" in self.repo_data: details = self.repo_data["repo_details"] self.repo_info["name"] = details.get("name", "") self.repo_info["full_name"] = details.get("full_name", "") self.repo_info["description"] = details.get("description", "") self.repo_info["url"] = details.get("html_url", "") self.repo_info["stars"] = details.get("stargazers_count", 0) self.repo_info["forks"] = details.get("forks_count", 0) self.repo_info["language"] = details.get("language", "") self.repo_info["created_at"] = details.get("created_at", "") self.repo_info["license"] = details.get("license", "") # Languages used if "languages" in self.repo_data: languages = self.repo_data["languages"] total_bytes = sum(languages.values()) if languages else 0 if total_bytes > 0: language_percentages = { lang: (bytes_count / total_bytes) * 100 for lang, bytes_count in languages.items() } self.repo_info["language_breakdown"] = language_percentages sorted_languages = sorted(language_percentages.items(), key=lambda x: x[1], reverse=True) self.repo_info["top_languages"] = sorted_languages[:5] # Contributors if "contributors" in self.repo_data: contributors = self.repo_data["contributors"] self.repo_info["total_contributors"] = len(contributors) if contributors: sorted_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) self.repo_info["top_contributors"] = [ { "name": c.get("login", "Unknown"), "contributions": c.get("contributions", 0) } for c in sorted_contributors[:5] ] # Activity metrics if "commit_insights" in self.insights: commit_insights = self.insights["commit_insights"] self.repo_info["commit_patterns"] = commit_insights.get("commit_time_patterns", {}) self.repo_info["top_committers"] = commit_insights.get("top_contributors", {}) # Documentation quality if "documentation_quality" in self.insights: doc_quality = self.insights["documentation_quality"] self.repo_info["documentation_score"] = doc_quality.get("score", 0) self.repo_info["documentation_quality"] = ( "High" if doc_quality.get("score", 0) > 0.7 else "Medium" if doc_quality.get("score", 0) > 0.4 else "Low" ) self.repo_info["readme_sections"] = doc_quality.get("sections", {}) # Community health if "community_health" in self.insights: community_health = self.insights["community_health"] self.repo_info["community_health_level"] = community_health.get("overall", {}).get("level", "Unknown") self.repo_info["community_guidelines"] = community_health.get("community_guidelines", {}) # Activity level if "activity_level" in self.insights: activity_level = self.insights["activity_level"] self.repo_info["activity_level"] = activity_level.get("level", "Unknown") # Code complexity if "code_complexity" in self.insights: code_complexity = self.insights["code_complexity"] self.repo_info["code_complexity_level"] = code_complexity.get("overall", {}).get("level", "Unknown") def get_context_for_query(self, query: str) -> str: """ Retrieve relevant context from repository data based on the query. Args: query: The user's query Returns: str: Contextual information to enhance the response """ # Convert query to lowercase for easier matching query_lower = query.lower() # Define keywords for different aspects of the repository keywords = { "overview": ["overview", "about", "what is", "tell me about", "summary"], "languages": ["language", "programming language", "code language", "tech stack"], "contributors": ["contributor", "who", "team", "maintainer", "author"], "activity": ["activity", "active", "commit", "update", "recent", "frequency"], "documentation": ["documentation", "docs", "readme", "well documented"], "community": ["community", "health", "governance", "conduct", "guideline"], "complexity": ["complex", "complexity", "difficult", "simple", "codebase", "understand"], "issues": ["issue", "bug", "problem", "ticket", "feature request"], "pulls": ["pull request", "pr", "merge", "contribution"], } # Check which aspects are relevant to the query relevant_aspects = [] for aspect, terms in keywords.items(): if any(term in query_lower for term in terms): relevant_aspects.append(aspect) # If no specific aspects are identified, provide a general overview if not relevant_aspects: relevant_aspects = ["overview"] # Build context information based on relevant aspects context_parts = [] # Repository overview if "overview" in relevant_aspects: repo_name = self.repo_info.get("full_name", "The repository") stars = self.repo_info.get("stars", 0) forks = self.repo_info.get("forks", 0) description = self.repo_info.get("description", "") overview = f"{repo_name} is a GitHub repository with {stars} stars and {forks} forks. " if description: overview += f"Description: {description}. " language = self.repo_info.get("language", "") if language: overview += f"It's primarily written in {language}. " created_at = self.repo_info.get("created_at", "") if created_at: try: date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) overview += f"The repository was created on {date.strftime('%B %d, %Y')}. " except (ValueError, AttributeError): pass context_parts.append(overview) # Language breakdown if "languages" in relevant_aspects: top_languages = self.repo_info.get("top_languages", []) if top_languages: languages_text = "Language breakdown: " languages_text += ", ".join([f"{lang}: {pct:.1f}%" for lang, pct in top_languages]) languages_text += "." context_parts.append(languages_text) # Contributors if "contributors" in relevant_aspects: total_contributors = self.repo_info.get("total_contributors", 0) top_contributors = self.repo_info.get("top_contributors", []) contributors_text = f"The repository has {total_contributors} contributors. " if top_contributors: contributors_text += "Top contributors: " contributors_text += ", ".join([ f"{c['name']} ({c['contributions']} commits)" for c in top_contributors ]) contributors_text += "." context_parts.append(contributors_text) # Activity metrics if "activity" in relevant_aspects: activity_level = self.repo_info.get("activity_level", "Unknown") activity_text = f"Activity level: {activity_level}. " commit_patterns = self.repo_info.get("commit_patterns", {}) by_weekday = commit_patterns.get("by_weekday", {}) if by_weekday: most_active_day = max(by_weekday.items(), key=lambda x: x[1])[0] activity_text += f"Most active day of the week: {most_active_day}. " context_parts.append(activity_text) # Documentation quality if "documentation" in relevant_aspects: doc_quality = self.repo_info.get("documentation_quality", "Unknown") doc_score = self.repo_info.get("documentation_score", 0) docs_text = f"Documentation quality: {doc_quality} (score: {doc_score:.2f}/1.0). " readme_sections = self.repo_info.get("readme_sections", {}) if readme_sections: present_sections = [k for k, v in readme_sections.items() if v] missing_sections = [k for k, v in readme_sections.items() if not v] if present_sections: docs_text += f"README includes sections on: {', '.join(present_sections)}. " if missing_sections: docs_text += f"README is missing sections on: {', '.join(missing_sections)}." context_parts.append(docs_text) # Community health if "community" in relevant_aspects: health_level = self.repo_info.get("community_health_level", "Unknown") guidelines = self.repo_info.get("community_guidelines", {}) community_text = f"Community health: {health_level}. " if guidelines: present_guidelines = [k for k, v in guidelines.items() if v] missing_guidelines = [k for k, v in guidelines.items() if not v] if present_guidelines: community_text += f"Has community files: {', '.join(present_guidelines)}. " if missing_guidelines: community_text += f"Missing community files: {', '.join(missing_guidelines)}." context_parts.append(community_text) # Code complexity if "complexity" in relevant_aspects: complexity_level = self.repo_info.get("code_complexity_level", "Unknown") complexity_text = f"Code complexity: {complexity_level}." context_parts.append(complexity_text) # Issues if "issues" in relevant_aspects and "issue_insights" in self.insights: issue_insights = self.insights["issue_insights"] by_state = issue_insights.get("by_state", {}) issues_text = "Issues: " if by_state: issues_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) issues_text += ". " resolution_time = issue_insights.get("resolution_time", {}) if resolution_time: mean_hours = resolution_time.get("mean_hours", 0) if mean_hours > 24: mean_days = mean_hours / 24 issues_text += f"Average resolution time: {mean_days:.1f} days." else: issues_text += f"Average resolution time: {mean_hours:.1f} hours." context_parts.append(issues_text) # Pull requests if "pulls" in relevant_aspects and "pr_insights" in self.insights: pr_insights = self.insights["pr_insights"] by_state = pr_insights.get("by_state", {}) prs_text = "Pull Requests: " if by_state: prs_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) prs_text += ". " context_parts.append(prs_text) # Join all context parts context = " ".join(context_parts) return context def create_gradio_interface(): """ Create and launch the Gradio interface for GitHub repository analysis. """ # Styling css = """ .gradio-container {max-width: 100% !important} .main-analysis-area {min-height: 600px} .analysis-result {overflow-y: auto; max-height: 500px} .chat-interface {border: 1px solid #ccc; border-radius: 5px; padding: 10px} .pdf-download {margin-top: 20px} """ # Initialize state repo_data = {} analyzer = None def parse_repo_url(url: str) -> Tuple[str, str]: """Parse GitHub repository URL into owner and repo name.""" # Pattern for GitHub repo URLs patterns = [ r"github\.com\/([^\/]+)\/([^\/]+)", # github.com/owner/repo r"github\.com\/([^\/]+)\/([^\/]+)\/?$", # github.com/owner/repo/ r"github\.com\/([^\/]+)\/([^\/]+)\.git", # github.com/owner/repo.git ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1), match.group(2) return None, None def analyze_repository(repo_url: str, is_private: bool, github_token: str = None, progress=gr.Progress()) -> Tuple[str, Dict]: """Analyze GitHub repository and return the analysis results.""" # Validate URL and extract owner/repo owner, repo_name = parse_repo_url(repo_url) if not owner or not repo_name: return "Invalid GitHub repository URL. Please use format: https://github.com/owner/repo", {} # Use provided token or default token token = github_token if is_private and github_token else os.environ.get("GITHUB_TOKEN", "") if is_private and not token: return "GitHub token is required for private repositories.", {} # Configure analyzer config = GitHubAPIConfig(token=token) nonlocal analyzer analyzer = GitHubRepoAnalyzer(config) # Analyze repository with progress updates progress(0, desc="Starting repository analysis...") try: progress(0.1, desc="Fetching repository details...") global repo_data repo_data = analyzer.analyze_repo(owner, repo_name) progress(0.9, desc="Generating insights...") # Create a summary of the analysis repo_details = repo_data.get("repo_details", {}) insights = repo_data.get("insights", {}) repo_name = repo_details.get("full_name", "") description = repo_details.get("description", "No description provided") stars = repo_details.get("stargazers_count", 0) forks = repo_details.get("forks_count", 0) language = repo_details.get("language", "Unknown") # Calculate age created_at = repo_details.get("created_at", "") age_str = "Unknown" if created_at: try: created_date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days age_years = age_days / 365.25 age_str = f"{age_years:.1f} years ({age_days} days)" except (ValueError, AttributeError): pass # Get activity level activity_level = insights.get("activity_level", {}).get("level", "Unknown") # Documentation quality doc_quality = insights.get("documentation_quality", {}) has_readme = doc_quality.get("has_readme", False) doc_score = doc_quality.get("score", 0) if has_readme else 0 doc_quality_level = "High" if doc_score > 0.7 else "Medium" if doc_score > 0.4 else "Low" # Community health community_health = insights.get("community_health", {}).get("overall", {}) health_level = community_health.get("level", "Unknown") # Code complexity code_complexity = insights.get("code_complexity", {}).get("overall", {}) complexity_level = code_complexity.get("level", "Unknown") # Create summary HTML summary_html = f"""

{repo_name}

Description: {description}

Repository Details

Key Insights

""" # Contributors section contributors = repo_data.get("contributors", []) if contributors: top_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)[:5] summary_html += f"""

Top Contributors

""" for contributor in top_contributors: avatar_url = contributor.get("avatar_url", "") login = contributor.get("login", "Unknown") contributions = contributor.get("contributions", 0) summary_html += f"""
{login}
{contributions} commits
""" summary_html += """
""" # Language distribution section languages = repo_data.get("languages", {}) if languages: total_bytes = sum(languages.values()) language_percentages = [ (lang, bytes_count, (bytes_count / total_bytes) * 100) for lang, bytes_count in languages.items() ] sorted_languages = sorted(language_percentages, key=lambda x: x[1], reverse=True)[:5] summary_html += f"""

Language Distribution

""" for lang, bytes_count, percentage in sorted_languages: bar_width = max(1, min(100, percentage)) summary_html += f"""
{lang}
{percentage:.1f}%
""" summary_html += """
""" progress(1.0, desc="Analysis complete!") return summary_html, repo_data except Exception as e: error_message = f"Error analyzing repository: {str(e)}" logger.error(error_message) return error_message, {} def generate_pdf_report() -> Tuple[str, Dict[str, str]]: """Generate and download PDF report.""" if not repo_data: return "Please analyze a repository first.", {} try: # Create PDF report pdf_generator = PDFReportGenerator(repo_data) pdf_path = pdf_generator.generate_report() # Return file path for download repo_name = repo_data.get("repo_details", {}).get("full_name", "repository").replace("/", "_") return f"PDF report generated for {repo_name}", {"report.pdf": pdf_path} except Exception as e: error_message = f"Error generating PDF report: {str(e)}" logger.error(error_message) return error_message, {} def chat_with_repo(query: str, history: List[Tuple[str, str]]) -> str: """ Chat with the repository analysis data using RAG approach. Args: query: User's question history: Chat history Returns: str: Response to the user's question """ if not repo_data: return "Please analyze a repository first before asking questions." try: # Use RAG helper to get relevant context rag_helper = RAGHelper(repo_data) context = rag_helper.get_context_for_query(query) # For a real implementation, you would use the Gemini API here # This is a simulated response based on the context # Format response based on the query and context response = "" # Extract repo name for more natural responses repo_name = repo_data.get("repo_details", {}).get("name", "The repository") # General info about the repo if any(term in query.lower() for term in ["what is", "tell me about", "overview", "about"]): response = f"{context}\n\nIs there something specific about {repo_name} you'd like to know more about?" # Language related queries elif any(term in query.lower() for term in ["language", "programming", "written in"]): response = f"{context}\n\nWould you like to know more about any specific language used in {repo_name}?" # Contributor related queries elif any(term in query.lower() for term in ["contributor", "who", "maintain", "author"]): response = f"{context}\n\nI can provide more details about specific contributors if you're interested." # Activity related queries elif any(term in query.lower() for term in ["active", "activity", "commit", "frequency"]): response = f"{context}\n\nWould you like to see visualizations of the commit activity patterns?" # Documentation related queries elif any(term in query.lower() for term in ["document", "readme", "docs"]): response = f"{context}\n\nIs there a specific aspect of the documentation you'd like feedback on?" # Code complexity related queries elif any(term in query.lower() for term in ["complex", "difficulty", "understand"]): response = f"{context}\n\nWould you like suggestions for navigating this codebase effectively?" # Default response for other queries else: response = f"Based on my analysis of {repo_name}:\n\n{context}\n\nIs there anything specific you'd like to know more about?" return response except Exception as e: error_message = f"Error processing your question: {str(e)}" logger.error(error_message) return error_message # Create Gradio interface with gr.Blocks(css=css) as interface: gr.Markdown("# GitHub Repository Analyzer") gr.Markdown("Analyze GitHub repositories and chat about the insights") with gr.Tab("Repository Analysis"): with gr.Row(): with gr.Column(scale=3): repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo") with gr.Column(scale=1): is_private = gr.Checkbox(label="Private Repository") github_token = gr.Textbox(label="GitHub Token (for private repos)", type="password", visible=False) # Show/hide token input based on private repo checkbox is_private.change(fn=lambda x: gr.update(visible=x), inputs=[is_private], outputs=[github_token]) analyze_btn = gr.Button("Analyze Repository", variant="primary") with gr.Row(): with gr.Column(scale=2): analysis_result = gr.HTML(label="Analysis Result", elem_classes=["analysis-result"]) with gr.Column(scale=1): with gr.Group(): gr.Markdown("### PDF Report") pdf_btn = gr.Button("Generate PDF Report", variant="secondary") pdf_output = gr.Markdown() pdf_download = gr.File(label="Download Report", elem_classes=["pdf-download"]) # Connect buttons to functions analyze_btn.click( fn=analyze_repository, inputs=[repo_url, is_private, github_token], outputs=[analysis_result, pdf_output] ) pdf_btn.click( fn=generate_pdf_report, inputs=[], outputs=[pdf_output, pdf_download] ) with gr.Tab("Chat with Repository"): gr.Markdown("Ask questions about the repository and get insights") chatbot = gr.Chatbot(elem_classes=["chat-interface"]) msg = gr.Textbox( placeholder="Ask me anything about the repository...", show_label=False ) clear = gr.Button("Clear") # Connect chat interface msg.submit( fn=chat_with_repo, inputs=[msg, chatbot], outputs=[chatbot], postprocess=lambda x: [(msg.value, x)] ).then(lambda: "", None, msg) clear.click(lambda: None, None, chatbot, queue=False) return interface # Main code to run the application if __name__ == "__main__": # Create and launch Gradio interface interface = create_gradio_interface() interface.launch(debug=True, share=True)