|
""" |
|
MCP (Model Context Protocol) Server for Laban Movement Analysis |
|
Provides tools for video movement analysis accessible to AI agents |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import os |
|
import tempfile |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional, Tuple |
|
from urllib.parse import urlparse |
|
import aiofiles |
|
import httpx |
|
|
|
from mcp.server import Server |
|
from mcp.server.stdio import stdio_server |
|
from mcp.types import ( |
|
Tool, |
|
TextContent, |
|
ImageContent, |
|
EmbeddedResource, |
|
ToolParameterType, |
|
ToolResponse, |
|
ToolResult, |
|
ToolError |
|
) |
|
|
|
|
|
import sys |
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
from gradio_labanmovementanalysis import LabanMovementAnalysis |
|
|
|
|
|
class LabanMCPServer: |
|
"""MCP Server for Laban Movement Analysis""" |
|
|
|
def __init__(self): |
|
self.server = Server("laban-movement-analysis") |
|
self.analyzer = LabanMovementAnalysis() |
|
self.analysis_cache = {} |
|
self.temp_dir = tempfile.mkdtemp(prefix="laban_mcp_") |
|
|
|
|
|
self._register_tools() |
|
|
|
def _register_tools(self): |
|
"""Register all available tools""" |
|
|
|
@self.server.tool() |
|
async def analyze_video( |
|
video_path: str, |
|
model: str = "mediapipe", |
|
enable_visualization: bool = False, |
|
include_keypoints: bool = False |
|
) -> ToolResult: |
|
""" |
|
Analyze movement in a video file using Laban Movement Analysis. |
|
|
|
Args: |
|
video_path: Path or URL to video file |
|
model: Pose estimation model ('mediapipe', 'movenet', 'yolo') |
|
enable_visualization: Generate annotated video output |
|
include_keypoints: Include raw keypoint data in JSON |
|
|
|
Returns: |
|
Movement analysis results and optional visualization |
|
""" |
|
try: |
|
|
|
if video_path.startswith(('http://', 'https://')): |
|
video_path = await self._download_video(video_path) |
|
|
|
|
|
json_output, viz_video = await asyncio.to_thread( |
|
self.analyzer.process_video, |
|
video_path, |
|
model=model, |
|
enable_visualization=enable_visualization, |
|
include_keypoints=include_keypoints |
|
) |
|
|
|
|
|
analysis_id = f"{Path(video_path).stem}_{datetime.now().isoformat()}" |
|
self.analysis_cache[analysis_id] = { |
|
"json_output": json_output, |
|
"viz_video": viz_video, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
response_data = { |
|
"analysis_id": analysis_id, |
|
"analysis": json_output, |
|
"visualization_path": viz_video if viz_video else None |
|
} |
|
|
|
return ToolResult( |
|
success=True, |
|
content=[TextContent(text=json.dumps(response_data, indent=2))] |
|
) |
|
|
|
except Exception as e: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Analysis failed: {str(e)}") |
|
) |
|
|
|
@self.server.tool() |
|
async def get_analysis_summary( |
|
analysis_id: str |
|
) -> ToolResult: |
|
""" |
|
Get a human-readable summary of a previous analysis. |
|
|
|
Args: |
|
analysis_id: ID of the analysis to summarize |
|
|
|
Returns: |
|
Summary of movement analysis |
|
""" |
|
try: |
|
if analysis_id not in self.analysis_cache: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Analysis ID '{analysis_id}' not found") |
|
) |
|
|
|
analysis_data = self.analysis_cache[analysis_id]["json_output"] |
|
|
|
|
|
summary = self._generate_summary(analysis_data) |
|
|
|
return ToolResult( |
|
success=True, |
|
content=[TextContent(text=summary)] |
|
) |
|
|
|
except Exception as e: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Summary generation failed: {str(e)}") |
|
) |
|
|
|
@self.server.tool() |
|
async def list_available_models() -> ToolResult: |
|
""" |
|
List available pose estimation models with their characteristics. |
|
|
|
Returns: |
|
Information about available models |
|
""" |
|
models_info = { |
|
"mediapipe": { |
|
"name": "MediaPipe Pose", |
|
"keypoints": 33, |
|
"dimensions": "3D", |
|
"optimization": "CPU", |
|
"best_for": "Single person, detailed analysis", |
|
"speed": "Fast" |
|
}, |
|
"movenet": { |
|
"name": "MoveNet", |
|
"keypoints": 17, |
|
"dimensions": "2D", |
|
"optimization": "Mobile/Edge", |
|
"best_for": "Real-time applications, mobile devices", |
|
"speed": "Very Fast" |
|
}, |
|
"yolo": { |
|
"name": "YOLO Pose", |
|
"keypoints": 17, |
|
"dimensions": "2D", |
|
"optimization": "GPU", |
|
"best_for": "Multi-person detection", |
|
"speed": "Fast (with GPU)" |
|
} |
|
} |
|
|
|
return ToolResult( |
|
success=True, |
|
content=[TextContent(text=json.dumps(models_info, indent=2))] |
|
) |
|
|
|
@self.server.tool() |
|
async def batch_analyze( |
|
video_paths: List[str], |
|
model: str = "mediapipe", |
|
parallel: bool = True |
|
) -> ToolResult: |
|
""" |
|
Analyze multiple videos in batch. |
|
|
|
Args: |
|
video_paths: List of video paths or URLs |
|
model: Pose estimation model to use |
|
parallel: Process videos in parallel |
|
|
|
Returns: |
|
Batch analysis results |
|
""" |
|
try: |
|
results = {} |
|
|
|
if parallel: |
|
|
|
tasks = [] |
|
for path in video_paths: |
|
task = self._analyze_single_video(path, model) |
|
tasks.append(task) |
|
|
|
analyses = await asyncio.gather(*tasks) |
|
|
|
for path, analysis in zip(video_paths, analyses): |
|
results[path] = analysis |
|
else: |
|
|
|
for path in video_paths: |
|
results[path] = await self._analyze_single_video(path, model) |
|
|
|
return ToolResult( |
|
success=True, |
|
content=[TextContent(text=json.dumps(results, indent=2))] |
|
) |
|
|
|
except Exception as e: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Batch analysis failed: {str(e)}") |
|
) |
|
|
|
@self.server.tool() |
|
async def compare_movements( |
|
analysis_id1: str, |
|
analysis_id2: str |
|
) -> ToolResult: |
|
""" |
|
Compare movement patterns between two analyzed videos. |
|
|
|
Args: |
|
analysis_id1: First analysis ID |
|
analysis_id2: Second analysis ID |
|
|
|
Returns: |
|
Comparison of movement metrics |
|
""" |
|
try: |
|
if analysis_id1 not in self.analysis_cache: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Analysis ID '{analysis_id1}' not found") |
|
) |
|
|
|
if analysis_id2 not in self.analysis_cache: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Analysis ID '{analysis_id2}' not found") |
|
) |
|
|
|
|
|
analysis1 = self.analysis_cache[analysis_id1]["json_output"] |
|
analysis2 = self.analysis_cache[analysis_id2]["json_output"] |
|
|
|
|
|
comparison = self._compare_analyses(analysis1, analysis2) |
|
|
|
return ToolResult( |
|
success=True, |
|
content=[TextContent(text=json.dumps(comparison, indent=2))] |
|
) |
|
|
|
except Exception as e: |
|
return ToolResult( |
|
success=False, |
|
error=ToolError(message=f"Comparison failed: {str(e)}") |
|
) |
|
|
|
async def _download_video(self, url: str) -> str: |
|
"""Download video from URL to temporary file""" |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(url) |
|
response.raise_for_status() |
|
|
|
|
|
filename = Path(urlparse(url).path).name or "video.mp4" |
|
temp_path = os.path.join(self.temp_dir, filename) |
|
|
|
async with aiofiles.open(temp_path, 'wb') as f: |
|
await f.write(response.content) |
|
|
|
return temp_path |
|
|
|
async def _analyze_single_video(self, path: str, model: str) -> Dict[str, Any]: |
|
"""Analyze a single video""" |
|
try: |
|
if path.startswith(('http://', 'https://')): |
|
path = await self._download_video(path) |
|
|
|
json_output, _ = await asyncio.to_thread( |
|
self.analyzer.process_video, |
|
path, |
|
model=model, |
|
enable_visualization=False |
|
) |
|
|
|
return { |
|
"status": "success", |
|
"analysis": json_output |
|
} |
|
except Exception as e: |
|
return { |
|
"status": "error", |
|
"error": str(e) |
|
} |
|
|
|
def _generate_summary(self, analysis_data: Dict[str, Any]) -> str: |
|
"""Generate human-readable summary from analysis data""" |
|
summary_parts = [] |
|
|
|
|
|
video_info = analysis_data.get("video_info", {}) |
|
summary_parts.append(f"Video Analysis Summary") |
|
summary_parts.append(f"Duration: {video_info.get('duration_seconds', 0):.1f} seconds") |
|
summary_parts.append(f"Resolution: {video_info.get('width', 0)}x{video_info.get('height', 0)}") |
|
summary_parts.append("") |
|
|
|
|
|
movement_summary = analysis_data.get("movement_analysis", {}).get("summary", {}) |
|
|
|
|
|
direction_data = movement_summary.get("direction", {}) |
|
dominant_direction = direction_data.get("dominant", "unknown") |
|
summary_parts.append(f"Dominant Movement Direction: {dominant_direction}") |
|
|
|
|
|
intensity_data = movement_summary.get("intensity", {}) |
|
dominant_intensity = intensity_data.get("dominant", "unknown") |
|
summary_parts.append(f"Movement Intensity: {dominant_intensity}") |
|
|
|
|
|
speed_data = movement_summary.get("speed", {}) |
|
dominant_speed = speed_data.get("dominant", "unknown") |
|
summary_parts.append(f"Movement Speed: {dominant_speed}") |
|
|
|
|
|
segments = movement_summary.get("movement_segments", []) |
|
if segments: |
|
summary_parts.append(f"\nMovement Segments: {len(segments)}") |
|
for i, segment in enumerate(segments[:3]): |
|
start_time = segment.get("start_time", 0) |
|
end_time = segment.get("end_time", 0) |
|
movement_type = segment.get("movement_type", "unknown") |
|
summary_parts.append(f" Segment {i+1}: {movement_type} ({start_time:.1f}s - {end_time:.1f}s)") |
|
|
|
return "\n".join(summary_parts) |
|
|
|
def _compare_analyses(self, analysis1: Dict, analysis2: Dict) -> Dict[str, Any]: |
|
"""Compare two movement analyses""" |
|
comparison = { |
|
"video1_info": analysis1.get("video_info", {}), |
|
"video2_info": analysis2.get("video_info", {}), |
|
"metric_comparison": {} |
|
} |
|
|
|
|
|
summary1 = analysis1.get("movement_analysis", {}).get("summary", {}) |
|
summary2 = analysis2.get("movement_analysis", {}).get("summary", {}) |
|
|
|
|
|
dir1 = summary1.get("direction", {}) |
|
dir2 = summary2.get("direction", {}) |
|
comparison["metric_comparison"]["direction"] = { |
|
"video1_dominant": dir1.get("dominant", "unknown"), |
|
"video2_dominant": dir2.get("dominant", "unknown"), |
|
"match": dir1.get("dominant") == dir2.get("dominant") |
|
} |
|
|
|
|
|
int1 = summary1.get("intensity", {}) |
|
int2 = summary2.get("intensity", {}) |
|
comparison["metric_comparison"]["intensity"] = { |
|
"video1_dominant": int1.get("dominant", "unknown"), |
|
"video2_dominant": int2.get("dominant", "unknown"), |
|
"match": int1.get("dominant") == int2.get("dominant") |
|
} |
|
|
|
|
|
speed1 = summary1.get("speed", {}) |
|
speed2 = summary2.get("speed", {}) |
|
comparison["metric_comparison"]["speed"] = { |
|
"video1_dominant": speed1.get("dominant", "unknown"), |
|
"video2_dominant": speed2.get("dominant", "unknown"), |
|
"match": speed1.get("dominant") == speed2.get("dominant") |
|
} |
|
|
|
return comparison |
|
|
|
async def run(self): |
|
"""Run the MCP server""" |
|
async with stdio_server() as (read_stream, write_stream): |
|
await self.server.run(read_stream, write_stream) |
|
|
|
|
|
async def main(): |
|
"""Main entry point""" |
|
server = LabanMCPServer() |
|
await server.run() |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |