|
|
|
"""main.py |
|
|
|
Automatically generated by Colab. |
|
|
|
Original file is located at |
|
https://colab.research.google.com/drive/1dfhQA9-peYN1-9q7ueAUF974jYXx3bKQ |
|
""" |
|
|
|
from typing import Dict, Any, Optional, List |
|
from fastapi import FastAPI |
|
from pydantic import BaseModel, Field |
|
from tools import get_tools |
|
from agent import ( |
|
create_agent, |
|
run_infra_resilience_agent, |
|
ReasoningTrace |
|
) |
|
|
|
app = FastAPI( |
|
title="InfraResilience Agent API", |
|
version="0.1.0", |
|
description="Modernization + Resilience planning agent (LangChain + MCP + Tools)." |
|
) |
|
|
|
|
|
class AnalyzeStackRequest(BaseModel): |
|
"""Input contract for /analyze-stack.""" |
|
legacy_stack: Dict[str, Any] = Field(..., description="Legacy stack as a dict (scheduler, language, architecture, dependencies, etc.)") |
|
outage_scenario: Optional[str] = Field(None, description="Optional failure scenario to plan resilience tests (e.g., 'Oracle DB down 30 min').") |
|
|
|
class AnalyzeStackResponse(BaseModel): |
|
"""Output contract for /analyze-stack.""" |
|
modernization_plan: List[str] |
|
resilience_strategy: List[str] |
|
reasoning_summary: str |
|
reasoning_trace: List[str] |
|
|
|
|
|
TOOLS = get_tools() |
|
AGENT = create_agent(TOOLS) |
|
|
|
|
|
@app.post("/analyze-stack", response_model=AnalyzeStackResponse) |
|
def analyze_stack(payload: AnalyzeStackRequest) -> AnalyzeStackResponse: |
|
""" |
|
Accepts a legacy stack + optional outage scenario, |
|
invokes the agent, and returns modernization + resilience guidance. |
|
""" |
|
trace = ReasoningTrace() |
|
trace.add("API request accepted. Dispatching to agent.") |
|
|
|
result = run_infra_resilience_agent( |
|
agent_executor=AGENT, |
|
legacy_stack=payload.legacy_stack, |
|
outage_scenario=payload.outage_scenario, |
|
trace=trace, |
|
) |
|
|
|
return AnalyzeStackResponse(**result) |
|
|
|
|
|
|
|
@app.get("/healthz") |
|
def health() -> Dict[str, str]: |
|
"""K8s/ECS-friendly liveness probe.""" |
|
return {"status": "ok"} |