|
import gradio as gr |
|
import os |
|
import json |
|
import time |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
from pathlib import Path |
|
from typing import Dict, List, Any, Tuple, Optional, Iterator |
|
import traceback |
|
from dotenv import load_dotenv |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
import pandas as pd |
|
import numpy as np |
|
import re |
|
from collections import Counter, defaultdict |
|
import statistics |
|
from datetime import datetime |
|
from threading import Lock |
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception |
|
import google.generativeai as genai |
|
import requests |
|
|
|
|
|
|
|
|
|
|
|
RELEVANT_EXTENSIONS = { |
|
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", ".hpp", ".rb", |
|
".php", ".go", ".rs", ".swift", ".kt", ".kts", ".scala", ".pl", ".pm", ".r", |
|
".sh", ".bat", ".ps1", ".lua", ".sql", ".html", ".css", ".xml", ".json", ".yaml", |
|
".yml", ".md", ".ipynb", ".m", ".mm", ".vb", ".cs", ".fs", ".fsx", ".erl", ".hrl", |
|
".ex", ".exs", ".dart", ".groovy", ".jl", ".clj", ".cljs", ".coffee", ".litcoffee", |
|
".rkt", ".hs", ".lhs", ".ml", ".mli", ".nim", ".cr", ".nimble", ".hx", ".sol", ".vy" |
|
} |
|
|
|
LANGUAGE_EXTENSIONS = { |
|
".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", ".jsx": "React", |
|
".tsx": "React TypeScript", ".java": "Java", ".cpp": "C++", ".c": "C", |
|
".h": "C/C++ Header", ".hpp": "C++ Header", ".rb": "Ruby", ".php": "PHP", |
|
".go": "Go", ".rs": "Rust", ".swift": "Swift", ".kt": "Kotlin", |
|
".kts": "Kotlin Script", ".scala": "Scala", ".pl": "Perl", ".pm": "Perl Module", |
|
".r": "R", ".sh": "Shell", ".bat": "Batch", ".ps1": "PowerShell", ".lua": "Lua", |
|
".sql": "SQL", ".html": "HTML", ".css": "CSS", ".xml": "XML", ".json": "JSON", |
|
".yaml": "YAML", ".yml": "YAML", ".md": "Markdown", ".ipynb": "Jupyter Notebook", |
|
".m": "MATLAB/Objective-C", ".mm": "Objective-C++", ".vb": "Visual Basic", |
|
".cs": "C#", ".fs": "F#", ".fsx": "F# Script", ".erl": "Erlang", |
|
".hrl": "Erlang Header", ".ex": "Elixir", ".exs": "Elixir Script", ".dart": "Dart", |
|
".groovy": "Groovy", ".jl": "Julia", ".clj": "Clojure", ".cljs": "ClojureScript", |
|
".coffee": "CoffeeScript", ".litcoffee": "Literate CoffeeScript", ".rkt": "Racket", |
|
".hs": "Haskell", ".lhs": "Literate Haskell", ".ml": "OCaml", ".mli": "OCaml Interface", |
|
".nim": "Nim", ".cr": "Crystal", ".nimble": "Nimble", ".hx": "Haxe", |
|
".sol": "Solidity", ".vy": "Vyper" |
|
} |
|
|
|
PACKAGE_FILES = { |
|
"package.json": "npm", "requirements.txt": "pip", "setup.py": "python", |
|
"pom.xml": "maven", "build.gradle": "gradle", "Gemfile": "bundler", |
|
"Cargo.toml": "cargo", "go.mod": "go", "go.sum": "go", "composer.json": "composer", |
|
"pubspec.yaml": "dart", "Project.toml": "julia", "mix.exs": "elixir", |
|
"Makefile": "make", "CMakeLists.txt": "cmake", "SConstruct": "scons", |
|
"build.xml": "ant", "Rakefile": "rake", "shard.yml": "crystal", |
|
"nim.cfg": "nim", "default.nix": "nix", "stack.yaml": "haskell", |
|
"rebar.config": "erlang", "rebar.lock": "erlang", "project.clj": "leiningen", |
|
"deps.edn": "clojure", "build.boot": "boot", "build.sbt": "sbt", |
|
"Brewfile": "homebrew", "Vagrantfile": "vagrant", "Dockerfile": "docker", |
|
"docker-compose.yml": "docker-compose", "Procfile": "heroku", |
|
"tox.ini": "tox", "pyproject.toml": "poetry", "Pipfile": "pipenv", |
|
"Pipfile.lock": "pipenv", "environment.yml": "conda", "meta.yaml": "conda" |
|
} |
|
|
|
SYSTEM_PROMPT = "You are an experienced software engineer and data analyst tasked with building a report on developer's coding style, technical background, approach to problem solving, architectural thinking, technology choices, re-used frameworks etc,. There will be a set of prompts, divided into CODE STYLE ANALYSIS, TEMPORAL ANALYSIS, PROJECT PREFERENCES ANALYSIS and IDENTITY CONFIDENCE CALCULATION together with data samples provided to you. You'll summarize your findings from all of the modules in a single comprehensive IDENTITY CALCULATION CONFIDENCE output. Output a valid JSON, avoid including to many strings into the list objects! Follow the instructions provided for this section:" |
|
|
|
|
|
|
|
|
|
|
|
def _should_retry_error(exception: Exception) -> bool: |
|
"""Check if the exception is one we should retry""" |
|
error_str = str(exception).lower() |
|
return any( |
|
msg in error_str |
|
for msg in [ |
|
"resource exhaust", |
|
"429", |
|
"too many requests", |
|
"quota exceeded", |
|
"rate limit", |
|
] |
|
) |
|
|
|
class RateLimiter: |
|
"""Token bucket rate limiter implementation""" |
|
|
|
def __init__(self, rate: int, per: int): |
|
self.rate = rate |
|
self.per = per |
|
self.tokens = rate |
|
self.last_update = time.time() |
|
self.lock = Lock() |
|
|
|
def _add_tokens(self): |
|
"""Add tokens based on time elapsed""" |
|
now = time.time() |
|
time_passed = now - self.last_update |
|
new_tokens = time_passed * (self.rate / self.per) |
|
if new_tokens > 0: |
|
self.tokens = min(self.rate, self.tokens + new_tokens) |
|
self.last_update = now |
|
|
|
def acquire(self) -> float: |
|
""" |
|
Try to acquire a token. Returns the time to wait if no token is available. |
|
""" |
|
with self.lock: |
|
self._add_tokens() |
|
|
|
if self.tokens >= 1: |
|
self.tokens -= 1 |
|
return 0.0 |
|
|
|
|
|
wait_time = (1 - self.tokens) * (self.per / self.rate) |
|
return wait_time |
|
|
|
class PromptAnalyzer: |
|
"""Handles LLM prompting for code analysis tasks""" |
|
|
|
def __init__(self, api_key: Optional[str] = None): |
|
"""Initialize Gemini handler with API key""" |
|
self.api_key = api_key or os.getenv("GEMINI_API_KEY") |
|
if not self.api_key: |
|
raise ValueError( |
|
"Gemini API key must be provided or set in GEMINI_API_KEY environment variable" |
|
) |
|
|
|
genai.configure(api_key=self.api_key) |
|
self.model = genai.GenerativeModel(model_name="gemini-1.5-flash-001", system_instruction=SYSTEM_PROMPT) |
|
self.token_count = 0 |
|
self.prompt_count = 0 |
|
self.rate_limiter = RateLimiter(rate=5, per=60) |
|
|
|
def count_tokens(self, text: str) -> int: |
|
"""Count tokens in a text string""" |
|
try: |
|
token_count = self.model.count_tokens(text) |
|
return token_count.total_tokens |
|
except Exception as e: |
|
print(f"Warning: Error counting tokens: {str(e)}") |
|
|
|
return len(text) // 4 |
|
|
|
def _clean_json_response(self, response_text: str) -> str: |
|
"""Clean up response text to extract JSON content""" |
|
if "```" in response_text: |
|
match = re.search(r"```(?:json)?\n(.*?)```", response_text, re.DOTALL) |
|
if match: |
|
return match.group(1).strip() |
|
return response_text.strip() |
|
|
|
@retry( |
|
retry=retry_if_exception(_should_retry_error), |
|
stop=stop_after_attempt(5), |
|
wait=wait_exponential(multiplier=2, min=4, max=60), |
|
before_sleep=lambda retry_state: print( |
|
f"Retrying due to rate limit/resource exhaustion... (attempt {retry_state.attempt_number})" |
|
), |
|
) |
|
def _rate_limited_generate(self, prompt: str) -> Any: |
|
"""Handle rate-limited generation with waiting and resource exhaustion""" |
|
while True: |
|
wait_time = self.rate_limiter.acquire() |
|
|
|
if wait_time == 0: |
|
try: |
|
|
|
return self.model.generate_content(prompt) |
|
except Exception as e: |
|
if _should_retry_error(e): |
|
print( |
|
f"Rate limit/resource exhaustion error, will retry: {str(e)}" |
|
) |
|
raise |
|
else: |
|
print(f"Non-retryable error occurred: {str(e)}") |
|
raise |
|
|
|
print(f"Rate limit reached. Waiting {wait_time:.2f} seconds...") |
|
time.sleep(wait_time) |
|
|
|
@retry( |
|
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) |
|
) |
|
def generate_json_response(self, prompt: str) -> Dict[str, Any]: |
|
"""Generate and parse JSON response with robust error handling""" |
|
try: |
|
self.prompt_count += 1 |
|
print(f"\nπ Processing prompt #{self.prompt_count}...") |
|
|
|
|
|
token_count = self.model.count_tokens(prompt) |
|
input_tokens = token_count.total_tokens |
|
print(f"π Sending prompt with {input_tokens:,} tokens...") |
|
|
|
|
|
max_json_retries = 3 |
|
last_response = None |
|
last_error = None |
|
|
|
for attempt in range(max_json_retries): |
|
try: |
|
|
|
start_time = time.time() |
|
|
|
response = self._rate_limited_generate(prompt) |
|
elapsed_time = time.time() - start_time |
|
|
|
|
|
output_token_count = response.usage_metadata.total_token_count |
|
prompt_total_tokens = input_tokens + output_token_count |
|
self.token_count += prompt_total_tokens |
|
|
|
print(f"β Response received in {elapsed_time:.2f} seconds") |
|
print(f"π Prompt #{self.prompt_count} token usage:") |
|
print(f" - Input tokens: {input_tokens:,}") |
|
print(f" - Output tokens: {output_token_count:,}") |
|
print(f" - Total tokens: {prompt_total_tokens:,}") |
|
print(f"π Cumulative token usage: {self.token_count:,}") |
|
|
|
|
|
last_response = response.text |
|
result = self._clean_json_response(last_response) |
|
return json.loads(result) |
|
|
|
except json.JSONDecodeError as e: |
|
last_error = e |
|
|
|
if attempt < max_json_retries - 1: |
|
print(f"β οΈ Attempt {attempt + 1}/{max_json_retries}: JSON parsing failed, retrying with feedback...") |
|
|
|
|
|
error_feedback = f"""Your previous response could not be parsed as valid JSON. The specific error was: {str(e)} |
|
|
|
IMPORTANT: You must provide a response that: |
|
1. Contains ONLY valid JSON |
|
2. Has NO markdown code blocks |
|
3. Has NO explanatory text |
|
4. Follows the exact schema requested |
|
5. Uses proper JSON syntax (quotes, commas, brackets) |
|
6. AVOID falling into recursive loops when retrieving data from the prompt |
|
|
|
Here is the original prompt again: |
|
""" |
|
|
|
prompt = error_feedback + prompt |
|
continue |
|
else: |
|
print(f"β Failed to parse JSON after {max_json_retries} attempts") |
|
print("Last response received:") |
|
print(last_response) |
|
print(f"Last error: {str(last_error)}") |
|
raise |
|
|
|
except Exception as e: |
|
print(f"β Error in generate_json_response: {str(e)}") |
|
print("Stack trace:") |
|
print(traceback.format_exc()) |
|
if "last_response" in locals(): |
|
print("\nLast response received:") |
|
print(last_response) |
|
raise |
|
|
|
def create_handler(api_key: Optional[str] = None) -> PromptAnalyzer: |
|
""" |
|
Factory function to create a PromptAnalyzer instance. |
|
""" |
|
return PromptAnalyzer(api_key) |
|
|
|
|
|
|
|
|
|
|
|
def analyze_repository_structure(repo_names: List[str], user_path: Path) -> Dict[str, Any]: |
|
"""Processes source code from repositories to build LLM-friendly structure""" |
|
result = {} |
|
|
|
for repo_name in repo_names: |
|
username = user_path.name |
|
repo_path = ( |
|
user_path / f"{username}_{repo_name}.git" |
|
) |
|
|
|
print("processing,", repo_name, "path:", repo_path) |
|
|
|
if not repo_path.exists(): |
|
print("skipping") |
|
continue |
|
|
|
|
|
structure = _build_tree_structure(repo_path) |
|
|
|
|
|
language_counts = {} |
|
for file_info in _get_source_files(structure): |
|
extension = file_info["extension"].lower() |
|
if extension in LANGUAGE_EXTENSIONS: |
|
language = LANGUAGE_EXTENSIONS[extension] |
|
language_counts[language] = language_counts.get(language, 0) + 1 |
|
|
|
|
|
languages = sorted( |
|
language_counts.items(), |
|
key=lambda x: (-x[1], x[0]) |
|
) |
|
|
|
|
|
languages_str = ", ".join(lang for lang, _ in languages) |
|
|
|
result[repo_name] = { |
|
"structure": structure, |
|
"file_stats": _analyze_file_statistics(repo_path), |
|
"documentation": _extract_documentation(repo_path), |
|
"languages": languages_str |
|
} |
|
|
|
_extract_code_samples(result, user_path) |
|
|
|
return result |
|
|
|
def _build_tree_structure(repo_path: Path, files_per_dir: int = 20, max_depth: int = 3) -> Dict[str, Any]: |
|
""" |
|
Builds a tree representation of repository structure with limits. |
|
|
|
Args: |
|
repo_path: Repository path |
|
files_per_dir: Maximum number of files to include per directory (default: 20) |
|
max_depth: Maximum depth for nested directories (default: 3) |
|
""" |
|
def create_tree(path: Path, current_depth: int = 0) -> Dict[str, Any]: |
|
tree = { |
|
"type": "directory", |
|
"name": path.name, |
|
"path": str(path.relative_to(repo_path)), |
|
"children": [], |
|
} |
|
|
|
|
|
if current_depth >= max_depth: |
|
tree["children"] = [{ |
|
"type": "note", |
|
"message": f"Directory depth limit ({max_depth}) reached" |
|
}] |
|
return tree |
|
|
|
try: |
|
items = list(path.iterdir()) |
|
|
|
|
|
if path.name in { |
|
".git", |
|
"node_modules", |
|
"__pycache__", |
|
"build", |
|
"dist", |
|
}: |
|
return tree |
|
|
|
|
|
files = [ |
|
item for item in items |
|
if item.is_file() and item.suffix.lower() in RELEVANT_EXTENSIONS |
|
] |
|
if files: |
|
files = files[:files_per_dir] |
|
for item in files: |
|
tree["children"].append({ |
|
"type": "file", |
|
"name": item.name, |
|
"path": str(item.relative_to(repo_path)), |
|
"extension": item.suffix.lower(), |
|
"size": item.stat().st_size, |
|
}) |
|
|
|
|
|
dirs = [item for item in items if item.is_dir()] |
|
for item in dirs: |
|
subtree = create_tree(item, current_depth + 1) |
|
if subtree["children"]: |
|
tree["children"].append(subtree) |
|
|
|
except PermissionError: |
|
pass |
|
|
|
return tree |
|
|
|
return create_tree(repo_path) |
|
|
|
def _analyze_file_statistics(repo_path: Path) -> Dict[str, Any]: |
|
"""Analyzes file statistics for the repository""" |
|
|
|
file_count = 0 |
|
total_loc = 0 |
|
|
|
for ext in LANGUAGE_EXTENSIONS: |
|
for file_path in repo_path.rglob(f"*{ext}"): |
|
if not any(p in str(file_path) for p in RELEVANT_EXTENSIONS): |
|
continue |
|
|
|
try: |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
loc = len([l for l in content.splitlines() if l.strip()]) |
|
total_loc += loc |
|
file_count += 1 |
|
except (UnicodeDecodeError, PermissionError): |
|
continue |
|
|
|
return { |
|
"file_count": file_count, |
|
"total_loc": total_loc, |
|
} |
|
|
|
def _extract_documentation(repo_path: Path) -> Dict[str, Any]: |
|
"""Extracts documentation and metadata from repository""" |
|
docs = {} |
|
|
|
|
|
readme_paths = list(repo_path.glob("README*")) |
|
if readme_paths: |
|
try: |
|
with open(readme_paths[0], "r", encoding="utf-8") as f: |
|
docs["readme"] = f.read() |
|
except (UnicodeDecodeError, PermissionError): |
|
docs["readme"] = None |
|
|
|
docs["package_info"] = {} |
|
for filename, pkg_type in PACKAGE_FILES.items(): |
|
pkg_path = repo_path / filename |
|
if pkg_path.exists(): |
|
try: |
|
with open(pkg_path, "r", encoding="utf-8") as f: |
|
docs["package_info"][pkg_type] = f.read() |
|
except (UnicodeDecodeError, PermissionError): |
|
continue |
|
|
|
return docs |
|
|
|
def _extract_code_samples(sources_data: Dict[str, Any], user_path: Path, max_file_size: int = 100000) -> Dict[str, Any]: |
|
""" |
|
Extracts code samples for files identified as relevant by Gemini. |
|
Filters out files larger than max_file_size bytes. |
|
""" |
|
handler = create_handler() |
|
|
|
try: |
|
|
|
filtered_structures = {} |
|
for repo_name, repo_data in sources_data.items(): |
|
structure_copy = repo_data["structure"].copy() |
|
|
|
|
|
def filter_large_files(node): |
|
if node.get("type") == "directory": |
|
node["children"] = [ |
|
child for child in node.get("children", []) |
|
if child.get("type") == "directory" |
|
or (child.get("type") == "file" and child.get("size", 0) <= max_file_size) |
|
] |
|
for child in node["children"]: |
|
if child.get("type") == "directory": |
|
filter_large_files(child) |
|
return node |
|
|
|
|
|
filtered_structures[repo_name] = filter_large_files(structure_copy) |
|
|
|
|
|
prompt = f""" |
|
Analyze the repository structures and identify the most relevant files for codebase analysis. |
|
|
|
Focus on files that would reveal: |
|
1. Core functionality and architecture |
|
2. Main business logic |
|
3. Key utilities and helpers |
|
4. Configuration and setup |
|
|
|
Results will be used for further code analysis. Remember to include ALL relevant files, especially for fullstack applications. Be thorough but concise. Avoid including non-original code, e.g., dependencies or libraries code. AVOID INCLUDING MORE THAN 50 FILES PER REPOSITORY!!! TRY TO INCLUDE LESS THAN 20 IF POSSIBLE. CORE_FILES ARE THE PRIORITY, YOU CAN OMITT THE REST IF IT EXCEEDS THE LIMIT. |
|
|
|
Return a JSON object with these categories: |
|
|
|
{{ |
|
"repositories": {{ // MANDATORY highest level key |
|
"repo_name": {{ // MANDATORY name of the repository you are analyzing |
|
"core_files": ["list of most important files"], // MAX 20 files! |
|
"secondary_files": ["list of supporting files"], // MAX 20 files! |
|
"config_files": ["list of relevant config files"] // MAX 10 files! |
|
}}, |
|
"repo_name": {{...}}, |
|
}} |
|
}} |
|
|
|
CRITICAL REQUIREMENTS: |
|
|
|
Limit each list of most important files to a maximum of 20 files!!! |
|
|
|
Avoid including binary files or large data files. Only include files that are essential for understanding the codebase. Avoid including too many files, focus on the most important ones. Avoid including files that user did not write, e.g., dependencies or libraries code. Avoid including utility files that are not essential for understanding the codebase. Focus on including only source code, some repositories may have a lot of files, but only a few are essential for understanding the codebase. Do not include long .json files or other artifact type of files - notice "size" of the file in the structure. |
|
|
|
Repository structures: |
|
{json.dumps(filtered_structures, indent=2)} |
|
|
|
Only include files that exist in the structure. Return valid JSON format. |
|
DO NOT wrap the JSON in markdown code blocks. |
|
""" |
|
|
|
|
|
file_categories = handler.generate_json_response(prompt) |
|
|
|
if not file_categories: |
|
print("Skipping due to API error") |
|
return sources_data |
|
|
|
for repo_name, repo_data in sources_data.items(): |
|
repo_data["samples"] = { |
|
"core_files": {}, |
|
"utility_files": {}, |
|
"config_files": {} |
|
} |
|
|
|
|
|
all_files = { |
|
file_info["path"]: file_info |
|
for file_info in _get_source_files(repo_data["structure"]) |
|
if file_info.get("size", 0) <= max_file_size |
|
} |
|
|
|
for category in ["core_files", "utility_files", "config_files"]: |
|
for file_path in file_categories["repositories"].get(repo_name, {}).get(category, []): |
|
if file_path not in all_files: |
|
continue |
|
|
|
source_code = _read_source_file(user_path, repo_name, file_path) |
|
if source_code: |
|
repo_data["samples"][category][file_path] = source_code |
|
|
|
except Exception as e: |
|
print(f"Error processing code samples: {str(e)}") |
|
|
|
return sources_data |
|
|
|
def _get_source_files(structure: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
"""Helper to recursively extract source files from tree structure""" |
|
files = [] |
|
|
|
def traverse(node: Dict[str, Any]): |
|
if not isinstance(node, dict): |
|
return |
|
|
|
|
|
if node.get("type") == "file": |
|
files.append(node) |
|
|
|
|
|
elif node.get("type") == "directory" and "children" in node: |
|
for child in node.get("children", []): |
|
traverse(child) |
|
|
|
|
|
for value in node.values(): |
|
if isinstance(value, dict): |
|
traverse(value) |
|
elif isinstance(value, list): |
|
for item in value: |
|
if isinstance(item, dict): |
|
traverse(item) |
|
|
|
traverse(structure) |
|
|
|
|
|
return sorted(files, key=lambda x: x["path"]) |
|
|
|
def _read_source_file(user_path: Path, repo_name: str, file_path: str) -> Optional[str]: |
|
"""Reads source code from file with proper error handling""" |
|
try: |
|
|
|
full_path = user_path / f"{user_path.name}_{repo_name}.git" / file_path |
|
|
|
|
|
if not full_path.is_file(): |
|
return None |
|
|
|
|
|
if full_path.suffix.lower() not in RELEVANT_EXTENSIONS: |
|
return None |
|
|
|
|
|
encodings = ["utf-8", "latin-1", "cp1252"] |
|
|
|
for encoding in encodings: |
|
try: |
|
with open(full_path, "r", encoding=encoding) as f: |
|
content = f.read() |
|
|
|
|
|
if "\0" in content: |
|
return None |
|
|
|
return content |
|
except UnicodeDecodeError: |
|
continue |
|
except Exception as e: |
|
print(f"Error reading {full_path}: {str(e)}") |
|
return None |
|
|
|
return None |
|
|
|
except Exception as e: |
|
print(f"Error accessing {file_path}: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
class RepositorySelector: |
|
"""Handles intelligent repository selection and authorship analysis""" |
|
|
|
def __init__(self, base_path: str, username: str): |
|
self.base_path = Path(base_path) |
|
self.username = username |
|
self.user_path = self.base_path / username |
|
|
|
def select_repositories(self, report_data: Dict) -> List[str]: |
|
""" |
|
Main entry point for repository selection. |
|
Returns a list of repository names to analyze, including both best-scored repos |
|
and single-contributor repos. |
|
""" |
|
|
|
self.report_data = report_data |
|
|
|
|
|
repositories = self._analyze_repositories(report_data) |
|
print(f"Found {len(repositories)} repositories with activity") |
|
|
|
|
|
selected_repos = self._select_best_repositories(repositories) |
|
selected_repo_names = {repo["name"] for repo in selected_repos} |
|
|
|
|
|
single_contributor_repos = self._get_only_owner_sources() |
|
|
|
|
|
all_repo_names = selected_repo_names.union(single_contributor_repos) |
|
|
|
print(f"Added {len(all_repo_names) - len(selected_repo_names)} single-contributor repositories") |
|
print(f"Total repositories to analyze: {len(all_repo_names)}") |
|
|
|
|
|
self.repo_metadata = {} |
|
for repo in selected_repos: |
|
self.repo_metadata[repo["name"]] = { |
|
"contribution_files": repo["contribution_files"], |
|
"stats": repo["stats"] |
|
} |
|
|
|
|
|
for repo_name in single_contributor_repos: |
|
if repo_name not in self.repo_metadata: |
|
repo_path = self.user_path / f"{self.username}_{repo_name}.git" |
|
if repo_path.exists(): |
|
stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, [])) |
|
contribution_files = self._analyze_contribution_files(repo_path) |
|
self.repo_metadata[repo_name] = { |
|
"contribution_files": contribution_files, |
|
"stats": stats or {} |
|
} |
|
|
|
return list(all_repo_names) |
|
|
|
def _get_only_owner_sources(self) -> List[str]: |
|
"""Gets list of repositories to analyze. Only single-contributor repos are considered""" |
|
return [ |
|
obj["repo"] |
|
for obj in self.report_data.get("contributors", []) |
|
if obj["contributors"][0] == self.username and len(obj["contributors"]) == 1 |
|
] |
|
|
|
def _analyze_repositories(self, report_data: Dict) -> List[Dict[str, Any]]: |
|
"""Analyzes all repositories the user has contributed to""" |
|
repositories = [] |
|
|
|
|
|
contributed_repos = [ |
|
obj["repo"] for obj in report_data.get("contributors", []) |
|
if self.username in obj["contributors"] |
|
] |
|
|
|
|
|
commit_repos = list(report_data.get("commits", {}).keys()) |
|
|
|
|
|
all_repos = list(set(contributed_repos + commit_repos)) |
|
|
|
print(f"Analyzing {len(all_repos)} repositories...") |
|
|
|
for repo_name in all_repos: |
|
repo_path = self.user_path / f"{self.username}_{repo_name}.git" |
|
if not repo_path.exists(): |
|
continue |
|
|
|
repo_stats = self._get_repository_stats(repo_path, report_data.get("commits", {}).get(repo_name, [])) |
|
if not repo_stats: |
|
continue |
|
|
|
contribution_files = self._analyze_contribution_files(repo_path) |
|
|
|
|
|
if repo_stats["commit_count"] > 0 or contribution_files: |
|
repositories.append({ |
|
"name": repo_name, |
|
"stats": repo_stats, |
|
"contribution_files": contribution_files |
|
}) |
|
|
|
return repositories |
|
|
|
def _analyze_contribution_files(self, repo_path: Path) -> List[Dict[str, Any]]: |
|
"""Identifies files with user contributions, with more flexible criteria""" |
|
contribution_files = [] |
|
|
|
|
|
for file_path in repo_path.rglob('*'): |
|
relative_path = str(file_path.relative_to(repo_path)) |
|
|
|
|
|
if not self._is_analyzable_file(relative_path): |
|
continue |
|
|
|
try: |
|
|
|
author_stats = self._get_file_author_stats(repo_path, relative_path) |
|
|
|
|
|
if self.username in author_stats and author_stats[self.username] >= 20: |
|
contribution_files.append({ |
|
"path": relative_path, |
|
"contribution_percentage": author_stats[self.username] |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error analyzing {relative_path}: {str(e)}") |
|
continue |
|
|
|
return contribution_files |
|
|
|
def _get_repository_stats(self, repo_path: Path, repo_commits: List = None) -> Dict[str, Any]: |
|
"""Analyzes repository activity metrics with both git log and commits data""" |
|
try: |
|
|
|
result = subprocess.run( |
|
'git log --format=%at', |
|
cwd=repo_path, |
|
shell=True, |
|
capture_output=True, |
|
text=True |
|
) |
|
|
|
if result.returncode != 0: |
|
return {} |
|
|
|
timestamps = [int(ts) for ts in result.stdout.strip().split('\n') if ts] |
|
|
|
|
|
if repo_commits: |
|
for commit in repo_commits: |
|
commit_date = datetime.fromisoformat( |
|
commit["commit"]["author"]["date"].replace("Z", "+00:00") |
|
) |
|
timestamps.append(int(commit_date.timestamp())) |
|
|
|
if not timestamps: |
|
return {} |
|
|
|
first_commit = datetime.fromtimestamp(min(timestamps)) |
|
last_commit = datetime.fromtimestamp(max(timestamps)) |
|
commit_count = len(timestamps) |
|
time_period = (last_commit - first_commit).days + 1 |
|
|
|
return { |
|
"first_commit": first_commit.isoformat(), |
|
"last_commit": last_commit.isoformat(), |
|
"commit_count": commit_count, |
|
"commits_per_day": commit_count / max(time_period, 1), |
|
"active_days": time_period |
|
} |
|
|
|
except Exception as e: |
|
print(f"Error analyzing repository stats: {str(e)}") |
|
return {} |
|
|
|
def _get_file_author_stats(self, repo_path: Path, file_path: str) -> Dict[str, float]: |
|
"""Analyzes file authorship percentages""" |
|
try: |
|
result = subprocess.run( |
|
['git', 'blame', '--porcelain', file_path], |
|
cwd=repo_path, |
|
capture_output=True, |
|
text=True |
|
) |
|
|
|
if result.returncode != 0: |
|
return {} |
|
|
|
author_lines = defaultdict(int) |
|
total_lines = 0 |
|
|
|
for line in result.stdout.split('\n'): |
|
if line.startswith('author '): |
|
author = line.replace('author ', '', 1) |
|
author_lines[author] += 1 |
|
total_lines += 1 |
|
|
|
if total_lines == 0: |
|
return {} |
|
|
|
return { |
|
author: (count / total_lines * 100) |
|
for author, count in author_lines.items() |
|
} |
|
|
|
except Exception as e: |
|
print(f"Error getting authorship stats for {file_path}: {str(e)}") |
|
return {} |
|
|
|
def _select_best_repositories(self, repositories: List[Dict[str, Any]], |
|
max_repos: int = 15) -> List[Dict[str, Any]]: |
|
"""Selects optimal repositories using more balanced scoring""" |
|
if not repositories: |
|
return [] |
|
|
|
for repo in repositories: |
|
score = 0 |
|
stats = repo["stats"] |
|
|
|
|
|
last_commit = datetime.fromisoformat(stats["last_commit"]) |
|
days_since_last_commit = (datetime.now() - last_commit).days |
|
score += max(0, 35 - (days_since_last_commit / 30)) |
|
|
|
|
|
commit_score = min(35, (stats["commit_count"] * 2) + (stats["commits_per_day"] * 10)) |
|
score += commit_score |
|
|
|
|
|
|
|
contribution_files = repo["contribution_files"] |
|
if contribution_files: |
|
file_count = len(contribution_files) |
|
avg_contribution = sum(f["contribution_percentage"] for f in contribution_files) / file_count |
|
score += min(30, (file_count * 2) + (avg_contribution / 5)) |
|
else: |
|
|
|
score += min(15, stats["commit_count"] / 2) |
|
|
|
repo["analysis_score"] = score |
|
|
|
|
|
repositories.sort(key=lambda x: x["analysis_score"], reverse=True) |
|
selected = repositories[:max_repos] |
|
|
|
print(f"\nSelected {len(selected)} repositories:") |
|
for repo in selected: |
|
print(f"- {repo['name']} (score: {repo['analysis_score']:.2f})") |
|
|
|
return selected |
|
|
|
def _is_analyzable_file(self, file_path: str) -> bool: |
|
"""Determines if a file should be included in analysis""" |
|
path = Path(file_path) |
|
|
|
|
|
excluded_paths = { |
|
'node_modules', '__pycache__', 'build', 'dist', '.git', |
|
'vendor', 'third_party', 'external' |
|
} |
|
|
|
if any(part in excluded_paths for part in path.parts): |
|
return False |
|
|
|
|
|
ext = path.suffix.lower() |
|
if not ext: |
|
return False |
|
|
|
return ext in RELEVANT_EXTENSIONS |
|
|
|
|
|
|
|
|
|
|
|
def analyze_code_style(sources_data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Analyzes developer's coding style patterns for stylometric analysis""" |
|
|
|
handler = create_handler() |
|
combined_results = {} |
|
|
|
for repo_name, repo_data in sources_data.items(): |
|
print(f"\nAnalyzing repository: {repo_name}") |
|
|
|
prompt = f""" |
|
|
|
CODE STYLE ANALYSIS |
|
|
|
You are an expert in code stylometry and developer behavior analysis. Analyze this repository to create a detailed profile of the developer's coding patterns, preferences, and habits. |
|
|
|
Repository: {repo_name} |
|
|
|
Code samples and structure: |
|
{json.dumps(repo_data, indent=2)} |
|
|
|
Focus on identifying unique, individual coding patterns that could distinguish this developer's style. Analyze how they: |
|
- Structure their code and control flow |
|
- Handle data and state |
|
- Approach problem-solving |
|
- Maintain code quality |
|
- Handle edge cases and errors |
|
|
|
IMPORTANT CONSTRAINTS: |
|
- Maximum 10 patterns per list category |
|
- No repeating similar patterns |
|
- Use "Unknown" if pattern cannot be determined |
|
- Focus on distinctive, personal coding traits |
|
|
|
Generate a JSON profile with this EXACT structure: |
|
|
|
{{ |
|
"code_organization": {{ |
|
"file_structure": {{ |
|
"preferred_file_size": number, // Average lines per file |
|
"module_organization": string, // e.g. "feature-based", "layer-based", "domain-based" |
|
"separation_patterns": [string] // Common ways they separate concerns |
|
}}, |
|
"code_layout": {{ |
|
"indentation": {{ "type": string, "width": number }}, |
|
"line_length": {{ "average": number, "max_observed": number }}, |
|
"spacing_style": {{ |
|
"around_operators": string, |
|
"after_commas": boolean, |
|
"around_blocks": string |
|
}} |
|
}} |
|
}}, |
|
"naming_patterns": {{ |
|
"variables": {{ |
|
"primary_style": string, // e.g. "snake_case", "camelCase" |
|
"consistency_score": number, // 0-100 |
|
"length_preference": {{ "average": number, "range": [number, number] }}, |
|
"semantic_patterns": [string] // How they choose names, e.g. "verb_noun_pairs", "hungarian_notation" |
|
}}, |
|
"functions": {{ |
|
"primary_style": string, |
|
"common_prefixes": [string], |
|
"common_patterns": [string], |
|
"length_preference": {{ "average": number, "range": [number, number] }} |
|
}} |
|
}}, |
|
"coding_patterns": {{ |
|
"control_flow": {{ |
|
"preferred_loop_type": string, // e.g. "for", "while", "comprehension" |
|
"nesting_depth": {{ "average": number, "max_observed": number }}, |
|
"branching_patterns": [string], // e.g. "early returns", "guard clauses" |
|
"condition_complexity": {{ "average": number, "max_observed": number }} |
|
}}, |
|
"data_handling": {{ |
|
"preferred_structures": [string], // Favorite data structures |
|
"mutation_patterns": {{ |
|
"prefers_immutable": boolean, |
|
"common_patterns": [string] |
|
}}, |
|
"state_management": {{ |
|
"approach": string, // e.g. "functional", "stateful", "mixed" |
|
"patterns": [string] |
|
}} |
|
}} |
|
}}, |
|
"error_handling": {{ |
|
"strategy": string, // e.g. "defensive", "fail-fast", "hybrid" |
|
"patterns": [string], // Common error handling patterns |
|
"error_checking": {{ |
|
"input_validation": boolean, |
|
"null_checking": boolean, |
|
"type_checking": boolean |
|
}} |
|
}}, |
|
"code_quality": {{ |
|
"documentation": {{ |
|
"style": string, // e.g. "detailed", "minimal", "moderate" |
|
"coverage_ratio": number, // 0-100 |
|
"preferred_formats": [string] |
|
}}, |
|
"testing": {{ |
|
"approach": string, // e.g. "unit-heavy", "integration-focused", "minimal" |
|
"patterns": [string] |
|
}}, |
|
"complexity_metrics": {{ |
|
"cyclomatic_complexity": {{ "average": number, "max_observed": number }}, |
|
"cognitive_complexity": {{ "average": number, "max_observed": number }} |
|
}} |
|
}}, |
|
"distinctive_traits": {{ |
|
"unique_patterns": [string], // Highly individual coding patterns |
|
"favored_techniques": [string], // Preferred coding approaches |
|
"consistent_habits": [string] // Reliable behavioral patterns |
|
}} |
|
}} |
|
|
|
Critical requirements: |
|
1. OUTPUT ONLY VALID JSON |
|
2. NO markdown, NO comments, NO explanations |
|
3. Use EXACT key names shown |
|
4. All arrays MAXIMUM 10 items |
|
5. Use numbers for metrics where specified |
|
6. Use "Unknown" for undeterminable values |
|
""" |
|
|
|
try: |
|
result = handler.generate_json_response(prompt) |
|
if result: |
|
combined_results[repo_name] = result |
|
except Exception as e: |
|
print(f"Error analyzing {repo_name}: {str(e)}") |
|
combined_results[repo_name] = {"error": str(e)} |
|
|
|
return combined_results |
|
|
|
|
|
|
|
|
|
|
|
def analyze_temporal_patterns( |
|
sources_data: Dict[str, Any], report_data: Dict[str, Any] |
|
) -> Dict[str, Any]: |
|
"""Analyzes temporal patterns using both LLM and statistical analysis""" |
|
|
|
commits = report_data.get("commits", {}) |
|
|
|
|
|
handler = create_handler() |
|
combined_results = {} |
|
|
|
|
|
commit_times = [ |
|
datetime.fromisoformat( |
|
commit["commit"]["author"]["date"].replace("Z", "+00:00") |
|
) |
|
for repo_commits in commits.values() |
|
for commit in repo_commits |
|
] |
|
|
|
|
|
temporal_best_targets = _select_best_targets(sources_data, commits) |
|
commit_contents = _get_commit_contents(temporal_best_targets, sources_data) |
|
|
|
|
|
inspection_data = { |
|
"temporal_targets": temporal_best_targets, |
|
"commit_contents": commit_contents, |
|
} |
|
|
|
inspection_path = Path("out") / "temporal_analysis_contents.json" |
|
try: |
|
with open(inspection_path, "w", encoding="utf-8") as f: |
|
json.dump(inspection_data, f, indent=2) |
|
print(f"Saved temporal analysis data to {inspection_path}") |
|
except Exception as e: |
|
print(f"Error saving inspection data: {str(e)}") |
|
|
|
for repo_name, repo_data in sources_data.items(): |
|
if repo_name not in temporal_best_targets: |
|
continue |
|
|
|
print(f"\nAnalyzing temporal patterns for repository: {repo_name}") |
|
|
|
|
|
repo_changes = commit_contents.get(repo_name, []) |
|
if not repo_changes: |
|
continue |
|
|
|
|
|
prompt = f""" |
|
|
|
TEMPORAL ANALYSIS |
|
|
|
Analyze the temporal evolution of this codebase with focus on developer behavior patterns and code evolution. |
|
|
|
Repository: {repo_name} |
|
|
|
Code Evolution Data: |
|
{json.dumps(repo_changes, indent=2)} |
|
|
|
Generate detailed temporal analysis JSON: |
|
{{ |
|
"evolution_patterns": {{ |
|
"code_quality": {{ |
|
"progression": string, |
|
"refactoring_patterns": [ |
|
{{ |
|
"pattern": string, |
|
"frequency": string, |
|
"motivation": string |
|
}} |
|
], |
|
"complexity_trends": {{ |
|
"direction": string, |
|
"significant_changes": [string], |
|
"trigger_patterns": [string] |
|
}} |
|
}}, |
|
"development_cycles": {{ |
|
"commit_patterns": {{ |
|
"frequency": {{ |
|
"pattern": string, |
|
"active_hours": [string], |
|
"timezone_confidence": {{ |
|
"zone": string, |
|
"confidence": number, |
|
"evidence": [string] |
|
}} |
|
}}, |
|
"burst_patterns": [ |
|
{{ |
|
"pattern": string, |
|
"typical_duration": string, |
|
"characteristics": [string] |
|
}} |
|
] |
|
}}, |
|
"feature_development": {{ |
|
"typical_cycle": string, |
|
"iteration_patterns": [string], |
|
"testing_integration": string |
|
}} |
|
}}, |
|
"communication_patterns": {{ |
|
"pr_characteristics": {{ |
|
"detail_level": string, |
|
"discussion_style": string, |
|
"iteration_patterns": string |
|
}}, |
|
"documentation_evolution": {{ |
|
"frequency": string, |
|
"detail_trends": string, |
|
"update_patterns": string |
|
}} |
|
}} |
|
}}, |
|
"architectural_evolution": {{ |
|
"major_changes": [ |
|
{{ |
|
"change": string, |
|
"motivation": string, |
|
"impact": string |
|
}} |
|
], |
|
"improvement_patterns": {{ |
|
"refactoring_types": [string], |
|
"optimization_focus": [string], |
|
"maintenance_patterns": string |
|
}}, |
|
"technical_debt": {{ |
|
"accumulation_patterns": [string], |
|
"resolution_approaches": string, |
|
"prevention_strategies": string |
|
}} |
|
}} |
|
}} |
|
|
|
Requirements: |
|
1. Focus on developer behavior patterns |
|
2. Track evolution of coding style |
|
3. Identify clear timezone patterns |
|
4. Detail burst activity characteristics |
|
5. Analyze code quality progression |
|
""" |
|
|
|
|
|
try: |
|
result = handler.generate_json_response(prompt) |
|
if result: |
|
combined_results[repo_name] = result |
|
except Exception as e: |
|
print(f"Error analyze_temporal_patterns {repo_name}: {str(e)}") |
|
combined_results[repo_name] = {"error": str(e)} |
|
|
|
return { |
|
"commit_style_metrics": combined_results, |
|
"activity_patterns": _analyze_activity_patterns(commit_times), |
|
} |
|
|
|
def _clean_diff(diff_output: str) -> str: |
|
"""Clean up diff output to focus on actual changes""" |
|
lines = diff_output.split("\n") |
|
cleaned_lines = [] |
|
skip_next = False |
|
|
|
for line in lines: |
|
|
|
if ( |
|
line.startswith("diff --git") |
|
or line.startswith("index ") |
|
or line.startswith("new file mode ") |
|
or line.startswith("deleted file mode ") |
|
): |
|
continue |
|
|
|
|
|
if line.startswith("--- ") or line.startswith("+++ "): |
|
|
|
if "/dev/null" in line: |
|
continue |
|
|
|
cleaned_lines.append(line.split("/")[-1]) |
|
continue |
|
|
|
|
|
if ( |
|
line.startswith("@@ ") |
|
or line.startswith("+") |
|
or line.startswith("-") |
|
or line.startswith(" ") |
|
): |
|
cleaned_lines.append(line) |
|
|
|
return "\n".join(cleaned_lines) |
|
|
|
def _get_commit_contents( |
|
target_repos: List[str], sources_data: Dict[str, Any], max_diff_lines: int = 100 |
|
) -> Dict[str, List[Dict[str, Any]]]: |
|
""" |
|
Retrieves commit contents focusing on core files and limiting diff sizes. |
|
Now with cleaner diff output. |
|
""" |
|
commit_contents = {} |
|
|
|
|
|
username = None |
|
for repo in sources_data.values(): |
|
if repo.get('structure', {}).get('name', ''): |
|
|
|
username = repo['structure']['name'].split('_')[0] |
|
break |
|
|
|
if not username: |
|
raise ValueError("Could not determine username from repository structure") |
|
|
|
for repo_name in target_repos: |
|
|
|
repo_path_name = sources_data[repo_name]['structure'].get('name', '') |
|
|
|
if not repo_path_name: |
|
print(f"Warning: No path found for repository {repo_name}") |
|
continue |
|
|
|
|
|
repo_path = f"out/{username}/{repo_path_name}" |
|
|
|
|
|
core_files = sources_data[repo_name].get("samples", {}).get("core_files", {}) |
|
if not core_files: |
|
continue |
|
|
|
try: |
|
commits = [] |
|
for file_path, _ in core_files.items(): |
|
try: |
|
|
|
commit_history = subprocess.check_output( |
|
[ |
|
"git", |
|
"log", |
|
"--format=%H %ad", |
|
"--date=iso", |
|
"--reverse", |
|
"--", |
|
file_path, |
|
], |
|
cwd=repo_path, |
|
text=True, |
|
).splitlines() |
|
|
|
|
|
commits_to_process = [] |
|
if len(commit_history) > 0: |
|
commits_to_process.append(commit_history[0]) |
|
if len(commit_history) > 4: |
|
|
|
middle_idx = len(commit_history) // 2 |
|
commits_to_process.append(commit_history[middle_idx]) |
|
if len(commit_history) > 1: |
|
commits_to_process.append(commit_history[-1]) |
|
|
|
prev_content = None |
|
for commit_info in commits_to_process: |
|
sha, date = commit_info.split(" ", 1) |
|
try: |
|
|
|
diff_output = subprocess.check_output( |
|
["git", "show", "--format=", sha, "--", file_path], |
|
cwd=repo_path, |
|
text=True, |
|
stderr=subprocess.PIPE, |
|
) |
|
|
|
|
|
diff_lines = diff_output.splitlines() |
|
if len(diff_lines) > max_diff_lines: |
|
continue |
|
|
|
|
|
clean_diff = _clean_diff(diff_output) |
|
if not clean_diff.strip(): |
|
continue |
|
|
|
|
|
if prev_content is None: |
|
file_content = subprocess.check_output( |
|
["git", "show", f"{sha}:{file_path}"], |
|
cwd=repo_path, |
|
text=True, |
|
stderr=subprocess.PIPE, |
|
) |
|
prev_content = file_content |
|
elif commit_info == commits_to_process[-1]: |
|
file_content = subprocess.check_output( |
|
["git", "show", f"{sha}:{file_path}"], |
|
cwd=repo_path, |
|
text=True, |
|
stderr=subprocess.PIPE, |
|
) |
|
else: |
|
file_content = None |
|
|
|
commit_data = { |
|
"sha": sha, |
|
"date": date, |
|
"file": file_path, |
|
"changes": clean_diff, |
|
} |
|
|
|
if file_content: |
|
commit_data["content"] = file_content |
|
|
|
commits.append(commit_data) |
|
|
|
except subprocess.CalledProcessError: |
|
continue |
|
|
|
except subprocess.CalledProcessError: |
|
continue |
|
|
|
if commits: |
|
|
|
commits.sort(key=lambda x: x["date"]) |
|
|
|
|
|
files_commits = {} |
|
for commit in commits: |
|
file_path = commit["file"] |
|
if file_path not in files_commits: |
|
files_commits[file_path] = [] |
|
files_commits[file_path].append(commit) |
|
|
|
commit_contents[repo_name] = { |
|
"core_files": list(core_files.keys()), |
|
"evolution": { |
|
"commit_count": len(commits), |
|
"commits_by_file": files_commits, |
|
}, |
|
} |
|
|
|
print(f"Processed {len(commits)} commits for {repo_name} core files") |
|
|
|
except Exception as e: |
|
print(f"Error analyzing repository {repo_name}: {str(e)}") |
|
continue |
|
|
|
return commit_contents |
|
|
|
def _select_best_targets( |
|
sources_data: Dict[str, Any], commits: Dict[str, Any] |
|
) -> List[str]: |
|
"""Selects repositories with sufficient history for analysis""" |
|
targets = [] |
|
|
|
for repo_name, repo_data in sources_data.items(): |
|
if ( |
|
len(commits.get(repo_name, [])) < 5 |
|
or repo_data["file_stats"]["file_count"] < 10 |
|
): |
|
continue |
|
targets.append(repo_name) |
|
|
|
return targets |
|
|
|
|
|
def _analyze_activity_patterns(commit_times: List[datetime]) -> Dict[str, Any]: |
|
"""Analyzes commit timing patterns""" |
|
if not commit_times: |
|
return { |
|
"frequency": { |
|
"commits_per_day": 0, |
|
"active_hours": [], |
|
"timezone_hint": "unknown", |
|
}, |
|
"burst_patterns": { |
|
"intensity": "low", |
|
"average_duration": "n/a", |
|
"frequency": "sporadic", |
|
}, |
|
} |
|
|
|
|
|
commit_times.sort() |
|
|
|
|
|
days_span = (commit_times[-1] - commit_times[0]).days or 1 |
|
commits_per_day = round(len(commit_times) / days_span, 2) |
|
|
|
|
|
hours = Counter([t.hour for t in commit_times]) |
|
active_hours = [ |
|
f"{h:02d}-{(h+1):02d}" |
|
for h, c in hours.most_common(3) |
|
if c > len(commit_times) * 0.1 |
|
] |
|
|
|
|
|
|
|
peak_hour = max(hours.items(), key=lambda x: x[1])[0] |
|
if 4 <= peak_hour <= 8: |
|
tz_hint = "UTC+8 to UTC+10" |
|
elif 8 <= peak_hour <= 12: |
|
tz_hint = "UTC+0 to UTC+2" |
|
elif 12 <= peak_hour <= 16: |
|
tz_hint = "UTC-6 to UTC-4" |
|
elif 16 <= peak_hour <= 20: |
|
tz_hint = "UTC-12 to UTC-8" |
|
else: |
|
tz_hint = "unclear" |
|
|
|
|
|
time_diffs = [] |
|
for i in range(1, len(commit_times)): |
|
diff = (commit_times[i] - commit_times[i - 1]).total_seconds() / 3600 |
|
time_diffs.append(diff) |
|
|
|
if time_diffs: |
|
avg_diff = statistics.mean(time_diffs) |
|
if avg_diff < 1: |
|
intensity = "high" |
|
elif avg_diff < 4: |
|
intensity = "moderate" |
|
else: |
|
intensity = "low" |
|
|
|
burst_duration = ( |
|
"few hours" |
|
if avg_diff < 4 |
|
else "day-length" if avg_diff < 24 else "multi-day" |
|
) |
|
burst_frequency = ( |
|
"frequent" |
|
if commits_per_day > 3 |
|
else "regular" if commits_per_day > 1 else "sporadic" |
|
) |
|
else: |
|
intensity = "low" |
|
burst_duration = "n/a" |
|
burst_frequency = "sporadic" |
|
|
|
return { |
|
"frequency": { |
|
"commits_per_day": commits_per_day, |
|
"active_hours": active_hours, |
|
"timezone_hint": tz_hint, |
|
}, |
|
"burst_patterns": { |
|
"intensity": intensity, |
|
"average_duration": burst_duration, |
|
"frequency": burst_frequency, |
|
}, |
|
} |
|
|
|
|
|
|
|
|
|
|
|
def analyze_project_preferences(sources_data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Analyzes project preferences and technology choices using LLM""" |
|
|
|
handler = create_handler() |
|
combined_results = {} |
|
|
|
for repo_name, repo_data in sources_data.items(): |
|
print(f"\nAnalyzing project preferences for repository: {repo_name}") |
|
|
|
|
|
prompt = f""" |
|
|
|
PROJECT PREFERENCES ANALYSIS |
|
|
|
You are an expert in developer profiling and technical background analysis. Study this repository to build a comprehensive profile of the developer's technical preferences and knowledge domains. |
|
|
|
Repository: {repo_name} |
|
Languages: {repo_data.get('languages', 'Unknown')} |
|
|
|
Project Structure: |
|
{json.dumps(repo_data.get('structure', {}), indent=2)} |
|
|
|
Configuration Files: |
|
{json.dumps(repo_data.get('config_files', []), indent=2)} |
|
|
|
Core Files: |
|
{json.dumps(repo_data.get('samples', {}).get('core_files', {}), indent=2)} |
|
|
|
Dependencies: |
|
{json.dumps(repo_data.get('samples', {}).get('package_files', {}), indent=2)} |
|
|
|
Analyze deeply to infer: |
|
1. Technical background and expertise level |
|
2. Problem-solving approaches and mathematical foundations |
|
3. Security awareness and defensive programming practices |
|
4. Development environment preferences |
|
|
|
Generate detailed JSON analysis: |
|
{{ |
|
"developer_profile": {{ |
|
"expertise_domains": [ |
|
{{ |
|
"domain": string, // e.g. "security", "data_science", "web_development" |
|
"confidence": number, // 0-100 |
|
"evidence": [string] |
|
}} |
|
], |
|
"knowledge_patterns": {{ |
|
"mathematical_foundations": [ |
|
{{ |
|
"area": string, // e.g. "graph_theory", "linear_algebra" |
|
"usage_examples": [string], |
|
"proficiency_level": string // "basic", "intermediate", "advanced" |
|
}} |
|
], |
|
"algorithmic_preferences": {{ |
|
"common_approaches": [string], |
|
"complexity_awareness": string, |
|
"optimization_patterns": [string] |
|
}}, |
|
"security_awareness": {{ |
|
"level": string, // "low", "medium", "high" |
|
"defensive_patterns": [string], |
|
"security_considerations": [string] |
|
}} |
|
}} |
|
}}, |
|
"technical_choices": {{ |
|
"primary_languages": [ |
|
{{ |
|
"language": string, |
|
"proficiency_indicators": [string], |
|
"usage_patterns": [string] |
|
}} |
|
], |
|
"frameworks": [ |
|
{{ |
|
"name": string, |
|
"purpose": string, |
|
"usage_patterns": [string], |
|
"implementation_depth": string // "basic", "intermediate", "advanced" |
|
}} |
|
], |
|
"development_environment": {{ |
|
"likely_editor": string, |
|
"confidence": number, |
|
"tooling_preferences": [string], |
|
"evidence": [string] |
|
}}, |
|
"testing_approach": {{ |
|
"methodology": string, |
|
"frameworks": [string], |
|
"coverage_patterns": string |
|
}} |
|
}}, |
|
"project_organization": {{ |
|
"architecture_style": {{ |
|
"pattern": string, |
|
"consistency": number, |
|
"key_characteristics": [string] |
|
}}, |
|
"code_quality": {{ |
|
"standards_adherence": string, |
|
"documentation_level": string, |
|
"maintainability_indicators": [string] |
|
}}, |
|
"deployment_patterns": {{ |
|
"infrastructure_preferences": [string], |
|
"containerization_approach": string, |
|
"ci_cd_sophistication": string |
|
}} |
|
}} |
|
}} |
|
|
|
Important: |
|
1. Base all inferences on concrete evidence in the code |
|
2. Indicate confidence levels where uncertain |
|
3. Provide specific examples supporting each conclusion |
|
4. Focus on unique/distinctive patterns |
|
""" |
|
|
|
|
|
try: |
|
result = handler.generate_json_response(prompt) |
|
if result: |
|
combined_results[repo_name] = result |
|
except Exception as e: |
|
print(f"Error analyzing {repo_name}: {str(e)}") |
|
combined_results[repo_name] = {"error": str(e)} |
|
|
|
|
|
return combined_results |
|
|
|
|
|
|
|
|
|
|
|
def calculate_identity_confidence( |
|
sources_data: Dict[str, Any], |
|
code_style_results: Dict[str, Any], |
|
project_preferences: Dict[str, Any], |
|
temporal_patterns: Dict[str, Any] |
|
) -> Dict[str, Any]: |
|
"""Synthesizes all analysis results into a comprehensive developer identity profile""" |
|
|
|
handler = create_handler() |
|
|
|
|
|
analysis_data = { |
|
"repositories": sources_data, |
|
"code_style_analysis": code_style_results, |
|
"project_preferences": project_preferences, |
|
"temporal_patterns": temporal_patterns |
|
} |
|
|
|
|
|
prompt = f""" |
|
|
|
IDENTITY CONFIDENCE CALCULATION |
|
|
|
You are an expert in developer profiling and behavioral analysis. Synthesize all provided analysis data to create a comprehensive profile of the developer's identity, expertise, and behavioral patterns. |
|
|
|
Analysis Data: |
|
{json.dumps(analysis_data, indent=2)} |
|
|
|
Based on all provided repository data and previous analyses, create a detailed developer profile focusing on: |
|
1. Technical expertise and knowledge domains |
|
2. Problem-solving patterns and approaches |
|
3. Development philosophy and practices |
|
4. Unique identifiers and consistent traits |
|
|
|
Generate a single comprehensive identity profile JSON: |
|
|
|
{{ |
|
"developer_profile": {{ |
|
"expertise": {{ |
|
"primary_domains": [ |
|
{{ |
|
"domain": string, |
|
"proficiency_level": string, // "beginner", "intermediate", "expert" |
|
"evidence": [string], |
|
"confidence": number // 0-100 |
|
}} |
|
], |
|
"technical_depth": {{ |
|
"languages": [ |
|
{{ |
|
"name": string, |
|
"mastery_level": string, |
|
"usage_patterns": [string], |
|
"notable_practices": [string] |
|
}} |
|
], |
|
"frameworks": [ |
|
{{ |
|
"name": string, |
|
"usage_sophistication": string, |
|
"implementation_patterns": [string] |
|
}} |
|
], |
|
"specialized_knowledge": [ |
|
{{ |
|
"area": string, // e.g. "cryptography", "distributed systems" |
|
"depth": string, |
|
"application_examples": [string] |
|
}} |
|
] |
|
}} |
|
}}, |
|
"work_patterns": {{ |
|
"development_style": {{ |
|
"code_organization": string, |
|
"problem_solving_approach": string, |
|
"quality_focus": string, |
|
"distinctive_habits": [string] |
|
}}, |
|
"workflow_characteristics": {{ |
|
"development_cycle": string, |
|
"testing_approach": string, |
|
"refactoring_patterns": string, |
|
"documentation_style": string |
|
}}, |
|
"communication_style": {{ |
|
"code_commenting": string, |
|
"commit_messages": string, |
|
"documentation_quality": string |
|
}} |
|
}}, |
|
"behavioral_traits": {{ |
|
"strengths": [ |
|
{{ |
|
"trait": string, |
|
"evidence": [string], |
|
"consistency": number // 0-100 |
|
}} |
|
], |
|
"areas_for_improvement": [ |
|
{{ |
|
"area": string, |
|
"indicators": [string] |
|
}} |
|
], |
|
"unique_characteristics": [ |
|
{{ |
|
"trait": string, |
|
"significance": string, |
|
"supporting_patterns": [string] |
|
}} |
|
] |
|
}}, |
|
"knowledge_breadth": {{ |
|
"technical_stack": {{ |
|
"preferred_technologies": [string], |
|
"experience_indicators": [string], |
|
"adoption_patterns": string |
|
}}, |
|
"domain_knowledge": {{ |
|
"primary_domains": [string], |
|
"depth_indicators": [string], |
|
"application_examples": [string] |
|
}}, |
|
"architectural_understanding": {{ |
|
"preferred_patterns": [string], |
|
"complexity_handling": string, |
|
"scalability_awareness": string |
|
}} |
|
}}, |
|
"identity_confidence": {{ |
|
"overall_score": number, // 0-100 |
|
"distinguishing_factors": [ |
|
{{ |
|
"factor": string, |
|
"significance": string, |
|
"supporting_evidence": [string] |
|
}} |
|
], |
|
"consistency_metrics": {{ |
|
"coding_style": number, // 0-100 |
|
"problem_solving": number, // 0-100 |
|
"quality_standards": number // 0-100 |
|
}}, |
|
"pattern_reliability": {{ |
|
"stable_patterns": [string], |
|
"variable_patterns": [string], |
|
"context_dependencies": [string] |
|
}} |
|
}} |
|
}} |
|
}} |
|
|
|
Critical Analysis Requirements: |
|
1. Base all conclusions on concrete evidence from the provided data |
|
2. Focus on patterns that appear consistently across repositories |
|
3. Highlight unique traits that distinguish this developer |
|
4. Note any evolution in skills or practices |
|
5. Indicate confidence levels for all major conclusions |
|
6. Consider both technical and behavioral aspects |
|
7. Identify any potential biases or limitations in the analysis |
|
""" |
|
|
|
try: |
|
result = handler.generate_json_response(prompt) |
|
except Exception as e: |
|
print(f"Error analyzing: {str(e)}") |
|
result = {"error": str(e)} |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
class ProfileVisualizer: |
|
"""Creates visualizations for the developer profile""" |
|
|
|
def __init__(self): |
|
pass |
|
|
|
def create_radar_chart(self, profile: Dict[str, Any]) -> go.Figure: |
|
"""Create a radar chart for developer skills""" |
|
if not profile or "identity_confidence" not in profile: |
|
return self._empty_chart("No profile data available") |
|
|
|
try: |
|
|
|
metrics = {} |
|
|
|
|
|
if "identity_confidence" in profile and "consistency_metrics" in profile["identity_confidence"]: |
|
consistency = profile["identity_confidence"]["consistency_metrics"] |
|
for key, value in consistency.items(): |
|
if isinstance(value, (int, float)): |
|
metrics[key.replace("_", " ").title()] = value |
|
|
|
|
|
if "expertise" in profile and "primary_domains" in profile["expertise"]: |
|
for domain in profile["expertise"]["primary_domains"]: |
|
if "domain" in domain and "confidence" in domain: |
|
metrics[domain["domain"]] = domain["confidence"] |
|
|
|
|
|
if not metrics: |
|
return self._empty_chart("No metrics found in profile data") |
|
|
|
categories = list(metrics.keys()) |
|
values = list(metrics.values()) |
|
|
|
fig = go.Figure() |
|
|
|
fig.add_trace(go.Scatterpolar( |
|
r=values, |
|
theta=categories, |
|
fill='toself', |
|
name='Developer Profile', |
|
line_color='rgb(31, 119, 180)', |
|
fillcolor='rgba(31, 119, 180, 0.3)' |
|
)) |
|
|
|
fig.update_layout( |
|
polar=dict( |
|
radialaxis=dict( |
|
visible=True, |
|
range=[0, 100] |
|
) |
|
), |
|
showlegend=False, |
|
title="Developer Profile Metrics", |
|
height=500 |
|
) |
|
|
|
return fig |
|
|
|
except Exception as e: |
|
return self._empty_chart(f"Error creating chart: {str(e)}") |
|
|
|
def create_language_bar_chart(self, profile: Dict[str, Any]) -> go.Figure: |
|
"""Create a bar chart for programming language proficiency""" |
|
if not profile or "expertise" not in profile: |
|
return self._empty_chart("No profile data available") |
|
|
|
try: |
|
languages = [] |
|
|
|
|
|
if "expertise" in profile and "technical_depth" in profile["expertise"]: |
|
if "languages" in profile["expertise"]["technical_depth"]: |
|
for lang in profile["expertise"]["technical_depth"]["languages"]: |
|
if "name" in lang and "mastery_level" in lang: |
|
|
|
mastery_value = self._mastery_to_number(lang["mastery_level"]) |
|
languages.append({ |
|
"Language": lang["name"], |
|
"Mastery": mastery_value |
|
}) |
|
|
|
if not languages: |
|
return self._empty_chart("No language data found in profile") |
|
|
|
|
|
df = pd.DataFrame(languages) |
|
|
|
|
|
fig = px.bar( |
|
df, |
|
x="Language", |
|
y="Mastery", |
|
color="Mastery", |
|
color_continuous_scale="viridis", |
|
title="Programming Language Proficiency" |
|
) |
|
|
|
fig.update_layout( |
|
xaxis_title="Language", |
|
yaxis_title="Proficiency Level (0-10)", |
|
height=400 |
|
) |
|
|
|
return fig |
|
|
|
except Exception as e: |
|
return self._empty_chart(f"Error creating chart: {str(e)}") |
|
|
|
def create_strengths_chart(self, profile: Dict[str, Any]) -> go.Figure: |
|
"""Create a horizontal bar chart for developer strengths""" |
|
if not profile or "behavioral_traits" not in profile: |
|
return self._empty_chart("No profile data available") |
|
|
|
try: |
|
strengths = [] |
|
|
|
|
|
if "behavioral_traits" in profile and "strengths" in profile["behavioral_traits"]: |
|
for strength in profile["behavioral_traits"]["strengths"]: |
|
if "trait" in strength and "consistency" in strength: |
|
strengths.append({ |
|
"Trait": strength["trait"], |
|
"Consistency": strength["consistency"] |
|
}) |
|
|
|
if not strengths: |
|
return self._empty_chart("No strengths data found in profile") |
|
|
|
|
|
df = pd.DataFrame(strengths) |
|
df = df.sort_values("Consistency", ascending=True) |
|
|
|
|
|
fig = px.bar( |
|
df, |
|
y="Trait", |
|
x="Consistency", |
|
orientation='h', |
|
color="Consistency", |
|
color_continuous_scale="greens", |
|
title="Developer Strengths" |
|
) |
|
|
|
fig.update_layout( |
|
xaxis_title="Consistency (%)", |
|
yaxis_title=None, |
|
height=400 |
|
) |
|
|
|
return fig |
|
|
|
except Exception as e: |
|
return self._empty_chart(f"Error creating chart: {str(e)}") |
|
|
|
def create_html_summary(self, profile: Dict[str, Any]) -> str: |
|
"""Create HTML summary with profile insights""" |
|
if not profile: |
|
return "<p>No profile data available</p>" |
|
|
|
try: |
|
html = [] |
|
|
|
|
|
if "identity_confidence" in profile and "overall_score" in profile["identity_confidence"]: |
|
score = profile["identity_confidence"]["overall_score"] |
|
html.append(f""" |
|
<div style="text-align: center; margin-bottom: 20px;"> |
|
<div style="font-size: 48px; font-weight: bold; color: #1f77b4;">{score}%</div> |
|
<div style="font-size: 16px; color: #666;">Identity Confidence Score</div> |
|
</div> |
|
""") |
|
|
|
|
|
if "expertise" in profile and "primary_domains" in profile["expertise"]: |
|
html.append("<h3>Primary Expertise Domains</h3>") |
|
html.append("<ul>") |
|
for domain in profile["expertise"]["primary_domains"]: |
|
if "domain" in domain and "proficiency_level" in domain: |
|
html.append(f"<li><strong>{domain['domain']}</strong> ({domain['proficiency_level']})</li>") |
|
if "evidence" in domain and domain["evidence"]: |
|
html.append(" - Evidence: " + ", ".join(domain["evidence"][:3])) |
|
html.append("</ul>") |
|
|
|
|
|
if "expertise" in profile and "technical_depth" in profile["expertise"] and "languages" in profile["expertise"]["technical_depth"]: |
|
html.append("<h3>Languages</h3>") |
|
html.append("<ul>") |
|
for lang in profile["expertise"]["technical_depth"]["languages"]: |
|
html.append(f"<li><strong>{lang.get('name', 'Unknown')}</strong> ({lang.get('mastery_level', 'Unknown')})</li>") |
|
html.append("</ul>") |
|
|
|
|
|
if "work_patterns" in profile: |
|
html.append("<h3>Work Patterns</h3>") |
|
|
|
if "development_style" in profile["work_patterns"]: |
|
dev_style = profile["work_patterns"]["development_style"] |
|
html.append("<ul>") |
|
html.append(f"<li><strong>Code Organization</strong>: {dev_style.get('code_organization', 'Unknown')}</li>") |
|
html.append(f"<li><strong>Problem Solving</strong>: {dev_style.get('problem_solving_approach', 'Unknown')}</li>") |
|
html.append("</ul>") |
|
|
|
|
|
if "behavioral_traits" in profile: |
|
html.append("<h3>Behavioral Traits</h3>") |
|
|
|
if "strengths" in profile["behavioral_traits"]: |
|
html.append("<h4>Strengths</h4>") |
|
html.append("<ul>") |
|
for strength in profile["behavioral_traits"]["strengths"][:3]: |
|
html.append(f"<li><strong>{strength.get('trait', 'Unknown')}</strong> (Consistency: {strength.get('consistency', 0)}%)</li>") |
|
html.append("</ul>") |
|
|
|
|
|
if "identity_confidence" in profile: |
|
html.append("<h3>Identity Confidence</h3>") |
|
conf = profile["identity_confidence"] |
|
html.append("<ul>") |
|
|
|
if "consistency_metrics" in conf: |
|
metrics = conf["consistency_metrics"] |
|
html.append(f"<li><strong>Coding Style</strong>: {metrics.get('coding_style', 0)}%</li>") |
|
html.append(f"<li><strong>Problem Solving</strong>: {metrics.get('problem_solving', 0)}%</li>") |
|
html.append(f"<li><strong>Quality Standards</strong>: {metrics.get('quality_standards', 0)}%</li>") |
|
html.append("</ul>") |
|
|
|
return "".join(html) |
|
|
|
except Exception as e: |
|
return f"<p>Error creating summary: {str(e)}</p>" |
|
|
|
def _mastery_to_number(self, mastery: str) -> float: |
|
"""Convert mastery level text to a numeric value""" |
|
mastery = mastery.lower() |
|
|
|
if "expert" in mastery or "advanced" in mastery: |
|
return 9.0 |
|
elif "proficient" in mastery or "strong" in mastery: |
|
return 7.5 |
|
elif "intermediate" in mastery or "moderate" in mastery: |
|
return 5.0 |
|
elif "basic" in mastery or "beginner" in mastery: |
|
return 3.0 |
|
elif "novice" in mastery or "limited" in mastery: |
|
return 1.5 |
|
else: |
|
return 5.0 |
|
|
|
def _empty_chart(self, message: str) -> go.Figure: |
|
"""Create an empty chart with an error message""" |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
x=0.5, |
|
y=0.5, |
|
xref="paper", |
|
yref="paper", |
|
text=message, |
|
showarrow=False, |
|
font=dict( |
|
size=14, |
|
color="#666" |
|
) |
|
) |
|
fig.update_layout( |
|
height=400, |
|
xaxis=dict(showticklabels=False, showgrid=False), |
|
yaxis=dict(showticklabels=False, showgrid=False) |
|
) |
|
return fig |
|
|
|
def visualize_profile(self, profile_json: Dict[str, Any]) -> List[Any]: |
|
"""Main method to generate all visualizations""" |
|
try: |
|
|
|
if "identity_confidence" in profile_json and "developer_profile" in profile_json["identity_confidence"]: |
|
profile = profile_json["identity_confidence"]["developer_profile"] |
|
else: |
|
profile = None |
|
|
|
if not profile: |
|
return [ |
|
self._empty_chart("No developer profile data available"), |
|
self._empty_chart("No developer profile data available"), |
|
self._empty_chart("No developer profile data available"), |
|
"<p>No developer profile data available</p>" |
|
] |
|
|
|
|
|
radar_chart = self.create_radar_chart(profile) |
|
language_chart = self.create_language_bar_chart(profile) |
|
strengths_chart = self.create_strengths_chart(profile) |
|
html_summary = self.create_html_summary(profile) |
|
|
|
return [radar_chart, language_chart, strengths_chart, html_summary] |
|
|
|
except Exception as e: |
|
error_msg = f"Error visualizing profile: {str(e)}" |
|
return [ |
|
self._empty_chart(error_msg), |
|
self._empty_chart(error_msg), |
|
self._empty_chart(error_msg), |
|
f"<p>{error_msg}</p>" |
|
] |
|
|
|
|
|
|
|
|
|
|
|
class StyleAnalyzerApp: |
|
"""Handles repository analysis and stylometric profiling with Gradio UI""" |
|
|
|
def __init__(self): |
|
self.base_path = Path("out") |
|
|
|
os.makedirs(self.base_path, exist_ok=True) |
|
|
|
def analyze_github_user( |
|
self, |
|
username: str, |
|
repository_selection: str, |
|
github_token: str, |
|
gemini_api_key: str, |
|
progress=gr.Progress() |
|
) -> Tuple[str, Dict, str]: |
|
"""Main analysis function that will be called from the Gradio interface""" |
|
|
|
|
|
os.environ["GH_TOKEN"] = github_token |
|
os.environ["GEMINI_API_KEY"] = gemini_api_key |
|
|
|
|
|
with open(".env", "w") as f: |
|
f.write(f"GH_TOKEN={github_token}\n") |
|
f.write(f"GEMINI_API_KEY={gemini_api_key}\n") |
|
|
|
|
|
user_path = self.base_path / username |
|
report_path = user_path / "report.json" |
|
|
|
|
|
if not report_path.exists(): |
|
progress(0, desc="Fetching GitHub data...") |
|
try: |
|
result = subprocess.run( |
|
["gh-analyze", username], |
|
check=True, |
|
capture_output=True, |
|
text=True |
|
) |
|
progress(0.2, desc="GitHub data fetched successfully") |
|
log_output = f"GitHub data fetched successfully:\n{result.stdout}" |
|
except subprocess.CalledProcessError as e: |
|
error_msg = f"Error fetching GitHub data: {e.stderr}" |
|
return "Error", {}, error_msg |
|
else: |
|
progress(0.2, desc="Using existing GitHub data") |
|
log_output = "Using existing GitHub data\n" |
|
|
|
try: |
|
|
|
progress(0.25, desc="Loading report data...") |
|
with open(report_path) as f: |
|
report_data = json.load(f) |
|
log_output += "Report data loaded successfully\n" |
|
|
|
|
|
progress(0.3, desc="Identifying repositories to analyze...") |
|
if repository_selection == "Smart Selection": |
|
repo_selector = RepositorySelector(str(self.base_path), username) |
|
sources_to_analyze = repo_selector.select_repositories(report_data) |
|
else: |
|
|
|
sources_to_analyze = [ |
|
obj["repo"] |
|
for obj in report_data.get("contributors", []) |
|
if obj["contributors"][0] == username and len(obj["contributors"]) == 1 |
|
] |
|
|
|
repo_list = ", ".join(sources_to_analyze) |
|
log_output += f"Found {len(sources_to_analyze)} repositories to analyze: {repo_list}\n" |
|
|
|
|
|
progress(0.4, desc="Analyzing repository structure...") |
|
sources_data = analyze_repository_structure(sources_to_analyze, user_path) |
|
log_output += "Repository structure analysis complete\n" |
|
|
|
|
|
progress(0.5, desc="Analyzing code style patterns...") |
|
code_style = analyze_code_style(sources_data) |
|
log_output += "Code style analysis complete\n" |
|
|
|
|
|
progress(0.6, desc="Analyzing temporal patterns...") |
|
temporal_patterns = analyze_temporal_patterns(sources_data, report_data) |
|
log_output += "Temporal patterns analysis complete\n" |
|
|
|
|
|
progress(0.7, desc="Analyzing project preferences...") |
|
project_preferences = analyze_project_preferences(sources_data) |
|
log_output += "Project preferences analysis complete\n" |
|
|
|
|
|
progress(0.8, desc="Calculating identity confidence...") |
|
identity_confidence = calculate_identity_confidence( |
|
sources_data, |
|
code_style, |
|
project_preferences, |
|
temporal_patterns |
|
) |
|
log_output += "Identity confidence calculation complete\n" |
|
|
|
|
|
progress(0.9, desc="Generating final report...") |
|
analysis_result = { |
|
"code_style_metrics": code_style, |
|
"temporal_patterns": temporal_patterns, |
|
"project_preferences": project_preferences, |
|
"identity_confidence": identity_confidence, |
|
} |
|
|
|
output_path = user_path / "stylometry_profile.json" |
|
with open(output_path, "w") as f: |
|
json.dump({"stylometric_profile": analysis_result}, f, indent=2) |
|
|
|
log_output += f"Report generated successfully and saved to {output_path}\n" |
|
progress(1.0, desc="Analysis complete!") |
|
|
|
return "Success", analysis_result, log_output |
|
|
|
except Exception as e: |
|
error_trace = traceback.format_exc() |
|
error_msg = f"Error during analysis: {str(e)}\n{error_trace}" |
|
return "Error", {}, error_msg |
|
|
|
|
|
|
|
|
|
|
|
def add_visualization_tab(app, profile_output): |
|
"""Add visualization tab to the main Gradio app""" |
|
visualizer = ProfileVisualizer() |
|
|
|
with gr.Tab("Visualizations"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Developer Profile Metrics") |
|
radar_chart = gr.Plot(label="Skills Radar") |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Technical Summary") |
|
html_summary = gr.HTML(label="Profile Summary") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Programming Languages") |
|
language_chart = gr.Plot(label="Language Proficiency") |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Developer Strengths") |
|
strengths_chart = gr.Plot(label="Strengths Analysis") |
|
|
|
|
|
profile_output.change( |
|
fn=visualizer.visualize_profile, |
|
inputs=[profile_output], |
|
outputs=[radar_chart, language_chart, strengths_chart, html_summary] |
|
) |
|
|
|
return app |
|
|
|
def create_gradio_interface(): |
|
"""Create and configure the Gradio interface""" |
|
analyzer = StyleAnalyzerApp() |
|
|
|
with gr.Blocks(title="GitHub Stylometry Analyzer") as app: |
|
gr.Markdown("# GitHub Stylometry Analyzer") |
|
gr.Markdown(""" |
|
This tool analyzes a GitHub user's repositories to build a developer profile based on coding style, |
|
temporal patterns, project preferences, and calculated identity confidence. |
|
|
|
The analysis process takes 10-15 minutes for standard accounts. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
username_input = gr.Textbox(label="GitHub Username", placeholder="Enter GitHub username") |
|
repo_selection = gr.Radio( |
|
choices=["Smart Selection", "Owner Repositories Only"], |
|
label="Repository Selection Method", |
|
value="Smart Selection" |
|
) |
|
|
|
github_token = gr.Textbox( |
|
label="GitHub API Token", |
|
placeholder="Enter your GitHub API token", |
|
type="password" |
|
) |
|
|
|
gemini_api_key = gr.Textbox( |
|
label="Google Gemini API Key", |
|
placeholder="Enter your Gemini API key", |
|
type="password" |
|
) |
|
|
|
analyze_button = gr.Button("Analyze", variant="primary") |
|
|
|
with gr.Accordion("Load configuration from file", open=False): |
|
gr.Markdown(""" |
|
You can load your GitHub token and Gemini API key from the .env file if present. |
|
This is useful if you don't want to enter them manually each time. |
|
""") |
|
|
|
load_config_button = gr.Button("Load from .env", variant="secondary") |
|
|
|
def load_from_env(): |
|
load_dotenv() |
|
gh_token = os.getenv("GH_TOKEN", "") |
|
gemini_key = os.getenv("GEMINI_API_KEY", "") |
|
return gh_token, gemini_key |
|
|
|
load_config_button.click( |
|
fn=load_from_env, |
|
inputs=[], |
|
outputs=[github_token, gemini_api_key] |
|
) |
|
|
|
with gr.Column(scale=2): |
|
with gr.Tab("Profile Summary"): |
|
status_output = gr.Textbox(label="Status", value="Ready") |
|
profile_output = gr.JSON(label="Developer Profile") |
|
|
|
with gr.Tab("Logs"): |
|
log_output = gr.Textbox(label="Analysis Logs", lines=20) |
|
|
|
|
|
app = add_visualization_tab(app, profile_output) |
|
|
|
analyze_button.click( |
|
fn=analyzer.analyze_github_user, |
|
inputs=[username_input, repo_selection, github_token, gemini_api_key], |
|
outputs=[status_output, profile_output, log_output] |
|
) |
|
|
|
return app |
|
|
|
|
|
|
|
|
|
|
|
def check_requirements(): |
|
"""Check if required packages are installed""" |
|
required_packages = ["gradio", "google.generativeai", "plotly"] |
|
missing_packages = [] |
|
|
|
for package in required_packages: |
|
try: |
|
__import__(package.split(".")[0]) |
|
except ImportError: |
|
missing_packages.append(package.split(".")[0]) |
|
|
|
if missing_packages: |
|
print("! Missing required packages: " + ", ".join(missing_packages)) |
|
print("Please install required packages with:") |
|
print(f"pip install {' '.join(missing_packages)}") |
|
return False |
|
|
|
print("β Required packages already installed") |
|
return True |
|
|
|
def check_environment(): |
|
"""Check if .env file exists and create it if needed""" |
|
env_file = Path(".env") |
|
if not env_file.exists(): |
|
print("! Creating .env file") |
|
with open(env_file, "w") as f: |
|
f.write("GH_TOKEN=\nGEMINI_API_KEY=\n") |
|
print("β Created .env file. You will need to provide API keys in the app.") |
|
else: |
|
print("β .env file already exists") |
|
|
|
def create_output_dir(): |
|
"""Create output directory if it doesn't exist""" |
|
out_dir = Path("out") |
|
if not out_dir.exists(): |
|
out_dir.mkdir() |
|
print("β Created output directory") |
|
else: |
|
print("β Output directory already exists") |
|
|
|
def check_gh_analyze(): |
|
"""Check if gh-analyze tool is installed""" |
|
try: |
|
subprocess.run(["gh-analyze", "--help"], |
|
capture_output=True, |
|
text=True) |
|
print("β gh-analyze tool is installed") |
|
return True |
|
except FileNotFoundError: |
|
print("! gh-analyze tool is not installed") |
|
print("Please install gh-fake-analyzer with:") |
|
print("pip install gh-fake-analyzer") |
|
return False |
|
|
|
def install_gh_analyze(): |
|
"""Install gh-analyze tool if not present""" |
|
try: |
|
subprocess.run(["pip", "install", "gh-fake-analyzer"], |
|
check=True, |
|
capture_output=True) |
|
print("β Installed gh-fake-analyzer") |
|
return True |
|
except subprocess.CalledProcessError as e: |
|
print(f"! Error installing gh-fake-analyzer: {e.stderr}") |
|
return False |
|
|
|
def main(): |
|
"""Main entry point for the application""" |
|
print("\n===========================================") |
|
print("GitHub Stylometry Analyzer Setup") |
|
print("===========================================\n") |
|
|
|
|
|
all_requirements_met = check_requirements() |
|
|
|
if not all_requirements_met: |
|
print("\nPlease install the missing packages and run the application again.") |
|
return |
|
|
|
|
|
gh_analyze_installed = check_gh_analyze() |
|
if not gh_analyze_installed: |
|
print("\nAttempting to install gh-fake-analyzer...") |
|
install_success = install_gh_analyze() |
|
if not install_success: |
|
print("\nPlease install gh-fake-analyzer manually and run the application again.") |
|
return |
|
|
|
|
|
check_environment() |
|
create_output_dir() |
|
|
|
print("\n===========================================") |
|
print("Launching GitHub Stylometry Analyzer") |
|
print("===========================================\n") |
|
|
|
|
|
app = create_gradio_interface() |
|
app.launch(share=True, debug=True) |
|
|
|
if __name__ == "__main__": |
|
main() |