File size: 3,133 Bytes
dc69696
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
"""tools.py

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1Srm2oS1q5iYwlo4PoMx6TBc2uNoSVao3
"""

from typing import Dict, Any, List
from langchain.tools import Tool

"""**It's purely deterministic Python, no LLM inside the tool, so the reasoning stays transparent.**"""

def modernize_stack(legacy_stack: Dict[str, Any]) -> List[str]:
    """Return concrete modernization actions based on a given legacy stack."""
    actions: List[str] = []

    scheduler = str(legacy_stack.get("scheduler", "")).lower()
    language = str(legacy_stack.get("language", "")).lower()
    architecture = str(legacy_stack.get("architecture", "")).lower()
    dependencies = [d.lower() for d in legacy_stack.get("dependencies", [])]

    if "autosys" in scheduler:
        actions.append("Replace Autosys with Apache Airflow or AWS Step Functions.")

    if "java 8" in language:
        actions.append("Upgrade Java 8 → Java 17 and containerize the application.")

    if "monolith" in architecture:
        actions.append("Refactor monolith into microservices and add an API Gateway.")

    if any("ftp" in dep for dep in dependencies):
        actions.append("Replace FTP with S3 and event-driven ingestion via Lambda.")

    actions.append("Adopt Terraform for Infrastructure-as-Code.")

    return actions

"""**This tool takes both the legacy_stack and an outage_scenario string to generate resilience test ideas.**"""

def generate_resilience_plan(legacy_stack: Dict[str, Any], outage_scenario: str) -> List[str]:
    """Return a resilience test plan for a given outage scenario."""
    plan: List[str] = []

    if "oracle" in outage_scenario.lower():
        plan.append("Test RDS Multi-AZ failover for Oracle DB.")
        plan.append("Implement circuit breaker around DB calls.")

    if "kafka" in outage_scenario.lower():
        plan.append("Simulate Kafka broker outage and validate consumer recovery.")

    plan.append("Perform chaos engineering tests for network partition.")
    plan.append("Run DR simulation and validate RTO/RPO.")

    return plan

"""**This step is critical, it's how the agent knows these Python functions exist and what they do.**

- Deterministic: No AI hallucinations in the tools, pure logic.

- Composable: We can add more tools later (cost analysis, migration path scorer).

- Agent-Controlled: The agent can decide when to call each tool based on the request.
"""

def get_tools() -> List[Tool]:
    """Return a list of LangChain Tools for the agent."""
    return [
        Tool(
            name="modernize_stack",
            func=lambda x: "\n".join(modernize_stack(x)),
            description="Recommend modernization actions for a given legacy stack."
        ),
        Tool(
            name="generate_resilience_plan",
            func=lambda x: "\n".join(generate_resilience_plan(
                x.get("legacy_stack", {}),
                x.get("outage_scenario", "")
            )),
            description="Generate a resilience test plan for the given legacy stack and outage scenario."
        )
    ]