import os
import json
import time
import re
import logging
import datetime
import concurrent.futures
import sys
import base64
import tempfile
from pathlib import Path
from typing import Dict, List, Union, Any, Optional, Tuple, Set
from collections import Counter, defaultdict
from dataclasses import dataclass, field, asdict
from io import BytesIO, StringIO
import urllib.request
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from tqdm.notebook import tqdm
from dateutil.relativedelta import relativedelta
from github import Github, GithubException, RateLimitExceededException
import gradio as gr
# For PDF Generation
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak
from reportlab.lib.units import inch
from reportlab.pdfgen import canvas
from reportlab.lib.enums import TA_CENTER, TA_LEFT
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("github_analyzer")
@dataclass
class GitHubAPIConfig:
"""Configuration for the GitHub API client with sensible defaults."""
# API access configuration
token: str = None
max_retries: int = 5
backoff_factor: int = 2
per_page: int = 100 # Max allowed by GitHub
timeout: int = 30
# Retry status codes
retry_status_codes: Set[int] = field(default_factory=lambda: {
403, 429, 500, 502, 503, 504
})
# Permission types
collaborator_permission_types: List[str] = field(default_factory=lambda: [
"admin", "push", "pull", "maintain", "triage"
])
# File classification
code_extensions: List[str] = field(default_factory=lambda: [
".py", ".js", ".java", ".c", ".cpp", ".cs", ".go", ".php", ".rb",
".swift", ".kt", ".ts", ".rs", ".scala", ".lua", ".m", ".mm",
".h", ".hpp", ".cc", ".hh", ".f", ".f90", ".f95", ".f03", ".f08",
".for", ".f77", ".jl", ".pl", ".pm", ".t", ".r", ".dart", ".groovy",
".v", ".vhd", ".vhdl", ".erl", ".hrl", ".hs", ".lhs", ".ex", ".exs", ".hx"
])
markup_extensions: List[str] = field(default_factory=lambda: [
".md", ".html", ".htm", ".xml", ".json", ".yaml", ".yml", ".txt",
".rst", ".tex", ".adoc", ".csv", ".tsv", ".toml", ".ini", ".cfg"
])
script_extensions: List[str] = field(default_factory=lambda: [
".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd"
])
notebook_extensions: List[str] = field(default_factory=lambda: [
".ipynb"
])
data_extensions: List[str] = field(default_factory=lambda: [
".csv", ".tsv", ".json", ".xml", ".xls", ".xlsx", ".hdf5",
".parquet", ".feather", ".pkl", ".sav", ".dta", ".arff"
])
config_extensions: List[str] = field(default_factory=lambda: [
".yml", ".yaml", ".json", ".toml", ".ini", ".cfg", ".conf"
])
other_extensions: List[str] = field(default_factory=lambda: [
".txt", ".log", ".svg", ".png", ".jpg", ".jpeg"
])
# Data collection limits (set to None for no limit)
max_contributors: Optional[int] = 50
max_issues: Optional[int] = 100
max_commits: Optional[int] = 200
max_search_results: Optional[int] = 50
max_pull_requests: Optional[int] = 100
max_collaborators: Optional[int] = 30
# Output configuration
output_dir: str = "/tmp/github_data"
generate_visualizations: bool = True
def __post_init__(self):
"""Ensure output directory exists"""
os.makedirs(self.output_dir, exist_ok=True)
def all_code_extensions(self) -> List[str]:
"""Return all code-related file extensions"""
return list(set(
self.code_extensions +
self.script_extensions +
self.config_extensions
))
class GithubClient:
"""
A robust GitHub client that handles rate limiting, retries, and provides
consistent error handling.
"""
def __init__(self, config: GitHubAPIConfig):
"""Initialize the GitHub client with configuration."""
self.config = config
self.github = Github(
config.token,
per_page=config.per_page,
timeout=config.timeout,
retry=config.max_retries
)
self.cache = {} # Simple in-memory cache
def get_repo(self, repo_path: str):
"""Get a repository by owner/name with caching."""
cache_key = f"repo:{repo_path}"
if cache_key in self.cache:
return self.cache[cache_key]
repo = self.github.get_repo(repo_path)
self.cache[cache_key] = repo
return repo
def _handle_exception(self, e: GithubException, retry_count: int) -> bool:
"""
Handle GitHub exceptions with proper retries and backoff strategy.
Args:
e: The exception to handle
retry_count: Current retry count
Returns:
bool: True if retry should be attempted, False otherwise
"""
if retry_count >= self.config.max_retries:
logger.error(f"Max retries ({self.config.max_retries}) exceeded.")
return False
if isinstance(e, RateLimitExceededException):
# Handle primary rate limit
rate_limit = self.github.get_rate_limit()
reset_time = rate_limit.core.reset.timestamp() if hasattr(rate_limit, 'core') else time.time() + 3600
sleep_time = max(0, int(reset_time - time.time())) + 1
logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds...")
time.sleep(sleep_time)
return True
elif e.status in self.config.retry_status_codes:
# Handle secondary rate limits and server errors
sleep_time = self.config.backoff_factor ** retry_count
logger.warning(
f"Temporary error (status {e.status}). Retrying in {sleep_time} seconds. "
f"Attempt {retry_count+1}/{self.config.max_retries}."
)
time.sleep(sleep_time)
return True
# Non-recoverable error
logger.error(f"Non-recoverable GitHub API error: {e}")
return False
def _paginated_request(self, method, *args, **kwargs):
"""
Execute a paginated GitHub API request with retry logic.
Args:
method: The PyGithub method to call
Returns:
List of results or None on non-recoverable error
"""
results = []
retry_count = 0
max_results = kwargs.pop('max_results', None)
while retry_count <= self.config.max_retries:
try:
paginated_list = method(*args, **kwargs)
# Process items
for item in paginated_list:
results.append(item)
if max_results and len(results) >= max_results:
return results
# Check if we've reached the end
if paginated_list.totalCount <= len(results):
break
# Reset retry counter on success
retry_count = 0
except GithubException as e:
if self._handle_exception(e, retry_count):
retry_count += 1
else:
return None
return results
def _execute_request(self, method, *args, **kwargs):
"""
Execute a single GitHub API request with retry logic.
Args:
method: The PyGithub method to call
Returns:
Result of the API call or None on non-recoverable error
"""
retry_count = 0
while retry_count <= self.config.max_retries:
try:
result = method(*args, **kwargs)
return result
except GithubException as e:
# Special case for 404 errors - file not found
if e.status == 404:
logger.info(f"Resource not found: {e}")
return None
if self._handle_exception(e, retry_count):
retry_count += 1
else:
return None
return None
class GitHubRepoAnalyzer:
"""
Main class for analyzing GitHub repositories and generating insights.
"""
def __init__(self, config: GitHubAPIConfig):
"""Initialize the analyzer with configuration."""
self.config = config
self.client = GithubClient(config)
def get_repo_details(self, repo) -> Dict[str, Any]:
"""Get comprehensive repository metadata."""
logger.info(f"Fetching repository details for {repo.full_name}")
return {
"name": repo.name,
"full_name": repo.full_name,
"description": repo.description,
"html_url": repo.html_url,
"stargazers_count": repo.stargazers_count,
"watchers_count": repo.watchers_count,
"forks_count": repo.forks_count,
"open_issues_count": repo.open_issues_count,
"language": repo.language,
"default_branch": repo.default_branch,
"created_at": repo.created_at.isoformat() if repo.created_at else None,
"updated_at": repo.updated_at.isoformat() if repo.updated_at else None,
"pushed_at": repo.pushed_at.isoformat() if repo.pushed_at else None,
"license": repo.license.name if repo.license else None,
"topics": list(repo.get_topics()),
"archived": repo.archived,
"disabled": repo.disabled,
"visibility": repo.visibility,
"has_wiki": repo.has_wiki,
"has_pages": repo.has_pages,
"has_projects": repo.has_projects,
"has_issues": repo.has_issues,
"has_discussions": repo.has_discussions if hasattr(repo, 'has_discussions') else None,
"size": repo.size, # Size in KB
"network_count": repo.network_count,
"subscribers_count": repo.subscribers_count,
"organization": repo.organization.login if repo.organization else None,
"parent": repo.parent.full_name if hasattr(repo, 'parent') and repo.parent else None,
"fork": repo.fork,
}
def get_contributors(self, repo) -> List[Dict[str, Any]]:
"""Get repository contributors with detailed information."""
logger.info(f"Fetching contributors for {repo.full_name}")
contributors = self.client._paginated_request(
repo.get_contributors,
max_results=self.config.max_contributors
)
if contributors is None:
return []
return [
{
"login": c.login,
"id": c.id,
"contributions": c.contributions,
"type": c.type,
"html_url": c.html_url,
"followers": c.followers,
"following": c.following,
"public_repos": c.public_repos if hasattr(c, 'public_repos') else None,
"bio": c.bio if hasattr(c, 'bio') else None,
"location": c.location if hasattr(c, 'location') else None,
"company": c.company if hasattr(c, 'company') else None,
"email": c.email if hasattr(c, 'email') else None,
"avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None,
}
for c in contributors
]
def get_languages(self, repo) -> Dict[str, int]:
"""Get languages used in the repository."""
logger.info(f"Fetching languages for {repo.full_name}")
languages = self.client._execute_request(repo.get_languages)
return languages or {}
def get_issues(self, repo, state: str = "all") -> List[Dict[str, Any]]:
"""Get repository issues."""
logger.info(f"Fetching issues for {repo.full_name} with state={state}")
issues = self.client._paginated_request(
repo.get_issues,
state=state,
max_results=self.config.max_issues
)
if issues is None:
return []
return [
{
"id": issue.id,
"number": issue.number,
"title": issue.title,
"body": issue.body,
"state": issue.state,
"user_login": issue.user.login if issue.user else None,
"labels": [label.name for label in issue.labels],
"comments": issue.comments,
"created_at": issue.created_at.isoformat() if issue.created_at else None,
"updated_at": issue.updated_at.isoformat() if issue.updated_at else None,
"closed_at": issue.closed_at.isoformat() if issue.closed_at else None,
"pull_request": issue.pull_request is not None,
"milestone": issue.milestone.title if issue.milestone else None,
"assignees": [user.login for user in issue.assignees] if issue.assignees else [],
}
for issue in issues
]
def get_commits(self, repo) -> List[Dict[str, Any]]:
"""Get repository commits."""
logger.info(f"Fetching commits for {repo.full_name}")
commits = self.client._paginated_request(
repo.get_commits,
max_results=self.config.max_commits
)
if commits is None:
return []
return [
{
"sha": commit.sha,
"commit_message": commit.commit.message,
"author_login": commit.author.login if commit.author else None,
"author_name": commit.commit.author.name if commit.commit and commit.commit.author else None,
"author_email": commit.commit.author.email if commit.commit and commit.commit.author else None,
"committer_login": commit.committer.login if commit.committer else None,
"committer_name": commit.commit.committer.name if commit.commit and commit.commit.committer else None,
"date": commit.commit.author.date.isoformat() if commit.commit and commit.commit.author else None,
"html_url": commit.html_url,
"stats": {
"additions": commit.stats.additions if hasattr(commit, 'stats') else None,
"deletions": commit.stats.deletions if hasattr(commit, 'stats') else None,
"total": commit.stats.total if hasattr(commit, 'stats') else None,
},
"files_changed": [
{"filename": f.filename, "additions": f.additions, "deletions": f.deletions, "status": f.status}
for f in commit.files
] if hasattr(commit, 'files') else [],
}
for commit in commits
]
def get_readme(self, repo) -> str:
"""Get repository README content."""
logger.info(f"Fetching README for {repo.full_name}")
readme = self.client._execute_request(repo.get_readme)
if readme is None:
return ""
try:
return readme.decoded_content.decode('utf-8')
except UnicodeDecodeError:
logger.warning(f"Could not decode README content for {repo.full_name}")
return ""
def get_pull_requests(self, repo, state: str = "all") -> List[Dict[str, Any]]:
"""Get repository pull requests."""
logger.info(f"Fetching pull requests for {repo.full_name} with state={state}")
pulls = self.client._paginated_request(
repo.get_pulls,
state=state,
max_results=self.config.max_pull_requests
)
if pulls is None:
return []
return [
{
"id": pull.id,
"number": pull.number,
"title": pull.title,
"body": pull.body,
"state": pull.state,
"user_login": pull.user.login if pull.user else None,
"created_at": pull.created_at.isoformat() if pull.created_at else None,
"updated_at": pull.updated_at.isoformat() if pull.updated_at else None,
"closed_at": pull.closed_at.isoformat() if pull.closed_at else None,
"merged_at": pull.merged_at.isoformat() if pull.merged_at else None,
"draft": pull.draft if hasattr(pull, 'draft') else None,
"mergeable": pull.mergeable if hasattr(pull, 'mergeable') else None,
"mergeable_state": pull.mergeable_state if hasattr(pull, 'mergeable_state') else None,
"merged": pull.merged if hasattr(pull, 'merged') else None,
"merge_commit_sha": pull.merge_commit_sha if hasattr(pull, 'merge_commit_sha') else None,
"comments": pull.comments if hasattr(pull, 'comments') else 0,
"review_comments": pull.review_comments if hasattr(pull, 'review_comments') else 0,
"commits": pull.commits if hasattr(pull, 'commits') else 0,
"additions": pull.additions if hasattr(pull, 'additions') else 0,
"deletions": pull.deletions if hasattr(pull, 'deletions') else 0,
"changed_files": pull.changed_files if hasattr(pull, 'changed_files') else 0,
"head_ref": pull.head.ref if hasattr(pull, 'head') and pull.head else None,
"base_ref": pull.base.ref if hasattr(pull, 'base') and pull.base else None,
"labels": [label.name for label in pull.labels] if hasattr(pull, 'labels') else [],
"assignees": [user.login for user in pull.assignees] if hasattr(pull, 'assignees') else [],
"requested_reviewers": [user.login for user in pull.requested_reviewers] if hasattr(pull, 'requested_reviewers') else [],
}
for pull in pulls
]
def get_collaborators(self, repo, affiliation: str = "all") -> List[Dict[str, Any]]:
"""Get repository collaborators."""
logger.info(f"Fetching collaborators for {repo.full_name} with affiliation={affiliation}")
collaborators = self.client._paginated_request(
repo.get_collaborators,
affiliation=affiliation,
max_results=self.config.max_collaborators
)
if collaborators is None:
return []
return [
{
"login": c.login,
"id": c.id,
"type": c.type,
"url": c.url,
"site_admin": c.site_admin if hasattr(c, 'site_admin') else None,
"role_name": self._get_permission_level(repo, c.login),
"avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None,
}
for c in collaborators
]
def _get_permission_level(self, repo, username: str) -> str:
"""Get permission level for a collaborator."""
try:
return repo.get_collaborator_permission(username)
except GithubException:
return "unknown"
def get_file_distribution(self, repo) -> Dict[str, int]:
"""Analyze file types distribution in the repository."""
logger.info(f"Analyzing file distribution for {repo.full_name}")
# Get all files in the repo (only feasible for smaller repos)
try:
contents = self.client._execute_request(repo.get_contents, "")
if not contents:
return {}
file_types = defaultdict(int)
directories = []
# Process initial contents
for item in contents:
if item.type == "dir":
directories.append(item.path)
elif item.type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_types[ext if ext else "no_extension"] += 1
# Process directories (up to a reasonable depth to avoid API rate limits)
max_depth = 3
for depth in range(max_depth):
if not directories:
break
next_level = []
for directory in directories[:100]: # Limit to avoid excessive API calls
dir_contents = self.client._execute_request(repo.get_contents, directory)
if not dir_contents:
continue
for item in dir_contents:
if item.type == "dir":
next_level.append(item.path)
elif item.type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_types[ext if ext else "no_extension"] += 1
directories = next_level
return dict(file_types)
except GithubException:
logger.warning(f"Could not get file distribution for {repo.full_name}")
return {}
def search_code(self, repo, query_terms: List[str]) -> List[Dict[str, Any]]:
"""Search for specific terms in the repository code."""
logger.info(f"Searching code in {repo.full_name} for terms: {query_terms}")
results = []
for term in query_terms:
query = f"repo:{repo.full_name} {term}"
search_results = self.client._paginated_request(
self.client.github.search_code,
query,
max_results=self.config.max_search_results
)
if search_results:
results.extend([
{
"term": term,
"name": result.name,
"path": result.path,
"sha": result.sha,
"url": result.html_url,
"repository": result.repository.full_name,
}
for result in search_results
if result.repository.full_name == repo.full_name
])
return results
def get_branches(self, repo) -> List[Dict[str, Any]]:
"""Get repository branches."""
logger.info(f"Fetching branches for {repo.full_name}")
branches = self.client._paginated_request(repo.get_branches)
if branches is None:
return []
return [
{
"name": branch.name,
"protected": branch.protected,
"commit_sha": branch.commit.sha if branch.commit else None,
}
for branch in branches
]
def get_releases(self, repo) -> List[Dict[str, Any]]:
"""Get repository releases."""
logger.info(f"Fetching releases for {repo.full_name}")
releases = self.client._paginated_request(repo.get_releases)
if releases is None:
return []
return [
{
"id": release.id,
"tag_name": release.tag_name,
"name": release.title,
"body": release.body,
"draft": release.draft,
"prerelease": release.prerelease,
"created_at": release.created_at.isoformat() if release.created_at else None,
"published_at": release.published_at.isoformat() if release.published_at else None,
"author_login": release.author.login if release.author else None,
"html_url": release.html_url,
"assets": [
{
"name": asset.name,
"label": asset.label,
"content_type": asset.content_type,
"size": asset.size,
"download_count": asset.download_count,
"browser_download_url": asset.browser_download_url,
}
for asset in release.get_assets()
],
}
for release in releases
]
def get_workflows(self, repo) -> List[Dict[str, Any]]:
"""Get repository GitHub Actions workflows."""
logger.info(f"Fetching workflows for {repo.full_name}")
try:
workflows = self.client._paginated_request(repo.get_workflows)
if workflows is None:
return []
return [
{
"id": workflow.id,
"name": workflow.name,
"path": workflow.path,
"state": workflow.state,
"created_at": workflow.created_at.isoformat() if workflow.created_at else None,
"updated_at": workflow.updated_at.isoformat() if workflow.updated_at else None,
}
for workflow in workflows
]
except (GithubException, AttributeError):
# Older PyGithub versions or repositories without workflows
return []
def analyze_commit_activity(self, repo) -> Dict[str, Any]:
"""Analyze commit activity patterns."""
logger.info(f"Analyzing commit activity for {repo.full_name}")
# Get stats commit activity
stats = self.client._execute_request(repo.get_stats_commit_activity)
if not stats:
return {}
weekly_commits = []
for week in stats:
if hasattr(week, 'week') and hasattr(week, 'total'):
date = datetime.datetime.fromtimestamp(week.week).strftime('%Y-%m-%d')
weekly_commits.append({
"week": date,
"total": week.total,
"days": week.days if hasattr(week, 'days') else [],
})
# Get code frequency
code_freq = self.client._execute_request(repo.get_stats_code_frequency)
if not code_freq:
code_frequency = []
else:
code_frequency = []
for item in code_freq:
date = datetime.datetime.fromtimestamp(item[0]).strftime('%Y-%m-%d')
code_frequency.append({
"week": date,
"additions": item[1],
"deletions": -item[2], # Convert to positive for better readability
})
return {
"weekly_commits": weekly_commits,
"code_frequency": code_frequency,
}
def analyze_contributor_activity(self, repo) -> Dict[str, Any]:
"""Analyze contributor activity patterns."""
logger.info(f"Analyzing contributor activity for {repo.full_name}")
# Get contributor stats
stats = self.client._execute_request(repo.get_stats_contributors)
if not stats:
return {}
contributor_stats = []
for stat in stats:
if not hasattr(stat, 'author') or not stat.author:
continue
weeks_data = []
for week in stat.weeks:
if hasattr(week, 'w'):
date = datetime.datetime.fromtimestamp(week.w).strftime('%Y-%m-%d')
weeks_data.append({
"week": date,
"additions": week.a,
"deletions": week.d,
"commits": week.c,
})
contributor_stats.append({
"author": stat.author.login,
"total_commits": stat.total,
"weeks": weeks_data,
})
return {
"contributor_stats": contributor_stats,
}
def analyze_issue_distribution(self, issues: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze distribution of issues by various metrics."""
if not issues:
return {}
# Convert to DataFrame for easier analysis
df = pd.DataFrame(issues)
# Issues by state
state_counts = df['state'].value_counts().to_dict() if 'state' in df else {}
# Issues by user
user_counts = df['user_login'].value_counts().head(10).to_dict() if 'user_login' in df else {}
# Pull requests vs regular issues
is_pr_counts = df['pull_request'].value_counts().to_dict() if 'pull_request' in df else {}
# Issues by labels (flattening the labels list)
labels = []
if 'labels' in df:
for label_list in df['labels']:
if label_list:
labels.extend(label_list)
label_counts = Counter(labels)
top_labels = dict(label_counts.most_common(10))
# Time analysis
if 'created_at' in df:
df['created_date'] = pd.to_datetime(df['created_at'])
df['month_year'] = df['created_date'].dt.strftime('%Y-%m')
issues_by_month = df.groupby('month_year').size().to_dict()
else:
issues_by_month = {}
# Calculate resolution time for closed issues
resolution_times = []
if 'created_at' in df and 'closed_at' in df:
for _, issue in df.iterrows():
if pd.notna(issue.get('closed_at')) and pd.notna(issue.get('created_at')):
created = pd.to_datetime(issue['created_at'])
closed = pd.to_datetime(issue['closed_at'])
resolution_time = (closed - created).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
resolution_stats = {}
if resolution_times:
resolution_stats = {
"mean_hours": sum(resolution_times) / len(resolution_times),
"median_hours": sorted(resolution_times)[len(resolution_times) // 2],
"min_hours": min(resolution_times),
"max_hours": max(resolution_times),
}
return {
"by_state": state_counts,
"by_user": user_counts,
"pr_vs_issue": is_pr_counts,
"by_label": top_labels,
"by_month": issues_by_month,
"resolution_time": resolution_stats,
}
def generate_insights(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate higher-level insights from the collected repository data."""
insights = {}
# Repository activity and health
if "repo_details" in repo_data:
repo_details = repo_data["repo_details"]
insights["repository_age_days"] = self._calculate_age_days(repo_details.get("created_at"))
insights["freshness_days"] = self._calculate_freshness_days(repo_details.get("pushed_at"))
# Popularity metrics
insights["popularity"] = {
"stars": repo_details.get("stargazers_count", 0),
"forks": repo_details.get("forks_count", 0),
"watchers": repo_details.get("watchers_count", 0),
"star_fork_ratio": self._calculate_ratio(
repo_details.get("stargazers_count", 0),
repo_details.get("forks_count", 0)
),
}
# Language distribution
if "languages" in repo_data:
languages = repo_data["languages"]
total_bytes = sum(languages.values()) if languages else 0
if total_bytes > 0:
language_percentages = {
lang: (bytes_count / total_bytes) * 100
for lang, bytes_count in languages.items()
}
insights["language_distribution"] = {
"primary_language": max(languages.items(), key=lambda x: x[1])[0] if languages else None,
"language_count": len(languages),
"percentages": language_percentages,
}
# Contributor insights
if "contributors" in repo_data:
contributors = repo_data["contributors"]
if contributors:
total_contributions = sum(c.get("contributions", 0) for c in contributors)
insights["contributor_insights"] = {
"contributor_count": len(contributors),
"total_contributions": total_contributions,
"avg_contributions_per_contributor": total_contributions / len(contributors) if len(contributors) > 0 else 0,
"contribution_distribution": self._analyze_contribution_distribution(contributors),
}
# Issue and PR dynamics
if "issues" in repo_data:
issues = repo_data["issues"]
insights["issue_insights"] = self.analyze_issue_distribution(issues)
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
insights["pr_insights"] = self.analyze_issue_distribution(prs) # Reuse the same analysis
# Additional PR-specific metrics
if prs:
insights["pr_code_change_stats"] = self._analyze_pr_code_changes(prs)
# Commit patterns
if "commits" in repo_data:
commits = repo_data["commits"]
insights["commit_insights"] = self._analyze_commit_patterns(commits)
# Check for CI/CD presence
insights["ci_cd_presence"] = self._detect_ci_cd(repo_data)
# Documentation quality
if "readme" in repo_data:
readme = repo_data["readme"]
insights["documentation_quality"] = self._assess_documentation_quality(readme)
# Project Activity Level
insights["activity_level"] = self._calculate_activity_level(repo_data)
# Code complexity analysis
insights["code_complexity"] = self._analyze_code_complexity(repo_data)
# Community health analysis
insights["community_health"] = self._analyze_community_health(repo_data)
return insights
def _calculate_age_days(self, created_at_iso: str) -> float:
"""Calculate repository age in days."""
if not created_at_iso:
return 0
try:
created_at = datetime.datetime.fromisoformat(created_at_iso.replace('Z', '+00:00'))
now = datetime.datetime.now(datetime.timezone.utc)
return (now - created_at).total_seconds() / (24 * 3600)
except ValueError:
return 0
def _calculate_freshness_days(self, pushed_at_iso: str) -> float:
"""Calculate days since last push."""
if not pushed_at_iso:
return float('inf')
try:
pushed_at = datetime.datetime.fromisoformat(pushed_at_iso.replace('Z', '+00:00'))
now = datetime.datetime.now(datetime.timezone.utc)
return (now - pushed_at).total_seconds() / (24 * 3600)
except ValueError:
return float('inf')
def _calculate_ratio(self, numerator: int, denominator: int) -> float:
"""Calculate ratio with handling for zero denominator."""
return numerator / denominator if denominator and denominator > 0 else float('inf')
def _analyze_contribution_distribution(self, contributors: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze the distribution of contributions among contributors."""
if not contributors:
return {}
# Sort contributors by number of contributions
sorted_contributors = sorted(contributors, key=lambda c: c.get("contributions", 0), reverse=True)
# Calculate percentiles
total_contributions = sum(c.get("contributions", 0) for c in contributors)
cumulative_contributions = 0
percentile_20 = 0
percentile_50 = 0
percentile_80 = 0
for i, contributor in enumerate(sorted_contributors):
contributions = contributor.get("contributions", 0)
cumulative_contributions += contributions
percentage = (cumulative_contributions / total_contributions) * 100
if percentage >= 20 and percentile_20 == 0:
percentile_20 = i + 1
if percentage >= 50 and percentile_50 == 0:
percentile_50 = i + 1
if percentage >= 80 and percentile_80 == 0:
percentile_80 = i + 1
# Calculate Gini coefficient to measure inequality
gini = self._calculate_gini([c.get("contributions", 0) for c in contributors])
return {
"contributors_for_20_percent": percentile_20,
"contributors_for_50_percent": percentile_50,
"contributors_for_80_percent": percentile_80,
"gini_coefficient": gini,
"top_contributor_percentage": (sorted_contributors[0].get("contributions", 0) / total_contributions) * 100 if sorted_contributors else 0,
}
def _calculate_gini(self, values: List[int]) -> float:
"""Calculate the Gini coefficient of a distribution."""
if not values or sum(values) == 0:
return 0
values = sorted(values)
n = len(values)
cumsum = 0
for i, value in enumerate(values):
cumsum += value
values[i] = cumsum
return (2 * sum(values) / (n * sum(values[-1]))) - (n + 1) / n
def _analyze_pr_code_changes(self, prs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze code changes across pull requests."""
if not prs:
return {}
# Extract metrics
additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None]
deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None]
changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None]
# Calculate stats
stats = {}
if additions:
stats["additions"] = {
"mean": sum(additions) / len(additions),
"median": sorted(additions)[len(additions) // 2],
"max": max(additions),
"total": sum(additions),
}
if deletions:
stats["deletions"] = {
"mean": sum(deletions) / len(deletions),
"median": sorted(deletions)[len(deletions) // 2],
"max": max(deletions),
"total": sum(deletions),
}
if changed_files:
stats["changed_files"] = {
"mean": sum(changed_files) / len(changed_files),
"median": sorted(changed_files)[len(changed_files) // 2],
"max": max(changed_files),
"total": sum(changed_files),
}
return stats
def _analyze_commit_patterns(self, commits: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze patterns in commit data."""
if not commits:
return {}
# Count by author
commit_counts = Counter(
commit.get("author_login", "Unknown")
for commit in commits
if commit.get("author_login")
)
# Analyze message patterns
message_lengths = [
len(commit.get("commit_message", ""))
for commit in commits
if commit.get("commit_message")
]
# Extract dates for time-based analysis
dates = []
for commit in commits:
date_str = commit.get("date")
if date_str:
try:
date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00'))
dates.append(date)
except ValueError:
pass
# Analyze times of day
hours = [date.hour for date in dates]
hour_counts = Counter(hours)
# Analyze days of week
weekdays = [date.weekday() for date in dates]
weekday_counts = Counter(weekdays)
weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
weekday_data = {weekday_names[day]: count for day, count in weekday_counts.items()}
# Analyze frequency of commits over time
commit_frequency = {}
if dates:
dates_sorted = sorted(dates)
first_date = dates_sorted[0]
last_date = dates_sorted[-1]
# Calculate commit frequency by month
current_date = first_date.replace(day=1)
while current_date <= last_date:
next_month = current_date.replace(day=28) + datetime.timedelta(days=4)
next_month = next_month.replace(day=1)
month_key = current_date.strftime('%Y-%m')
commit_frequency[month_key] = sum(
1 for date in dates
if date.year == current_date.year and date.month == current_date.month
)
current_date = next_month
return {
"top_contributors": dict(commit_counts.most_common(5)),
"message_length": {
"mean": sum(message_lengths) / len(message_lengths) if message_lengths else 0,
"max": max(message_lengths) if message_lengths else 0,
"min": min(message_lengths) if message_lengths else 0,
},
"commit_time_patterns": {
"by_hour": dict(sorted(hour_counts.items())),
"by_weekday": weekday_data,
},
"commit_frequency": commit_frequency,
}
def _detect_ci_cd(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Detect CI/CD presence and configuration in the repository."""
ci_cd_indicators = {
"github_actions": False,
"travis": False,
"circle_ci": False,
"jenkins": False,
"gitlab_ci": False,
"azure_pipelines": False,
}
# Check workflows
if "workflows" in repo_data and repo_data["workflows"]:
ci_cd_indicators["github_actions"] = True
# Check for CI configuration files
if "file_distribution" in repo_data:
files = repo_data.get("file_distribution", {})
if ".travis.yml" in files:
ci_cd_indicators["travis"] = True
if ".circleci/config.yml" in files or "circle.yml" in files:
ci_cd_indicators["circle_ci"] = True
if "Jenkinsfile" in files:
ci_cd_indicators["jenkins"] = True
if ".gitlab-ci.yml" in files:
ci_cd_indicators["gitlab_ci"] = True
if "azure-pipelines.yml" in files:
ci_cd_indicators["azure_pipelines"] = True
return {
"has_ci_cd": any(ci_cd_indicators.values()),
"ci_cd_systems": ci_cd_indicators,
}
def _assess_documentation_quality(self, readme: str) -> Dict[str, Any]:
"""Assess the quality of documentation based on the README."""
if not readme:
return {
"has_readme": False,
"readme_length": 0,
"score": 0,
"sections": {},
}
# Analyze the README content
lines = readme.strip().split('\n')
word_count = len(readme.split())
sections = {}
# Check for common README sections
section_keywords = {
"introduction": ["introduction", "overview", "about"],
"installation": ["installation", "install", "setup", "getting started"],
"usage": ["usage", "using", "example", "examples"],
"api": ["api", "reference", "documentation"],
"contributing": ["contributing", "contribute", "development"],
"license": ["license", "licensing"],
"code_of_conduct": ["code of conduct"],
}
for section, keywords in section_keywords.items():
sections[section] = any(
any(keyword.lower() in line.lower() for keyword in keywords)
for line in lines
)
# Count images/diagrams (markdown format)
image_count = readme.count("![")
# Count code examples
code_block_count = readme.count("```")
# Calculate a simple score
section_score = sum(1 for present in sections.values() if present) / len(sections)
has_images = image_count > 0
has_code = code_block_count > 0
length_score = min(1.0, word_count / 1000) # Normalize to 0-1, with 1000+ words being "complete"
score = (section_score * 0.5) + (has_images * 0.2) + (has_code * 0.2) + (length_score * 0.1)
return {
"has_readme": True,
"readme_length": word_count,
"score": score,
"sections": sections,
"has_images": has_images,
"image_count": image_count,
"has_code_examples": has_code,
"code_block_count": code_block_count // 2, # Each block has opening and closing ```
}
def _calculate_activity_level(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate repository activity level based on commits, PRs, and issues."""
activity_score = 0
activity_details = {}
# Get repository age in months
if "repo_details" in repo_data:
age_days = self._calculate_age_days(repo_data["repo_details"].get("created_at"))
age_months = age_days / 30.5 # Approximate
if age_months < 1:
age_months = 1 # Avoid division by zero
activity_details["age_months"] = age_months
else:
age_months = 1
# Check recent commits (last 3 months)
recent_commits = 0
if "commits" in repo_data:
commits = repo_data["commits"]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for commit in commits:
if commit.get("date"):
commit_date = datetime.datetime.fromisoformat(commit["date"].replace('Z', '+00:00'))
if commit_date >= three_months_ago:
recent_commits += 1
activity_details["recent_commits"] = recent_commits
activity_score += min(10, recent_commits / 10) # Up to 10 points for recent commits
# Check recent PRs and issues (last 3 months)
recent_prs = 0
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for pr in prs:
if pr.get("created_at"):
pr_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00'))
if pr_date >= three_months_ago:
recent_prs += 1
activity_details["recent_prs"] = recent_prs
activity_score += min(5, recent_prs / 5) # Up to 5 points for recent PRs
recent_issues = 0
if "issues" in repo_data:
issues = [issue for issue in repo_data["issues"] if not issue.get("pull_request")]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for issue in issues:
if issue.get("created_at"):
issue_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00'))
if issue_date >= three_months_ago:
recent_issues += 1
activity_details["recent_issues"] = recent_issues
activity_score += min(5, recent_issues / 5) # Up to 5 points for recent issues
# Check release frequency
if "releases" in repo_data:
releases = repo_data["releases"]
release_count = len(releases)
# Calculate releases per month
releases_per_month = release_count / max(1, age_months)
activity_details["releases_per_month"] = releases_per_month
activity_score += min(5, releases_per_month * 2.5) # Up to 5 points for regular releases
# Determine activity level
activity_level = "None"
if activity_score >= 20:
activity_level = "Very High"
elif activity_score >= 15:
activity_level = "High"
elif activity_score >= 10:
activity_level = "Medium"
elif activity_score >= 5:
activity_level = "Low"
elif activity_score > 0:
activity_level = "Very Low"
return {
"score": activity_score,
"level": activity_level,
"details": activity_details,
}
def _analyze_code_complexity(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Estimate code complexity based on available metrics."""
complexity = {}
# Analyze file distribution
if "file_distribution" in repo_data:
file_types = repo_data["file_distribution"]
total_files = sum(file_types.values())
code_files = sum(
count for ext, count in file_types.items()
if ext in self.config.all_code_extensions()
)
complexity["file_counts"] = {
"total_files": total_files,
"code_files": code_files,
}
# Analyze PR complexity
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
# Get average changes per PR
additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None]
deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None]
changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None]
if additions and deletions and changed_files:
avg_additions = sum(additions) / len(additions)
avg_deletions = sum(deletions) / len(deletions)
avg_changed_files = sum(changed_files) / len(changed_files)
complexity["pr_complexity"] = {
"avg_additions": avg_additions,
"avg_deletions": avg_deletions,
"avg_changed_files": avg_changed_files,
}
# Estimate complexity score
pr_complexity_score = min(10, (avg_additions + avg_deletions) / 100)
complexity["pr_complexity_score"] = pr_complexity_score
# Check dependency complexity
dependency_complexity_score = 0
if "commit_insights" in repo_data.get("insights", {}):
commit_messages = [
commit.get("commit_message", "").lower()
for commit in repo_data.get("commits", [])
]
# Check for dependency-related keywords
dependency_keywords = ["dependency", "dependencies", "upgrade", "update", "version", "package"]
dependency_commits = sum(
1 for message in commit_messages
if any(keyword in message for keyword in dependency_keywords)
)
dependency_ratio = dependency_commits / len(commit_messages) if commit_messages else 0
dependency_complexity_score = min(5, dependency_ratio * 20) # Up to 5 points
complexity["dependency_complexity"] = {
"dependency_commits": dependency_commits,
"dependency_ratio": dependency_ratio,
"score": dependency_complexity_score,
}
# Overall complexity score
overall_score = 0
contributors = len(repo_data.get("contributors", []))
if contributors > 0:
contributor_score = min(5, contributors / 10) # Up to 5 points
overall_score += contributor_score
if "pr_complexity_score" in complexity:
overall_score += complexity["pr_complexity_score"]
overall_score += dependency_complexity_score
# Code size complexity
if "languages" in repo_data:
languages = repo_data["languages"]
total_bytes = sum(languages.values()) if languages else 0
# Size points based on code size in MB
size_mb = total_bytes / (1024 * 1024)
size_score = min(10, size_mb / 5) # Up to 10 points for large codebases
overall_score += size_score
complexity["code_size"] = {
"total_bytes": total_bytes,
"size_mb": size_mb,
"score": size_score,
}
# Determine complexity level
complexity_level = "Low"
if overall_score >= 25:
complexity_level = "Very High"
elif overall_score >= 20:
complexity_level = "High"
elif overall_score >= 15:
complexity_level = "Medium-High"
elif overall_score >= 10:
complexity_level = "Medium"
elif overall_score >= 5:
complexity_level = "Low-Medium"
complexity["overall"] = {
"score": overall_score,
"level": complexity_level,
}
return complexity
def _analyze_community_health(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze the community health of the repository."""
health = {}
# Calculate issue responsiveness
if "issues" in repo_data:
issues = repo_data["issues"]
closed_issues = [issue for issue in issues if issue.get("state") == "closed"]
if issues:
closure_rate = len(closed_issues) / len(issues)
health["issue_closure_rate"] = closure_rate
# Calculate average time to close
resolution_times = []
for issue in closed_issues:
if issue.get("created_at") and issue.get("closed_at"):
created = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00'))
closed = datetime.datetime.fromisoformat(issue["closed_at"].replace('Z', '+00:00'))
resolution_time = (closed - created).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
if resolution_times:
avg_resolution_time = sum(resolution_times) / len(resolution_times)
health["avg_issue_resolution_time_hours"] = avg_resolution_time
# Calculate PR review responsiveness
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
merged_prs = [pr for pr in prs if pr.get("merged")]
if prs:
merge_rate = len(merged_prs) / len(prs)
health["pr_merge_rate"] = merge_rate
# Calculate average time to merge
merge_times = []
for pr in merged_prs:
if pr.get("created_at") and pr.get("merged_at"):
created = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00'))
merged = datetime.datetime.fromisoformat(pr["merged_at"].replace('Z', '+00:00'))
merge_time = (merged - created).total_seconds() / 3600 # hours
merge_times.append(merge_time)
if merge_times:
avg_merge_time = sum(merge_times) / len(merge_times)
health["avg_pr_merge_time_hours"] = avg_merge_time
# Check for community guidelines
community_files = [
"CONTRIBUTING.md",
"CODE_OF_CONDUCT.md",
"SECURITY.md",
"SUPPORT.md",
"GOVERNANCE.md",
]
community_file_presence = {}
if "file_distribution" in repo_data:
file_paths = []
for item in repo_data.get("file_distribution", {}):
file_paths.append(item)
for community_file in community_files:
present = any(community_file.lower() in path.lower() for path in file_paths)
community_file_presence[community_file] = present
health["community_guidelines"] = community_file_presence
# Calculate contributor diversity
if "contributors" in repo_data:
contributors = repo_data["contributors"]
if contributors:
# Calculate Gini coefficient for contribution distribution
gini = self._calculate_gini([c.get("contributions", 0) for c in contributors])
health["contributor_gini"] = gini
# Interpret Gini coefficient
if gini < 0.4:
diversity_level = "High"
elif gini < 0.6:
diversity_level = "Medium"
else:
diversity_level = "Low"
health["contributor_diversity"] = diversity_level
# Calculate overall health score
health_score = 0
# Points for issue responsiveness
if "issue_closure_rate" in health:
health_score += health["issue_closure_rate"] * 10 # Up to 10 points
# Points for PR responsiveness
if "pr_merge_rate" in health:
health_score += health["pr_merge_rate"] * 10 # Up to 10 points
# Points for community guidelines
guideline_count = sum(1 for present in community_file_presence.values() if present)
health_score += guideline_count * 2 # Up to 10 points
# Points for contributor diversity
if "contributor_gini" in health:
diversity_score = 10 * (1 - health["contributor_gini"]) # Up to 10 points
health_score += diversity_score
# Determine health level
health_level = "Poor"
if health_score >= 30:
health_level = "Excellent"
elif health_score >= 25:
health_level = "Very Good"
elif health_score >= 20:
health_level = "Good"
elif health_score >= 15:
health_level = "Fair"
elif health_score >= 10:
health_level = "Needs Improvement"
health["overall"] = {
"score": health_score,
"level": health_level,
}
return health
def generate_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""
Generate visualizations of repository data.
Returns:
Dict of visualization figures
"""
if not self.config.generate_visualizations:
return {}
figures = {}
# Create visualizations
lang_fig = self._visualize_language_distribution(repo_data)
if lang_fig:
figures["language_distribution"] = lang_fig
commit_figs = self._visualize_commit_activity(repo_data, insights)
figures.update(commit_figs)
contrib_figs = self._visualize_contributor_activity(repo_data, insights)
figures.update(contrib_figs)
issue_figs = self._visualize_issues_and_prs(repo_data, insights)
figures.update(issue_figs)
# Add interactive visualizations with Plotly
plotly_figs = self._generate_plotly_visualizations(repo_data, insights)
figures.update(plotly_figs)
# Generate collaboration network
collab_fig = self._visualize_collaboration_network(repo_data, insights)
if collab_fig:
figures["collaboration_network"] = collab_fig
return figures
def _visualize_language_distribution(self, repo_data: Dict[str, Any]) -> Optional[plt.Figure]:
"""Create a visualization of language distribution."""
languages = repo_data.get("languages", {})
if not languages:
return None
# Create a pie chart of language distribution
fig, ax = plt.subplots(figsize=(10, 6))
total = sum(languages.values())
# Filter out small languages for better visualization
threshold = total * 0.01 # 1% threshold
other_sum = sum(size for lang, size in languages.items() if size < threshold)
filtered_languages = {lang: size for lang, size in languages.items() if size >= threshold}
if other_sum > 0:
filtered_languages["Other"] = other_sum
sizes = list(filtered_languages.values())
labels = list(filtered_languages.keys())
wedges, texts, autotexts = ax.pie(
sizes,
labels=labels,
autopct='%1.1f%%',
startangle=90,
shadow=False,
textprops={'fontsize': 9}, # Smaller font for better fit
wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge
)
# Make the percentage labels more readable
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
ax.axis('equal')
plt.title(f"Language Distribution", fontsize=16)
plt.tight_layout()
return fig
def _visualize_commit_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of commit activity."""
figures = {}
commit_activity = repo_data.get("commit_activity", {})
weekly_commits = commit_activity.get("weekly_commits", [])
if weekly_commits:
# Extract weeks and commit counts
weeks = [item["week"] for item in weekly_commits]
commits = [item["total"] for item in weekly_commits]
# Create a time series plot
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(weeks, commits, marker='o', linestyle='-', color='blue', alpha=0.7)
# Add trend line
z = np.polyfit(range(len(weeks)), commits, 1)
p = np.poly1d(z)
ax.plot(weeks, p(range(len(weeks))), "r--", alpha=0.7)
ax.set_title("Weekly Commit Activity", fontsize=16)
ax.set_xlabel("Week")
ax.set_ylabel("Number of Commits")
plt.xticks(rotation=45)
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(weeks) > 20:
every_nth = len(weeks) // 10
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["weekly_commits"] = fig
# Visualize code frequency if available
code_frequency = commit_activity.get("code_frequency", [])
if code_frequency:
weeks = [item["week"] for item in code_frequency]
additions = [item["additions"] for item in code_frequency]
deletions = [item["deletions"] for item in code_frequency]
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(weeks, additions, marker='o', linestyle='-', color='green', label='Additions')
ax.plot(weeks, deletions, marker='o', linestyle='-', color='red', label='Deletions')
ax.set_title("Code Frequency", fontsize=16)
ax.set_xlabel("Week")
ax.set_ylabel("Lines Changed")
plt.xticks(rotation=45)
ax.legend()
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(weeks) > 20:
every_nth = len(weeks) // 10
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["code_frequency"] = fig
# Commits by weekday
if "commit_insights" in insights:
commit_insights = insights["commit_insights"]
by_weekday = commit_insights.get("commit_time_patterns", {}).get("by_weekday", {})
if by_weekday:
fig, ax = plt.subplots(figsize=(10, 6))
weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
counts = [by_weekday.get(day, 0) for day in weekdays]
# Create gradient colors based on commit counts
colors = plt.cm.Blues(np.array(counts) / max(counts))
ax.bar(weekdays, counts, color=colors)
ax.set_title("Commits by Day of Week", fontsize=16)
ax.set_xlabel("Day of Week")
ax.set_ylabel("Number of Commits")
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["commits_by_weekday"] = fig
# Commits by hour
by_hour = commit_insights.get("commit_time_patterns", {}).get("by_hour", {})
if by_hour:
fig, ax = plt.subplots(figsize=(12, 6))
hours = sorted(by_hour.keys())
counts = [by_hour[hour] for hour in hours]
# Create gradient colors based on commit counts
colors = plt.cm.Greens(np.array(counts) / max(counts))
ax.bar(hours, counts, color=colors)
ax.set_title("Commits by Hour of Day (UTC)", fontsize=16)
ax.set_xlabel("Hour")
ax.set_ylabel("Number of Commits")
ax.set_xticks(range(0, 24, 2))
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["commits_by_hour"] = fig
return figures
def _visualize_contributor_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of contributor activity."""
figures = {}
contributors = repo_data.get("contributors", [])
if contributors:
# Create a bar chart of top contributors
contributors_sorted = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)
top_n = min(10, len(contributors_sorted))
fig, ax = plt.subplots(figsize=(12, 6))
names = [c.get("login", "Unknown") for c in contributors_sorted[:top_n]]
contributions = [c.get("contributions", 0) for c in contributors_sorted[:top_n]]
# Create gradient colors based on contribution counts
colors = plt.cm.viridis(np.array(contributions) / max(contributions))
bars = ax.bar(names, contributions, color=colors)
ax.set_title("Top Contributors by Commit Count", fontsize=16)
ax.set_xlabel("Contributor")
ax.set_ylabel("Number of Commits")
plt.xticks(rotation=45, ha='right')
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
ax.annotate(f'{height}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom')
plt.tight_layout()
figures["top_contributors"] = fig
# Visualize contribution distribution if insights available
if "contributor_insights" in insights:
contributor_insights = insights["contributor_insights"]
distribution = contributor_insights.get("contribution_distribution", {})
if distribution:
# Create a pie chart showing contributor concentration
fig, ax = plt.subplots(figsize=(10, 6))
percentiles = [
distribution.get("contributors_for_20_percent", 0),
distribution.get("contributors_for_50_percent", 0) - distribution.get("contributors_for_20_percent", 0),
distribution.get("contributors_for_80_percent", 0) - distribution.get("contributors_for_50_percent", 0),
len(contributors) - distribution.get("contributors_for_80_percent", 0)
]
labels = [
f"Top {percentiles[0]} contributors (0-20%)",
f"Next {percentiles[1]} contributors (20-50%)",
f"Next {percentiles[2]} contributors (50-80%)",
f"Remaining {percentiles[3]} contributors (80-100%)"
]
wedges, texts, autotexts = ax.pie(
[20, 30, 30, 20], # Fixed percentages for visualization
labels=labels,
autopct='%1.1f%%',
startangle=90,
shadow=False,
explode=(0.1, 0, 0, 0), # Emphasize the top contributors
wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge
)
# Make the percentage labels more readable
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
ax.axis('equal')
ax.set_title("Contribution Distribution", fontsize=16)
plt.tight_layout()
figures["contribution_distribution"] = fig
return figures
def _visualize_issues_and_prs(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of issues and pull requests."""
figures = {}
# Visualize issue distribution if available
if "issue_insights" in insights:
issue_insights = insights["issue_insights"]
# Issues by state
by_state = issue_insights.get("by_state", {})
if by_state:
fig, ax = plt.subplots(figsize=(8, 6))
states = list(by_state.keys())
counts = list(by_state.values())
colors = ['red' if state.lower() == 'open' else 'green' for state in states]
ax.bar(states, counts, color=colors)
ax.set_title("Issues by State", fontsize=16)
ax.set_xlabel("State")
ax.set_ylabel("Count")
# Add count labels on top of bars
for i, v in enumerate(counts):
ax.text(i, v + 0.5, str(v), ha='center')
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["issues_by_state"] = fig
# Issues by month
by_month = issue_insights.get("by_month", {})
if by_month:
fig, ax = plt.subplots(figsize=(12, 6))
months = sorted(by_month.keys())
counts = [by_month[month] for month in months]
ax.plot(months, counts, marker='o', linestyle='-', color='blue')
# Add trend line
z = np.polyfit(range(len(months)), counts, 1)
p = np.poly1d(z)
ax.plot(months, p(range(len(months))), "r--", alpha=0.7)
ax.set_title("Issues Created by Month", fontsize=16)
ax.set_xlabel("Month")
ax.set_ylabel("Number of Issues")
plt.xticks(rotation=45)
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(months) > 12:
every_nth = max(1, len(months) // 12)
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["issues_by_month"] = fig
# Issues by label
by_label = issue_insights.get("by_label", {})
if by_label and len(by_label) > 1:
fig, ax = plt.subplots(figsize=(12, 6))
labels = list(by_label.keys())
counts = list(by_label.values())
# Sort by count
sorted_indices = np.argsort(counts)[::-1]
labels = [labels[i] for i in sorted_indices]
counts = [counts[i] for i in sorted_indices]
# Limit to top 10
if len(labels) > 10:
labels = labels[:10]
counts = counts[:10]
# Create gradient colors
colors = plt.cm.tab10(np.linspace(0, 1, len(labels)))
bars = ax.barh(labels, counts, color=colors)
ax.set_title("Top Issue Labels", fontsize=16)
ax.set_xlabel("Count")
ax.set_ylabel("Label")
# Add count labels
for bar in bars:
width = bar.get_width()
ax.annotate(f'{int(width)}',
xy=(width, bar.get_y() + bar.get_height() / 2),
xytext=(3, 0), # 3 points horizontal offset
textcoords="offset points",
ha='left', va='center')
ax.grid(True, axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["issues_by_label"] = fig
# Visualize PR insights if available
if "pr_insights" in insights and "pr_code_change_stats" in insights:
pr_code_stats = insights["pr_code_change_stats"]
# Additions and deletions by PR
if "additions" in pr_code_stats and "deletions" in pr_code_stats:
fig, ax = plt.subplots(figsize=(10, 6))
categories = ["Mean", "Median", "Max"]
additions = [
pr_code_stats["additions"].get("mean", 0),
pr_code_stats["additions"].get("median", 0),
pr_code_stats["additions"].get("max", 0) / 10 # Scale down for visibility
]
deletions = [
pr_code_stats["deletions"].get("mean", 0),
pr_code_stats["deletions"].get("median", 0),
pr_code_stats["deletions"].get("max", 0) / 10 # Scale down for visibility
]
x = range(len(categories))
width = 0.35
addition_bars = ax.bar([i - width/2 for i in x], additions, width, label='Additions', color='green')
deletion_bars = ax.bar([i + width/2 for i in x], deletions, width, label='Deletions', color='red')
ax.set_xlabel('Metric')
ax.set_ylabel('Lines of Code')
ax.set_title('PR Code Change Statistics')
plt.xticks(x, categories)
ax.legend()
# Add value labels
for bars in [addition_bars, deletion_bars]:
for bar in bars:
height = bar.get_height()
ax.annotate(f'{int(height)}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom')
if "max" in pr_code_stats["additions"]:
plt.annotate(f"Max: {int(pr_code_stats['additions']['max'])}",
(2 - width/2, additions[2] + 5),
textcoords="offset points",
xytext=(0,10),
ha='center')
if "max" in pr_code_stats["deletions"]:
plt.annotate(f"Max: {int(pr_code_stats['deletions']['max'])}",
(2 + width/2, deletions[2] + 5),
textcoords="offset points",
xytext=(0,10),
ha='center')
plt.tight_layout()
figures["pr_code_changes"] = fig
return figures
def _generate_plotly_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, Any]:
"""Generate interactive Plotly visualizations."""
plotly_figures = {}
# Activity heatmap (commits by day and hour)
if "commits" in repo_data:
commits = repo_data["commits"]
dates = []
for commit in commits:
date_str = commit.get("date")
if date_str:
try:
date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00'))
dates.append(date)
except ValueError:
pass
if dates:
# Group by day of week and hour
day_hour_counts = defaultdict(int)
for date in dates:
day_hour_counts[(date.weekday(), date.hour)] += 1
# Create 2D array for heatmap
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
hours = list(range(24))
z = np.zeros((7, 24))
for (day, hour), count in day_hour_counts.items():
z[day][hour] = count
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=z,
x=hours,
y=days,
colorscale='Viridis',
hoverongaps=False,
hovertemplate='Day: %{y}
Hour: %{x}
Commits: %{z}
Bytes: %{value}
Percentage: %{percentRoot:.2%}
Description: {description}