nihalaninihal's picture
Update app.py
c543ba0 verified
import gradio as gr
import os
import json
import time
import subprocess
import tempfile
import shutil
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional, Iterator
import traceback
from dotenv import load_dotenv
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
import re
from collections import Counter, defaultdict
import statistics
from datetime import datetime
from threading import Lock
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
import google.generativeai as genai
import requests
#####################################################################
# Constants and Shared Variables
#####################################################################
RELEVANT_EXTENSIONS = {
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", ".hpp", ".rb",
".php", ".go", ".rs", ".swift", ".kt", ".kts", ".scala", ".pl", ".pm", ".r",
".sh", ".bat", ".ps1", ".lua", ".sql", ".html", ".css", ".xml", ".json", ".yaml",
".yml", ".md", ".ipynb", ".m", ".mm", ".vb", ".cs", ".fs", ".fsx", ".erl", ".hrl",
".ex", ".exs", ".dart", ".groovy", ".jl", ".clj", ".cljs", ".coffee", ".litcoffee",
".rkt", ".hs", ".lhs", ".ml", ".mli", ".nim", ".cr", ".nimble", ".hx", ".sol", ".vy"
}
LANGUAGE_EXTENSIONS = {
".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", ".jsx": "React",
".tsx": "React TypeScript", ".java": "Java", ".cpp": "C++", ".c": "C",
".h": "C/C++ Header", ".hpp": "C++ Header", ".rb": "Ruby", ".php": "PHP",
".go": "Go", ".rs": "Rust", ".swift": "Swift", ".kt": "Kotlin",
".kts": "Kotlin Script", ".scala": "Scala", ".pl": "Perl", ".pm": "Perl Module",
".r": "R", ".sh": "Shell", ".bat": "Batch", ".ps1": "PowerShell", ".lua": "Lua",
".sql": "SQL", ".html": "HTML", ".css": "CSS", ".xml": "XML", ".json": "JSON",
".yaml": "YAML", ".yml": "YAML", ".md": "Markdown", ".ipynb": "Jupyter Notebook",
".m": "MATLAB/Objective-C", ".mm": "Objective-C++", ".vb": "Visual Basic",
".cs": "C#", ".fs": "F#", ".fsx": "F# Script", ".erl": "Erlang",
".hrl": "Erlang Header", ".ex": "Elixir", ".exs": "Elixir Script", ".dart": "Dart",
".groovy": "Groovy", ".jl": "Julia", ".clj": "Clojure", ".cljs": "ClojureScript",
".coffee": "CoffeeScript", ".litcoffee": "Literate CoffeeScript", ".rkt": "Racket",
".hs": "Haskell", ".lhs": "Literate Haskell", ".ml": "OCaml", ".mli": "OCaml Interface",
".nim": "Nim", ".cr": "Crystal", ".nimble": "Nimble", ".hx": "Haxe",
".sol": "Solidity", ".vy": "Vyper"
}
PACKAGE_FILES = {
"package.json": "npm", "requirements.txt": "pip", "setup.py": "python",
"pom.xml": "maven", "build.gradle": "gradle", "Gemfile": "bundler",
"Cargo.toml": "cargo", "go.mod": "go", "go.sum": "go", "composer.json": "composer",
"pubspec.yaml": "dart", "Project.toml": "julia", "mix.exs": "elixir",
"Makefile": "make", "CMakeLists.txt": "cmake", "SConstruct": "scons",
"build.xml": "ant", "Rakefile": "rake", "shard.yml": "crystal",
"nim.cfg": "nim", "default.nix": "nix", "stack.yaml": "haskell",
"rebar.config": "erlang", "rebar.lock": "erlang", "project.clj": "leiningen",
"deps.edn": "clojure", "build.boot": "boot", "build.sbt": "sbt",
"Brewfile": "homebrew", "Vagrantfile": "vagrant", "Dockerfile": "docker",
"docker-compose.yml": "docker-compose", "Procfile": "heroku",
"tox.ini": "tox", "pyproject.toml": "poetry", "Pipfile": "pipenv",
"Pipfile.lock": "pipenv", "environment.yml": "conda", "meta.yaml": "conda"
}
SYSTEM_PROMPT = "You are an experienced software engineer and data analyst tasked with building a report on developer's coding style, technical background, approach to problem solving, architectural thinking, technology choices, re-used frameworks etc,. There will be a set of prompts, divided into CODE STYLE ANALYSIS, TEMPORAL ANALYSIS, PROJECT PREFERENCES ANALYSIS and IDENTITY CONFIDENCE CALCULATION together with data samples provided to you. You'll summarize your findings from all of the modules in a single comprehensive IDENTITY CALCULATION CONFIDENCE output. Output a valid JSON, avoid including to many strings into the list objects! Follow the instructions provided for this section:"
#####################################################################
# Prompt Analyzer Module
#####################################################################
def _should_retry_error(exception: Exception) -> bool:
"""Check if the exception is one we should retry"""
error_str = str(exception).lower()
return any(
msg in error_str
for msg in [
"resource exhaust",
"429",
"too many requests",
"quota exceeded",
"rate limit",
]
)
class RateLimiter:
"""Token bucket rate limiter implementation"""
def __init__(self, rate: int, per: int):
self.rate = rate # Number of requests allowed per time period
self.per = per # Time period in seconds
self.tokens = rate # Current token count
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""Add tokens based on time elapsed"""
now = time.time()
time_passed = now - self.last_update
new_tokens = time_passed * (self.rate / self.per)
if new_tokens > 0:
self.tokens = min(self.rate, self.tokens + new_tokens)
self.last_update = now
def acquire(self) -> float:
"""
Try to acquire a token. Returns the time to wait if no token is available.
"""
with self.lock:
self._add_tokens()
if self.tokens >= 1:
self.tokens -= 1
return 0.0
# Calculate wait time needed for next token
wait_time = (1 - self.tokens) * (self.per / self.rate)
return wait_time
class PromptAnalyzer:
"""Handles LLM prompting for code analysis tasks"""
def __init__(self, api_key: Optional[str] = None):
"""Initialize Gemini handler with API key"""
self.api_key = api_key or os.getenv("GEMINI_API_KEY")
if not self.api_key:
raise ValueError(
"Gemini API key must be provided or set in GEMINI_API_KEY environment variable"
)
genai.configure(api_key=self.api_key)
self.model = genai.GenerativeModel(model_name="gemini-1.5-flash-001", system_instruction=SYSTEM_PROMPT)
self.token_count = 0
self.prompt_count = 0
self.rate_limiter = RateLimiter(rate=5, per=60)
def count_tokens(self, text: str) -> int:
"""Count tokens in a text string"""
try:
token_count = self.model.count_tokens(text)
return token_count.total_tokens
except Exception as e:
print(f"Warning: Error counting tokens: {str(e)}")
# Fallback to approximate count if token counting fails
return len(text) // 4 # Rough approximation
def _clean_json_response(self, response_text: str) -> str:
"""Clean up response text to extract JSON content"""
if "```" in response_text:
match = re.search(r"```(?:json)?\n(.*?)```", response_text, re.DOTALL)
if match:
return match.group(1).strip()
return response_text.strip()
@retry(
retry=retry_if_exception(_should_retry_error),
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=4, max=60),
before_sleep=lambda retry_state: print(
f"Retrying due to rate limit/resource exhaustion... (attempt {retry_state.attempt_number})"
),
)
def _rate_limited_generate(self, prompt: str) -> Any:
"""Handle rate-limited generation with waiting and resource exhaustion"""
while True:
wait_time = self.rate_limiter.acquire()
if wait_time == 0:
try:
# Direct call to generate_content instead of using chat
return self.model.generate_content(prompt)
except Exception as e:
if _should_retry_error(e):
print(
f"Rate limit/resource exhaustion error, will retry: {str(e)}"
)
raise # Let the retry decorator handle it
else:
print(f"Non-retryable error occurred: {str(e)}")
raise
print(f"Rate limit reached. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
@retry(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)
)
def generate_json_response(self, prompt: str) -> Dict[str, Any]:
"""Generate and parse JSON response with robust error handling"""
try:
self.prompt_count += 1
print(f"\nπŸ“ Processing prompt #{self.prompt_count}...")
# Count input tokens
token_count = self.model.count_tokens(prompt)
input_tokens = token_count.total_tokens
print(f"πŸ“Š Sending prompt with {input_tokens:,} tokens...")
# Track retries for JSON parsing
max_json_retries = 3
last_response = None
last_error = None
for attempt in range(max_json_retries):
try:
# Generate with rate limiting
start_time = time.time()
# Here's the actual model call
response = self._rate_limited_generate(prompt)
elapsed_time = time.time() - start_time
# Track token usage
output_token_count = response.usage_metadata.total_token_count
prompt_total_tokens = input_tokens + output_token_count
self.token_count += prompt_total_tokens
print(f"βœ“ Response received in {elapsed_time:.2f} seconds")
print(f"πŸ“Š Prompt #{self.prompt_count} token usage:")
print(f" - Input tokens: {input_tokens:,}")
print(f" - Output tokens: {output_token_count:,}")
print(f" - Total tokens: {prompt_total_tokens:,}")
print(f"πŸ“ˆ Cumulative token usage: {self.token_count:,}")
# Try to parse JSON with advanced error recovery
last_response = response.text
result = self._clean_json_response(last_response)
return json.loads(result)
except json.JSONDecodeError as e:
last_error = e
if attempt < max_json_retries - 1:
print(f"⚠️ Attempt {attempt + 1}/{max_json_retries}: JSON parsing failed, retrying with feedback...")
# Add feedback about the JSON parsing failure and retry
error_feedback = f"""Your previous response could not be parsed as valid JSON. The specific error was: {str(e)}
IMPORTANT: You must provide a response that:
1. Contains ONLY valid JSON
2. Has NO markdown code blocks
3. Has NO explanatory text
4. Follows the exact schema requested
5. Uses proper JSON syntax (quotes, commas, brackets)
6. AVOID falling into recursive loops when retrieving data from the prompt
Here is the original prompt again:
"""
# Combine feedback with original prompt
prompt = error_feedback + prompt
continue
else:
print(f"❌ Failed to parse JSON after {max_json_retries} attempts")
print("Last response received:")
print(last_response)
print(f"Last error: {str(last_error)}")
raise
except Exception as e:
print(f"❌ Error in generate_json_response: {str(e)}")
print("Stack trace:")
print(traceback.format_exc())
if "last_response" in locals():
print("\nLast response received:")
print(last_response)
raise
def create_handler(api_key: Optional[str] = None) -> PromptAnalyzer:
"""
Factory function to create a PromptAnalyzer instance.
"""
return PromptAnalyzer(api_key)
#####################################################################
# Repository Structure Analysis Module
#####################################################################
def analyze_repository_structure(repo_names: List[str], user_path: Path) -> Dict[str, Any]:
"""Processes source code from repositories to build LLM-friendly structure"""
result = {}
for repo_name in repo_names:
username = user_path.name
repo_path = (
user_path / f"{username}_{repo_name}.git"
)
print("processing,", repo_name, "path:", repo_path)
if not repo_path.exists():
print("skipping")
continue
# Get the structure first
structure = _build_tree_structure(repo_path)
# Count language occurrences from the structure
language_counts = {}
for file_info in _get_source_files(structure):
extension = file_info["extension"].lower()
if extension in LANGUAGE_EXTENSIONS:
language = LANGUAGE_EXTENSIONS[extension]
language_counts[language] = language_counts.get(language, 0) + 1
# Sort languages by frequency, most common first
languages = sorted(
language_counts.items(),
key=lambda x: (-x[1], x[0]) # Sort by count descending, then name ascending
)
# Create the language string
languages_str = ", ".join(lang for lang, _ in languages)
result[repo_name] = {
"structure": structure,
"file_stats": _analyze_file_statistics(repo_path),
"documentation": _extract_documentation(repo_path),
"languages": languages_str
}
_extract_code_samples(result, user_path)
return result
def _build_tree_structure(repo_path: Path, files_per_dir: int = 20, max_depth: int = 3) -> Dict[str, Any]:
"""
Builds a tree representation of repository structure with limits.
Args:
repo_path: Repository path
files_per_dir: Maximum number of files to include per directory (default: 20)
max_depth: Maximum depth for nested directories (default: 3)
"""
def create_tree(path: Path, current_depth: int = 0) -> Dict[str, Any]:
tree = {
"type": "directory",
"name": path.name,
"path": str(path.relative_to(repo_path)),
"children": [],
}
# Stop traversing if we hit max depth
if current_depth >= max_depth:
tree["children"] = [{
"type": "note",
"message": f"Directory depth limit ({max_depth}) reached"
}]
return tree
try:
items = list(path.iterdir())
# Skip git directory and common build artifacts
if path.name in {
".git",
"node_modules",
"__pycache__",
"build",
"dist",
}:
return tree
# Process files with limit
files = [
item for item in items
if item.is_file() and item.suffix.lower() in RELEVANT_EXTENSIONS
]
if files:
files = files[:files_per_dir] # Limit number of files
for item in files:
tree["children"].append({
"type": "file",
"name": item.name,
"path": str(item.relative_to(repo_path)),
"extension": item.suffix.lower(),
"size": item.stat().st_size,
})
# Process directories
dirs = [item for item in items if item.is_dir()]
for item in dirs:
subtree = create_tree(item, current_depth + 1)
if subtree["children"]: # Only add non-empty directories
tree["children"].append(subtree)
except PermissionError:
pass
return tree
return create_tree(repo_path)
def _analyze_file_statistics(repo_path: Path) -> Dict[str, Any]:
"""Analyzes file statistics for the repository"""
file_count = 0
total_loc = 0
for ext in LANGUAGE_EXTENSIONS:
for file_path in repo_path.rglob(f"*{ext}"):
if not any(p in str(file_path) for p in RELEVANT_EXTENSIONS):
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
loc = len([l for l in content.splitlines() if l.strip()])
total_loc += loc
file_count += 1
except (UnicodeDecodeError, PermissionError):
continue
return {
"file_count": file_count,
"total_loc": total_loc,
}
def _extract_documentation(repo_path: Path) -> Dict[str, Any]:
"""Extracts documentation and metadata from repository"""
docs = {}
# Look for README
readme_paths = list(repo_path.glob("README*"))
if readme_paths:
try:
with open(readme_paths[0], "r", encoding="utf-8") as f:
docs["readme"] = f.read()
except (UnicodeDecodeError, PermissionError):
docs["readme"] = None
docs["package_info"] = {}
for filename, pkg_type in PACKAGE_FILES.items():
pkg_path = repo_path / filename
if pkg_path.exists():
try:
with open(pkg_path, "r", encoding="utf-8") as f:
docs["package_info"][pkg_type] = f.read()
except (UnicodeDecodeError, PermissionError):
continue
return docs
def _extract_code_samples(sources_data: Dict[str, Any], user_path: Path, max_file_size: int = 100000) -> Dict[str, Any]:
"""
Extracts code samples for files identified as relevant by Gemini.
Filters out files larger than max_file_size bytes.
"""
handler = create_handler()
try:
# Preprocess to remove large files from consideration
filtered_structures = {}
for repo_name, repo_data in sources_data.items():
structure_copy = repo_data["structure"].copy()
# Filter function to remove large files
def filter_large_files(node):
if node.get("type") == "directory":
node["children"] = [
child for child in node.get("children", [])
if child.get("type") == "directory"
or (child.get("type") == "file" and child.get("size", 0) <= max_file_size)
]
for child in node["children"]:
if child.get("type") == "directory":
filter_large_files(child)
return node
# Apply filter
filtered_structures[repo_name] = filter_large_files(structure_copy)
# Create a combined prompt for all repositories
prompt = f"""
Analyze the repository structures and identify the most relevant files for codebase analysis.
Focus on files that would reveal:
1. Core functionality and architecture
2. Main business logic
3. Key utilities and helpers
4. Configuration and setup
Results will be used for further code analysis. Remember to include ALL relevant files, especially for fullstack applications. Be thorough but concise. Avoid including non-original code, e.g., dependencies or libraries code. AVOID INCLUDING MORE THAN 50 FILES PER REPOSITORY!!! TRY TO INCLUDE LESS THAN 20 IF POSSIBLE. CORE_FILES ARE THE PRIORITY, YOU CAN OMITT THE REST IF IT EXCEEDS THE LIMIT.
Return a JSON object with these categories:
{{
"repositories": {{ // MANDATORY highest level key
"repo_name": {{ // MANDATORY name of the repository you are analyzing
"core_files": ["list of most important files"], // MAX 20 files!
"secondary_files": ["list of supporting files"], // MAX 20 files!
"config_files": ["list of relevant config files"] // MAX 10 files!
}},
"repo_name": {{...}},
}}
}}
CRITICAL REQUIREMENTS:
Limit each list of most important files to a maximum of 20 files!!!
Avoid including binary files or large data files. Only include files that are essential for understanding the codebase. Avoid including too many files, focus on the most important ones. Avoid including files that user did not write, e.g., dependencies or libraries code. Avoid including utility files that are not essential for understanding the codebase. Focus on including only source code, some repositories may have a lot of files, but only a few are essential for understanding the codebase. Do not include long .json files or other artifact type of files - notice "size" of the file in the structure.
Repository structures:
{json.dumps(filtered_structures, indent=2)}
Only include files that exist in the structure. Return valid JSON format.
DO NOT wrap the JSON in markdown code blocks.
"""
# Get file categories for all repositories
file_categories = handler.generate_json_response(prompt)
if not file_categories:
print("Skipping due to API error")
return sources_data
for repo_name, repo_data in sources_data.items():
repo_data["samples"] = {
"core_files": {},
"utility_files": {},
"config_files": {}
}
# Filter out large files from consideration
all_files = {
file_info["path"]: file_info
for file_info in _get_source_files(repo_data["structure"])
if file_info.get("size", 0) <= max_file_size
}
for category in ["core_files", "utility_files", "config_files"]:
for file_path in file_categories["repositories"].get(repo_name, {}).get(category, []):
if file_path not in all_files:
continue
source_code = _read_source_file(user_path, repo_name, file_path)
if source_code:
repo_data["samples"][category][file_path] = source_code
except Exception as e:
print(f"Error processing code samples: {str(e)}")
return sources_data
def _get_source_files(structure: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Helper to recursively extract source files from tree structure"""
files = []
def traverse(node: Dict[str, Any]):
if not isinstance(node, dict):
return
# If it's a file, add it
if node.get("type") == "file":
files.append(node)
# If it's a directory, traverse its children
elif node.get("type") == "directory" and "children" in node:
for child in node.get("children", []):
traverse(child)
# Also check any other dictionaries that might contain nested structures
for value in node.values():
if isinstance(value, dict):
traverse(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
traverse(item)
traverse(structure)
# Sort files by path for consistent ordering
return sorted(files, key=lambda x: x["path"])
def _read_source_file(user_path: Path, repo_name: str, file_path: str) -> Optional[str]:
"""Reads source code from file with proper error handling"""
try:
# Construct the full path to the source file
full_path = user_path / f"{user_path.name}_{repo_name}.git" / file_path
# Check if file exists and is readable
if not full_path.is_file():
return None
# Common binary file extensions to skip
if full_path.suffix.lower() not in RELEVANT_EXTENSIONS:
return None
# Try to read the file with different encodings
encodings = ["utf-8", "latin-1", "cp1252"]
for encoding in encodings:
try:
with open(full_path, "r", encoding=encoding) as f:
content = f.read()
# Basic validation of text content
if "\0" in content: # Binary file check
return None
return content
except UnicodeDecodeError:
continue
except Exception as e:
print(f"Error reading {full_path}: {str(e)}")
return None
return None
except Exception as e:
print(f"Error accessing {file_path}: {str(e)}")
return None
#####################################################################
# Repository Selector Module
#####################################################################
class RepositorySelector:
"""Handles intelligent repository selection and authorship analysis"""
def __init__(self, base_path: str, username: str):
self.base_path = Path(base_path)
self.username = username
self.user_path = self.base_path / username
def select_repositories(self, report_data: Dict) -> List[str]:
"""
Main entry point for repository selection.
Returns a list of repository names to analyze, including both best-scored repos
and single-contributor repos.
"""
# Store report data for use in other methods
self.report_data = report_data
# Get repositories with activity scores
repositories = self._analyze_repositories(report_data)
print(f"Found {len(repositories)} repositories with activity")
# Get best scored repositories
selected_repos = self._select_best_repositories(repositories)
selected_repo_names = {repo["name"] for repo in selected_repos}
# Get single-contributor repositories
single_contributor_repos = self._get_only_owner_sources()
# Combine both sets of repositories without duplicates
all_repo_names = selected_repo_names.union(single_contributor_repos)
print(f"Added {len(all_repo_names) - len(selected_repo_names)} single-contributor repositories")
print(f"Total repositories to analyze: {len(all_repo_names)}")
# Update metadata for all repositories
self.repo_metadata = {}
for repo in selected_repos:
self.repo_metadata[repo["name"]] = {
"contribution_files": repo["contribution_files"],
"stats": repo["stats"]
}
# Add metadata for additional single-contributor repos if they weren't in selected_repos
for repo_name in single_contributor_repos:
if repo_name not in self.repo_metadata:
repo_path = self.user_path / f"{self.username}_{repo_name}.git"
if repo_path.exists():
stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, []))
contribution_files = self._analyze_contribution_files(repo_path)
self.repo_metadata[repo_name] = {
"contribution_files": contribution_files,
"stats": stats or {}
}
return list(all_repo_names)
def _get_only_owner_sources(self) -> List[str]:
"""Gets list of repositories to analyze. Only single-contributor repos are considered"""
return [
obj["repo"]
for obj in self.report_data.get("contributors", [])
if obj["contributors"][0] == self.username and len(obj["contributors"]) == 1
]
def _analyze_repositories(self, report_data: Dict) -> List[Dict[str, Any]]:
"""Analyzes all repositories the user has contributed to"""
repositories = []
# Get repos from contributors data
contributed_repos = [
obj["repo"] for obj in report_data.get("contributors", [])
if self.username in obj["contributors"]
]
# Also get repos from commits data
commit_repos = list(report_data.get("commits", {}).keys())
# Combine and deduplicate
all_repos = list(set(contributed_repos + commit_repos))
print(f"Analyzing {len(all_repos)} repositories...")
for repo_name in all_repos:
repo_path = self.user_path / f"{self.username}_{repo_name}.git"
if not repo_path.exists():
continue
repo_stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, []))
if not repo_stats:
continue
contribution_files = self._analyze_contribution_files(repo_path)
# Include repository if it has either commits or contribution files
if repo_stats["commit_count"] > 0 or contribution_files:
repositories.append({
"name": repo_name,
"stats": repo_stats,
"contribution_files": contribution_files
})
return repositories
def _analyze_contribution_files(self, repo_path: Path) -> List[Dict[str, Any]]:
"""Identifies files with user contributions, with more flexible criteria"""
contribution_files = []
# List all files in repository
for file_path in repo_path.rglob('*'):
relative_path = str(file_path.relative_to(repo_path))
# Skip excluded paths and non-source files
if not self._is_analyzable_file(relative_path):
continue
try:
# Get authorship statistics
author_stats = self._get_file_author_stats(repo_path, relative_path)
# Include files where user has any meaningful contribution (>20%)
if self.username in author_stats and author_stats[self.username] >= 20:
contribution_files.append({
"path": relative_path,
"contribution_percentage": author_stats[self.username]
})
except Exception as e:
print(f"Error analyzing {relative_path}: {str(e)}")
continue
return contribution_files
def _get_repository_stats(self, repo_path: Path, repo_commits: List = None) -> Dict[str, Any]:
"""Analyzes repository activity metrics with both git log and commits data"""
try:
# Get commit timestamps from git log
result = subprocess.run(
'git log --format=%at',
cwd=repo_path,
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
return {}
timestamps = [int(ts) for ts in result.stdout.strip().split('\n') if ts]
# Also consider commits from report data
if repo_commits:
for commit in repo_commits:
commit_date = datetime.fromisoformat(
commit["commit"]["author"]["date"].replace("Z", "+00:00")
)
timestamps.append(int(commit_date.timestamp()))
if not timestamps:
return {}
first_commit = datetime.fromtimestamp(min(timestamps))
last_commit = datetime.fromtimestamp(max(timestamps))
commit_count = len(timestamps)
time_period = (last_commit - first_commit).days + 1
return {
"first_commit": first_commit.isoformat(),
"last_commit": last_commit.isoformat(),
"commit_count": commit_count,
"commits_per_day": commit_count / max(time_period, 1),
"active_days": time_period
}
except Exception as e:
print(f"Error analyzing repository stats: {str(e)}")
return {}
def _get_file_author_stats(self, repo_path: Path, file_path: str) -> Dict[str, float]:
"""Analyzes file authorship percentages"""
try:
result = subprocess.run(
['git', 'blame', '--porcelain', file_path],
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
return {}
author_lines = defaultdict(int)
total_lines = 0
for line in result.stdout.split('\n'):
if line.startswith('author '):
author = line.replace('author ', '', 1)
author_lines[author] += 1
total_lines += 1
if total_lines == 0:
return {}
return {
author: (count / total_lines * 100)
for author, count in author_lines.items()
}
except Exception as e:
print(f"Error getting authorship stats for {file_path}: {str(e)}")
return {}
def _select_best_repositories(self, repositories: List[Dict[str, Any]],
max_repos: int = 15) -> List[Dict[str, Any]]:
"""Selects optimal repositories using more balanced scoring"""
if not repositories:
return []
for repo in repositories:
score = 0
stats = repo["stats"]
# Recency score (max 35 points)
last_commit = datetime.fromisoformat(stats["last_commit"])
days_since_last_commit = (datetime.now() - last_commit).days
score += max(0, 35 - (days_since_last_commit / 30))
# Activity score (max 35 points)
commit_score = min(35, (stats["commit_count"] * 2) + (stats["commits_per_day"] * 10))
score += commit_score
# Contribution score (max 30 points)
# Consider both number and quality of contributions
contribution_files = repo["contribution_files"]
if contribution_files:
file_count = len(contribution_files)
avg_contribution = sum(f["contribution_percentage"] for f in contribution_files) / file_count
score += min(30, (file_count * 2) + (avg_contribution / 5))
else:
# Still give some points for commits if no files detected
score += min(15, stats["commit_count"] / 2)
repo["analysis_score"] = score
# Sort by score and return top repositories
repositories.sort(key=lambda x: x["analysis_score"], reverse=True)
selected = repositories[:max_repos]
print(f"\nSelected {len(selected)} repositories:")
for repo in selected:
print(f"- {repo['name']} (score: {repo['analysis_score']:.2f})")
return selected
def _is_analyzable_file(self, file_path: str) -> bool:
"""Determines if a file should be included in analysis"""
path = Path(file_path)
# Skip excluded directories
excluded_paths = {
'node_modules', '__pycache__', 'build', 'dist', '.git',
'vendor', 'third_party', 'external'
}
if any(part in excluded_paths for part in path.parts):
return False
# Get file extension (lowercase)
ext = path.suffix.lower()
if not ext:
return False
return ext in RELEVANT_EXTENSIONS
#####################################################################
# Code Style Analysis Module
#####################################################################
def analyze_code_style(sources_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyzes developer's coding style patterns for stylometric analysis"""
handler = create_handler()
combined_results = {}
for repo_name, repo_data in sources_data.items():
print(f"\nAnalyzing repository: {repo_name}")
prompt = f"""
CODE STYLE ANALYSIS
You are an expert in code stylometry and developer behavior analysis. Analyze this repository to create a detailed profile of the developer's coding patterns, preferences, and habits.
Repository: {repo_name}
Code samples and structure:
{json.dumps(repo_data, indent=2)}
Focus on identifying unique, individual coding patterns that could distinguish this developer's style. Analyze how they:
- Structure their code and control flow
- Handle data and state
- Approach problem-solving
- Maintain code quality
- Handle edge cases and errors
IMPORTANT CONSTRAINTS:
- Maximum 10 patterns per list category
- No repeating similar patterns
- Use "Unknown" if pattern cannot be determined
- Focus on distinctive, personal coding traits
Generate a JSON profile with this EXACT structure:
{{
"code_organization": {{
"file_structure": {{
"preferred_file_size": number, // Average lines per file
"module_organization": string, // e.g. "feature-based", "layer-based", "domain-based"
"separation_patterns": [string] // Common ways they separate concerns
}},
"code_layout": {{
"indentation": {{ "type": string, "width": number }},
"line_length": {{ "average": number, "max_observed": number }},
"spacing_style": {{
"around_operators": string,
"after_commas": boolean,
"around_blocks": string
}}
}}
}},
"naming_patterns": {{
"variables": {{
"primary_style": string, // e.g. "snake_case", "camelCase"
"consistency_score": number, // 0-100
"length_preference": {{ "average": number, "range": [number, number] }},
"semantic_patterns": [string] // How they choose names, e.g. "verb_noun_pairs", "hungarian_notation"
}},
"functions": {{
"primary_style": string,
"common_prefixes": [string],
"common_patterns": [string],
"length_preference": {{ "average": number, "range": [number, number] }}
}}
}},
"coding_patterns": {{
"control_flow": {{
"preferred_loop_type": string, // e.g. "for", "while", "comprehension"
"nesting_depth": {{ "average": number, "max_observed": number }},
"branching_patterns": [string], // e.g. "early returns", "guard clauses"
"condition_complexity": {{ "average": number, "max_observed": number }}
}},
"data_handling": {{
"preferred_structures": [string], // Favorite data structures
"mutation_patterns": {{
"prefers_immutable": boolean,
"common_patterns": [string]
}},
"state_management": {{
"approach": string, // e.g. "functional", "stateful", "mixed"
"patterns": [string]
}}
}}
}},
"error_handling": {{
"strategy": string, // e.g. "defensive", "fail-fast", "hybrid"
"patterns": [string], // Common error handling patterns
"error_checking": {{
"input_validation": boolean,
"null_checking": boolean,
"type_checking": boolean
}}
}},
"code_quality": {{
"documentation": {{
"style": string, // e.g. "detailed", "minimal", "moderate"
"coverage_ratio": number, // 0-100
"preferred_formats": [string]
}},
"testing": {{
"approach": string, // e.g. "unit-heavy", "integration-focused", "minimal"
"patterns": [string]
}},
"complexity_metrics": {{
"cyclomatic_complexity": {{ "average": number, "max_observed": number }},
"cognitive_complexity": {{ "average": number, "max_observed": number }}
}}
}},
"distinctive_traits": {{
"unique_patterns": [string], // Highly individual coding patterns
"favored_techniques": [string], // Preferred coding approaches
"consistent_habits": [string] // Reliable behavioral patterns
}}
}}
Critical requirements:
1. OUTPUT ONLY VALID JSON
2. NO markdown, NO comments, NO explanations
3. Use EXACT key names shown
4. All arrays MAXIMUM 10 items
5. Use numbers for metrics where specified
6. Use "Unknown" for undeterminable values
"""
try:
result = handler.generate_json_response(prompt)
if result:
combined_results[repo_name] = result
except Exception as e:
print(f"Error analyzing {repo_name}: {str(e)}")
combined_results[repo_name] = {"error": str(e)}
return combined_results
#####################################################################
# Temporal Patterns Analysis Module
#####################################################################
def analyze_temporal_patterns(
sources_data: Dict[str, Any], report_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Analyzes temporal patterns using both LLM and statistical analysis"""
commits = report_data.get("commits", {})
# Setup LLM Prompting
handler = create_handler()
combined_results = {}
# Get commit timestamps for activity analysis
commit_times = [
datetime.fromisoformat(
commit["commit"]["author"]["date"].replace("Z", "+00:00")
)
for repo_commits in commits.values()
for commit in repo_commits
]
# Get best targets and their commit contents
temporal_best_targets = _select_best_targets(sources_data, commits)
commit_contents = _get_commit_contents(temporal_best_targets, sources_data)
# Save commit contents for inspection
inspection_data = {
"temporal_targets": temporal_best_targets,
"commit_contents": commit_contents,
}
inspection_path = Path("out") / "temporal_analysis_contents.json"
try:
with open(inspection_path, "w", encoding="utf-8") as f:
json.dump(inspection_data, f, indent=2)
print(f"Saved temporal analysis data to {inspection_path}")
except Exception as e:
print(f"Error saving inspection data: {str(e)}")
for repo_name, repo_data in sources_data.items():
if repo_name not in temporal_best_targets:
continue
print(f"\nAnalyzing temporal patterns for repository: {repo_name}")
# Get code changes for this repository
repo_changes = commit_contents.get(repo_name, [])
if not repo_changes:
continue
# Analyze code style evolution using LLM with actual code changes
prompt = f"""
TEMPORAL ANALYSIS
Analyze the temporal evolution of this codebase with focus on developer behavior patterns and code evolution.
Repository: {repo_name}
Code Evolution Data:
{json.dumps(repo_changes, indent=2)}
Generate detailed temporal analysis JSON:
{{
"evolution_patterns": {{
"code_quality": {{
"progression": string,
"refactoring_patterns": [
{{
"pattern": string,
"frequency": string,
"motivation": string
}}
],
"complexity_trends": {{
"direction": string,
"significant_changes": [string],
"trigger_patterns": [string]
}}
}},
"development_cycles": {{
"commit_patterns": {{
"frequency": {{
"pattern": string,
"active_hours": [string],
"timezone_confidence": {{
"zone": string,
"confidence": number,
"evidence": [string]
}}
}},
"burst_patterns": [
{{
"pattern": string,
"typical_duration": string,
"characteristics": [string]
}}
]
}},
"feature_development": {{
"typical_cycle": string,
"iteration_patterns": [string],
"testing_integration": string
}}
}},
"communication_patterns": {{
"pr_characteristics": {{
"detail_level": string,
"discussion_style": string,
"iteration_patterns": string
}},
"documentation_evolution": {{
"frequency": string,
"detail_trends": string,
"update_patterns": string
}}
}}
}},
"architectural_evolution": {{
"major_changes": [
{{
"change": string,
"motivation": string,
"impact": string
}}
],
"improvement_patterns": {{
"refactoring_types": [string],
"optimization_focus": [string],
"maintenance_patterns": string
}},
"technical_debt": {{
"accumulation_patterns": [string],
"resolution_approaches": string,
"prevention_strategies": string
}}
}}
}}
Requirements:
1. Focus on developer behavior patterns
2. Track evolution of coding style
3. Identify clear timezone patterns
4. Detail burst activity characteristics
5. Analyze code quality progression
"""
try:
result = handler.generate_json_response(prompt)
if result:
combined_results[repo_name] = result
except Exception as e:
print(f"Error analyze_temporal_patterns {repo_name}: {str(e)}")
combined_results[repo_name] = {"error": str(e)}
return {
"commit_style_metrics": combined_results,
"activity_patterns": _analyze_activity_patterns(commit_times),
}
def _clean_diff(diff_output: str) -> str:
"""Clean up diff output to focus on actual changes"""
lines = diff_output.split("\n")
cleaned_lines = []
skip_next = False
for line in lines:
# Skip git-specific headers
if (
line.startswith("diff --git")
or line.startswith("index ")
or line.startswith("new file mode ")
or line.startswith("deleted file mode ")
):
continue
# Keep file markers but clean them up
if line.startswith("--- ") or line.startswith("+++ "):
# Convert /dev/null to clearer marker
if "/dev/null" in line:
continue
# Keep just the filename
cleaned_lines.append(line.split("/")[-1])
continue
# Keep actual diff content
if (
line.startswith("@@ ")
or line.startswith("+")
or line.startswith("-")
or line.startswith(" ")
):
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def _get_commit_contents(
target_repos: List[str], sources_data: Dict[str, Any], max_diff_lines: int = 100
) -> Dict[str, List[Dict[str, Any]]]:
"""
Retrieves commit contents focusing on core files and limiting diff sizes.
Now with cleaner diff output.
"""
commit_contents = {}
# Extract username from the first repository's path structure
username = None
for repo in sources_data.values():
if repo.get('structure', {}).get('name', ''):
# Extract username from the repository name (format: username_reponame.git)
username = repo['structure']['name'].split('_')[0]
break
if not username:
raise ValueError("Could not determine username from repository structure")
for repo_name in target_repos:
# Store the full repo path but don't overwrite repo_name
repo_path_name = sources_data[repo_name]['structure'].get('name', '')
if not repo_path_name:
print(f"Warning: No path found for repository {repo_name}")
continue
# Construct correct path using extracted username
repo_path = f"out/{username}/{repo_path_name}"
# Get core files from sources_data using original repo_name
core_files = sources_data[repo_name].get("samples", {}).get("core_files", {})
if not core_files:
continue
try:
commits = []
for file_path, _ in core_files.items():
try:
# Get commit history for this file
commit_history = subprocess.check_output(
[
"git",
"log",
"--format=%H %ad",
"--date=iso",
"--reverse",
"--",
file_path,
],
cwd=repo_path,
text=True,
).splitlines()
# Process key commits
commits_to_process = []
if len(commit_history) > 0:
commits_to_process.append(commit_history[0]) # First commit
if len(commit_history) > 4:
# Add some middle commits, evenly spaced
middle_idx = len(commit_history) // 2
commits_to_process.append(commit_history[middle_idx])
if len(commit_history) > 1:
commits_to_process.append(commit_history[-1]) # Last commit
prev_content = None
for commit_info in commits_to_process:
sha, date = commit_info.split(" ", 1)
try:
# Get the diff for this commit
diff_output = subprocess.check_output(
["git", "show", "--format=", sha, "--", file_path],
cwd=repo_path,
text=True,
stderr=subprocess.PIPE,
)
# Skip if diff is too large
diff_lines = diff_output.splitlines()
if len(diff_lines) > max_diff_lines:
continue
# Clean up the diff
clean_diff = _clean_diff(diff_output)
if not clean_diff.strip():
continue
# Get actual file content at this commit for first and last commit only
if prev_content is None: # First commit
file_content = subprocess.check_output(
["git", "show", f"{sha}:{file_path}"],
cwd=repo_path,
text=True,
stderr=subprocess.PIPE,
)
prev_content = file_content
elif commit_info == commits_to_process[-1]: # Last commit
file_content = subprocess.check_output(
["git", "show", f"{sha}:{file_path}"],
cwd=repo_path,
text=True,
stderr=subprocess.PIPE,
)
else:
file_content = None
commit_data = {
"sha": sha,
"date": date,
"file": file_path,
"changes": clean_diff,
}
if file_content:
commit_data["content"] = file_content
commits.append(commit_data)
except subprocess.CalledProcessError:
continue
except subprocess.CalledProcessError:
continue
if commits:
# Sort commits by date
commits.sort(key=lambda x: x["date"])
# Group commits by file for better analysis
files_commits = {}
for commit in commits:
file_path = commit["file"]
if file_path not in files_commits:
files_commits[file_path] = []
files_commits[file_path].append(commit)
commit_contents[repo_name] = {
"core_files": list(core_files.keys()),
"evolution": {
"commit_count": len(commits),
"commits_by_file": files_commits,
},
}
print(f"Processed {len(commits)} commits for {repo_name} core files")
except Exception as e:
print(f"Error analyzing repository {repo_name}: {str(e)}")
continue
return commit_contents
def _select_best_targets(
sources_data: Dict[str, Any], commits: Dict[str, Any]
) -> List[str]:
"""Selects repositories with sufficient history for analysis"""
targets = []
for repo_name, repo_data in sources_data.items():
if (
len(commits.get(repo_name, [])) < 5
or repo_data["file_stats"]["file_count"] < 10
):
continue
targets.append(repo_name)
return targets
def _analyze_activity_patterns(commit_times: List[datetime]) -> Dict[str, Any]:
"""Analyzes commit timing patterns"""
if not commit_times:
return {
"frequency": {
"commits_per_day": 0,
"active_hours": [],
"timezone_hint": "unknown",
},
"burst_patterns": {
"intensity": "low",
"average_duration": "n/a",
"frequency": "sporadic",
},
}
# Sort commit times
commit_times.sort()
# Calculate commits per day
days_span = (commit_times[-1] - commit_times[0]).days or 1
commits_per_day = round(len(commit_times) / days_span, 2)
# Analyze active hours
hours = Counter([t.hour for t in commit_times])
active_hours = [
f"{h:02d}-{(h+1):02d}"
for h, c in hours.most_common(3)
if c > len(commit_times) * 0.1
]
# Estimate timezone from most active hours
# NOTE: Unclear should show the closest timezone
peak_hour = max(hours.items(), key=lambda x: x[1])[0]
if 4 <= peak_hour <= 8:
tz_hint = "UTC+8 to UTC+10"
elif 8 <= peak_hour <= 12:
tz_hint = "UTC+0 to UTC+2"
elif 12 <= peak_hour <= 16:
tz_hint = "UTC-6 to UTC-4"
elif 16 <= peak_hour <= 20:
tz_hint = "UTC-12 to UTC-8"
else:
tz_hint = "unclear"
# Analyze burst patterns
time_diffs = []
for i in range(1, len(commit_times)):
diff = (commit_times[i] - commit_times[i - 1]).total_seconds() / 3600
time_diffs.append(diff)
if time_diffs:
avg_diff = statistics.mean(time_diffs)
if avg_diff < 1:
intensity = "high"
elif avg_diff < 4:
intensity = "moderate"
else:
intensity = "low"
burst_duration = (
"few hours"
if avg_diff < 4
else "day-length" if avg_diff < 24 else "multi-day"
)
burst_frequency = (
"frequent"
if commits_per_day > 3
else "regular" if commits_per_day > 1 else "sporadic"
)
else:
intensity = "low"
burst_duration = "n/a"
burst_frequency = "sporadic"
return {
"frequency": {
"commits_per_day": commits_per_day,
"active_hours": active_hours,
"timezone_hint": tz_hint,
},
"burst_patterns": {
"intensity": intensity,
"average_duration": burst_duration,
"frequency": burst_frequency,
},
}
#####################################################################
# Project Preferences Analysis Module
#####################################################################
def analyze_project_preferences(sources_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyzes project preferences and technology choices using LLM"""
handler = create_handler()
combined_results = {}
for repo_name, repo_data in sources_data.items():
print(f"\nAnalyzing project preferences for repository: {repo_name}")
# Create repository-specific prompt
prompt = f"""
PROJECT PREFERENCES ANALYSIS
You are an expert in developer profiling and technical background analysis. Study this repository to build a comprehensive profile of the developer's technical preferences and knowledge domains.
Repository: {repo_name}
Languages: {repo_data.get('languages', 'Unknown')}
Project Structure:
{json.dumps(repo_data.get('structure', {}), indent=2)}
Configuration Files:
{json.dumps(repo_data.get('config_files', []), indent=2)}
Core Files:
{json.dumps(repo_data.get('samples', {}).get('core_files', {}), indent=2)}
Dependencies:
{json.dumps(repo_data.get('samples', {}).get('package_files', {}), indent=2)}
Analyze deeply to infer:
1. Technical background and expertise level
2. Problem-solving approaches and mathematical foundations
3. Security awareness and defensive programming practices
4. Development environment preferences
Generate detailed JSON analysis:
{{
"developer_profile": {{
"expertise_domains": [
{{
"domain": string, // e.g. "security", "data_science", "web_development"
"confidence": number, // 0-100
"evidence": [string]
}}
],
"knowledge_patterns": {{
"mathematical_foundations": [
{{
"area": string, // e.g. "graph_theory", "linear_algebra"
"usage_examples": [string],
"proficiency_level": string // "basic", "intermediate", "advanced"
}}
],
"algorithmic_preferences": {{
"common_approaches": [string],
"complexity_awareness": string,
"optimization_patterns": [string]
}},
"security_awareness": {{
"level": string, // "low", "medium", "high"
"defensive_patterns": [string],
"security_considerations": [string]
}}
}}
}},
"technical_choices": {{
"primary_languages": [
{{
"language": string,
"proficiency_indicators": [string],
"usage_patterns": [string]
}}
],
"frameworks": [
{{
"name": string,
"purpose": string,
"usage_patterns": [string],
"implementation_depth": string // "basic", "intermediate", "advanced"
}}
],
"development_environment": {{
"likely_editor": string,
"confidence": number,
"tooling_preferences": [string],
"evidence": [string]
}},
"testing_approach": {{
"methodology": string,
"frameworks": [string],
"coverage_patterns": string
}}
}},
"project_organization": {{
"architecture_style": {{
"pattern": string,
"consistency": number,
"key_characteristics": [string]
}},
"code_quality": {{
"standards_adherence": string,
"documentation_level": string,
"maintainability_indicators": [string]
}},
"deployment_patterns": {{
"infrastructure_preferences": [string],
"containerization_approach": string,
"ci_cd_sophistication": string
}}
}}
}}
Important:
1. Base all inferences on concrete evidence in the code
2. Indicate confidence levels where uncertain
3. Provide specific examples supporting each conclusion
4. Focus on unique/distinctive patterns
"""
try:
result = handler.generate_json_response(prompt)
if result:
combined_results[repo_name] = result
except Exception as e:
print(f"Error analyzing {repo_name}: {str(e)}")
combined_results[repo_name] = {"error": str(e)}
return combined_results
#####################################################################
# Identity Confidence Calculation Module
#####################################################################
def calculate_identity_confidence(
sources_data: Dict[str, Any],
code_style_results: Dict[str, Any],
project_preferences: Dict[str, Any],
temporal_patterns: Dict[str, Any]
) -> Dict[str, Any]:
"""Synthesizes all analysis results into a comprehensive developer identity profile"""
handler = create_handler()
# Create consolidated analysis data for the prompt
analysis_data = {
"repositories": sources_data,
"code_style_analysis": code_style_results,
"project_preferences": project_preferences,
"temporal_patterns": temporal_patterns
}
prompt = f"""
IDENTITY CONFIDENCE CALCULATION
You are an expert in developer profiling and behavioral analysis. Synthesize all provided analysis data to create a comprehensive profile of the developer's identity, expertise, and behavioral patterns.
Analysis Data:
{json.dumps(analysis_data, indent=2)}
Based on all provided repository data and previous analyses, create a detailed developer profile focusing on:
1. Technical expertise and knowledge domains
2. Problem-solving patterns and approaches
3. Development philosophy and practices
4. Unique identifiers and consistent traits
Generate a single comprehensive identity profile JSON:
{{
"developer_profile": {{
"expertise": {{
"primary_domains": [
{{
"domain": string,
"proficiency_level": string, // "beginner", "intermediate", "expert"
"evidence": [string],
"confidence": number // 0-100
}}
],
"technical_depth": {{
"languages": [
{{
"name": string,
"mastery_level": string,
"usage_patterns": [string],
"notable_practices": [string]
}}
],
"frameworks": [
{{
"name": string,
"usage_sophistication": string,
"implementation_patterns": [string]
}}
],
"specialized_knowledge": [
{{
"area": string, // e.g. "cryptography", "distributed systems"
"depth": string,
"application_examples": [string]
}}
]
}}
}},
"work_patterns": {{
"development_style": {{
"code_organization": string,
"problem_solving_approach": string,
"quality_focus": string,
"distinctive_habits": [string]
}},
"workflow_characteristics": {{
"development_cycle": string,
"testing_approach": string,
"refactoring_patterns": string,
"documentation_style": string
}},
"communication_style": {{
"code_commenting": string,
"commit_messages": string,
"documentation_quality": string
}}
}},
"behavioral_traits": {{
"strengths": [
{{
"trait": string,
"evidence": [string],
"consistency": number // 0-100
}}
],
"areas_for_improvement": [
{{
"area": string,
"indicators": [string]
}}
],
"unique_characteristics": [
{{
"trait": string,
"significance": string,
"supporting_patterns": [string]
}}
]
}},
"knowledge_breadth": {{
"technical_stack": {{
"preferred_technologies": [string],
"experience_indicators": [string],
"adoption_patterns": string
}},
"domain_knowledge": {{
"primary_domains": [string],
"depth_indicators": [string],
"application_examples": [string]
}},
"architectural_understanding": {{
"preferred_patterns": [string],
"complexity_handling": string,
"scalability_awareness": string
}}
}},
"identity_confidence": {{
"overall_score": number, // 0-100
"distinguishing_factors": [
{{
"factor": string,
"significance": string,
"supporting_evidence": [string]
}}
],
"consistency_metrics": {{
"coding_style": number, // 0-100
"problem_solving": number, // 0-100
"quality_standards": number // 0-100
}},
"pattern_reliability": {{
"stable_patterns": [string],
"variable_patterns": [string],
"context_dependencies": [string]
}}
}}
}}
}}
Critical Analysis Requirements:
1. Base all conclusions on concrete evidence from the provided data
2. Focus on patterns that appear consistently across repositories
3. Highlight unique traits that distinguish this developer
4. Note any evolution in skills or practices
5. Indicate confidence levels for all major conclusions
6. Consider both technical and behavioral aspects
7. Identify any potential biases or limitations in the analysis
"""
try:
result = handler.generate_json_response(prompt)
except Exception as e:
print(f"Error analyzing: {str(e)}")
result = {"error": str(e)}
return result
#####################################################################
# Profile Visualizer Component
#####################################################################
class ProfileVisualizer:
"""Creates visualizations for the developer profile"""
def __init__(self):
pass
def create_radar_chart(self, profile: Dict[str, Any]) -> go.Figure:
"""Create a radar chart for developer skills"""
if not profile or "identity_confidence" not in profile:
return self._empty_chart("No profile data available")
try:
# Extract metrics from profile
metrics = {}
# Get consistency metrics
if "identity_confidence" in profile and "consistency_metrics" in profile["identity_confidence"]:
consistency = profile["identity_confidence"]["consistency_metrics"]
for key, value in consistency.items():
if isinstance(value, (int, float)):
metrics[key.replace("_", " ").title()] = value
# Get expertise domains confidence
if "expertise" in profile and "primary_domains" in profile["expertise"]:
for domain in profile["expertise"]["primary_domains"]:
if "domain" in domain and "confidence" in domain:
metrics[domain["domain"]] = domain["confidence"]
# Create radar chart
if not metrics:
return self._empty_chart("No metrics found in profile data")
categories = list(metrics.keys())
values = list(metrics.values())
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=values,
theta=categories,
fill='toself',
name='Developer Profile',
line_color='rgb(31, 119, 180)',
fillcolor='rgba(31, 119, 180, 0.3)'
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 100]
)
),
showlegend=False,
title="Developer Profile Metrics",
height=500
)
return fig
except Exception as e:
return self._empty_chart(f"Error creating chart: {str(e)}")
def create_language_bar_chart(self, profile: Dict[str, Any]) -> go.Figure:
"""Create a bar chart for programming language proficiency"""
if not profile or "expertise" not in profile:
return self._empty_chart("No profile data available")
try:
languages = []
# Extract languages
if "expertise" in profile and "technical_depth" in profile["expertise"]:
if "languages" in profile["expertise"]["technical_depth"]:
for lang in profile["expertise"]["technical_depth"]["languages"]:
if "name" in lang and "mastery_level" in lang:
# Convert mastery level to numeric value
mastery_value = self._mastery_to_number(lang["mastery_level"])
languages.append({
"Language": lang["name"],
"Mastery": mastery_value
})
if not languages:
return self._empty_chart("No language data found in profile")
# Create DataFrame
df = pd.DataFrame(languages)
# Create bar chart
fig = px.bar(
df,
x="Language",
y="Mastery",
color="Mastery",
color_continuous_scale="viridis",
title="Programming Language Proficiency"
)
fig.update_layout(
xaxis_title="Language",
yaxis_title="Proficiency Level (0-10)",
height=400
)
return fig
except Exception as e:
return self._empty_chart(f"Error creating chart: {str(e)}")
def create_strengths_chart(self, profile: Dict[str, Any]) -> go.Figure:
"""Create a horizontal bar chart for developer strengths"""
if not profile or "behavioral_traits" not in profile:
return self._empty_chart("No profile data available")
try:
strengths = []
# Extract strengths
if "behavioral_traits" in profile and "strengths" in profile["behavioral_traits"]:
for strength in profile["behavioral_traits"]["strengths"]:
if "trait" in strength and "consistency" in strength:
strengths.append({
"Trait": strength["trait"],
"Consistency": strength["consistency"]
})
if not strengths:
return self._empty_chart("No strengths data found in profile")
# Create DataFrame
df = pd.DataFrame(strengths)
df = df.sort_values("Consistency", ascending=True)
# Create horizontal bar chart
fig = px.bar(
df,
y="Trait",
x="Consistency",
orientation='h',
color="Consistency",
color_continuous_scale="greens",
title="Developer Strengths"
)
fig.update_layout(
xaxis_title="Consistency (%)",
yaxis_title=None,
height=400
)
return fig
except Exception as e:
return self._empty_chart(f"Error creating chart: {str(e)}")
def create_html_summary(self, profile: Dict[str, Any]) -> str:
"""Create HTML summary with profile insights"""
if not profile:
return "<p>No profile data available</p>"
try:
html = []
# Overall score
if "identity_confidence" in profile and "overall_score" in profile["identity_confidence"]:
score = profile["identity_confidence"]["overall_score"]
html.append(f"""
<div style="text-align: center; margin-bottom: 20px;">
<div style="font-size: 48px; font-weight: bold; color: #1f77b4;">{score}%</div>
<div style="font-size: 16px; color: #666;">Identity Confidence Score</div>
</div>
""")
# Primary domains
if "expertise" in profile and "primary_domains" in profile["expertise"]:
html.append("<h3>Primary Expertise Domains</h3>")
html.append("<ul>")
for domain in profile["expertise"]["primary_domains"]:
if "domain" in domain and "proficiency_level" in domain:
html.append(f"<li><strong>{domain['domain']}</strong> ({domain['proficiency_level']})</li>")
if "evidence" in domain and domain["evidence"]:
html.append(" - Evidence: " + ", ".join(domain["evidence"][:3]))
html.append("</ul>")
# Languages
if "expertise" in profile and "technical_depth" in profile["expertise"] and "languages" in profile["expertise"]["technical_depth"]:
html.append("<h3>Languages</h3>")
html.append("<ul>")
for lang in profile["expertise"]["technical_depth"]["languages"]:
html.append(f"<li><strong>{lang.get('name', 'Unknown')}</strong> ({lang.get('mastery_level', 'Unknown')})</li>")
html.append("</ul>")
# Add work patterns
if "work_patterns" in profile:
html.append("<h3>Work Patterns</h3>")
if "development_style" in profile["work_patterns"]:
dev_style = profile["work_patterns"]["development_style"]
html.append("<ul>")
html.append(f"<li><strong>Code Organization</strong>: {dev_style.get('code_organization', 'Unknown')}</li>")
html.append(f"<li><strong>Problem Solving</strong>: {dev_style.get('problem_solving_approach', 'Unknown')}</li>")
html.append("</ul>")
# Add behavioral traits
if "behavioral_traits" in profile:
html.append("<h3>Behavioral Traits</h3>")
if "strengths" in profile["behavioral_traits"]:
html.append("<h4>Strengths</h4>")
html.append("<ul>")
for strength in profile["behavioral_traits"]["strengths"][:3]:
html.append(f"<li><strong>{strength.get('trait', 'Unknown')}</strong> (Consistency: {strength.get('consistency', 0)}%)</li>")
html.append("</ul>")
# Add identity confidence
if "identity_confidence" in profile:
html.append("<h3>Identity Confidence</h3>")
conf = profile["identity_confidence"]
html.append("<ul>")
if "consistency_metrics" in conf:
metrics = conf["consistency_metrics"]
html.append(f"<li><strong>Coding Style</strong>: {metrics.get('coding_style', 0)}%</li>")
html.append(f"<li><strong>Problem Solving</strong>: {metrics.get('problem_solving', 0)}%</li>")
html.append(f"<li><strong>Quality Standards</strong>: {metrics.get('quality_standards', 0)}%</li>")
html.append("</ul>")
return "".join(html)
except Exception as e:
return f"<p>Error creating summary: {str(e)}</p>"
def _mastery_to_number(self, mastery: str) -> float:
"""Convert mastery level text to a numeric value"""
mastery = mastery.lower()
if "expert" in mastery or "advanced" in mastery:
return 9.0
elif "proficient" in mastery or "strong" in mastery:
return 7.5
elif "intermediate" in mastery or "moderate" in mastery:
return 5.0
elif "basic" in mastery or "beginner" in mastery:
return 3.0
elif "novice" in mastery or "limited" in mastery:
return 1.5
else:
return 5.0 # Default moderate level
def _empty_chart(self, message: str) -> go.Figure:
"""Create an empty chart with an error message"""
fig = go.Figure()
fig.add_annotation(
x=0.5,
y=0.5,
xref="paper",
yref="paper",
text=message,
showarrow=False,
font=dict(
size=14,
color="#666"
)
)
fig.update_layout(
height=400,
xaxis=dict(showticklabels=False, showgrid=False),
yaxis=dict(showticklabels=False, showgrid=False)
)
return fig
def visualize_profile(self, profile_json: Dict[str, Any]) -> List[Any]:
"""Main method to generate all visualizations"""
try:
# Extract the developer profile
if "identity_confidence" in profile_json and "developer_profile" in profile_json["identity_confidence"]:
profile = profile_json["identity_confidence"]["developer_profile"]
else:
profile = None
if not profile:
return [
self._empty_chart("No developer profile data available"),
self._empty_chart("No developer profile data available"),
self._empty_chart("No developer profile data available"),
"<p>No developer profile data available</p>"
]
# Create visualizations
radar_chart = self.create_radar_chart(profile)
language_chart = self.create_language_bar_chart(profile)
strengths_chart = self.create_strengths_chart(profile)
html_summary = self.create_html_summary(profile)
return [radar_chart, language_chart, strengths_chart, html_summary]
except Exception as e:
error_msg = f"Error visualizing profile: {str(e)}"
return [
self._empty_chart(error_msg),
self._empty_chart(error_msg),
self._empty_chart(error_msg),
f"<p>{error_msg}</p>"
]
#####################################################################
# Main Application Class
#####################################################################
class StyleAnalyzerApp:
"""Handles repository analysis and stylometric profiling with Gradio UI"""
def __init__(self):
self.base_path = Path("out")
# Ensure the output directory exists
os.makedirs(self.base_path, exist_ok=True)
def analyze_github_user(
self,
username: str,
repository_selection: str,
github_token: str,
gemini_api_key: str,
progress=gr.Progress()
) -> Tuple[str, Dict, str]:
"""Main analysis function that will be called from the Gradio interface"""
# Save API keys to environment variables or .env file
os.environ["GH_TOKEN"] = github_token
os.environ["GEMINI_API_KEY"] = gemini_api_key
# Update .env file
with open(".env", "w") as f:
f.write(f"GH_TOKEN={github_token}\n")
f.write(f"GEMINI_API_KEY={gemini_api_key}\n")
# Create user path
user_path = self.base_path / username
report_path = user_path / "report.json"
# Check if user data exists, if not, fetch it
if not report_path.exists():
progress(0, desc="Fetching GitHub data...")
try:
result = subprocess.run(
["gh-analyze", username],
check=True,
capture_output=True,
text=True
)
progress(0.2, desc="GitHub data fetched successfully")
log_output = f"GitHub data fetched successfully:\n{result.stdout}"
except subprocess.CalledProcessError as e:
error_msg = f"Error fetching GitHub data: {e.stderr}"
return "Error", {}, error_msg
else:
progress(0.2, desc="Using existing GitHub data")
log_output = "Using existing GitHub data\n"
try:
# Load report data
progress(0.25, desc="Loading report data...")
with open(report_path) as f:
report_data = json.load(f)
log_output += "Report data loaded successfully\n"
# Select repositories to analyze
progress(0.3, desc="Identifying repositories to analyze...")
if repository_selection == "Smart Selection":
repo_selector = RepositorySelector(str(self.base_path), username)
sources_to_analyze = repo_selector.select_repositories(report_data)
else:
# Only single-contributor (owner) repos
sources_to_analyze = [
obj["repo"]
for obj in report_data.get("contributors", [])
if obj["contributors"][0] == username and len(obj["contributors"]) == 1
]
repo_list = ", ".join(sources_to_analyze)
log_output += f"Found {len(sources_to_analyze)} repositories to analyze: {repo_list}\n"
# Analyze repository structure
progress(0.4, desc="Analyzing repository structure...")
sources_data = analyze_repository_structure(sources_to_analyze, user_path)
log_output += "Repository structure analysis complete\n"
# Analyze code style
progress(0.5, desc="Analyzing code style patterns...")
code_style = analyze_code_style(sources_data)
log_output += "Code style analysis complete\n"
# Analyze temporal patterns
progress(0.6, desc="Analyzing temporal patterns...")
temporal_patterns = analyze_temporal_patterns(sources_data, report_data)
log_output += "Temporal patterns analysis complete\n"
# Analyze project preferences
progress(0.7, desc="Analyzing project preferences...")
project_preferences = analyze_project_preferences(sources_data)
log_output += "Project preferences analysis complete\n"
# Calculate identity confidence
progress(0.8, desc="Calculating identity confidence...")
identity_confidence = calculate_identity_confidence(
sources_data,
code_style,
project_preferences,
temporal_patterns
)
log_output += "Identity confidence calculation complete\n"
# Generate final report
progress(0.9, desc="Generating final report...")
analysis_result = {
"code_style_metrics": code_style,
"temporal_patterns": temporal_patterns,
"project_preferences": project_preferences,
"identity_confidence": identity_confidence,
}
output_path = user_path / "stylometry_profile.json"
with open(output_path, "w") as f:
json.dump({"stylometric_profile": analysis_result}, f, indent=2)
log_output += f"Report generated successfully and saved to {output_path}\n"
progress(1.0, desc="Analysis complete!")
return "Success", analysis_result, log_output
except Exception as e:
error_trace = traceback.format_exc()
error_msg = f"Error during analysis: {str(e)}\n{error_trace}"
return "Error", {}, error_msg
#####################################################################
# Gradio Interface Setup
#####################################################################
def add_visualization_tab(app, profile_output):
"""Add visualization tab to the main Gradio app"""
visualizer = ProfileVisualizer()
with gr.Tab("Visualizations"):
with gr.Row():
with gr.Column():
gr.Markdown("### Developer Profile Metrics")
radar_chart = gr.Plot(label="Skills Radar")
with gr.Column():
gr.Markdown("### Technical Summary")
html_summary = gr.HTML(label="Profile Summary")
with gr.Row():
with gr.Column():
gr.Markdown("### Programming Languages")
language_chart = gr.Plot(label="Language Proficiency")
with gr.Column():
gr.Markdown("### Developer Strengths")
strengths_chart = gr.Plot(label="Strengths Analysis")
# Connect the profile output to the visualization components
profile_output.change(
fn=visualizer.visualize_profile,
inputs=[profile_output],
outputs=[radar_chart, language_chart, strengths_chart, html_summary]
)
return app
def create_gradio_interface():
"""Create and configure the Gradio interface"""
analyzer = StyleAnalyzerApp()
with gr.Blocks(title="GitHub Stylometry Analyzer") as app:
gr.Markdown("# GitHub Stylometry Analyzer")
gr.Markdown("""
This tool analyzes a GitHub user's repositories to build a developer profile based on coding style,
temporal patterns, project preferences, and calculated identity confidence.
The analysis process takes 10-15 minutes for standard accounts.
""")
with gr.Row():
with gr.Column(scale=1):
username_input = gr.Textbox(label="GitHub Username", placeholder="Enter GitHub username")
repo_selection = gr.Radio(
choices=["Smart Selection", "Owner Repositories Only"],
label="Repository Selection Method",
value="Smart Selection"
)
github_token = gr.Textbox(
label="GitHub API Token",
placeholder="Enter your GitHub API token",
type="password"
)
gemini_api_key = gr.Textbox(
label="Google Gemini API Key",
placeholder="Enter your Gemini API key",
type="password"
)
analyze_button = gr.Button("Analyze", variant="primary")
with gr.Accordion("Load configuration from file", open=False):
gr.Markdown("""
You can load your GitHub token and Gemini API key from the .env file if present.
This is useful if you don't want to enter them manually each time.
""")
load_config_button = gr.Button("Load from .env", variant="secondary")
def load_from_env():
load_dotenv()
gh_token = os.getenv("GH_TOKEN", "")
gemini_key = os.getenv("GEMINI_API_KEY", "")
return gh_token, gemini_key
load_config_button.click(
fn=load_from_env,
inputs=[],
outputs=[github_token, gemini_api_key]
)
with gr.Column(scale=2):
with gr.Tab("Profile Summary"):
status_output = gr.Textbox(label="Status", value="Ready")
profile_output = gr.JSON(label="Developer Profile")
with gr.Tab("Logs"):
log_output = gr.Textbox(label="Analysis Logs", lines=20)
# Add the visualizations tab
app = add_visualization_tab(app, profile_output)
analyze_button.click(
fn=analyzer.analyze_github_user,
inputs=[username_input, repo_selection, github_token, gemini_api_key],
outputs=[status_output, profile_output, log_output]
)
return app
#####################################################################
# Entry Point
#####################################################################
def check_requirements():
"""Check if required packages are installed"""
required_packages = ["gradio", "google.generativeai", "plotly"]
missing_packages = []
for package in required_packages:
try:
__import__(package.split(".")[0])
except ImportError:
missing_packages.append(package.split(".")[0])
if missing_packages:
print("! Missing required packages: " + ", ".join(missing_packages))
print("Please install required packages with:")
print(f"pip install {' '.join(missing_packages)}")
return False
print("βœ“ Required packages already installed")
return True
def check_environment():
"""Check if .env file exists and create it if needed"""
env_file = Path(".env")
if not env_file.exists():
print("! Creating .env file")
with open(env_file, "w") as f:
f.write("GH_TOKEN=\nGEMINI_API_KEY=\n")
print("βœ“ Created .env file. You will need to provide API keys in the app.")
else:
print("βœ“ .env file already exists")
def create_output_dir():
"""Create output directory if it doesn't exist"""
out_dir = Path("out")
if not out_dir.exists():
out_dir.mkdir()
print("βœ“ Created output directory")
else:
print("βœ“ Output directory already exists")
def check_gh_analyze():
"""Check if gh-analyze tool is installed"""
try:
subprocess.run(["gh-analyze", "--help"],
capture_output=True,
text=True)
print("βœ“ gh-analyze tool is installed")
return True
except FileNotFoundError:
print("! gh-analyze tool is not installed")
print("Please install gh-fake-analyzer with:")
print("pip install gh-fake-analyzer")
return False
def install_gh_analyze():
"""Install gh-analyze tool if not present"""
try:
subprocess.run(["pip", "install", "gh-fake-analyzer"],
check=True,
capture_output=True)
print("βœ“ Installed gh-fake-analyzer")
return True
except subprocess.CalledProcessError as e:
print(f"! Error installing gh-fake-analyzer: {e.stderr}")
return False
def main():
"""Main entry point for the application"""
print("\n===========================================")
print("GitHub Stylometry Analyzer Setup")
print("===========================================\n")
# Check and install requirements
all_requirements_met = check_requirements()
if not all_requirements_met:
print("\nPlease install the missing packages and run the application again.")
return
# Check if gh-analyze is installed
gh_analyze_installed = check_gh_analyze()
if not gh_analyze_installed:
print("\nAttempting to install gh-fake-analyzer...")
install_success = install_gh_analyze()
if not install_success:
print("\nPlease install gh-fake-analyzer manually and run the application again.")
return
# Setup environment
check_environment()
create_output_dir()
print("\n===========================================")
print("Launching GitHub Stylometry Analyzer")
print("===========================================\n")
# Create and launch the Gradio interface
app = create_gradio_interface()
app.launch(share=True, debug=True)
if __name__ == "__main__":
main()