import gradio as gr import os import json import time import subprocess import tempfile import shutil from pathlib import Path from typing import Dict, List, Any, Tuple, Optional, Iterator import traceback from dotenv import load_dotenv import plotly.graph_objects as go import plotly.express as px import pandas as pd import numpy as np import re from collections import Counter, defaultdict import statistics from datetime import datetime from threading import Lock from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception import google.generativeai as genai import requests ##################################################################### # Constants and Shared Variables ##################################################################### RELEVANT_EXTENSIONS = { ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", ".hpp", ".rb", ".php", ".go", ".rs", ".swift", ".kt", ".kts", ".scala", ".pl", ".pm", ".r", ".sh", ".bat", ".ps1", ".lua", ".sql", ".html", ".css", ".xml", ".json", ".yaml", ".yml", ".md", ".ipynb", ".m", ".mm", ".vb", ".cs", ".fs", ".fsx", ".erl", ".hrl", ".ex", ".exs", ".dart", ".groovy", ".jl", ".clj", ".cljs", ".coffee", ".litcoffee", ".rkt", ".hs", ".lhs", ".ml", ".mli", ".nim", ".cr", ".nimble", ".hx", ".sol", ".vy" } LANGUAGE_EXTENSIONS = { ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", ".jsx": "React", ".tsx": "React TypeScript", ".java": "Java", ".cpp": "C++", ".c": "C", ".h": "C/C++ Header", ".hpp": "C++ Header", ".rb": "Ruby", ".php": "PHP", ".go": "Go", ".rs": "Rust", ".swift": "Swift", ".kt": "Kotlin", ".kts": "Kotlin Script", ".scala": "Scala", ".pl": "Perl", ".pm": "Perl Module", ".r": "R", ".sh": "Shell", ".bat": "Batch", ".ps1": "PowerShell", ".lua": "Lua", ".sql": "SQL", ".html": "HTML", ".css": "CSS", ".xml": "XML", ".json": "JSON", ".yaml": "YAML", ".yml": "YAML", ".md": "Markdown", ".ipynb": "Jupyter Notebook", ".m": "MATLAB/Objective-C", ".mm": "Objective-C++", ".vb": "Visual Basic", ".cs": "C#", ".fs": "F#", ".fsx": "F# Script", ".erl": "Erlang", ".hrl": "Erlang Header", ".ex": "Elixir", ".exs": "Elixir Script", ".dart": "Dart", ".groovy": "Groovy", ".jl": "Julia", ".clj": "Clojure", ".cljs": "ClojureScript", ".coffee": "CoffeeScript", ".litcoffee": "Literate CoffeeScript", ".rkt": "Racket", ".hs": "Haskell", ".lhs": "Literate Haskell", ".ml": "OCaml", ".mli": "OCaml Interface", ".nim": "Nim", ".cr": "Crystal", ".nimble": "Nimble", ".hx": "Haxe", ".sol": "Solidity", ".vy": "Vyper" } PACKAGE_FILES = { "package.json": "npm", "requirements.txt": "pip", "setup.py": "python", "pom.xml": "maven", "build.gradle": "gradle", "Gemfile": "bundler", "Cargo.toml": "cargo", "go.mod": "go", "go.sum": "go", "composer.json": "composer", "pubspec.yaml": "dart", "Project.toml": "julia", "mix.exs": "elixir", "Makefile": "make", "CMakeLists.txt": "cmake", "SConstruct": "scons", "build.xml": "ant", "Rakefile": "rake", "shard.yml": "crystal", "nim.cfg": "nim", "default.nix": "nix", "stack.yaml": "haskell", "rebar.config": "erlang", "rebar.lock": "erlang", "project.clj": "leiningen", "deps.edn": "clojure", "build.boot": "boot", "build.sbt": "sbt", "Brewfile": "homebrew", "Vagrantfile": "vagrant", "Dockerfile": "docker", "docker-compose.yml": "docker-compose", "Procfile": "heroku", "tox.ini": "tox", "pyproject.toml": "poetry", "Pipfile": "pipenv", "Pipfile.lock": "pipenv", "environment.yml": "conda", "meta.yaml": "conda" } SYSTEM_PROMPT = "You are an experienced software engineer and data analyst tasked with building a report on developer's coding style, technical background, approach to problem solving, architectural thinking, technology choices, re-used frameworks etc,. There will be a set of prompts, divided into CODE STYLE ANALYSIS, TEMPORAL ANALYSIS, PROJECT PREFERENCES ANALYSIS and IDENTITY CONFIDENCE CALCULATION together with data samples provided to you. You'll summarize your findings from all of the modules in a single comprehensive IDENTITY CALCULATION CONFIDENCE output. Output a valid JSON, avoid including to many strings into the list objects! Follow the instructions provided for this section:" ##################################################################### # Prompt Analyzer Module ##################################################################### def _should_retry_error(exception: Exception) -> bool: """Check if the exception is one we should retry""" error_str = str(exception).lower() return any( msg in error_str for msg in [ "resource exhaust", "429", "too many requests", "quota exceeded", "rate limit", ] ) class RateLimiter: """Token bucket rate limiter implementation""" def __init__(self, rate: int, per: int): self.rate = rate # Number of requests allowed per time period self.per = per # Time period in seconds self.tokens = rate # Current token count self.last_update = time.time() self.lock = Lock() def _add_tokens(self): """Add tokens based on time elapsed""" now = time.time() time_passed = now - self.last_update new_tokens = time_passed * (self.rate / self.per) if new_tokens > 0: self.tokens = min(self.rate, self.tokens + new_tokens) self.last_update = now def acquire(self) -> float: """ Try to acquire a token. Returns the time to wait if no token is available. """ with self.lock: self._add_tokens() if self.tokens >= 1: self.tokens -= 1 return 0.0 # Calculate wait time needed for next token wait_time = (1 - self.tokens) * (self.per / self.rate) return wait_time class PromptAnalyzer: """Handles LLM prompting for code analysis tasks""" def __init__(self, api_key: Optional[str] = None): """Initialize Gemini handler with API key""" self.api_key = api_key or os.getenv("GEMINI_API_KEY") if not self.api_key: raise ValueError( "Gemini API key must be provided or set in GEMINI_API_KEY environment variable" ) genai.configure(api_key=self.api_key) self.model = genai.GenerativeModel(model_name="gemini-1.5-flash-001", system_instruction=SYSTEM_PROMPT) self.token_count = 0 self.prompt_count = 0 self.rate_limiter = RateLimiter(rate=5, per=60) def count_tokens(self, text: str) -> int: """Count tokens in a text string""" try: token_count = self.model.count_tokens(text) return token_count.total_tokens except Exception as e: print(f"Warning: Error counting tokens: {str(e)}") # Fallback to approximate count if token counting fails return len(text) // 4 # Rough approximation def _clean_json_response(self, response_text: str) -> str: """Clean up response text to extract JSON content""" if "```" in response_text: match = re.search(r"```(?:json)?\n(.*?)```", response_text, re.DOTALL) if match: return match.group(1).strip() return response_text.strip() @retry( retry=retry_if_exception(_should_retry_error), stop=stop_after_attempt(5), wait=wait_exponential(multiplier=2, min=4, max=60), before_sleep=lambda retry_state: print( f"Retrying due to rate limit/resource exhaustion... (attempt {retry_state.attempt_number})" ), ) def _rate_limited_generate(self, prompt: str) -> Any: """Handle rate-limited generation with waiting and resource exhaustion""" while True: wait_time = self.rate_limiter.acquire() if wait_time == 0: try: # Direct call to generate_content instead of using chat return self.model.generate_content(prompt) except Exception as e: if _should_retry_error(e): print( f"Rate limit/resource exhaustion error, will retry: {str(e)}" ) raise # Let the retry decorator handle it else: print(f"Non-retryable error occurred: {str(e)}") raise print(f"Rate limit reached. Waiting {wait_time:.2f} seconds...") time.sleep(wait_time) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def generate_json_response(self, prompt: str) -> Dict[str, Any]: """Generate and parse JSON response with robust error handling""" try: self.prompt_count += 1 print(f"\nšŸ“ Processing prompt #{self.prompt_count}...") # Count input tokens token_count = self.model.count_tokens(prompt) input_tokens = token_count.total_tokens print(f"šŸ“Š Sending prompt with {input_tokens:,} tokens...") # Track retries for JSON parsing max_json_retries = 3 last_response = None last_error = None for attempt in range(max_json_retries): try: # Generate with rate limiting start_time = time.time() # Here's the actual model call response = self._rate_limited_generate(prompt) elapsed_time = time.time() - start_time # Track token usage output_token_count = response.usage_metadata.total_token_count prompt_total_tokens = input_tokens + output_token_count self.token_count += prompt_total_tokens print(f"āœ“ Response received in {elapsed_time:.2f} seconds") print(f"šŸ“Š Prompt #{self.prompt_count} token usage:") print(f" - Input tokens: {input_tokens:,}") print(f" - Output tokens: {output_token_count:,}") print(f" - Total tokens: {prompt_total_tokens:,}") print(f"šŸ“ˆ Cumulative token usage: {self.token_count:,}") # Try to parse JSON with advanced error recovery last_response = response.text result = self._clean_json_response(last_response) return json.loads(result) except json.JSONDecodeError as e: last_error = e if attempt < max_json_retries - 1: print(f"āš ļø Attempt {attempt + 1}/{max_json_retries}: JSON parsing failed, retrying with feedback...") # Add feedback about the JSON parsing failure and retry error_feedback = f"""Your previous response could not be parsed as valid JSON. The specific error was: {str(e)} IMPORTANT: You must provide a response that: 1. Contains ONLY valid JSON 2. Has NO markdown code blocks 3. Has NO explanatory text 4. Follows the exact schema requested 5. Uses proper JSON syntax (quotes, commas, brackets) 6. AVOID falling into recursive loops when retrieving data from the prompt Here is the original prompt again: """ # Combine feedback with original prompt prompt = error_feedback + prompt continue else: print(f"āŒ Failed to parse JSON after {max_json_retries} attempts") print("Last response received:") print(last_response) print(f"Last error: {str(last_error)}") raise except Exception as e: print(f"āŒ Error in generate_json_response: {str(e)}") print("Stack trace:") print(traceback.format_exc()) if "last_response" in locals(): print("\nLast response received:") print(last_response) raise def create_handler(api_key: Optional[str] = None) -> PromptAnalyzer: """ Factory function to create a PromptAnalyzer instance. """ return PromptAnalyzer(api_key) ##################################################################### # Repository Structure Analysis Module ##################################################################### def analyze_repository_structure(repo_names: List[str], user_path: Path) -> Dict[str, Any]: """Processes source code from repositories to build LLM-friendly structure""" result = {} for repo_name in repo_names: username = user_path.name repo_path = ( user_path / f"{username}_{repo_name}.git" ) print("processing,", repo_name, "path:", repo_path) if not repo_path.exists(): print("skipping") continue # Get the structure first structure = _build_tree_structure(repo_path) # Count language occurrences from the structure language_counts = {} for file_info in _get_source_files(structure): extension = file_info["extension"].lower() if extension in LANGUAGE_EXTENSIONS: language = LANGUAGE_EXTENSIONS[extension] language_counts[language] = language_counts.get(language, 0) + 1 # Sort languages by frequency, most common first languages = sorted( language_counts.items(), key=lambda x: (-x[1], x[0]) # Sort by count descending, then name ascending ) # Create the language string languages_str = ", ".join(lang for lang, _ in languages) result[repo_name] = { "structure": structure, "file_stats": _analyze_file_statistics(repo_path), "documentation": _extract_documentation(repo_path), "languages": languages_str } _extract_code_samples(result, user_path) return result def _build_tree_structure(repo_path: Path, files_per_dir: int = 20, max_depth: int = 3) -> Dict[str, Any]: """ Builds a tree representation of repository structure with limits. Args: repo_path: Repository path files_per_dir: Maximum number of files to include per directory (default: 20) max_depth: Maximum depth for nested directories (default: 3) """ def create_tree(path: Path, current_depth: int = 0) -> Dict[str, Any]: tree = { "type": "directory", "name": path.name, "path": str(path.relative_to(repo_path)), "children": [], } # Stop traversing if we hit max depth if current_depth >= max_depth: tree["children"] = [{ "type": "note", "message": f"Directory depth limit ({max_depth}) reached" }] return tree try: items = list(path.iterdir()) # Skip git directory and common build artifacts if path.name in { ".git", "node_modules", "__pycache__", "build", "dist", }: return tree # Process files with limit files = [ item for item in items if item.is_file() and item.suffix.lower() in RELEVANT_EXTENSIONS ] if files: files = files[:files_per_dir] # Limit number of files for item in files: tree["children"].append({ "type": "file", "name": item.name, "path": str(item.relative_to(repo_path)), "extension": item.suffix.lower(), "size": item.stat().st_size, }) # Process directories dirs = [item for item in items if item.is_dir()] for item in dirs: subtree = create_tree(item, current_depth + 1) if subtree["children"]: # Only add non-empty directories tree["children"].append(subtree) except PermissionError: pass return tree return create_tree(repo_path) def _analyze_file_statistics(repo_path: Path) -> Dict[str, Any]: """Analyzes file statistics for the repository""" file_count = 0 total_loc = 0 for ext in LANGUAGE_EXTENSIONS: for file_path in repo_path.rglob(f"*{ext}"): if not any(p in str(file_path) for p in RELEVANT_EXTENSIONS): continue try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() loc = len([l for l in content.splitlines() if l.strip()]) total_loc += loc file_count += 1 except (UnicodeDecodeError, PermissionError): continue return { "file_count": file_count, "total_loc": total_loc, } def _extract_documentation(repo_path: Path) -> Dict[str, Any]: """Extracts documentation and metadata from repository""" docs = {} # Look for README readme_paths = list(repo_path.glob("README*")) if readme_paths: try: with open(readme_paths[0], "r", encoding="utf-8") as f: docs["readme"] = f.read() except (UnicodeDecodeError, PermissionError): docs["readme"] = None docs["package_info"] = {} for filename, pkg_type in PACKAGE_FILES.items(): pkg_path = repo_path / filename if pkg_path.exists(): try: with open(pkg_path, "r", encoding="utf-8") as f: docs["package_info"][pkg_type] = f.read() except (UnicodeDecodeError, PermissionError): continue return docs def _extract_code_samples(sources_data: Dict[str, Any], user_path: Path, max_file_size: int = 100000) -> Dict[str, Any]: """ Extracts code samples for files identified as relevant by Gemini. Filters out files larger than max_file_size bytes. """ handler = create_handler() try: # Preprocess to remove large files from consideration filtered_structures = {} for repo_name, repo_data in sources_data.items(): structure_copy = repo_data["structure"].copy() # Filter function to remove large files def filter_large_files(node): if node.get("type") == "directory": node["children"] = [ child for child in node.get("children", []) if child.get("type") == "directory" or (child.get("type") == "file" and child.get("size", 0) <= max_file_size) ] for child in node["children"]: if child.get("type") == "directory": filter_large_files(child) return node # Apply filter filtered_structures[repo_name] = filter_large_files(structure_copy) # Create a combined prompt for all repositories prompt = f""" Analyze the repository structures and identify the most relevant files for codebase analysis. Focus on files that would reveal: 1. Core functionality and architecture 2. Main business logic 3. Key utilities and helpers 4. Configuration and setup Results will be used for further code analysis. Remember to include ALL relevant files, especially for fullstack applications. Be thorough but concise. Avoid including non-original code, e.g., dependencies or libraries code. AVOID INCLUDING MORE THAN 50 FILES PER REPOSITORY!!! TRY TO INCLUDE LESS THAN 20 IF POSSIBLE. CORE_FILES ARE THE PRIORITY, YOU CAN OMITT THE REST IF IT EXCEEDS THE LIMIT. Return a JSON object with these categories: {{ "repositories": {{ // MANDATORY highest level key "repo_name": {{ // MANDATORY name of the repository you are analyzing "core_files": ["list of most important files"], // MAX 20 files! "secondary_files": ["list of supporting files"], // MAX 20 files! "config_files": ["list of relevant config files"] // MAX 10 files! }}, "repo_name": {{...}}, }} }} CRITICAL REQUIREMENTS: Limit each list of most important files to a maximum of 20 files!!! Avoid including binary files or large data files. Only include files that are essential for understanding the codebase. Avoid including too many files, focus on the most important ones. Avoid including files that user did not write, e.g., dependencies or libraries code. Avoid including utility files that are not essential for understanding the codebase. Focus on including only source code, some repositories may have a lot of files, but only a few are essential for understanding the codebase. Do not include long .json files or other artifact type of files - notice "size" of the file in the structure. Repository structures: {json.dumps(filtered_structures, indent=2)} Only include files that exist in the structure. Return valid JSON format. DO NOT wrap the JSON in markdown code blocks. """ # Get file categories for all repositories file_categories = handler.generate_json_response(prompt) if not file_categories: print("Skipping due to API error") return sources_data for repo_name, repo_data in sources_data.items(): repo_data["samples"] = { "core_files": {}, "utility_files": {}, "config_files": {} } # Filter out large files from consideration all_files = { file_info["path"]: file_info for file_info in _get_source_files(repo_data["structure"]) if file_info.get("size", 0) <= max_file_size } for category in ["core_files", "utility_files", "config_files"]: for file_path in file_categories["repositories"].get(repo_name, {}).get(category, []): if file_path not in all_files: continue source_code = _read_source_file(user_path, repo_name, file_path) if source_code: repo_data["samples"][category][file_path] = source_code except Exception as e: print(f"Error processing code samples: {str(e)}") return sources_data def _get_source_files(structure: Dict[str, Any]) -> List[Dict[str, Any]]: """Helper to recursively extract source files from tree structure""" files = [] def traverse(node: Dict[str, Any]): if not isinstance(node, dict): return # If it's a file, add it if node.get("type") == "file": files.append(node) # If it's a directory, traverse its children elif node.get("type") == "directory" and "children" in node: for child in node.get("children", []): traverse(child) # Also check any other dictionaries that might contain nested structures for value in node.values(): if isinstance(value, dict): traverse(value) elif isinstance(value, list): for item in value: if isinstance(item, dict): traverse(item) traverse(structure) # Sort files by path for consistent ordering return sorted(files, key=lambda x: x["path"]) def _read_source_file(user_path: Path, repo_name: str, file_path: str) -> Optional[str]: """Reads source code from file with proper error handling""" try: # Construct the full path to the source file full_path = user_path / f"{user_path.name}_{repo_name}.git" / file_path # Check if file exists and is readable if not full_path.is_file(): return None # Common binary file extensions to skip if full_path.suffix.lower() not in RELEVANT_EXTENSIONS: return None # Try to read the file with different encodings encodings = ["utf-8", "latin-1", "cp1252"] for encoding in encodings: try: with open(full_path, "r", encoding=encoding) as f: content = f.read() # Basic validation of text content if "\0" in content: # Binary file check return None return content except UnicodeDecodeError: continue except Exception as e: print(f"Error reading {full_path}: {str(e)}") return None return None except Exception as e: print(f"Error accessing {file_path}: {str(e)}") return None ##################################################################### # Repository Selector Module ##################################################################### class RepositorySelector: """Handles intelligent repository selection and authorship analysis""" def __init__(self, base_path: str, username: str): self.base_path = Path(base_path) self.username = username self.user_path = self.base_path / username def select_repositories(self, report_data: Dict) -> List[str]: """ Main entry point for repository selection. Returns a list of repository names to analyze, including both best-scored repos and single-contributor repos. """ # Store report data for use in other methods self.report_data = report_data # Get repositories with activity scores repositories = self._analyze_repositories(report_data) print(f"Found {len(repositories)} repositories with activity") # Get best scored repositories selected_repos = self._select_best_repositories(repositories) selected_repo_names = {repo["name"] for repo in selected_repos} # Get single-contributor repositories single_contributor_repos = self._get_only_owner_sources() # Combine both sets of repositories without duplicates all_repo_names = selected_repo_names.union(single_contributor_repos) print(f"Added {len(all_repo_names) - len(selected_repo_names)} single-contributor repositories") print(f"Total repositories to analyze: {len(all_repo_names)}") # Update metadata for all repositories self.repo_metadata = {} for repo in selected_repos: self.repo_metadata[repo["name"]] = { "contribution_files": repo["contribution_files"], "stats": repo["stats"] } # Add metadata for additional single-contributor repos if they weren't in selected_repos for repo_name in single_contributor_repos: if repo_name not in self.repo_metadata: repo_path = self.user_path / f"{self.username}_{repo_name}.git" if repo_path.exists(): stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, [])) contribution_files = self._analyze_contribution_files(repo_path) self.repo_metadata[repo_name] = { "contribution_files": contribution_files, "stats": stats or {} } return list(all_repo_names) def _get_only_owner_sources(self) -> List[str]: """Gets list of repositories to analyze. Only single-contributor repos are considered""" return [ obj["repo"] for obj in self.report_data.get("contributors", []) if obj["contributors"][0] == self.username and len(obj["contributors"]) == 1 ] def _analyze_repositories(self, report_data: Dict) -> List[Dict[str, Any]]: """Analyzes all repositories the user has contributed to""" repositories = [] # Get repos from contributors data contributed_repos = [ obj["repo"] for obj in report_data.get("contributors", []) if self.username in obj["contributors"] ] # Also get repos from commits data commit_repos = list(report_data.get("commits", {}).keys()) # Combine and deduplicate all_repos = list(set(contributed_repos + commit_repos)) print(f"Analyzing {len(all_repos)} repositories...") for repo_name in all_repos: repo_path = self.user_path / f"{self.username}_{repo_name}.git" if not repo_path.exists(): continue repo_stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, [])) if not repo_stats: continue contribution_files = self._analyze_contribution_files(repo_path) # Include repository if it has either commits or contribution files if repo_stats["commit_count"] > 0 or contribution_files: repositories.append({ "name": repo_name, "stats": repo_stats, "contribution_files": contribution_files }) return repositories def _analyze_contribution_files(self, repo_path: Path) -> List[Dict[str, Any]]: """Identifies files with user contributions, with more flexible criteria""" contribution_files = [] # List all files in repository for file_path in repo_path.rglob('*'): relative_path = str(file_path.relative_to(repo_path)) # Skip excluded paths and non-source files if not self._is_analyzable_file(relative_path): continue try: # Get authorship statistics author_stats = self._get_file_author_stats(repo_path, relative_path) # Include files where user has any meaningful contribution (>20%) if self.username in author_stats and author_stats[self.username] >= 20: contribution_files.append({ "path": relative_path, "contribution_percentage": author_stats[self.username] }) except Exception as e: print(f"Error analyzing {relative_path}: {str(e)}") continue return contribution_files def _get_repository_stats(self, repo_path: Path, repo_commits: List = None) -> Dict[str, Any]: """Analyzes repository activity metrics with both git log and commits data""" try: # Get commit timestamps from git log result = subprocess.run( 'git log --format=%at', cwd=repo_path, shell=True, capture_output=True, text=True ) if result.returncode != 0: return {} timestamps = [int(ts) for ts in result.stdout.strip().split('\n') if ts] # Also consider commits from report data if repo_commits: for commit in repo_commits: commit_date = datetime.fromisoformat( commit["commit"]["author"]["date"].replace("Z", "+00:00") ) timestamps.append(int(commit_date.timestamp())) if not timestamps: return {} first_commit = datetime.fromtimestamp(min(timestamps)) last_commit = datetime.fromtimestamp(max(timestamps)) commit_count = len(timestamps) time_period = (last_commit - first_commit).days + 1 return { "first_commit": first_commit.isoformat(), "last_commit": last_commit.isoformat(), "commit_count": commit_count, "commits_per_day": commit_count / max(time_period, 1), "active_days": time_period } except Exception as e: print(f"Error analyzing repository stats: {str(e)}") return {} def _get_file_author_stats(self, repo_path: Path, file_path: str) -> Dict[str, float]: """Analyzes file authorship percentages""" try: result = subprocess.run( ['git', 'blame', '--porcelain', file_path], cwd=repo_path, capture_output=True, text=True ) if result.returncode != 0: return {} author_lines = defaultdict(int) total_lines = 0 for line in result.stdout.split('\n'): if line.startswith('author '): author = line.replace('author ', '', 1) author_lines[author] += 1 total_lines += 1 if total_lines == 0: return {} return { author: (count / total_lines * 100) for author, count in author_lines.items() } except Exception as e: print(f"Error getting authorship stats for {file_path}: {str(e)}") return {} def _select_best_repositories(self, repositories: List[Dict[str, Any]], max_repos: int = 15) -> List[Dict[str, Any]]: """Selects optimal repositories using more balanced scoring""" if not repositories: return [] for repo in repositories: score = 0 stats = repo["stats"] # Recency score (max 35 points) last_commit = datetime.fromisoformat(stats["last_commit"]) days_since_last_commit = (datetime.now() - last_commit).days score += max(0, 35 - (days_since_last_commit / 30)) # Activity score (max 35 points) commit_score = min(35, (stats["commit_count"] * 2) + (stats["commits_per_day"] * 10)) score += commit_score # Contribution score (max 30 points) # Consider both number and quality of contributions contribution_files = repo["contribution_files"] if contribution_files: file_count = len(contribution_files) avg_contribution = sum(f["contribution_percentage"] for f in contribution_files) / file_count score += min(30, (file_count * 2) + (avg_contribution / 5)) else: # Still give some points for commits if no files detected score += min(15, stats["commit_count"] / 2) repo["analysis_score"] = score # Sort by score and return top repositories repositories.sort(key=lambda x: x["analysis_score"], reverse=True) selected = repositories[:max_repos] print(f"\nSelected {len(selected)} repositories:") for repo in selected: print(f"- {repo['name']} (score: {repo['analysis_score']:.2f})") return selected def _is_analyzable_file(self, file_path: str) -> bool: """Determines if a file should be included in analysis""" path = Path(file_path) # Skip excluded directories excluded_paths = { 'node_modules', '__pycache__', 'build', 'dist', '.git', 'vendor', 'third_party', 'external' } if any(part in excluded_paths for part in path.parts): return False # Get file extension (lowercase) ext = path.suffix.lower() if not ext: return False return ext in RELEVANT_EXTENSIONS ##################################################################### # Code Style Analysis Module ##################################################################### def analyze_code_style(sources_data: Dict[str, Any]) -> Dict[str, Any]: """Analyzes developer's coding style patterns for stylometric analysis""" handler = create_handler() combined_results = {} for repo_name, repo_data in sources_data.items(): print(f"\nAnalyzing repository: {repo_name}") prompt = f""" CODE STYLE ANALYSIS You are an expert in code stylometry and developer behavior analysis. Analyze this repository to create a detailed profile of the developer's coding patterns, preferences, and habits. Repository: {repo_name} Code samples and structure: {json.dumps(repo_data, indent=2)} Focus on identifying unique, individual coding patterns that could distinguish this developer's style. Analyze how they: - Structure their code and control flow - Handle data and state - Approach problem-solving - Maintain code quality - Handle edge cases and errors IMPORTANT CONSTRAINTS: - Maximum 10 patterns per list category - No repeating similar patterns - Use "Unknown" if pattern cannot be determined - Focus on distinctive, personal coding traits Generate a JSON profile with this EXACT structure: {{ "code_organization": {{ "file_structure": {{ "preferred_file_size": number, // Average lines per file "module_organization": string, // e.g. "feature-based", "layer-based", "domain-based" "separation_patterns": [string] // Common ways they separate concerns }}, "code_layout": {{ "indentation": {{ "type": string, "width": number }}, "line_length": {{ "average": number, "max_observed": number }}, "spacing_style": {{ "around_operators": string, "after_commas": boolean, "around_blocks": string }} }} }}, "naming_patterns": {{ "variables": {{ "primary_style": string, // e.g. "snake_case", "camelCase" "consistency_score": number, // 0-100 "length_preference": {{ "average": number, "range": [number, number] }}, "semantic_patterns": [string] // How they choose names, e.g. "verb_noun_pairs", "hungarian_notation" }}, "functions": {{ "primary_style": string, "common_prefixes": [string], "common_patterns": [string], "length_preference": {{ "average": number, "range": [number, number] }} }} }}, "coding_patterns": {{ "control_flow": {{ "preferred_loop_type": string, // e.g. "for", "while", "comprehension" "nesting_depth": {{ "average": number, "max_observed": number }}, "branching_patterns": [string], // e.g. "early returns", "guard clauses" "condition_complexity": {{ "average": number, "max_observed": number }} }}, "data_handling": {{ "preferred_structures": [string], // Favorite data structures "mutation_patterns": {{ "prefers_immutable": boolean, "common_patterns": [string] }}, "state_management": {{ "approach": string, // e.g. "functional", "stateful", "mixed" "patterns": [string] }} }} }}, "error_handling": {{ "strategy": string, // e.g. "defensive", "fail-fast", "hybrid" "patterns": [string], // Common error handling patterns "error_checking": {{ "input_validation": boolean, "null_checking": boolean, "type_checking": boolean }} }}, "code_quality": {{ "documentation": {{ "style": string, // e.g. "detailed", "minimal", "moderate" "coverage_ratio": number, // 0-100 "preferred_formats": [string] }}, "testing": {{ "approach": string, // e.g. "unit-heavy", "integration-focused", "minimal" "patterns": [string] }}, "complexity_metrics": {{ "cyclomatic_complexity": {{ "average": number, "max_observed": number }}, "cognitive_complexity": {{ "average": number, "max_observed": number }} }} }}, "distinctive_traits": {{ "unique_patterns": [string], // Highly individual coding patterns "favored_techniques": [string], // Preferred coding approaches "consistent_habits": [string] // Reliable behavioral patterns }} }} Critical requirements: 1. OUTPUT ONLY VALID JSON 2. NO markdown, NO comments, NO explanations 3. Use EXACT key names shown 4. All arrays MAXIMUM 10 items 5. Use numbers for metrics where specified 6. Use "Unknown" for undeterminable values """ try: result = handler.generate_json_response(prompt) if result: combined_results[repo_name] = result except Exception as e: print(f"Error analyzing {repo_name}: {str(e)}") combined_results[repo_name] = {"error": str(e)} return combined_results ##################################################################### # Temporal Patterns Analysis Module ##################################################################### def analyze_temporal_patterns( sources_data: Dict[str, Any], report_data: Dict[str, Any] ) -> Dict[str, Any]: """Analyzes temporal patterns using both LLM and statistical analysis""" commits = report_data.get("commits", {}) # Setup LLM Prompting handler = create_handler() combined_results = {} # Get commit timestamps for activity analysis commit_times = [ datetime.fromisoformat( commit["commit"]["author"]["date"].replace("Z", "+00:00") ) for repo_commits in commits.values() for commit in repo_commits ] # Get best targets and their commit contents temporal_best_targets = _select_best_targets(sources_data, commits) commit_contents = _get_commit_contents(temporal_best_targets, sources_data) # Save commit contents for inspection inspection_data = { "temporal_targets": temporal_best_targets, "commit_contents": commit_contents, } inspection_path = Path("out") / "temporal_analysis_contents.json" try: with open(inspection_path, "w", encoding="utf-8") as f: json.dump(inspection_data, f, indent=2) print(f"Saved temporal analysis data to {inspection_path}") except Exception as e: print(f"Error saving inspection data: {str(e)}") for repo_name, repo_data in sources_data.items(): if repo_name not in temporal_best_targets: continue print(f"\nAnalyzing temporal patterns for repository: {repo_name}") # Get code changes for this repository repo_changes = commit_contents.get(repo_name, []) if not repo_changes: continue # Analyze code style evolution using LLM with actual code changes prompt = f""" TEMPORAL ANALYSIS Analyze the temporal evolution of this codebase with focus on developer behavior patterns and code evolution. Repository: {repo_name} Code Evolution Data: {json.dumps(repo_changes, indent=2)} Generate detailed temporal analysis JSON: {{ "evolution_patterns": {{ "code_quality": {{ "progression": string, "refactoring_patterns": [ {{ "pattern": string, "frequency": string, "motivation": string }} ], "complexity_trends": {{ "direction": string, "significant_changes": [string], "trigger_patterns": [string] }} }}, "development_cycles": {{ "commit_patterns": {{ "frequency": {{ "pattern": string, "active_hours": [string], "timezone_confidence": {{ "zone": string, "confidence": number, "evidence": [string] }} }}, "burst_patterns": [ {{ "pattern": string, "typical_duration": string, "characteristics": [string] }} ] }}, "feature_development": {{ "typical_cycle": string, "iteration_patterns": [string], "testing_integration": string }} }}, "communication_patterns": {{ "pr_characteristics": {{ "detail_level": string, "discussion_style": string, "iteration_patterns": string }}, "documentation_evolution": {{ "frequency": string, "detail_trends": string, "update_patterns": string }} }} }}, "architectural_evolution": {{ "major_changes": [ {{ "change": string, "motivation": string, "impact": string }} ], "improvement_patterns": {{ "refactoring_types": [string], "optimization_focus": [string], "maintenance_patterns": string }}, "technical_debt": {{ "accumulation_patterns": [string], "resolution_approaches": string, "prevention_strategies": string }} }} }} Requirements: 1. Focus on developer behavior patterns 2. Track evolution of coding style 3. Identify clear timezone patterns 4. Detail burst activity characteristics 5. Analyze code quality progression """ try: result = handler.generate_json_response(prompt) if result: combined_results[repo_name] = result except Exception as e: print(f"Error analyze_temporal_patterns {repo_name}: {str(e)}") combined_results[repo_name] = {"error": str(e)} return { "commit_style_metrics": combined_results, "activity_patterns": _analyze_activity_patterns(commit_times), } def _clean_diff(diff_output: str) -> str: """Clean up diff output to focus on actual changes""" lines = diff_output.split("\n") cleaned_lines = [] skip_next = False for line in lines: # Skip git-specific headers if ( line.startswith("diff --git") or line.startswith("index ") or line.startswith("new file mode ") or line.startswith("deleted file mode ") ): continue # Keep file markers but clean them up if line.startswith("--- ") or line.startswith("+++ "): # Convert /dev/null to clearer marker if "/dev/null" in line: continue # Keep just the filename cleaned_lines.append(line.split("/")[-1]) continue # Keep actual diff content if ( line.startswith("@@ ") or line.startswith("+") or line.startswith("-") or line.startswith(" ") ): cleaned_lines.append(line) return "\n".join(cleaned_lines) def _get_commit_contents( target_repos: List[str], sources_data: Dict[str, Any], max_diff_lines: int = 100 ) -> Dict[str, List[Dict[str, Any]]]: """ Retrieves commit contents focusing on core files and limiting diff sizes. Now with cleaner diff output. """ commit_contents = {} # Extract username from the first repository's path structure username = None for repo in sources_data.values(): if repo.get('structure', {}).get('name', ''): # Extract username from the repository name (format: username_reponame.git) username = repo['structure']['name'].split('_')[0] break if not username: raise ValueError("Could not determine username from repository structure") for repo_name in target_repos: # Store the full repo path but don't overwrite repo_name repo_path_name = sources_data[repo_name]['structure'].get('name', '') if not repo_path_name: print(f"Warning: No path found for repository {repo_name}") continue # Construct correct path using extracted username repo_path = f"out/{username}/{repo_path_name}" # Get core files from sources_data using original repo_name core_files = sources_data[repo_name].get("samples", {}).get("core_files", {}) if not core_files: continue try: commits = [] for file_path, _ in core_files.items(): try: # Get commit history for this file commit_history = subprocess.check_output( [ "git", "log", "--format=%H %ad", "--date=iso", "--reverse", "--", file_path, ], cwd=repo_path, text=True, ).splitlines() # Process key commits commits_to_process = [] if len(commit_history) > 0: commits_to_process.append(commit_history[0]) # First commit if len(commit_history) > 4: # Add some middle commits, evenly spaced middle_idx = len(commit_history) // 2 commits_to_process.append(commit_history[middle_idx]) if len(commit_history) > 1: commits_to_process.append(commit_history[-1]) # Last commit prev_content = None for commit_info in commits_to_process: sha, date = commit_info.split(" ", 1) try: # Get the diff for this commit diff_output = subprocess.check_output( ["git", "show", "--format=", sha, "--", file_path], cwd=repo_path, text=True, stderr=subprocess.PIPE, ) # Skip if diff is too large diff_lines = diff_output.splitlines() if len(diff_lines) > max_diff_lines: continue # Clean up the diff clean_diff = _clean_diff(diff_output) if not clean_diff.strip(): continue # Get actual file content at this commit for first and last commit only if prev_content is None: # First commit file_content = subprocess.check_output( ["git", "show", f"{sha}:{file_path}"], cwd=repo_path, text=True, stderr=subprocess.PIPE, ) prev_content = file_content elif commit_info == commits_to_process[-1]: # Last commit file_content = subprocess.check_output( ["git", "show", f"{sha}:{file_path}"], cwd=repo_path, text=True, stderr=subprocess.PIPE, ) else: file_content = None commit_data = { "sha": sha, "date": date, "file": file_path, "changes": clean_diff, } if file_content: commit_data["content"] = file_content commits.append(commit_data) except subprocess.CalledProcessError: continue except subprocess.CalledProcessError: continue if commits: # Sort commits by date commits.sort(key=lambda x: x["date"]) # Group commits by file for better analysis files_commits = {} for commit in commits: file_path = commit["file"] if file_path not in files_commits: files_commits[file_path] = [] files_commits[file_path].append(commit) commit_contents[repo_name] = { "core_files": list(core_files.keys()), "evolution": { "commit_count": len(commits), "commits_by_file": files_commits, }, } print(f"Processed {len(commits)} commits for {repo_name} core files") except Exception as e: print(f"Error analyzing repository {repo_name}: {str(e)}") continue return commit_contents def _select_best_targets( sources_data: Dict[str, Any], commits: Dict[str, Any] ) -> List[str]: """Selects repositories with sufficient history for analysis""" targets = [] for repo_name, repo_data in sources_data.items(): if ( len(commits.get(repo_name, [])) < 5 or repo_data["file_stats"]["file_count"] < 10 ): continue targets.append(repo_name) return targets def _analyze_activity_patterns(commit_times: List[datetime]) -> Dict[str, Any]: """Analyzes commit timing patterns""" if not commit_times: return { "frequency": { "commits_per_day": 0, "active_hours": [], "timezone_hint": "unknown", }, "burst_patterns": { "intensity": "low", "average_duration": "n/a", "frequency": "sporadic", }, } # Sort commit times commit_times.sort() # Calculate commits per day days_span = (commit_times[-1] - commit_times[0]).days or 1 commits_per_day = round(len(commit_times) / days_span, 2) # Analyze active hours hours = Counter([t.hour for t in commit_times]) active_hours = [ f"{h:02d}-{(h+1):02d}" for h, c in hours.most_common(3) if c > len(commit_times) * 0.1 ] # Estimate timezone from most active hours # NOTE: Unclear should show the closest timezone peak_hour = max(hours.items(), key=lambda x: x[1])[0] if 4 <= peak_hour <= 8: tz_hint = "UTC+8 to UTC+10" elif 8 <= peak_hour <= 12: tz_hint = "UTC+0 to UTC+2" elif 12 <= peak_hour <= 16: tz_hint = "UTC-6 to UTC-4" elif 16 <= peak_hour <= 20: tz_hint = "UTC-12 to UTC-8" else: tz_hint = "unclear" # Analyze burst patterns time_diffs = [] for i in range(1, len(commit_times)): diff = (commit_times[i] - commit_times[i - 1]).total_seconds() / 3600 time_diffs.append(diff) if time_diffs: avg_diff = statistics.mean(time_diffs) if avg_diff < 1: intensity = "high" elif avg_diff < 4: intensity = "moderate" else: intensity = "low" burst_duration = ( "few hours" if avg_diff < 4 else "day-length" if avg_diff < 24 else "multi-day" ) burst_frequency = ( "frequent" if commits_per_day > 3 else "regular" if commits_per_day > 1 else "sporadic" ) else: intensity = "low" burst_duration = "n/a" burst_frequency = "sporadic" return { "frequency": { "commits_per_day": commits_per_day, "active_hours": active_hours, "timezone_hint": tz_hint, }, "burst_patterns": { "intensity": intensity, "average_duration": burst_duration, "frequency": burst_frequency, }, } ##################################################################### # Project Preferences Analysis Module ##################################################################### def analyze_project_preferences(sources_data: Dict[str, Any]) -> Dict[str, Any]: """Analyzes project preferences and technology choices using LLM""" handler = create_handler() combined_results = {} for repo_name, repo_data in sources_data.items(): print(f"\nAnalyzing project preferences for repository: {repo_name}") # Create repository-specific prompt prompt = f""" PROJECT PREFERENCES ANALYSIS You are an expert in developer profiling and technical background analysis. Study this repository to build a comprehensive profile of the developer's technical preferences and knowledge domains. Repository: {repo_name} Languages: {repo_data.get('languages', 'Unknown')} Project Structure: {json.dumps(repo_data.get('structure', {}), indent=2)} Configuration Files: {json.dumps(repo_data.get('config_files', []), indent=2)} Core Files: {json.dumps(repo_data.get('samples', {}).get('core_files', {}), indent=2)} Dependencies: {json.dumps(repo_data.get('samples', {}).get('package_files', {}), indent=2)} Analyze deeply to infer: 1. Technical background and expertise level 2. Problem-solving approaches and mathematical foundations 3. Security awareness and defensive programming practices 4. Development environment preferences Generate detailed JSON analysis: {{ "developer_profile": {{ "expertise_domains": [ {{ "domain": string, // e.g. "security", "data_science", "web_development" "confidence": number, // 0-100 "evidence": [string] }} ], "knowledge_patterns": {{ "mathematical_foundations": [ {{ "area": string, // e.g. "graph_theory", "linear_algebra" "usage_examples": [string], "proficiency_level": string // "basic", "intermediate", "advanced" }} ], "algorithmic_preferences": {{ "common_approaches": [string], "complexity_awareness": string, "optimization_patterns": [string] }}, "security_awareness": {{ "level": string, // "low", "medium", "high" "defensive_patterns": [string], "security_considerations": [string] }} }} }}, "technical_choices": {{ "primary_languages": [ {{ "language": string, "proficiency_indicators": [string], "usage_patterns": [string] }} ], "frameworks": [ {{ "name": string, "purpose": string, "usage_patterns": [string], "implementation_depth": string // "basic", "intermediate", "advanced" }} ], "development_environment": {{ "likely_editor": string, "confidence": number, "tooling_preferences": [string], "evidence": [string] }}, "testing_approach": {{ "methodology": string, "frameworks": [string], "coverage_patterns": string }} }}, "project_organization": {{ "architecture_style": {{ "pattern": string, "consistency": number, "key_characteristics": [string] }}, "code_quality": {{ "standards_adherence": string, "documentation_level": string, "maintainability_indicators": [string] }}, "deployment_patterns": {{ "infrastructure_preferences": [string], "containerization_approach": string, "ci_cd_sophistication": string }} }} }} Important: 1. Base all inferences on concrete evidence in the code 2. Indicate confidence levels where uncertain 3. Provide specific examples supporting each conclusion 4. Focus on unique/distinctive patterns """ try: result = handler.generate_json_response(prompt) if result: combined_results[repo_name] = result except Exception as e: print(f"Error analyzing {repo_name}: {str(e)}") combined_results[repo_name] = {"error": str(e)} return combined_results ##################################################################### # Identity Confidence Calculation Module ##################################################################### def calculate_identity_confidence( sources_data: Dict[str, Any], code_style_results: Dict[str, Any], project_preferences: Dict[str, Any], temporal_patterns: Dict[str, Any] ) -> Dict[str, Any]: """Synthesizes all analysis results into a comprehensive developer identity profile""" handler = create_handler() # Create consolidated analysis data for the prompt analysis_data = { "repositories": sources_data, "code_style_analysis": code_style_results, "project_preferences": project_preferences, "temporal_patterns": temporal_patterns } prompt = f""" IDENTITY CONFIDENCE CALCULATION You are an expert in developer profiling and behavioral analysis. Synthesize all provided analysis data to create a comprehensive profile of the developer's identity, expertise, and behavioral patterns. Analysis Data: {json.dumps(analysis_data, indent=2)} Based on all provided repository data and previous analyses, create a detailed developer profile focusing on: 1. Technical expertise and knowledge domains 2. Problem-solving patterns and approaches 3. Development philosophy and practices 4. Unique identifiers and consistent traits Generate a single comprehensive identity profile JSON: {{ "developer_profile": {{ "expertise": {{ "primary_domains": [ {{ "domain": string, "proficiency_level": string, // "beginner", "intermediate", "expert" "evidence": [string], "confidence": number // 0-100 }} ], "technical_depth": {{ "languages": [ {{ "name": string, "mastery_level": string, "usage_patterns": [string], "notable_practices": [string] }} ], "frameworks": [ {{ "name": string, "usage_sophistication": string, "implementation_patterns": [string] }} ], "specialized_knowledge": [ {{ "area": string, // e.g. "cryptography", "distributed systems" "depth": string, "application_examples": [string] }} ] }} }}, "work_patterns": {{ "development_style": {{ "code_organization": string, "problem_solving_approach": string, "quality_focus": string, "distinctive_habits": [string] }}, "workflow_characteristics": {{ "development_cycle": string, "testing_approach": string, "refactoring_patterns": string, "documentation_style": string }}, "communication_style": {{ "code_commenting": string, "commit_messages": string, "documentation_quality": string }} }}, "behavioral_traits": {{ "strengths": [ {{ "trait": string, "evidence": [string], "consistency": number // 0-100 }} ], "areas_for_improvement": [ {{ "area": string, "indicators": [string] }} ], "unique_characteristics": [ {{ "trait": string, "significance": string, "supporting_patterns": [string] }} ] }}, "knowledge_breadth": {{ "technical_stack": {{ "preferred_technologies": [string], "experience_indicators": [string], "adoption_patterns": string }}, "domain_knowledge": {{ "primary_domains": [string], "depth_indicators": [string], "application_examples": [string] }}, "architectural_understanding": {{ "preferred_patterns": [string], "complexity_handling": string, "scalability_awareness": string }} }}, "identity_confidence": {{ "overall_score": number, // 0-100 "distinguishing_factors": [ {{ "factor": string, "significance": string, "supporting_evidence": [string] }} ], "consistency_metrics": {{ "coding_style": number, // 0-100 "problem_solving": number, // 0-100 "quality_standards": number // 0-100 }}, "pattern_reliability": {{ "stable_patterns": [string], "variable_patterns": [string], "context_dependencies": [string] }} }} }} }} Critical Analysis Requirements: 1. Base all conclusions on concrete evidence from the provided data 2. Focus on patterns that appear consistently across repositories 3. Highlight unique traits that distinguish this developer 4. Note any evolution in skills or practices 5. Indicate confidence levels for all major conclusions 6. Consider both technical and behavioral aspects 7. Identify any potential biases or limitations in the analysis """ try: result = handler.generate_json_response(prompt) except Exception as e: print(f"Error analyzing: {str(e)}") result = {"error": str(e)} return result ##################################################################### # Profile Visualizer Component ##################################################################### class ProfileVisualizer: """Creates visualizations for the developer profile""" def __init__(self): pass def create_radar_chart(self, profile: Dict[str, Any]) -> go.Figure: """Create a radar chart for developer skills""" if not profile or "identity_confidence" not in profile: return self._empty_chart("No profile data available") try: # Extract metrics from profile metrics = {} # Get consistency metrics if "identity_confidence" in profile and "consistency_metrics" in profile["identity_confidence"]: consistency = profile["identity_confidence"]["consistency_metrics"] for key, value in consistency.items(): if isinstance(value, (int, float)): metrics[key.replace("_", " ").title()] = value # Get expertise domains confidence if "expertise" in profile and "primary_domains" in profile["expertise"]: for domain in profile["expertise"]["primary_domains"]: if "domain" in domain and "confidence" in domain: metrics[domain["domain"]] = domain["confidence"] # Create radar chart if not metrics: return self._empty_chart("No metrics found in profile data") categories = list(metrics.keys()) values = list(metrics.values()) fig = go.Figure() fig.add_trace(go.Scatterpolar( r=values, theta=categories, fill='toself', name='Developer Profile', line_color='rgb(31, 119, 180)', fillcolor='rgba(31, 119, 180, 0.3)' )) fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 100] ) ), showlegend=False, title="Developer Profile Metrics", height=500 ) return fig except Exception as e: return self._empty_chart(f"Error creating chart: {str(e)}") def create_language_bar_chart(self, profile: Dict[str, Any]) -> go.Figure: """Create a bar chart for programming language proficiency""" if not profile or "expertise" not in profile: return self._empty_chart("No profile data available") try: languages = [] # Extract languages if "expertise" in profile and "technical_depth" in profile["expertise"]: if "languages" in profile["expertise"]["technical_depth"]: for lang in profile["expertise"]["technical_depth"]["languages"]: if "name" in lang and "mastery_level" in lang: # Convert mastery level to numeric value mastery_value = self._mastery_to_number(lang["mastery_level"]) languages.append({ "Language": lang["name"], "Mastery": mastery_value }) if not languages: return self._empty_chart("No language data found in profile") # Create DataFrame df = pd.DataFrame(languages) # Create bar chart fig = px.bar( df, x="Language", y="Mastery", color="Mastery", color_continuous_scale="viridis", title="Programming Language Proficiency" ) fig.update_layout( xaxis_title="Language", yaxis_title="Proficiency Level (0-10)", height=400 ) return fig except Exception as e: return self._empty_chart(f"Error creating chart: {str(e)}") def create_strengths_chart(self, profile: Dict[str, Any]) -> go.Figure: """Create a horizontal bar chart for developer strengths""" if not profile or "behavioral_traits" not in profile: return self._empty_chart("No profile data available") try: strengths = [] # Extract strengths if "behavioral_traits" in profile and "strengths" in profile["behavioral_traits"]: for strength in profile["behavioral_traits"]["strengths"]: if "trait" in strength and "consistency" in strength: strengths.append({ "Trait": strength["trait"], "Consistency": strength["consistency"] }) if not strengths: return self._empty_chart("No strengths data found in profile") # Create DataFrame df = pd.DataFrame(strengths) df = df.sort_values("Consistency", ascending=True) # Create horizontal bar chart fig = px.bar( df, y="Trait", x="Consistency", orientation='h', color="Consistency", color_continuous_scale="greens", title="Developer Strengths" ) fig.update_layout( xaxis_title="Consistency (%)", yaxis_title=None, height=400 ) return fig except Exception as e: return self._empty_chart(f"Error creating chart: {str(e)}") def create_html_summary(self, profile: Dict[str, Any]) -> str: """Create HTML summary with profile insights""" if not profile: return "

No profile data available

" try: html = [] # Overall score if "identity_confidence" in profile and "overall_score" in profile["identity_confidence"]: score = profile["identity_confidence"]["overall_score"] html.append(f"""
{score}%
Identity Confidence Score
""") # Primary domains if "expertise" in profile and "primary_domains" in profile["expertise"]: html.append("

Primary Expertise Domains

") html.append("") # Languages if "expertise" in profile and "technical_depth" in profile["expertise"] and "languages" in profile["expertise"]["technical_depth"]: html.append("

Languages

") html.append("") # Add work patterns if "work_patterns" in profile: html.append("

Work Patterns

") if "development_style" in profile["work_patterns"]: dev_style = profile["work_patterns"]["development_style"] html.append("") # Add behavioral traits if "behavioral_traits" in profile: html.append("

Behavioral Traits

") if "strengths" in profile["behavioral_traits"]: html.append("

Strengths

") html.append("") # Add identity confidence if "identity_confidence" in profile: html.append("

Identity Confidence

") conf = profile["identity_confidence"] html.append("") return "".join(html) except Exception as e: return f"

Error creating summary: {str(e)}

" def _mastery_to_number(self, mastery: str) -> float: """Convert mastery level text to a numeric value""" mastery = mastery.lower() if "expert" in mastery or "advanced" in mastery: return 9.0 elif "proficient" in mastery or "strong" in mastery: return 7.5 elif "intermediate" in mastery or "moderate" in mastery: return 5.0 elif "basic" in mastery or "beginner" in mastery: return 3.0 elif "novice" in mastery or "limited" in mastery: return 1.5 else: return 5.0 # Default moderate level def _empty_chart(self, message: str) -> go.Figure: """Create an empty chart with an error message""" fig = go.Figure() fig.add_annotation( x=0.5, y=0.5, xref="paper", yref="paper", text=message, showarrow=False, font=dict( size=14, color="#666" ) ) fig.update_layout( height=400, xaxis=dict(showticklabels=False, showgrid=False), yaxis=dict(showticklabels=False, showgrid=False) ) return fig def visualize_profile(self, profile_json: Dict[str, Any]) -> List[Any]: """Main method to generate all visualizations""" try: # Extract the developer profile if "identity_confidence" in profile_json and "developer_profile" in profile_json["identity_confidence"]: profile = profile_json["identity_confidence"]["developer_profile"] else: profile = None if not profile: return [ self._empty_chart("No developer profile data available"), self._empty_chart("No developer profile data available"), self._empty_chart("No developer profile data available"), "

No developer profile data available

" ] # Create visualizations radar_chart = self.create_radar_chart(profile) language_chart = self.create_language_bar_chart(profile) strengths_chart = self.create_strengths_chart(profile) html_summary = self.create_html_summary(profile) return [radar_chart, language_chart, strengths_chart, html_summary] except Exception as e: error_msg = f"Error visualizing profile: {str(e)}" return [ self._empty_chart(error_msg), self._empty_chart(error_msg), self._empty_chart(error_msg), f"

{error_msg}

" ] ##################################################################### # Main Application Class ##################################################################### class StyleAnalyzerApp: """Handles repository analysis and stylometric profiling with Gradio UI""" def __init__(self): self.base_path = Path("out") # Ensure the output directory exists os.makedirs(self.base_path, exist_ok=True) def analyze_github_user( self, username: str, repository_selection: str, github_token: str, gemini_api_key: str, progress=gr.Progress() ) -> Tuple[str, Dict, str]: """Main analysis function that will be called from the Gradio interface""" # Save API keys to environment variables or .env file os.environ["GH_TOKEN"] = github_token os.environ["GEMINI_API_KEY"] = gemini_api_key # Update .env file with open(".env", "w") as f: f.write(f"GH_TOKEN={github_token}\n") f.write(f"GEMINI_API_KEY={gemini_api_key}\n") # Create user path user_path = self.base_path / username report_path = user_path / "report.json" # Check if user data exists, if not, fetch it if not report_path.exists(): progress(0, desc="Fetching GitHub data...") try: result = subprocess.run( ["gh-analyze", username], check=True, capture_output=True, text=True ) progress(0.2, desc="GitHub data fetched successfully") log_output = f"GitHub data fetched successfully:\n{result.stdout}" except subprocess.CalledProcessError as e: error_msg = f"Error fetching GitHub data: {e.stderr}" return "Error", {}, error_msg else: progress(0.2, desc="Using existing GitHub data") log_output = "Using existing GitHub data\n" try: # Load report data progress(0.25, desc="Loading report data...") with open(report_path) as f: report_data = json.load(f) log_output += "Report data loaded successfully\n" # Select repositories to analyze progress(0.3, desc="Identifying repositories to analyze...") if repository_selection == "Smart Selection": repo_selector = RepositorySelector(str(self.base_path), username) sources_to_analyze = repo_selector.select_repositories(report_data) else: # Only single-contributor (owner) repos sources_to_analyze = [ obj["repo"] for obj in report_data.get("contributors", []) if obj["contributors"][0] == username and len(obj["contributors"]) == 1 ] repo_list = ", ".join(sources_to_analyze) log_output += f"Found {len(sources_to_analyze)} repositories to analyze: {repo_list}\n" # Analyze repository structure progress(0.4, desc="Analyzing repository structure...") sources_data = analyze_repository_structure(sources_to_analyze, user_path) log_output += "Repository structure analysis complete\n" # Analyze code style progress(0.5, desc="Analyzing code style patterns...") code_style = analyze_code_style(sources_data) log_output += "Code style analysis complete\n" # Analyze temporal patterns progress(0.6, desc="Analyzing temporal patterns...") temporal_patterns = analyze_temporal_patterns(sources_data, report_data) log_output += "Temporal patterns analysis complete\n" # Analyze project preferences progress(0.7, desc="Analyzing project preferences...") project_preferences = analyze_project_preferences(sources_data) log_output += "Project preferences analysis complete\n" # Calculate identity confidence progress(0.8, desc="Calculating identity confidence...") identity_confidence = calculate_identity_confidence( sources_data, code_style, project_preferences, temporal_patterns ) log_output += "Identity confidence calculation complete\n" # Generate final report progress(0.9, desc="Generating final report...") analysis_result = { "code_style_metrics": code_style, "temporal_patterns": temporal_patterns, "project_preferences": project_preferences, "identity_confidence": identity_confidence, } output_path = user_path / "stylometry_profile.json" with open(output_path, "w") as f: json.dump({"stylometric_profile": analysis_result}, f, indent=2) log_output += f"Report generated successfully and saved to {output_path}\n" progress(1.0, desc="Analysis complete!") return "Success", analysis_result, log_output except Exception as e: error_trace = traceback.format_exc() error_msg = f"Error during analysis: {str(e)}\n{error_trace}" return "Error", {}, error_msg ##################################################################### # Gradio Interface Setup ##################################################################### def add_visualization_tab(app, profile_output): """Add visualization tab to the main Gradio app""" visualizer = ProfileVisualizer() with gr.Tab("Visualizations"): with gr.Row(): with gr.Column(): gr.Markdown("### Developer Profile Metrics") radar_chart = gr.Plot(label="Skills Radar") with gr.Column(): gr.Markdown("### Technical Summary") html_summary = gr.HTML(label="Profile Summary") with gr.Row(): with gr.Column(): gr.Markdown("### Programming Languages") language_chart = gr.Plot(label="Language Proficiency") with gr.Column(): gr.Markdown("### Developer Strengths") strengths_chart = gr.Plot(label="Strengths Analysis") # Connect the profile output to the visualization components profile_output.change( fn=visualizer.visualize_profile, inputs=[profile_output], outputs=[radar_chart, language_chart, strengths_chart, html_summary] ) return app def create_gradio_interface(): """Create and configure the Gradio interface""" analyzer = StyleAnalyzerApp() with gr.Blocks(title="GitHub Stylometry Analyzer") as app: gr.Markdown("# GitHub Stylometry Analyzer") gr.Markdown(""" This tool analyzes a GitHub user's repositories to build a developer profile based on coding style, temporal patterns, project preferences, and calculated identity confidence. The analysis process takes 10-15 minutes for standard accounts. """) with gr.Row(): with gr.Column(scale=1): username_input = gr.Textbox(label="GitHub Username", placeholder="Enter GitHub username") repo_selection = gr.Radio( choices=["Smart Selection", "Owner Repositories Only"], label="Repository Selection Method", value="Smart Selection" ) github_token = gr.Textbox( label="GitHub API Token", placeholder="Enter your GitHub API token", type="password" ) gemini_api_key = gr.Textbox( label="Google Gemini API Key", placeholder="Enter your Gemini API key", type="password" ) analyze_button = gr.Button("Analyze", variant="primary") with gr.Accordion("Load configuration from file", open=False): gr.Markdown(""" You can load your GitHub token and Gemini API key from the .env file if present. This is useful if you don't want to enter them manually each time. """) load_config_button = gr.Button("Load from .env", variant="secondary") def load_from_env(): load_dotenv() gh_token = os.getenv("GH_TOKEN", "") gemini_key = os.getenv("GEMINI_API_KEY", "") return gh_token, gemini_key load_config_button.click( fn=load_from_env, inputs=[], outputs=[github_token, gemini_api_key] ) with gr.Column(scale=2): with gr.Tab("Profile Summary"): status_output = gr.Textbox(label="Status", value="Ready") profile_output = gr.JSON(label="Developer Profile") with gr.Tab("Logs"): log_output = gr.Textbox(label="Analysis Logs", lines=20) # Add the visualizations tab app = add_visualization_tab(app, profile_output) analyze_button.click( fn=analyzer.analyze_github_user, inputs=[username_input, repo_selection, github_token, gemini_api_key], outputs=[status_output, profile_output, log_output] ) return app ##################################################################### # Entry Point ##################################################################### def check_requirements(): """Check if required packages are installed""" required_packages = ["gradio", "google.generativeai", "plotly"] missing_packages = [] for package in required_packages: try: __import__(package.split(".")[0]) except ImportError: missing_packages.append(package.split(".")[0]) if missing_packages: print("! Missing required packages: " + ", ".join(missing_packages)) print("Please install required packages with:") print(f"pip install {' '.join(missing_packages)}") return False print("āœ“ Required packages already installed") return True def check_environment(): """Check if .env file exists and create it if needed""" env_file = Path(".env") if not env_file.exists(): print("! Creating .env file") with open(env_file, "w") as f: f.write("GH_TOKEN=\nGEMINI_API_KEY=\n") print("āœ“ Created .env file. You will need to provide API keys in the app.") else: print("āœ“ .env file already exists") def create_output_dir(): """Create output directory if it doesn't exist""" out_dir = Path("out") if not out_dir.exists(): out_dir.mkdir() print("āœ“ Created output directory") else: print("āœ“ Output directory already exists") def check_gh_analyze(): """Check if gh-analyze tool is installed""" try: subprocess.run(["gh-analyze", "--help"], capture_output=True, text=True) print("āœ“ gh-analyze tool is installed") return True except FileNotFoundError: print("! gh-analyze tool is not installed") print("Please install gh-fake-analyzer with:") print("pip install gh-fake-analyzer") return False def install_gh_analyze(): """Install gh-analyze tool if not present""" try: subprocess.run(["pip", "install", "gh-fake-analyzer"], check=True, capture_output=True) print("āœ“ Installed gh-fake-analyzer") return True except subprocess.CalledProcessError as e: print(f"! Error installing gh-fake-analyzer: {e.stderr}") return False def main(): """Main entry point for the application""" print("\n===========================================") print("GitHub Stylometry Analyzer Setup") print("===========================================\n") # Check and install requirements all_requirements_met = check_requirements() if not all_requirements_met: print("\nPlease install the missing packages and run the application again.") return # Check if gh-analyze is installed gh_analyze_installed = check_gh_analyze() if not gh_analyze_installed: print("\nAttempting to install gh-fake-analyzer...") install_success = install_gh_analyze() if not install_success: print("\nPlease install gh-fake-analyzer manually and run the application again.") return # Setup environment check_environment() create_output_dir() print("\n===========================================") print("Launching GitHub Stylometry Analyzer") print("===========================================\n") # Create and launch the Gradio interface app = create_gradio_interface() app.launch(share=True, debug=True) if __name__ == "__main__": main()