Spaces:
Running
Running
from tavily import TavilyClient | |
import subprocess, tempfile, time, os | |
from pathlib import Path | |
from typing import Dict, Any, List, Literal | |
import shutil, zipfile | |
from uuid import uuid4 | |
from smolagents import tool, CodeAgent, InferenceClientModel, ToolCallingAgent | |
# For Data Analysis Agent | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from PIL import Image | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import accuracy_score, classification_report, r2_score, mean_squared_error | |
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor | |
from sklearn.linear_model import LogisticRegression, LinearRegression | |
import joblib | |
import io | |
# Initialize Tavily client for web search | |
#tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"]) | |
tavily_client = TavilyClient(api_key="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") | |
os.environ["HF_TOKEN"] = "" | |
# ----------------- Tools ----------------- | |
def internet_search( | |
query: str, | |
max_results: int = 5, | |
topic: Literal["general", "news", "finance","science","technology","economy"] = "general", | |
include_raw_content: bool = False, | |
)-> List[Dict[str, Any]]: | |
""" | |
Tool to perform an internet search using the Tavily API. | |
This tool allows the agent to gather information from the web | |
based on a query and a specified topic. It returns a list of | |
search results, optionally including the raw content of the | |
webpages. | |
Args: | |
query (str): The search query or keywords to look up on the web. | |
max_results (int, optional): Maximum number of search results to return. | |
Defaults to 5. | |
topic (Literal["general", "news", "finance", "science", "technology", "economy"], optional): | |
Category of the search to prioritize relevant content. Defaults to "general". | |
include_raw_content (bool, optional): If True, include the full raw content of the results; | |
otherwise, only metadata is returned. Defaults to False. | |
Returns: | |
List[Dict[str, Any]]: A list of search results from Tavily, with each item containing | |
relevant information such as title, URL, snippet, and optionally raw content. | |
""" | |
result1 = tavily_client.search( | |
query, | |
max_results=max_results, | |
include_raw_content=include_raw_content, | |
topic=topic, | |
) | |
return result1 | |
def code_executor( | |
image: str, | |
cmds: List[str], | |
mounts: Dict[str, str] = None, | |
host_workspace: str = None, | |
container_workdir: str = "/workspace", | |
timeout: int = 60, | |
allow_network: bool = False, | |
) -> Dict[str, Any]: | |
""" | |
Executes a sequence of shell commands inside a Docker container. | |
This tool allows safe and isolated execution of code or scripts | |
using a specified Docker image. It supports mounting host directories, | |
custom working directories, timeout handling, and optional network access. | |
Args: | |
image (str): The Docker image to use for execution (e.g., "python:3.11-slim"). | |
cmds (List[str]): A list of shell commands to run inside the container. | |
mounts (Dict[str, str], optional): Dictionary mapping host paths to container paths | |
for volume mounting. Defaults to None. | |
host_workspace (str, optional): Path on the host machine to use as workspace. | |
If None, a temporary directory is created. Defaults to None. | |
container_workdir (str, optional): Working directory inside the container. Defaults to "/workspace". | |
timeout (int, optional): Maximum execution time in seconds before terminating the process. Defaults to 60. | |
allow_network (bool, optional): Whether to allow network access inside the container. | |
Defaults to False (safe default). | |
Returns: | |
Dict[str, Any]: A dictionary containing execution results: | |
- stdout (str): Standard output from the container. | |
- stderr (str): Standard error output. | |
- exit_code (int): Exit code of the executed commands. | |
- runtime_s (float): Execution time in seconds. | |
- files (List[str]): List of files created in the host workspace (relative paths). | |
- host_workspace (str): Path to the host workspace used for execution. | |
Notes: | |
- Ensures that the host workspace is always mounted to the container. | |
- Normalizes Windows paths for Docker volume mounting. | |
- Safely handles subprocess timeouts and captures output. | |
""" | |
if host_workspace is None: | |
host_workspace = tempfile.mkdtemp(prefix="mini_manus_ws_") | |
# Ensure mounts include host_workspace -> container_workdir | |
mounts = dict(mounts or {}) | |
if host_workspace not in mounts: | |
mounts[host_workspace] = container_workdir | |
docker_cmd = ["docker", "run", "--rm", "--memory", "512m", "--cpus", "1"] | |
if not allow_network: | |
docker_cmd += ["--network", "none"] | |
# Normalize Windows backslashes -> forward slashes for docker -v on some setups | |
def _norm(p: str) -> str: | |
return p.replace("\\", "/") | |
for host, cont in mounts.items(): | |
docker_cmd += ["-v", f"{_norm(host)}:{cont}"] | |
docker_cmd += ["-w", container_workdir, image] | |
joined = " && ".join(cmds) if cmds else "echo 'No commands provided'" | |
docker_cmd += ["sh", "-lc", joined] | |
start = time.time() | |
try: | |
proc = subprocess.run(docker_cmd, capture_output=True, text=True, timeout=timeout) | |
runtime = time.time() - start | |
# Gather files from the host workspace (NOT container path) | |
files = [] | |
try: | |
for p in Path(host_workspace).rglob("*"): | |
if p.is_file(): | |
files.append(str(p.relative_to(host_workspace))) | |
except Exception: | |
files = [] | |
return { | |
"stdout": proc.stdout, | |
"stderr": proc.stderr, | |
"exit_code": proc.returncode, | |
"runtime_s": round(runtime, 3), | |
"files": files, | |
"host_workspace": host_workspace, | |
} | |
except subprocess.TimeoutExpired as te: | |
return { | |
"stdout": te.stdout or "", | |
"stderr": (te.stderr or "") + f"\n[Timed out after {timeout}s]", | |
"exit_code": -1, | |
"runtime_s": round(time.time() - start, 3), | |
"files": [], | |
"host_workspace": host_workspace, | |
} | |
def save_files(manifest_files: List[Dict[str,str]], workspace: str = None) -> str: | |
""" | |
Saves a list of files to a host workspace directory. | |
This tool creates the specified files with their content on the host system. | |
Each file is defined by a dictionary containing a relative path and content. | |
If no workspace path is provided, a temporary directory is created automatically. | |
Args: | |
manifest_files (List[Dict[str, str]]): A list of file descriptors, | |
where each descriptor is a dictionary with: | |
- "path" (str): Relative file path (e.g., "app.py" or "src/module.py"). | |
- "content" (str): The content to write into the file. | |
workspace (str, optional): Path to the host directory where files should be saved. | |
If None, a temporary directory is created. Defaults to None. | |
Returns: | |
str: The path to the host workspace directory where the files were saved. | |
Notes: | |
- Automatically creates parent directories if they do not exist. | |
- Overwrites files if they already exist at the same path. | |
- Useful for preparing workspaces for code execution in sandboxed environments. | |
""" | |
if workspace is None: | |
workspace = tempfile.mkdtemp(prefix="mini_manus_ws_") | |
ws = Path(workspace) | |
ws.mkdir(parents=True, exist_ok=True) | |
for f in manifest_files: | |
p = ws / f["path"] | |
p.parent.mkdir(parents=True, exist_ok=True) | |
p.write_text(f["content"], encoding="utf-8") | |
return str(ws) | |
# 2) List files in a workspace (relative) | |
def list_workspace_files(workspace: str) -> List[str]: | |
""" | |
Recursively list all files in a given workspace directory. | |
This tool traverses the workspace directory and collects all file paths, | |
returning them relative to the workspace root. It is useful for inspecting | |
the contents of a workspace, packaging artifacts, or tracking generated files. | |
Args: | |
workspace (str): Path to the workspace directory to list. | |
Returns: | |
List[str]: A list of file paths relative to the workspace root. | |
Notes: | |
- Only files are included; directories themselves are ignored. | |
- If the workspace path is invalid or an error occurs during traversal, | |
an empty list is returned. | |
- Paths are returned as strings using forward slashes. | |
""" | |
files = [] | |
try: | |
for p in Path(workspace).rglob("*"): | |
if p.is_file(): | |
files.append(str(p.relative_to(workspace))) | |
except Exception: | |
pass | |
return files | |
# 3) Package artifact (zip) and return path | |
def package_artifact(workspace: str, out_dir: str = None) -> str: | |
""" | |
Package the contents of a workspace directory into a ZIP archive. | |
This tool collects all files within a given workspace and compresses | |
them into a single ZIP file, which can be used as an artifact for | |
deployment, sharing, or backup purposes. | |
Args: | |
workspace (str): Path to the workspace directory to package. | |
out_dir (str, optional): Directory to save the generated ZIP file. | |
If None, a temporary directory will be created. | |
Returns: | |
str: Absolute file path of the created ZIP archive. | |
Notes: | |
- Only files are included in the ZIP archive; directories themselves | |
are not stored. | |
- The ZIP filename is automatically generated using a UUID to ensure | |
uniqueness. | |
- If `out_dir` does not exist, it will be created. | |
- Useful for packaging code, data, or other artifacts generated | |
during automated workflows. | |
""" | |
if out_dir is None: | |
out_dir = tempfile.mkdtemp(prefix="mini_manus_artifacts_") | |
Path(out_dir).mkdir(parents=True, exist_ok=True) | |
zip_name = Path(out_dir) / f"artifact_{uuid4().hex}.zip" | |
with zipfile.ZipFile(zip_name, "w", zipfile.ZIP_DEFLATED) as z: | |
for p in Path(workspace).rglob("*"): | |
if p.is_file(): | |
z.write(p, p.relative_to(workspace)) | |
return str(zip_name) | |
# 4) Cleanup workspace | |
def cleanup_workspace(workspace: str, keep: bool = False) -> None: | |
""" | |
Safely removes a workspace directory and all its contents. | |
This tool is used to clean up temporary directories created during | |
code execution, testing, or file manipulation. It ensures that the | |
workspace is deleted unless explicitly preserved. | |
Args: | |
workspace (str): Path to the workspace directory to delete. | |
keep (bool, optional): If True, the workspace will not be deleted. | |
Defaults to False. | |
Returns: | |
None | |
Notes: | |
- Any errors during deletion (e.g., non-existent directory, permission issues) | |
are silently ignored. | |
- Use `keep=True` to preserve the workspace, for example, when artifacts | |
need to be inspected after execution. | |
- Intended for host-side cleanup of temporary directories used in containerized | |
or local code execution workflows. | |
""" | |
if keep: | |
return | |
try: | |
shutil.rmtree(workspace) | |
except Exception: | |
pass | |
# 5) Run a manifest end-to-end using your code_executor (uses Docker image + run_commands) | |
def run_manifest(manifest: Dict[str, Any], base_image: str = "python:3.11-slim", timeout: int = 120, keep_workspace: bool = False) -> Dict[str, Any]: | |
""" | |
Executes a manifest of files and commands inside a Docker container and optionally packages the workspace. | |
This tool automates the process of: | |
1. Saving provided files to a host workspace. | |
2. Installing dependencies (if a `requirements.txt` is present or if `install_libs` is specified). | |
3. Running commands and optional test commands inside a Docker container. | |
- Commands referencing workspace files are automatically adjusted to point to the container workspace. | |
4. Collecting outputs, listing files, and optionally packaging the workspace into a ZIP artifact. | |
5. Cleaning up the workspace unless `keep_workspace=True`. | |
Args: | |
manifest (Dict[str, Any]): A dictionary describing the manifest, with the following keys: | |
- "files" (List[Dict[str,str]]): List of files to save, each with "path" and "content". | |
- "run_commands" (List[str], optional): Commands to execute inside the container. | |
- "test_command" (str, optional): A command for testing/verifying the execution. | |
- "install_libs" (List[str], optional): A list of Python packages to install dynamically | |
(e.g., ["crewai", "transformers"]). Installed before any run/test commands. | |
base_image (str, optional): Docker image to use for execution. Defaults to "python:3.11-slim". | |
timeout (int, optional): Maximum time in seconds for container execution. Defaults to 120. | |
keep_workspace (bool, optional): If True, preserves the host workspace after execution. Defaults to False. | |
Returns: | |
Dict[str, Any]: A dictionary containing execution results and metadata: | |
- "stdout" (str): Standard output from the execution. | |
- "stderr" (str): Standard error from the execution. | |
- "exit_code" (int): Exit code of the executed commands. | |
- "runtime_s" (float): Total runtime in seconds. | |
- "files" (List[str]): List of files present in the workspace after execution. | |
- "artifact" (str or None): Path to a ZIP file of the workspace, if packaging succeeded. | |
- "workspace" (str): Path to the host workspace. | |
Notes: | |
- If `requirements.txt` exists, dependencies are installed automatically inside the container. | |
- If `install_libs` is provided, those packages are installed dynamically via pip. | |
- Commands that reference workspace files are automatically adjusted to point to the container workspace. | |
- Network access is enabled briefly during dependency installation. | |
- Commands are executed sequentially inside the container. | |
- Workspace cleanup is automatic unless `keep_workspace=True`. | |
- Useful for safely running and testing code in isolated, reproducible environments. | |
""" | |
files = manifest.get("files", []) | |
run_cmds = manifest.get("run_commands", []) | |
test_cmd = manifest.get("test_command") | |
install_libs = manifest.get("install_libs", []) # 👈 NEW | |
host_workspace = save_files(files) # this returns a host path | |
# Map host workspace -> container path | |
mounts = {host_workspace: "/workspace"} | |
# Pre-install step if requirements.txt exists | |
install_cmds = [] | |
if install_libs: | |
# install arbitrary packages inside container | |
libs = " ".join(install_libs) | |
install_cmds.append(f"pip install {libs}") | |
if (Path(host_workspace) / "requirements.txt").exists(): | |
install_cmds.append("pip install -r requirements.txt") | |
#NEW | |
def fix_file_paths(cmds: List[str]) -> List[str]: | |
fixed = [] | |
for c in cmds: | |
parts = c.split() | |
if parts[0] == "python" and len(parts) > 1: | |
parts[1] = f"/workspace/{parts[1]}" | |
fixed.append(" ".join(parts)) | |
return fixed | |
# Build the full command sequence (run installs first if present) | |
run_cmds = fix_file_paths(run_cmds) | |
if test_cmd: | |
test_cmd = fix_file_paths([test_cmd])[0] | |
# Build full command list | |
cmds = install_cmds + [f"cd /workspace && {c}" for c in run_cmds] | |
if test_cmd: | |
cmds.append(f"cd /workspace && {test_cmd}") | |
if not cmds: | |
cmds = ["cd /workspace && echo 'No commands provided'"] | |
# If we're installing requirements, allow network briefly (set allow_network=True) | |
allow_network = bool(install_cmds) | |
exec_res = code_executor( | |
image=base_image, | |
cmds=cmds, | |
mounts=mounts, | |
host_workspace=host_workspace, | |
container_workdir="/workspace", | |
timeout=timeout, | |
allow_network=allow_network, | |
) | |
# gather host-side file list (relative) | |
files_list = list_workspace_files(host_workspace) | |
# package artifact (optional) | |
artifact = None | |
try: | |
artifact = package_artifact(host_workspace) | |
except Exception: | |
artifact = None | |
result = { | |
"stdout": exec_res.get("stdout", ""), | |
"stderr": exec_res.get("stderr", ""), | |
"exit_code": exec_res.get("exit_code", 1), | |
"runtime_s": exec_res.get("runtime_s", None), | |
"files": files_list, | |
"artifact": artifact, | |
"workspace": host_workspace, | |
} | |
# decide whether to cleanup workspace | |
cleanup_workspace(host_workspace, keep=keep_workspace) | |
return result | |
def detect_target_column(df: pd.DataFrame) -> str: | |
""" | |
Heuristically detect the most likely target column based on naming, cardinality, and type. | |
""" | |
if df.empty or len(df.columns) < 2: | |
return None | |
scores = {} | |
for col in df.columns: | |
score = 0.0 | |
name_lower = col.lower() | |
# Rule 1: Name matches common target keywords | |
keywords = ["target", "label", "class", "outcome", "result", "y", "output", "flag", "status", "churn", "survived", "price", "sale"] | |
if any(kw in name_lower for kw in keywords): | |
score += 3.0 | |
if name_lower in ["target", "label", "class", "y"]: | |
score += 2.0 | |
# Rule 2: Binary or low-cardinality categorical → likely classification | |
nunique = df[col].nunique() | |
total = len(df) | |
unique_ratio = nunique / total | |
if nunique == 2 and df[col].dtype in ["int64", "object", "category"]: | |
score += 4.0 # Strong signal | |
elif nunique <= 20 and df[col].dtype in ["int64", "object", "category"]: | |
score += 3.0 | |
# Rule 3: High unique ratio + numeric → likely regression target | |
if unique_ratio > 0.8 and df[col].dtype in ["int64", "float64"]: | |
score += 2.5 | |
# Rule 4: Avoid ID-like or high-cardinality text | |
id_keywords = ["id", "name", "email", "phone", "address", "username", "url", "link"] | |
if any(kw in name_lower for kw in id_keywords): | |
score -= 10.0 | |
if nunique == total and df[col].dtype == "object": | |
score -= 10.0 # Likely unique identifier | |
scores[col] = score | |
# Return best candidate if score > 0 | |
best_col = max(scores, key=scores.get) | |
return best_col if scores[best_col] > 0 else None | |
# ———————————————————————————————— | |
# 🛠️ Tool 1: LoadData | |
# ———————————————————————————————— | |
def LoadData(filepath: str) -> dict: | |
""" | |
Loads data from a CSV file and returns it as a dictionary. | |
Args: | |
filepath (str): Path to the CSV file. | |
Returns: | |
dict: Data as dictionary (from DataFrame.to_dict()). | |
""" | |
df = pd.read_csv(filepath) | |
return df.to_dict() | |
# ———————————————————————————————— | |
# 🛠️ Tool 2: CleanData (Enhanced) | |
# ———————————————————————————————— | |
def CleanData(data: dict, handle_outliers: bool = True, impute_strategy: str = "median_mode") -> pd.DataFrame: | |
""" | |
Cleans dataset with smart imputation, encoding, and optional outlier removal. | |
Args: | |
data (dict): Dataset in dictionary format. | |
handle_outliers (bool): Whether to remove outliers using IQR. | |
impute_strategy (str): "median_mode" or "mean_mode" | |
Returns: | |
pd.DataFrame: Cleaned dataset. | |
""" | |
df = pd.DataFrame.from_dict(data) | |
# Drop duplicates | |
df = df.drop_duplicates().reset_index(drop=True) | |
# Handle missing values | |
for col in df.columns: | |
if df[col].dtype in ["int64", "float64"]: | |
if impute_strategy == "median_mode" or df[col].skew() > 1: | |
fill_val = df[col].median() | |
else: | |
fill_val = df[col].mean() | |
df[col] = df[col].fillna(fill_val) | |
else: | |
mode = df[col].mode() | |
fill_val = mode[0] if len(mode) > 0 else "Unknown" | |
df[col] = df[col].fillna(fill_val) | |
# Parse datetime | |
for col in df.columns: | |
if "date" in col.lower() or "time" in col.lower(): | |
try: | |
df[col] = pd.to_datetime(df[col], infer_datetime_format=True, errors="coerce") | |
except: | |
pass | |
# Encode categorical variables (only if not too many unique values) | |
for col in df.select_dtypes(include="object").columns: | |
if df[col].nunique() / len(df) < 0.5: | |
df[col] = df[col].astype("category").cat.codes | |
# else: leave as object (e.g., free text) | |
# Outlier removal (optional) | |
if handle_outliers: | |
for col in df.select_dtypes(include=["float64", "int64"]).columns: | |
Q1 = df[col].quantile(0.25) | |
Q3 = df[col].quantile(0.75) | |
IQR = Q3 - Q1 | |
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR | |
count_before = len(df) | |
df = df[(df[col] >= lower) & (df[col] <= upper)] | |
if len(df) == 0: | |
# Avoid empty df | |
df = pd.DataFrame.from_dict(data) # Revert | |
break | |
return df.reset_index(drop=True) | |
# ———————————————————————————————— | |
# 📊 Tool 3: EDA (Enhanced) | |
# ———————————————————————————————— | |
def EDA(data: dict, max_cat_plots: int = 3, max_num_plots: int = 3) -> dict: | |
""" | |
Performs advanced EDA with smart visualizations and insights. | |
Args: | |
data (dict): Dataset in dictionary format. | |
max_cat_plots (int): Max number of categorical distribution plots. | |
max_num_plots (int): Max number of numeric vs target plots. | |
Returns: | |
dict: EDA results including text, plots, and recommendations. | |
""" | |
df = pd.DataFrame.from_dict(data) | |
results = {} | |
# 1. Summary Stats | |
results["summary"] = df.describe(include="all").to_string() | |
# 2. Missing Values | |
missing = df.isnull().sum() | |
results["missing_values"] = missing[missing > 0].to_dict() | |
# Missingness heatmap | |
if missing.sum() > 0: | |
plt.figure(figsize=(8, 4)) | |
sns.heatmap(df.isnull(), cbar=True, cmap="viridis", yticklabels=False) | |
buf = io.BytesIO() | |
plt.savefig(buf, format="png", bbox_inches="tight") | |
plt.close() | |
buf.seek(0) | |
img = Image.open(buf) | |
results["missingness_plot"] = img #buf | |
# 3. Correlation Heatmap | |
corr = df.corr(numeric_only=True) | |
if not corr.empty and len(corr.columns) > 1: | |
plt.figure(figsize=(8, 6)) | |
sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", square=True) | |
buf = io.BytesIO() | |
plt.savefig(buf, format="png", bbox_inches="tight") | |
plt.close() | |
buf.seek(0) | |
img = Image.open(buf) | |
results["correlation_plot"] = img #buf | |
# Top 5 absolute correlations | |
unstacked = corr.abs().unstack() | |
unstacked = unstacked[unstacked < 1.0] | |
top_corr = unstacked.sort_values(ascending=False).head(5).to_dict() | |
results["top_correlations"] = top_corr | |
# 4. Skewness & Kurtosis | |
numeric_cols = df.select_dtypes(include=["float64", "int64"]).columns | |
skew_kurt = {} | |
for col in numeric_cols: | |
skew_kurt[col] = {"skew": df[col].skew(), "kurtosis": df[col].kurtosis()} | |
results["skew_kurtosis"] = skew_kurt | |
# 5. Numeric Distributions | |
if len(numeric_cols) > 0: | |
df[numeric_cols].hist(bins=20, figsize=(12, 8), layout=(2, -1)) | |
buf = io.BytesIO() | |
plt.savefig(buf, format="png", bbox_inches="tight") | |
plt.close() | |
buf.seek(0) | |
img = Image.open(buf) | |
results["numeric_distributions"] = img #buf | |
# 6. Categorical Distributions | |
cat_cols = df.select_dtypes(include=["object", "category"]).columns | |
for col in cat_cols[:max_cat_plots]: | |
plt.figure(figsize=(6, 4)) | |
top_vals = df[col].value_counts().head(10) | |
sns.barplot(x=top_vals.index, y=top_vals.values) | |
plt.xticks(rotation=45) | |
buf = io.BytesIO() | |
plt.savefig(buf, format="png", bbox_inches="tight") | |
plt.close() | |
buf.seek(0) | |
img = Image.open(buf) | |
results[f"dist_{col}"] = img #buf | |
# 7. Target Relationships | |
target_col = detect_target_column(df) | |
if target_col: | |
results["detected_target"] = target_col | |
for col in numeric_cols[:max_num_plots]: | |
plt.figure(figsize=(6, 4)) | |
if df[target_col].nunique() <= 20: | |
sns.boxplot(data=df, x=target_col, y=col) | |
else: | |
sns.scatterplot(data=df, x=col, y=target_col) | |
buf = io.BytesIO() | |
plt.savefig(buf, format="png", bbox_inches="tight") | |
plt.close() | |
buf.seek(0) | |
img = Image.open(buf) | |
results[f"{col}_vs_{target_col}"] = img #buf | |
# 8. Recommendations | |
recs = [] | |
for col, sk in skew_kurt.items(): | |
if abs(sk["skew"]) > 1: | |
recs.append(f"Feature '{col}' is skewed ({sk['skew']:.2f}) → consider log transform.") | |
if results["missing_values"]: | |
recs.append("Missing data detected → consider KNN or iterative imputation.") | |
if results.get("top_correlations"): | |
recs.append("High correlations found → consider PCA or feature selection.") | |
if target_col: | |
recs.append(f"Target variable '{target_col}' detected automatically.") | |
results["recommendations"] = recs | |
return results | |
# ———————————————————————————————— | |
# 🤖 Tool 4: AutoML (Enhanced) | |
# ———————————————————————————————— | |
def AutoML(data: dict, task_hint: str = None) -> dict: | |
""" | |
Enhanced AutoML with multiple models and robust evaluation. | |
Args: | |
data (dict): Cleaned dataset. | |
task_hint (str): "classification", "regression", or None. | |
Returns: | |
dict: Model results and metrics. | |
""" | |
df = pd.DataFrame.from_dict(data) | |
results = {} | |
target_col = detect_target_column(df) | |
if not target_col: | |
results["note"] = "No target column detected. Check column names and data." | |
return results | |
X = df.drop(columns=[target_col]) | |
y = df[target_col] | |
# One-hot encode X | |
X = pd.get_dummies(X, drop_first=True) | |
if X.shape[1] == 0: | |
results["error"] = "No valid features after encoding." | |
return results | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
# Detect task | |
if task_hint: | |
task = task_hint | |
elif y.dtype in ["object", "category"] or y.nunique() <= 20: | |
task = "classification" | |
else: | |
task = "regression" | |
try: | |
if task == "classification": | |
models = { | |
"RandomForest": RandomForestClassifier(n_estimators=100, random_state=42), | |
"LogisticRegression": LogisticRegression(max_iter=1000, random_state=42) | |
} | |
results["task"] = "classification" | |
best_acc = 0 | |
for name, model in models.items(): | |
model.fit(X_train, y_train) | |
preds = model.predict(X_test) | |
acc = accuracy_score(y_test, preds) | |
if acc > best_acc: | |
best_acc = acc | |
results["accuracy"] = acc | |
results["best_model"] = name | |
results["report"] = classification_report(y_test, preds, zero_division=0) | |
if hasattr(model, "feature_importances_"): | |
results["feature_importance"] = dict(zip(X.columns, model.feature_importances_)) | |
else: | |
models = { | |
"RandomForest": RandomForestRegressor(n_estimators=100, random_state=42), | |
"LinearRegression": LinearRegression() | |
} | |
results["task"] = "regression" | |
best_r2 = -float("inf") | |
for name, model in models.items(): | |
model.fit(X_train, y_train) | |
preds = model.predict(X_test) | |
r2 = r2_score(y_test, preds) | |
if r2 > best_r2: | |
best_r2 = r2 | |
results["r2_score"] = r2 | |
results["mse"] = mean_squared_error(y_test, preds) | |
results["best_model"] = name | |
best_model = model # Keep best model | |
if hasattr(model, "feature_importances_"): | |
results["feature_importance"] = dict(zip(X.columns, model.feature_importances_)) | |
# ✅ Save the best model to a temporary file | |
model_dir = tempfile.mkdtemp() | |
model_path = os.path.join(model_dir, f"trained_model_{task}.pkl") | |
joblib.dump({ | |
"model": best_model, | |
"task": task, | |
"target_column": target_col, | |
"features": X.columns.tolist() | |
}, model_path) | |
results["model_download_path"] = model_path | |
results["model_info"] = f"Best model: {results['best_model']} | Task: {task} | Target: {target_col}" | |
except Exception as e: | |
results["error"] = f"Model training failed: {str(e)}" | |
return results | |
model = InferenceClientModel( | |
model_id="Qwen/Qwen2.5-Coder-32B-Instruct", | |
token=os.environ["HF_TOKEN"], | |
provider="together", | |
max_tokens=8048 | |
) | |
planner = ToolCallingAgent( | |
tools=[], | |
model=model, | |
name="PlannerAgent", | |
max_steps=10, | |
planning_interval=5, | |
description= "Breaks down complex tasks and orchestrates tools for execution", | |
) | |
# Research agent | |
researcher = ToolCallingAgent( | |
tools=[internet_search], | |
model=model, | |
name="ResearchAgent", | |
max_steps=10, | |
description = "Conducts deep research using internet_search", | |
) | |
# Coding agent | |
coder = CodeAgent( | |
tools=[ | |
code_executor, | |
save_files, | |
list_workspace_files, | |
package_artifact, | |
cleanup_workspace, | |
run_manifest, | |
], | |
model=model, | |
name="CodingAgent", | |
max_steps=20, | |
additional_authorized_imports=[ | |
"subprocess", "tempfile", "time", "os", "pathlib", "typing","shutil", "zipfile","uuid" | |
], | |
description = "Executes Python code safely in a sandboxed Docker container." | |
"If a library is missing, add it to install_libs in run_manifest." | |
) | |
analyst = CodeAgent( | |
tools=[LoadData, CleanData, EDA, AutoML], | |
model=model, | |
max_steps=20, | |
name="DataScienceAgent", | |
additional_authorized_imports=[ | |
"pandas", "matplotlib.pyplot", "seaborn", "PIL", "sklearn", "io", "os","joblib","tempfile" | |
], | |
description = "Loads datasets, cleans and preprocesses data, performs exploratory data analysis (EDA) with visualizations, and builds predictive models when a target variable is specified." | |
) | |
manager_agent = ToolCallingAgent( | |
tools=[], | |
model=model, | |
managed_agents=[planner, researcher, coder, analyst], | |
max_steps=20, | |
description= "Routes user queries to the right agent (Planner, Researcher, Coder or Data Scientist) and assembles results", | |
) | |