Arthur Passuello
initial commit
5e1a30c
"""
Graph analytics for Epic 2 Week 2.
This module provides analytics capabilities for knowledge graphs including
metrics collection, performance monitoring, and optional visualization
of graph structures and retrieval patterns.
"""
import logging
import time
from typing import List, Dict, Any, Optional, Set, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, Counter
import json
try:
import networkx as nx
import numpy as np
except ImportError:
nx = None
np = None
try:
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
PLOTLY_AVAILABLE = True
except ImportError:
PLOTLY_AVAILABLE = False
from .config.graph_config import GraphAnalyticsConfig
from .document_graph_builder import DocumentGraphBuilder
from .graph_retriever import GraphRetriever
logger = logging.getLogger(__name__)
@dataclass
class GraphMetrics:
"""Graph structure metrics."""
nodes: int = 0
edges: int = 0
density: float = 0.0
avg_degree: float = 0.0
connected_components: int = 0
diameter: Optional[int] = None
clustering_coefficient: float = 0.0
node_type_distribution: Dict[str, int] = field(default_factory=dict)
edge_type_distribution: Dict[str, int] = field(default_factory=dict)
@dataclass
class RetrievalMetrics:
"""Graph retrieval performance metrics."""
total_queries: int = 0
avg_latency_ms: float = 0.0
cache_hit_rate: float = 0.0
algorithm_usage: Dict[str, int] = field(default_factory=dict)
avg_results_per_query: float = 0.0
query_patterns: Dict[str, int] = field(default_factory=dict)
@dataclass
class AnalyticsSnapshot:
"""Complete analytics snapshot."""
timestamp: float
graph_metrics: GraphMetrics
retrieval_metrics: RetrievalMetrics
memory_usage_mb: float = 0.0
processing_stats: Dict[str, Any] = field(default_factory=dict)
class GraphAnalyticsError(Exception):
"""Raised when graph analytics operations fail."""
pass
class GraphAnalytics:
"""
Analytics and monitoring for graph-based retrieval.
This class provides comprehensive analytics capabilities including:
- Graph structure analysis and metrics
- Retrieval performance monitoring
- Query pattern analysis
- Optional visualization of graphs and metrics
- Time-series tracking of performance
Features:
- Real-time metrics collection
- Historical performance tracking
- Graph visualization (when Plotly is available)
- Performance trend analysis
- Memory usage monitoring
"""
def __init__(self, config: GraphAnalyticsConfig):
"""
Initialize graph analytics.
Args:
config: Analytics configuration
"""
self.config = config
# Metrics storage
self.snapshots: List[AnalyticsSnapshot] = []
self.current_metrics = {
"graph": GraphMetrics(),
"retrieval": RetrievalMetrics()
}
# Query tracking
self.query_history: List[Dict[str, Any]] = []
self.performance_history: List[Dict[str, Any]] = []
# Statistics
self.stats = {
"analytics_started": time.time(),
"snapshots_created": 0,
"metrics_collected": 0,
"visualizations_generated": 0
}
logger.info(f"GraphAnalytics initialized (visualization: {PLOTLY_AVAILABLE})")
def collect_graph_metrics(self, graph_builder: DocumentGraphBuilder) -> GraphMetrics:
"""
Collect comprehensive graph structure metrics.
Args:
graph_builder: Document graph builder
Returns:
Graph metrics object
"""
if not self.config.collect_graph_metrics:
return GraphMetrics()
try:
graph = graph_builder.get_graph()
if not graph or graph.number_of_nodes() == 0:
return GraphMetrics()
# Basic metrics
num_nodes = graph.number_of_nodes()
num_edges = graph.number_of_edges()
metrics = GraphMetrics(
nodes=num_nodes,
edges=num_edges
)
# Calculate density
if num_nodes > 1:
metrics.density = nx.density(graph)
# Calculate average degree
if num_nodes > 0:
degrees = dict(graph.degree())
metrics.avg_degree = sum(degrees.values()) / num_nodes
# Connected components
if graph.is_directed():
metrics.connected_components = nx.number_weakly_connected_components(graph)
else:
metrics.connected_components = nx.number_connected_components(graph)
# Diameter (for smaller graphs)
if num_nodes < 1000 and nx.is_connected(graph.to_undirected()):
try:
metrics.diameter = nx.diameter(graph.to_undirected())
except nx.NetworkXError:
metrics.diameter = None
# Clustering coefficient
if num_nodes > 2:
try:
metrics.clustering_coefficient = nx.average_clustering(graph.to_undirected())
except (nx.NetworkXError, ZeroDivisionError):
metrics.clustering_coefficient = 0.0
# Node type distribution
node_types = defaultdict(int)
for node_id in graph.nodes():
node_data = graph.nodes[node_id]
node_type = node_data.get("node_type", "unknown")
node_types[node_type] += 1
metrics.node_type_distribution = dict(node_types)
# Edge type distribution
edge_types = defaultdict(int)
for source, target, edge_data in graph.edges(data=True):
edge_type = edge_data.get("edge_type", "unknown")
edge_types[edge_type] += 1
metrics.edge_type_distribution = dict(edge_types)
self.current_metrics["graph"] = metrics
self.stats["metrics_collected"] += 1
logger.debug(f"Collected graph metrics: {num_nodes} nodes, {num_edges} edges")
return metrics
except Exception as e:
logger.error(f"Failed to collect graph metrics: {str(e)}")
return GraphMetrics()
def collect_retrieval_metrics(self, graph_retriever: GraphRetriever) -> RetrievalMetrics:
"""
Collect retrieval performance metrics.
Args:
graph_retriever: Graph retriever component
Returns:
Retrieval metrics object
"""
if not self.config.collect_retrieval_metrics:
return RetrievalMetrics()
try:
retriever_stats = graph_retriever.get_statistics()
metrics = RetrievalMetrics(
total_queries=retriever_stats.get("queries_processed", 0),
avg_latency_ms=retriever_stats.get("avg_search_time", 0.0) * 1000,
cache_hit_rate=retriever_stats.get("cache_hit_rate", 0.0),
algorithm_usage=dict(retriever_stats.get("algorithm_usage", {})),
avg_results_per_query=(
retriever_stats.get("total_results_returned", 0) /
max(retriever_stats.get("queries_processed", 1), 1)
)
)
self.current_metrics["retrieval"] = metrics
self.stats["metrics_collected"] += 1
return metrics
except Exception as e:
logger.error(f"Failed to collect retrieval metrics: {str(e)}")
return RetrievalMetrics()
def create_snapshot(self, graph_builder: DocumentGraphBuilder,
graph_retriever: GraphRetriever) -> AnalyticsSnapshot:
"""
Create a complete analytics snapshot.
Args:
graph_builder: Document graph builder
graph_retriever: Graph retriever component
Returns:
Analytics snapshot
"""
timestamp = time.time()
# Collect metrics
graph_metrics = self.collect_graph_metrics(graph_builder)
retrieval_metrics = self.collect_retrieval_metrics(graph_retriever)
# Get memory usage
memory_usage = self._estimate_memory_usage(graph_builder, graph_retriever)
# Get processing stats
processing_stats = {
"graph_builder": graph_builder.get_graph_statistics(),
"graph_retriever": graph_retriever.get_statistics()
}
snapshot = AnalyticsSnapshot(
timestamp=timestamp,
graph_metrics=graph_metrics,
retrieval_metrics=retrieval_metrics,
memory_usage_mb=memory_usage,
processing_stats=processing_stats
)
# Store snapshot
self.snapshots.append(snapshot)
self.stats["snapshots_created"] += 1
# Clean old snapshots based on retention policy
self._clean_old_snapshots()
logger.info(f"Created analytics snapshot ({len(self.snapshots)} total)")
return snapshot
def track_query(self, query: str, results_count: int, latency_ms: float,
algorithm_used: str, success: bool = True) -> None:
"""
Track an individual query for analysis.
Args:
query: Query string
results_count: Number of results returned
latency_ms: Query latency in milliseconds
algorithm_used: Algorithm used for retrieval
success: Whether query was successful
"""
query_record = {
"timestamp": time.time(),
"query": query,
"results_count": results_count,
"latency_ms": latency_ms,
"algorithm": algorithm_used,
"success": success,
"query_length": len(query),
"query_words": len(query.split())
}
self.query_history.append(query_record)
# Update query patterns
if hasattr(self.current_metrics["retrieval"], "query_patterns"):
query_type = self._classify_query(query)
self.current_metrics["retrieval"].query_patterns[query_type] = (
self.current_metrics["retrieval"].query_patterns.get(query_type, 0) + 1
)
# Limit history size
max_history = 10000
if len(self.query_history) > max_history:
self.query_history = self.query_history[-max_history//2:]
def generate_report(self) -> Dict[str, Any]:
"""
Generate comprehensive analytics report.
Returns:
Dictionary with analytics report
"""
if not self.snapshots:
return {"error": "No analytics data available"}
latest_snapshot = self.snapshots[-1]
# Basic metrics
report = {
"timestamp": latest_snapshot.timestamp,
"graph_metrics": {
"nodes": latest_snapshot.graph_metrics.nodes,
"edges": latest_snapshot.graph_metrics.edges,
"density": latest_snapshot.graph_metrics.density,
"avg_degree": latest_snapshot.graph_metrics.avg_degree,
"connected_components": latest_snapshot.graph_metrics.connected_components,
"clustering_coefficient": latest_snapshot.graph_metrics.clustering_coefficient,
"node_type_distribution": latest_snapshot.graph_metrics.node_type_distribution,
"edge_type_distribution": latest_snapshot.graph_metrics.edge_type_distribution
},
"retrieval_metrics": {
"total_queries": latest_snapshot.retrieval_metrics.total_queries,
"avg_latency_ms": latest_snapshot.retrieval_metrics.avg_latency_ms,
"cache_hit_rate": latest_snapshot.retrieval_metrics.cache_hit_rate,
"algorithm_usage": latest_snapshot.retrieval_metrics.algorithm_usage,
"avg_results_per_query": latest_snapshot.retrieval_metrics.avg_results_per_query
},
"performance": {
"memory_usage_mb": latest_snapshot.memory_usage_mb,
"snapshots_count": len(self.snapshots),
"queries_tracked": len(self.query_history)
}
}
# Historical trends
if len(self.snapshots) > 1:
report["trends"] = self._calculate_trends()
# Query analysis
if self.query_history:
report["query_analysis"] = self._analyze_queries()
return report
def visualize_graph(self, graph_builder: DocumentGraphBuilder,
layout: str = "spring", max_nodes: Optional[int] = None) -> Optional[str]:
"""
Generate graph visualization.
Args:
graph_builder: Document graph builder
layout: Layout algorithm (spring, circular, etc.)
max_nodes: Maximum nodes to visualize
Returns:
HTML string of visualization or None if disabled/failed
"""
if not self.config.enable_visualization or not PLOTLY_AVAILABLE:
return None
try:
graph = graph_builder.get_graph()
if not graph or graph.number_of_nodes() == 0:
return None
# Limit graph size for visualization
max_viz_nodes = max_nodes or self.config.visualization_max_nodes
if graph.number_of_nodes() > max_viz_nodes:
# Sample most connected nodes
node_degrees = dict(graph.degree())
top_nodes = sorted(node_degrees.items(), key=lambda x: x[1], reverse=True)
nodes_to_keep = [node for node, _ in top_nodes[:max_viz_nodes]]
graph = graph.subgraph(nodes_to_keep)
# Convert to undirected for layout
layout_graph = graph.to_undirected()
# Generate layout
if layout == "spring":
pos = nx.spring_layout(layout_graph)
elif layout == "circular":
pos = nx.circular_layout(layout_graph)
elif layout == "kamada_kawai":
pos = nx.kamada_kawai_layout(layout_graph)
else:
pos = nx.spring_layout(layout_graph)
# Create visualization
fig = self._create_plotly_graph(graph, pos)
self.stats["visualizations_generated"] += 1
return fig.to_html()
except Exception as e:
logger.error(f"Graph visualization failed: {str(e)}")
return None
def visualize_metrics(self) -> Optional[str]:
"""
Generate metrics visualization.
Returns:
HTML string of metrics visualization or None if disabled/failed
"""
if not self.config.enable_visualization or not PLOTLY_AVAILABLE or not self.snapshots:
return None
try:
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=("Graph Growth", "Retrieval Latency", "Node Types", "Algorithm Usage"),
specs=[[{"secondary_y": True}, {"secondary_y": False}],
[{"type": "pie"}, {"type": "pie"}]]
)
# Extract time series data
timestamps = [s.timestamp for s in self.snapshots]
nodes = [s.graph_metrics.nodes for s in self.snapshots]
edges = [s.graph_metrics.edges for s in self.snapshots]
latencies = [s.retrieval_metrics.avg_latency_ms for s in self.snapshots]
# Graph growth
fig.add_trace(
go.Scatter(x=timestamps, y=nodes, name="Nodes", line=dict(color="blue")),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=timestamps, y=edges, name="Edges", line=dict(color="red")),
row=1, col=1, secondary_y=True
)
# Retrieval latency
fig.add_trace(
go.Scatter(x=timestamps, y=latencies, name="Latency (ms)", line=dict(color="green")),
row=1, col=2
)
# Latest snapshot data for pie charts
latest = self.snapshots[-1]
# Node types pie chart
if latest.graph_metrics.node_type_distribution:
fig.add_trace(
go.Pie(
labels=list(latest.graph_metrics.node_type_distribution.keys()),
values=list(latest.graph_metrics.node_type_distribution.values()),
name="Node Types"
),
row=2, col=1
)
# Algorithm usage pie chart
if latest.retrieval_metrics.algorithm_usage:
fig.add_trace(
go.Pie(
labels=list(latest.retrieval_metrics.algorithm_usage.keys()),
values=list(latest.retrieval_metrics.algorithm_usage.values()),
name="Algorithm Usage"
),
row=2, col=2
)
fig.update_layout(
title="Graph Analytics Dashboard",
height=800
)
self.stats["visualizations_generated"] += 1
return fig.to_html()
except Exception as e:
logger.error(f"Metrics visualization failed: {str(e)}")
return None
def _estimate_memory_usage(self, graph_builder: DocumentGraphBuilder,
graph_retriever: GraphRetriever) -> float:
"""Estimate memory usage in MB."""
try:
# Get graph builder stats
builder_stats = graph_builder.get_graph_statistics()
builder_memory = builder_stats.get("memory_usage_mb", 0.0)
# Estimate retriever memory (cache size)
retriever_stats = graph_retriever.get_statistics()
cache_entries = len(graph_retriever.query_cache) if hasattr(graph_retriever, 'query_cache') else 0
retriever_memory = cache_entries * 0.01 # Rough estimate: 10KB per cache entry
# Analytics memory
analytics_memory = len(self.snapshots) * 0.001 # Rough estimate: 1KB per snapshot
return builder_memory + retriever_memory + analytics_memory
except Exception:
return 0.0
def _clean_old_snapshots(self) -> None:
"""Clean old snapshots based on retention policy."""
if not self.snapshots:
return
current_time = time.time()
retention_seconds = self.config.metrics_retention_hours * 3600
# Remove snapshots older than retention period
self.snapshots = [
s for s in self.snapshots
if current_time - s.timestamp <= retention_seconds
]
def _classify_query(self, query: str) -> str:
"""Classify query type for pattern analysis."""
query_lower = query.lower()
if any(word in query_lower for word in ["risc-v", "riscv", "isa"]):
return "architecture"
elif any(word in query_lower for word in ["extension", "implement", "support"]):
return "extension"
elif any(word in query_lower for word in ["protocol", "interface", "communication"]):
return "protocol"
elif len(query.split()) <= 2:
return "short"
elif len(query.split()) > 10:
return "long"
else:
return "general"
def _calculate_trends(self) -> Dict[str, Any]:
"""Calculate performance trends from historical data."""
if len(self.snapshots) < 2:
return {}
# Calculate growth rates
first = self.snapshots[0]
last = self.snapshots[-1]
time_diff = last.timestamp - first.timestamp
if time_diff == 0:
return {}
node_growth_rate = (last.graph_metrics.nodes - first.graph_metrics.nodes) / time_diff
edge_growth_rate = (last.graph_metrics.edges - first.graph_metrics.edges) / time_diff
# Average performance metrics
recent_snapshots = self.snapshots[-5:] # Last 5 snapshots
avg_latency = sum(s.retrieval_metrics.avg_latency_ms for s in recent_snapshots) / len(recent_snapshots)
avg_memory = sum(s.memory_usage_mb for s in recent_snapshots) / len(recent_snapshots)
return {
"node_growth_rate_per_second": node_growth_rate,
"edge_growth_rate_per_second": edge_growth_rate,
"avg_recent_latency_ms": avg_latency,
"avg_recent_memory_mb": avg_memory,
"total_time_span_hours": time_diff / 3600
}
def _analyze_queries(self) -> Dict[str, Any]:
"""Analyze query history for patterns."""
if not self.query_history:
return {}
# Query statistics
latencies = [q["latency_ms"] for q in self.query_history if q["success"]]
analysis = {
"total_queries": len(self.query_history),
"successful_queries": sum(1 for q in self.query_history if q["success"]),
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
"max_latency_ms": max(latencies) if latencies else 0,
"min_latency_ms": min(latencies) if latencies else 0
}
# Query length distribution
lengths = [q["query_length"] for q in self.query_history]
analysis["avg_query_length"] = sum(lengths) / len(lengths) if lengths else 0
# Algorithm usage
algorithms = [q["algorithm"] for q in self.query_history]
analysis["algorithm_distribution"] = dict(Counter(algorithms))
return analysis
def _create_plotly_graph(self, graph: nx.DiGraph, pos: Dict[str, Tuple[float, float]]) -> go.Figure:
"""Create Plotly graph visualization."""
# Extract edges
edge_x = []
edge_y = []
for edge in graph.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
# Create edge trace
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=0.5, color='#888'),
hoverinfo='none',
mode='lines'
)
# Extract nodes
node_x = []
node_y = []
node_text = []
node_colors = []
color_map = {
"concept": "blue",
"protocol": "red",
"architecture": "green",
"extension": "purple"
}
for node in graph.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_data = graph.nodes[node]
node_text.append(node_data.get("text", node))
node_type = node_data.get("node_type", "concept")
node_colors.append(color_map.get(node_type, "gray"))
# Create node trace
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
hoverinfo='text',
text=node_text,
textposition="middle center",
marker=dict(
size=10,
color=node_colors,
line=dict(width=2, color="black")
)
)
# Create figure
fig = go.Figure(
data=[edge_trace, node_trace],
layout=go.Layout(
title="Knowledge Graph Visualization",
titlefont_size=16,
showlegend=False,
hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
annotations=[ dict(
text="Graph Visualization",
showarrow=False,
xref="paper", yref="paper",
x=0.005, y=-0.002,
xanchor="left", yanchor="bottom",
font=dict(color="gray", size=12)
)],
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)
)
)
return fig
def export_data(self, format: str = "json") -> str:
"""
Export analytics data.
Args:
format: Export format ("json", "csv")
Returns:
Exported data as string
"""
if format == "json":
export_data = {
"snapshots": [
{
"timestamp": s.timestamp,
"graph_metrics": {
"nodes": s.graph_metrics.nodes,
"edges": s.graph_metrics.edges,
"density": s.graph_metrics.density,
"avg_degree": s.graph_metrics.avg_degree
},
"retrieval_metrics": {
"total_queries": s.retrieval_metrics.total_queries,
"avg_latency_ms": s.retrieval_metrics.avg_latency_ms,
"cache_hit_rate": s.retrieval_metrics.cache_hit_rate
},
"memory_usage_mb": s.memory_usage_mb
}
for s in self.snapshots
],
"query_history": self.query_history,
"stats": self.stats
}
return json.dumps(export_data, indent=2)
else:
return "Unsupported export format"
def get_statistics(self) -> Dict[str, Any]:
"""Get analytics statistics."""
return {
**self.stats,
"snapshots_count": len(self.snapshots),
"queries_tracked": len(self.query_history),
"current_memory_estimate_mb": (
self.snapshots[-1].memory_usage_mb if self.snapshots else 0.0
)
}