Multi-Agent / MiniManus.py
Rustamshry's picture
Upload 4 files
3480bcd verified
from tavily import TavilyClient
import subprocess, tempfile, time, os
from pathlib import Path
from typing import Dict, Any, List, Literal
import shutil, zipfile
from uuid import uuid4
from smolagents import tool, CodeAgent, InferenceClientModel, ToolCallingAgent
# For Data Analysis Agent
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, r2_score, mean_squared_error
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
import joblib
import io
# Initialize Tavily client for web search
#tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
tavily_client = TavilyClient(api_key="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")
os.environ["HF_TOKEN"] = ""
# ----------------- Tools -----------------
@tool
def internet_search(
query: str,
max_results: int = 5,
topic: Literal["general", "news", "finance","science","technology","economy"] = "general",
include_raw_content: bool = False,
)-> List[Dict[str, Any]]:
"""
Tool to perform an internet search using the Tavily API.
This tool allows the agent to gather information from the web
based on a query and a specified topic. It returns a list of
search results, optionally including the raw content of the
webpages.
Args:
query (str): The search query or keywords to look up on the web.
max_results (int, optional): Maximum number of search results to return.
Defaults to 5.
topic (Literal["general", "news", "finance", "science", "technology", "economy"], optional):
Category of the search to prioritize relevant content. Defaults to "general".
include_raw_content (bool, optional): If True, include the full raw content of the results;
otherwise, only metadata is returned. Defaults to False.
Returns:
List[Dict[str, Any]]: A list of search results from Tavily, with each item containing
relevant information such as title, URL, snippet, and optionally raw content.
"""
result1 = tavily_client.search(
query,
max_results=max_results,
include_raw_content=include_raw_content,
topic=topic,
)
return result1
@tool
def code_executor(
image: str,
cmds: List[str],
mounts: Dict[str, str] = None,
host_workspace: str = None,
container_workdir: str = "/workspace",
timeout: int = 60,
allow_network: bool = False,
) -> Dict[str, Any]:
"""
Executes a sequence of shell commands inside a Docker container.
This tool allows safe and isolated execution of code or scripts
using a specified Docker image. It supports mounting host directories,
custom working directories, timeout handling, and optional network access.
Args:
image (str): The Docker image to use for execution (e.g., "python:3.11-slim").
cmds (List[str]): A list of shell commands to run inside the container.
mounts (Dict[str, str], optional): Dictionary mapping host paths to container paths
for volume mounting. Defaults to None.
host_workspace (str, optional): Path on the host machine to use as workspace.
If None, a temporary directory is created. Defaults to None.
container_workdir (str, optional): Working directory inside the container. Defaults to "/workspace".
timeout (int, optional): Maximum execution time in seconds before terminating the process. Defaults to 60.
allow_network (bool, optional): Whether to allow network access inside the container.
Defaults to False (safe default).
Returns:
Dict[str, Any]: A dictionary containing execution results:
- stdout (str): Standard output from the container.
- stderr (str): Standard error output.
- exit_code (int): Exit code of the executed commands.
- runtime_s (float): Execution time in seconds.
- files (List[str]): List of files created in the host workspace (relative paths).
- host_workspace (str): Path to the host workspace used for execution.
Notes:
- Ensures that the host workspace is always mounted to the container.
- Normalizes Windows paths for Docker volume mounting.
- Safely handles subprocess timeouts and captures output.
"""
if host_workspace is None:
host_workspace = tempfile.mkdtemp(prefix="mini_manus_ws_")
# Ensure mounts include host_workspace -> container_workdir
mounts = dict(mounts or {})
if host_workspace not in mounts:
mounts[host_workspace] = container_workdir
docker_cmd = ["docker", "run", "--rm", "--memory", "512m", "--cpus", "1"]
if not allow_network:
docker_cmd += ["--network", "none"]
# Normalize Windows backslashes -> forward slashes for docker -v on some setups
def _norm(p: str) -> str:
return p.replace("\\", "/")
for host, cont in mounts.items():
docker_cmd += ["-v", f"{_norm(host)}:{cont}"]
docker_cmd += ["-w", container_workdir, image]
joined = " && ".join(cmds) if cmds else "echo 'No commands provided'"
docker_cmd += ["sh", "-lc", joined]
start = time.time()
try:
proc = subprocess.run(docker_cmd, capture_output=True, text=True, timeout=timeout)
runtime = time.time() - start
# Gather files from the host workspace (NOT container path)
files = []
try:
for p in Path(host_workspace).rglob("*"):
if p.is_file():
files.append(str(p.relative_to(host_workspace)))
except Exception:
files = []
return {
"stdout": proc.stdout,
"stderr": proc.stderr,
"exit_code": proc.returncode,
"runtime_s": round(runtime, 3),
"files": files,
"host_workspace": host_workspace,
}
except subprocess.TimeoutExpired as te:
return {
"stdout": te.stdout or "",
"stderr": (te.stderr or "") + f"\n[Timed out after {timeout}s]",
"exit_code": -1,
"runtime_s": round(time.time() - start, 3),
"files": [],
"host_workspace": host_workspace,
}
@tool
def save_files(manifest_files: List[Dict[str,str]], workspace: str = None) -> str:
"""
Saves a list of files to a host workspace directory.
This tool creates the specified files with their content on the host system.
Each file is defined by a dictionary containing a relative path and content.
If no workspace path is provided, a temporary directory is created automatically.
Args:
manifest_files (List[Dict[str, str]]): A list of file descriptors,
where each descriptor is a dictionary with:
- "path" (str): Relative file path (e.g., "app.py" or "src/module.py").
- "content" (str): The content to write into the file.
workspace (str, optional): Path to the host directory where files should be saved.
If None, a temporary directory is created. Defaults to None.
Returns:
str: The path to the host workspace directory where the files were saved.
Notes:
- Automatically creates parent directories if they do not exist.
- Overwrites files if they already exist at the same path.
- Useful for preparing workspaces for code execution in sandboxed environments.
"""
if workspace is None:
workspace = tempfile.mkdtemp(prefix="mini_manus_ws_")
ws = Path(workspace)
ws.mkdir(parents=True, exist_ok=True)
for f in manifest_files:
p = ws / f["path"]
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(f["content"], encoding="utf-8")
return str(ws)
# 2) List files in a workspace (relative)
@tool
def list_workspace_files(workspace: str) -> List[str]:
"""
Recursively list all files in a given workspace directory.
This tool traverses the workspace directory and collects all file paths,
returning them relative to the workspace root. It is useful for inspecting
the contents of a workspace, packaging artifacts, or tracking generated files.
Args:
workspace (str): Path to the workspace directory to list.
Returns:
List[str]: A list of file paths relative to the workspace root.
Notes:
- Only files are included; directories themselves are ignored.
- If the workspace path is invalid or an error occurs during traversal,
an empty list is returned.
- Paths are returned as strings using forward slashes.
"""
files = []
try:
for p in Path(workspace).rglob("*"):
if p.is_file():
files.append(str(p.relative_to(workspace)))
except Exception:
pass
return files
# 3) Package artifact (zip) and return path
@tool
def package_artifact(workspace: str, out_dir: str = None) -> str:
"""
Package the contents of a workspace directory into a ZIP archive.
This tool collects all files within a given workspace and compresses
them into a single ZIP file, which can be used as an artifact for
deployment, sharing, or backup purposes.
Args:
workspace (str): Path to the workspace directory to package.
out_dir (str, optional): Directory to save the generated ZIP file.
If None, a temporary directory will be created.
Returns:
str: Absolute file path of the created ZIP archive.
Notes:
- Only files are included in the ZIP archive; directories themselves
are not stored.
- The ZIP filename is automatically generated using a UUID to ensure
uniqueness.
- If `out_dir` does not exist, it will be created.
- Useful for packaging code, data, or other artifacts generated
during automated workflows.
"""
if out_dir is None:
out_dir = tempfile.mkdtemp(prefix="mini_manus_artifacts_")
Path(out_dir).mkdir(parents=True, exist_ok=True)
zip_name = Path(out_dir) / f"artifact_{uuid4().hex}.zip"
with zipfile.ZipFile(zip_name, "w", zipfile.ZIP_DEFLATED) as z:
for p in Path(workspace).rglob("*"):
if p.is_file():
z.write(p, p.relative_to(workspace))
return str(zip_name)
# 4) Cleanup workspace
@tool
def cleanup_workspace(workspace: str, keep: bool = False) -> None:
"""
Safely removes a workspace directory and all its contents.
This tool is used to clean up temporary directories created during
code execution, testing, or file manipulation. It ensures that the
workspace is deleted unless explicitly preserved.
Args:
workspace (str): Path to the workspace directory to delete.
keep (bool, optional): If True, the workspace will not be deleted.
Defaults to False.
Returns:
None
Notes:
- Any errors during deletion (e.g., non-existent directory, permission issues)
are silently ignored.
- Use `keep=True` to preserve the workspace, for example, when artifacts
need to be inspected after execution.
- Intended for host-side cleanup of temporary directories used in containerized
or local code execution workflows.
"""
if keep:
return
try:
shutil.rmtree(workspace)
except Exception:
pass
# 5) Run a manifest end-to-end using your code_executor (uses Docker image + run_commands)
@tool
def run_manifest(manifest: Dict[str, Any], base_image: str = "python:3.11-slim", timeout: int = 120, keep_workspace: bool = False) -> Dict[str, Any]:
"""
Executes a manifest of files and commands inside a Docker container and optionally packages the workspace.
This tool automates the process of:
1. Saving provided files to a host workspace.
2. Installing dependencies (if a `requirements.txt` is present or if `install_libs` is specified).
3. Running commands and optional test commands inside a Docker container.
- Commands referencing workspace files are automatically adjusted to point to the container workspace.
4. Collecting outputs, listing files, and optionally packaging the workspace into a ZIP artifact.
5. Cleaning up the workspace unless `keep_workspace=True`.
Args:
manifest (Dict[str, Any]): A dictionary describing the manifest, with the following keys:
- "files" (List[Dict[str,str]]): List of files to save, each with "path" and "content".
- "run_commands" (List[str], optional): Commands to execute inside the container.
- "test_command" (str, optional): A command for testing/verifying the execution.
- "install_libs" (List[str], optional): A list of Python packages to install dynamically
(e.g., ["crewai", "transformers"]). Installed before any run/test commands.
base_image (str, optional): Docker image to use for execution. Defaults to "python:3.11-slim".
timeout (int, optional): Maximum time in seconds for container execution. Defaults to 120.
keep_workspace (bool, optional): If True, preserves the host workspace after execution. Defaults to False.
Returns:
Dict[str, Any]: A dictionary containing execution results and metadata:
- "stdout" (str): Standard output from the execution.
- "stderr" (str): Standard error from the execution.
- "exit_code" (int): Exit code of the executed commands.
- "runtime_s" (float): Total runtime in seconds.
- "files" (List[str]): List of files present in the workspace after execution.
- "artifact" (str or None): Path to a ZIP file of the workspace, if packaging succeeded.
- "workspace" (str): Path to the host workspace.
Notes:
- If `requirements.txt` exists, dependencies are installed automatically inside the container.
- If `install_libs` is provided, those packages are installed dynamically via pip.
- Commands that reference workspace files are automatically adjusted to point to the container workspace.
- Network access is enabled briefly during dependency installation.
- Commands are executed sequentially inside the container.
- Workspace cleanup is automatic unless `keep_workspace=True`.
- Useful for safely running and testing code in isolated, reproducible environments.
"""
files = manifest.get("files", [])
run_cmds = manifest.get("run_commands", [])
test_cmd = manifest.get("test_command")
install_libs = manifest.get("install_libs", []) # 👈 NEW
host_workspace = save_files(files) # this returns a host path
# Map host workspace -> container path
mounts = {host_workspace: "/workspace"}
# Pre-install step if requirements.txt exists
install_cmds = []
if install_libs:
# install arbitrary packages inside container
libs = " ".join(install_libs)
install_cmds.append(f"pip install {libs}")
if (Path(host_workspace) / "requirements.txt").exists():
install_cmds.append("pip install -r requirements.txt")
#NEW
def fix_file_paths(cmds: List[str]) -> List[str]:
fixed = []
for c in cmds:
parts = c.split()
if parts[0] == "python" and len(parts) > 1:
parts[1] = f"/workspace/{parts[1]}"
fixed.append(" ".join(parts))
return fixed
# Build the full command sequence (run installs first if present)
run_cmds = fix_file_paths(run_cmds)
if test_cmd:
test_cmd = fix_file_paths([test_cmd])[0]
# Build full command list
cmds = install_cmds + [f"cd /workspace && {c}" for c in run_cmds]
if test_cmd:
cmds.append(f"cd /workspace && {test_cmd}")
if not cmds:
cmds = ["cd /workspace && echo 'No commands provided'"]
# If we're installing requirements, allow network briefly (set allow_network=True)
allow_network = bool(install_cmds)
exec_res = code_executor(
image=base_image,
cmds=cmds,
mounts=mounts,
host_workspace=host_workspace,
container_workdir="/workspace",
timeout=timeout,
allow_network=allow_network,
)
# gather host-side file list (relative)
files_list = list_workspace_files(host_workspace)
# package artifact (optional)
artifact = None
try:
artifact = package_artifact(host_workspace)
except Exception:
artifact = None
result = {
"stdout": exec_res.get("stdout", ""),
"stderr": exec_res.get("stderr", ""),
"exit_code": exec_res.get("exit_code", 1),
"runtime_s": exec_res.get("runtime_s", None),
"files": files_list,
"artifact": artifact,
"workspace": host_workspace,
}
# decide whether to cleanup workspace
cleanup_workspace(host_workspace, keep=keep_workspace)
return result
def detect_target_column(df: pd.DataFrame) -> str:
"""
Heuristically detect the most likely target column based on naming, cardinality, and type.
"""
if df.empty or len(df.columns) < 2:
return None
scores = {}
for col in df.columns:
score = 0.0
name_lower = col.lower()
# Rule 1: Name matches common target keywords
keywords = ["target", "label", "class", "outcome", "result", "y", "output", "flag", "status", "churn", "survived", "price", "sale"]
if any(kw in name_lower for kw in keywords):
score += 3.0
if name_lower in ["target", "label", "class", "y"]:
score += 2.0
# Rule 2: Binary or low-cardinality categorical → likely classification
nunique = df[col].nunique()
total = len(df)
unique_ratio = nunique / total
if nunique == 2 and df[col].dtype in ["int64", "object", "category"]:
score += 4.0 # Strong signal
elif nunique <= 20 and df[col].dtype in ["int64", "object", "category"]:
score += 3.0
# Rule 3: High unique ratio + numeric → likely regression target
if unique_ratio > 0.8 and df[col].dtype in ["int64", "float64"]:
score += 2.5
# Rule 4: Avoid ID-like or high-cardinality text
id_keywords = ["id", "name", "email", "phone", "address", "username", "url", "link"]
if any(kw in name_lower for kw in id_keywords):
score -= 10.0
if nunique == total and df[col].dtype == "object":
score -= 10.0 # Likely unique identifier
scores[col] = score
# Return best candidate if score > 0
best_col = max(scores, key=scores.get)
return best_col if scores[best_col] > 0 else None
# ————————————————————————————————
# 🛠️ Tool 1: LoadData
# ————————————————————————————————
@tool
def LoadData(filepath: str) -> dict:
"""
Loads data from a CSV file and returns it as a dictionary.
Args:
filepath (str): Path to the CSV file.
Returns:
dict: Data as dictionary (from DataFrame.to_dict()).
"""
df = pd.read_csv(filepath)
return df.to_dict()
# ————————————————————————————————
# 🛠️ Tool 2: CleanData (Enhanced)
# ————————————————————————————————
@tool
def CleanData(data: dict, handle_outliers: bool = True, impute_strategy: str = "median_mode") -> pd.DataFrame:
"""
Cleans dataset with smart imputation, encoding, and optional outlier removal.
Args:
data (dict): Dataset in dictionary format.
handle_outliers (bool): Whether to remove outliers using IQR.
impute_strategy (str): "median_mode" or "mean_mode"
Returns:
pd.DataFrame: Cleaned dataset.
"""
df = pd.DataFrame.from_dict(data)
# Drop duplicates
df = df.drop_duplicates().reset_index(drop=True)
# Handle missing values
for col in df.columns:
if df[col].dtype in ["int64", "float64"]:
if impute_strategy == "median_mode" or df[col].skew() > 1:
fill_val = df[col].median()
else:
fill_val = df[col].mean()
df[col] = df[col].fillna(fill_val)
else:
mode = df[col].mode()
fill_val = mode[0] if len(mode) > 0 else "Unknown"
df[col] = df[col].fillna(fill_val)
# Parse datetime
for col in df.columns:
if "date" in col.lower() or "time" in col.lower():
try:
df[col] = pd.to_datetime(df[col], infer_datetime_format=True, errors="coerce")
except:
pass
# Encode categorical variables (only if not too many unique values)
for col in df.select_dtypes(include="object").columns:
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype("category").cat.codes
# else: leave as object (e.g., free text)
# Outlier removal (optional)
if handle_outliers:
for col in df.select_dtypes(include=["float64", "int64"]).columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
count_before = len(df)
df = df[(df[col] >= lower) & (df[col] <= upper)]
if len(df) == 0:
# Avoid empty df
df = pd.DataFrame.from_dict(data) # Revert
break
return df.reset_index(drop=True)
# ————————————————————————————————
# 📊 Tool 3: EDA (Enhanced)
# ————————————————————————————————
@tool
def EDA(data: dict, max_cat_plots: int = 3, max_num_plots: int = 3) -> dict:
"""
Performs advanced EDA with smart visualizations and insights.
Args:
data (dict): Dataset in dictionary format.
max_cat_plots (int): Max number of categorical distribution plots.
max_num_plots (int): Max number of numeric vs target plots.
Returns:
dict: EDA results including text, plots, and recommendations.
"""
df = pd.DataFrame.from_dict(data)
results = {}
# 1. Summary Stats
results["summary"] = df.describe(include="all").to_string()
# 2. Missing Values
missing = df.isnull().sum()
results["missing_values"] = missing[missing > 0].to_dict()
# Missingness heatmap
if missing.sum() > 0:
plt.figure(figsize=(8, 4))
sns.heatmap(df.isnull(), cbar=True, cmap="viridis", yticklabels=False)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
img = Image.open(buf)
results["missingness_plot"] = img #buf
# 3. Correlation Heatmap
corr = df.corr(numeric_only=True)
if not corr.empty and len(corr.columns) > 1:
plt.figure(figsize=(8, 6))
sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", square=True)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
img = Image.open(buf)
results["correlation_plot"] = img #buf
# Top 5 absolute correlations
unstacked = corr.abs().unstack()
unstacked = unstacked[unstacked < 1.0]
top_corr = unstacked.sort_values(ascending=False).head(5).to_dict()
results["top_correlations"] = top_corr
# 4. Skewness & Kurtosis
numeric_cols = df.select_dtypes(include=["float64", "int64"]).columns
skew_kurt = {}
for col in numeric_cols:
skew_kurt[col] = {"skew": df[col].skew(), "kurtosis": df[col].kurtosis()}
results["skew_kurtosis"] = skew_kurt
# 5. Numeric Distributions
if len(numeric_cols) > 0:
df[numeric_cols].hist(bins=20, figsize=(12, 8), layout=(2, -1))
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
img = Image.open(buf)
results["numeric_distributions"] = img #buf
# 6. Categorical Distributions
cat_cols = df.select_dtypes(include=["object", "category"]).columns
for col in cat_cols[:max_cat_plots]:
plt.figure(figsize=(6, 4))
top_vals = df[col].value_counts().head(10)
sns.barplot(x=top_vals.index, y=top_vals.values)
plt.xticks(rotation=45)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
img = Image.open(buf)
results[f"dist_{col}"] = img #buf
# 7. Target Relationships
target_col = detect_target_column(df)
if target_col:
results["detected_target"] = target_col
for col in numeric_cols[:max_num_plots]:
plt.figure(figsize=(6, 4))
if df[target_col].nunique() <= 20:
sns.boxplot(data=df, x=target_col, y=col)
else:
sns.scatterplot(data=df, x=col, y=target_col)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
img = Image.open(buf)
results[f"{col}_vs_{target_col}"] = img #buf
# 8. Recommendations
recs = []
for col, sk in skew_kurt.items():
if abs(sk["skew"]) > 1:
recs.append(f"Feature '{col}' is skewed ({sk['skew']:.2f}) → consider log transform.")
if results["missing_values"]:
recs.append("Missing data detected → consider KNN or iterative imputation.")
if results.get("top_correlations"):
recs.append("High correlations found → consider PCA or feature selection.")
if target_col:
recs.append(f"Target variable '{target_col}' detected automatically.")
results["recommendations"] = recs
return results
# ————————————————————————————————
# 🤖 Tool 4: AutoML (Enhanced)
# ————————————————————————————————
@tool
def AutoML(data: dict, task_hint: str = None) -> dict:
"""
Enhanced AutoML with multiple models and robust evaluation.
Args:
data (dict): Cleaned dataset.
task_hint (str): "classification", "regression", or None.
Returns:
dict: Model results and metrics.
"""
df = pd.DataFrame.from_dict(data)
results = {}
target_col = detect_target_column(df)
if not target_col:
results["note"] = "No target column detected. Check column names and data."
return results
X = df.drop(columns=[target_col])
y = df[target_col]
# One-hot encode X
X = pd.get_dummies(X, drop_first=True)
if X.shape[1] == 0:
results["error"] = "No valid features after encoding."
return results
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Detect task
if task_hint:
task = task_hint
elif y.dtype in ["object", "category"] or y.nunique() <= 20:
task = "classification"
else:
task = "regression"
try:
if task == "classification":
models = {
"RandomForest": RandomForestClassifier(n_estimators=100, random_state=42),
"LogisticRegression": LogisticRegression(max_iter=1000, random_state=42)
}
results["task"] = "classification"
best_acc = 0
for name, model in models.items():
model.fit(X_train, y_train)
preds = model.predict(X_test)
acc = accuracy_score(y_test, preds)
if acc > best_acc:
best_acc = acc
results["accuracy"] = acc
results["best_model"] = name
results["report"] = classification_report(y_test, preds, zero_division=0)
if hasattr(model, "feature_importances_"):
results["feature_importance"] = dict(zip(X.columns, model.feature_importances_))
else:
models = {
"RandomForest": RandomForestRegressor(n_estimators=100, random_state=42),
"LinearRegression": LinearRegression()
}
results["task"] = "regression"
best_r2 = -float("inf")
for name, model in models.items():
model.fit(X_train, y_train)
preds = model.predict(X_test)
r2 = r2_score(y_test, preds)
if r2 > best_r2:
best_r2 = r2
results["r2_score"] = r2
results["mse"] = mean_squared_error(y_test, preds)
results["best_model"] = name
best_model = model # Keep best model
if hasattr(model, "feature_importances_"):
results["feature_importance"] = dict(zip(X.columns, model.feature_importances_))
# ✅ Save the best model to a temporary file
model_dir = tempfile.mkdtemp()
model_path = os.path.join(model_dir, f"trained_model_{task}.pkl")
joblib.dump({
"model": best_model,
"task": task,
"target_column": target_col,
"features": X.columns.tolist()
}, model_path)
results["model_download_path"] = model_path
results["model_info"] = f"Best model: {results['best_model']} | Task: {task} | Target: {target_col}"
except Exception as e:
results["error"] = f"Model training failed: {str(e)}"
return results
model = InferenceClientModel(
model_id="Qwen/Qwen2.5-Coder-32B-Instruct",
token=os.environ["HF_TOKEN"],
provider="together",
max_tokens=8048
)
planner = ToolCallingAgent(
tools=[],
model=model,
name="PlannerAgent",
max_steps=10,
planning_interval=5,
description= "Breaks down complex tasks and orchestrates tools for execution",
)
# Research agent
researcher = ToolCallingAgent(
tools=[internet_search],
model=model,
name="ResearchAgent",
max_steps=10,
description = "Conducts deep research using internet_search",
)
# Coding agent
coder = CodeAgent(
tools=[
code_executor,
save_files,
list_workspace_files,
package_artifact,
cleanup_workspace,
run_manifest,
],
model=model,
name="CodingAgent",
max_steps=20,
additional_authorized_imports=[
"subprocess", "tempfile", "time", "os", "pathlib", "typing","shutil", "zipfile","uuid"
],
description = "Executes Python code safely in a sandboxed Docker container."
"If a library is missing, add it to install_libs in run_manifest."
)
analyst = CodeAgent(
tools=[LoadData, CleanData, EDA, AutoML],
model=model,
max_steps=20,
name="DataScienceAgent",
additional_authorized_imports=[
"pandas", "matplotlib.pyplot", "seaborn", "PIL", "sklearn", "io", "os","joblib","tempfile"
],
description = "Loads datasets, cleans and preprocesses data, performs exploratory data analysis (EDA) with visualizations, and builds predictive models when a target variable is specified."
)
manager_agent = ToolCallingAgent(
tools=[],
model=model,
managed_agents=[planner, researcher, coder, analyst],
max_steps=20,
description= "Routes user queries to the right agent (Planner, Researcher, Coder or Data Scientist) and assembles results",
)