BiotechU2 / app.py
C2MV's picture
Update app.py
8681abe verified
raw
history blame
76.2 kB
#import os
from pydantic import BaseModel, ConfigDict
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.integrate import odeint
from scipy.optimize import curve_fit
from sklearn.metrics import mean_squared_error
import gradio as gr
import io
from PIL import Image
import tempfile
class YourModel(BaseModel):
class Config:
arbitrary_types_allowed = True
class BioprocessModel:
def __init__(self, model_type='logistic', maxfev=50000):
self.params = {}
self.r2 = {}
self.rmse = {}
self.datax = []
self.datas = []
self.datap = []
self.dataxp = []
self.datasp = []
self.datapp = []
self.datax_std = []
self.datas_std = []
self.datap_std = []
self.biomass_model = None
self.biomass_diff = None
self.model_type = model_type
self.maxfev = maxfev
self.time = None # Initialize time attribute
@staticmethod
def logistic(time, xo, xm, um):
# Ensure xm is not zero and xo/xm is not 1 to avoid division by zero or log(0)
if xm == 0 or (xo / xm == 1 and np.any(um * time > 0)): # Simplified check
return np.full_like(time, np.nan) # or handle appropriately
# Add a small epsilon to prevent division by zero in the denominator
denominator = (1 - (xo / xm) * (1 - np.exp(um * time)))
denominator = np.where(denominator == 0, 1e-9, denominator) # Replace 0 with small number
return (xo * np.exp(um * time)) / denominator
@staticmethod
def gompertz(time, xm, um, lag):
# Ensure xm is not zero
if xm == 0:
return np.full_like(time, np.nan)
return xm * np.exp(-np.exp((um * np.e / xm) * (lag - time) + 1))
@staticmethod
def moser(time, Xm, um, Ks):
return Xm * (1 - np.exp(-um * (time - Ks)))
@staticmethod
def logistic_diff(X, t, params):
xo, xm, um = params
if xm == 0: # Prevent division by zero
return 0
return um * X * (1 - X / xm)
@staticmethod
def gompertz_diff(X, t, params):
xm, um, lag = params
if xm == 0: # Prevent division by zero
return 0
return X * (um * np.e / xm) * np.exp((um * np.e / xm) * (lag - t) + 1)
@staticmethod
def moser_diff(X, t, params):
Xm, um, Ks = params
return um * (Xm - X)
def substrate(self, time, so, p, q, biomass_params):
if self.biomass_model is None or not biomass_params:
return np.full_like(time, np.nan)
X_t = self.biomass_model(time, *biomass_params)
if np.any(np.isnan(X_t)): # If biomass model returned NaN
return np.full_like(time, np.nan)
# dXdt = np.gradient(X_t, time, edge_order=2) # Use edge_order=2 for better boundary derivatives
# integral_X = np.cumsum(X_t) * np.gradient(time)
# A more robust way to calculate integral, especially for non-uniform time
integral_X = np.zeros_like(X_t)
if len(time) > 1:
dt = np.diff(time, prepend=time[0] - (time[1]-time[0] if len(time)>1 else 1)) # Estimate dt
integral_X = np.cumsum(X_t * dt)
# Initial biomass value is the first element of biomass_params for logistic (xo)
# For Gompertz and Moser, biomass_params[0] is Xm. We need X(t=0)
if self.model_type == 'logistic':
X0 = biomass_params[0]
elif self.model_type == 'gompertz':
# X(0) for Gompertz
X0 = self.gompertz(0, *biomass_params)
elif self.model_type == 'moser':
# X(0) for Moser
X0 = self.moser(0, *biomass_params)
else:
X0 = X_t[0] # Fallback
return so - p * (X_t - X0) - q * integral_X
def product(self, time, po, alpha, beta, biomass_params):
if self.biomass_model is None or not biomass_params:
return np.full_like(time, np.nan)
X_t = self.biomass_model(time, *biomass_params)
if np.any(np.isnan(X_t)): # If biomass model returned NaN
return np.full_like(time, np.nan)
# dXdt = np.gradient(X_t, time, edge_order=2)
# integral_X = np.cumsum(X_t) * np.gradient(time)
integral_X = np.zeros_like(X_t)
if len(time) > 1:
dt = np.diff(time, prepend=time[0] - (time[1]-time[0] if len(time)>1 else 1)) # Estimate dt
integral_X = np.cumsum(X_t * dt)
if self.model_type == 'logistic':
X0 = biomass_params[0]
elif self.model_type == 'gompertz':
X0 = self.gompertz(0, *biomass_params)
elif self.model_type == 'moser':
X0 = self.moser(0, *biomass_params)
else:
X0 = X_t[0]
return po + alpha * (X_t - X0) + beta * integral_X
def process_data(self, df):
biomass_cols = [col for col in df.columns if col[1] == 'Biomasa']
substrate_cols = [col for col in df.columns if col[1] == 'Sustrato']
product_cols = [col for col in df.columns if col[1] == 'Producto']
if not any(col[1] == 'Tiempo' for col in df.columns):
raise ValueError("La columna 'Tiempo' no se encuentra en el DataFrame.")
time_col = [col for col in df.columns if col[1] == 'Tiempo'][0]
time = df[time_col].values
if len(biomass_cols) > 0:
data_biomass = [df[col].values for col in biomass_cols]
data_biomass = np.array(data_biomass)
self.datax.append(data_biomass)
self.dataxp.append(np.mean(data_biomass, axis=0))
self.datax_std.append(np.std(data_biomass, axis=0, ddof=1))
else: # Handle case where Biomass columns might be missing
self.datax.append(np.array([]))
self.dataxp.append(np.array([]))
self.datax_std.append(np.array([]))
if len(substrate_cols) > 0:
data_substrate = [df[col].values for col in substrate_cols]
data_substrate = np.array(data_substrate)
self.datas.append(data_substrate)
self.datasp.append(np.mean(data_substrate, axis=0))
self.datas_std.append(np.std(data_substrate, axis=0, ddof=1))
else:
self.datas.append(np.array([]))
self.datasp.append(np.array([]))
self.datas_std.append(np.array([]))
if len(product_cols) > 0:
data_product = [df[col].values for col in product_cols]
data_product = np.array(data_product)
self.datap.append(data_product)
self.datapp.append(np.mean(data_product, axis=0))
self.datap_std.append(np.std(data_product, axis=0, ddof=1))
else:
self.datap.append(np.array([]))
self.datapp.append(np.array([]))
self.datap_std.append(np.array([]))
self.time = time
def fit_model(self):
if self.model_type == 'logistic':
self.biomass_model = self.logistic
self.biomass_diff = self.logistic_diff
elif self.model_type == 'gompertz':
self.biomass_model = self.gompertz
self.biomass_diff = self.gompertz_diff
elif self.model_type == 'moser':
self.biomass_model = self.moser
self.biomass_diff = self.moser_diff
def fit_biomass(self, time, biomass):
try:
# Ensure biomass has some variation, otherwise std dev can be 0
if len(np.unique(biomass)) < 2 : # or np.std(biomass) == 0:
print(f"Biomasa constante para {self.model_type}, no se puede ajustar el modelo.")
return None
if self.model_type == 'logistic':
# Ensure initial xo is less than xm. Max biomass could be initial guess for xm.
# xo guess: first non-zero biomass value or a small positive number
xo_guess = biomass[biomass > 1e-6][0] if np.any(biomass > 1e-6) else 1e-3
xm_guess = max(biomass) * 1.1 if max(biomass) > xo_guess else xo_guess * 2
if xm_guess <= xo_guess: xm_guess = xo_guess + 1e-3 # ensure xm > xo
p0 = [xo_guess, xm_guess, 0.1]
bounds = ([1e-9, 1e-9, 1e-9], [np.inf, np.inf, np.inf])
popt, _ = curve_fit(self.logistic, time, biomass, p0=p0, maxfev=self.maxfev, bounds=bounds, ftol=1e-9, xtol=1e-9)
# Check for xm > xo after fit
if popt[1] <= popt[0]:
print(f"Advertencia: En modelo logístico, Xm ({popt[1]:.2f}) no es mayor que Xo ({popt[0]:.2f}). Ajuste puede no ser válido.")
# Optionally, try to re-fit with constraints or return None
self.params['biomass'] = {'xo': popt[0], 'xm': popt[1], 'um': popt[2]}
y_pred = self.logistic(time, *popt)
elif self.model_type == 'gompertz':
xm_guess = max(biomass) if max(biomass) > 0 else 1.0
um_guess = 0.1
# Estimate lag phase: time until significant growth starts
# This is a rough estimate, could be improved
lag_guess = time[np.argmax(np.gradient(biomass))] if len(biomass) > 1 and np.any(np.gradient(biomass) > 1e-6) else time[0]
p0 = [xm_guess, um_guess, lag_guess]
bounds = ([1e-9, 1e-9, 0], [np.inf, np.inf, max(time) if len(time)>0 else 100]) # Lag can't be negative
popt, _ = curve_fit(self.gompertz, time, biomass, p0=p0, maxfev=self.maxfev, bounds=bounds, ftol=1e-9, xtol=1e-9)
self.params['biomass'] = {'xm': popt[0], 'um': popt[1], 'lag': popt[2]}
y_pred = self.gompertz(time, *popt)
elif self.model_type == 'moser':
Xm_guess = max(biomass) if max(biomass) > 0 else 1.0
um_guess = 0.1
Ks_guess = time[0] # Ks is like a time shift
p0 = [Xm_guess, um_guess, Ks_guess]
# Ks could be negative if growth starts before t=0 effectively
bounds = ([1e-9, 1e-9, -np.inf], [np.inf, np.inf, max(time) if len(time)>0 else 100])
popt, _ = curve_fit(self.moser, time, biomass, p0=p0, maxfev=self.maxfev, bounds=bounds, ftol=1e-9, xtol=1e-9)
self.params['biomass'] = {'Xm': popt[0], 'um': popt[1], 'Ks': popt[2]}
y_pred = self.moser(time, *popt)
else:
return None
if np.any(np.isnan(y_pred)) or np.any(np.isinf(y_pred)):
print(f"Predicción de biomasa contiene NaN/Inf para {self.model_type}. Ajuste fallido.")
self.r2['biomass'] = np.nan
self.rmse['biomass'] = np.nan
return None
# Ensure R2 calculation is robust against constant biomass data (already checked, but good practice)
ss_res = np.sum((biomass - y_pred) ** 2)
ss_tot = np.sum((biomass - np.mean(biomass)) ** 2)
if ss_tot == 0: # Avoid division by zero if biomass is constant
self.r2['biomass'] = 1.0 if ss_res == 0 else 0.0 # Perfect fit if residuals are also 0
else:
self.r2['biomass'] = 1 - (ss_res / ss_tot)
self.rmse['biomass'] = np.sqrt(mean_squared_error(biomass, y_pred))
return y_pred
except RuntimeError as e:
print(f"Error de Runtime en fit_biomass_{self.model_type} (probablemente no se pudo ajustar): {e}")
self.params['biomass'] = {} # Clear params on failure
self.r2['biomass'] = np.nan
self.rmse['biomass'] = np.nan
return None
except Exception as e:
print(f"Error general en fit_biomass_{self.model_type}: {e}")
self.params['biomass'] = {}
self.r2['biomass'] = np.nan
self.rmse['biomass'] = np.nan
return None
def fit_substrate(self, time, substrate, biomass_params_dict):
if not biomass_params_dict: # Check if biomass_params_dict is empty
print(f"Error en fit_substrate_{self.model_type}: Parámetros de biomasa no disponibles.")
return None
try:
# Extract parameters based on model type
if self.model_type == 'logistic':
biomass_params_values = [biomass_params_dict['xo'], biomass_params_dict['xm'], biomass_params_dict['um']]
elif self.model_type == 'gompertz':
biomass_params_values = [biomass_params_dict['xm'], biomass_params_dict['um'], biomass_params_dict['lag']]
elif self.model_type == 'moser':
biomass_params_values = [biomass_params_dict['Xm'], biomass_params_dict['um'], biomass_params_dict['Ks']]
else:
return None
so_guess = substrate[0] if len(substrate) > 0 else 1.0
p_guess = 0.1 # Yxs inverse (biomass/substrate)
q_guess = 0.01 # Maintenance
p0 = [so_guess, p_guess, q_guess]
bounds = ([0, 0, 0], [np.inf, np.inf, np.inf]) # Parameters should be non-negative
# Use a lambda that directly takes the parameter values list
popt, _ = curve_fit(
lambda t, so, p, q: self.substrate(t, so, p, q, biomass_params_values),
time, substrate, p0=p0, maxfev=self.maxfev, bounds=bounds, ftol=1e-9, xtol=1e-9
)
self.params['substrate'] = {'so': popt[0], 'p': popt[1], 'q': popt[2]}
y_pred = self.substrate(time, *popt, biomass_params_values)
if np.any(np.isnan(y_pred)) or np.any(np.isinf(y_pred)):
print(f"Predicción de sustrato contiene NaN/Inf para {self.model_type}. Ajuste fallido.")
self.r2['substrate'] = np.nan
self.rmse['substrate'] = np.nan
return None
ss_res = np.sum((substrate - y_pred) ** 2)
ss_tot = np.sum((substrate - np.mean(substrate)) ** 2)
if ss_tot == 0:
self.r2['substrate'] = 1.0 if ss_res == 0 else 0.0
else:
self.r2['substrate'] = 1 - (ss_res / ss_tot)
self.rmse['substrate'] = np.sqrt(mean_squared_error(substrate, y_pred))
return y_pred
except RuntimeError as e:
print(f"Error de Runtime en fit_substrate_{self.model_type} (probablemente no se pudo ajustar): {e}")
self.params['substrate'] = {}
self.r2['substrate'] = np.nan
self.rmse['substrate'] = np.nan
return None
except Exception as e:
print(f"Error general en fit_substrate_{self.model_type}: {e}")
self.params['substrate'] = {}
self.r2['substrate'] = np.nan
self.rmse['substrate'] = np.nan
return None
def fit_product(self, time, product, biomass_params_dict):
if not biomass_params_dict:
print(f"Error en fit_product_{self.model_type}: Parámetros de biomasa no disponibles.")
return None
try:
if self.model_type == 'logistic':
biomass_params_values = [biomass_params_dict['xo'], biomass_params_dict['xm'], biomass_params_dict['um']]
elif self.model_type == 'gompertz':
biomass_params_values = [biomass_params_dict['xm'], biomass_params_dict['um'], biomass_params_dict['lag']]
elif self.model_type == 'moser':
biomass_params_values = [biomass_params_dict['Xm'], biomass_params_dict['um'], biomass_params_dict['Ks']]
else:
return None
po_guess = product[0] if len(product) > 0 else 0.0
alpha_guess = 0.1 # Growth-associated
beta_guess = 0.01 # Non-growth-associated
p0 = [po_guess, alpha_guess, beta_guess]
bounds = ([0, 0, 0], [np.inf, np.inf, np.inf]) # Parameters should be non-negative
popt, _ = curve_fit(
lambda t, po, alpha, beta: self.product(t, po, alpha, beta, biomass_params_values),
time, product, p0=p0, maxfev=self.maxfev, bounds=bounds, ftol=1e-9, xtol=1e-9
)
self.params['product'] = {'po': popt[0], 'alpha': popt[1], 'beta': popt[2]}
y_pred = self.product(time, *popt, biomass_params_values)
if np.any(np.isnan(y_pred)) or np.any(np.isinf(y_pred)):
print(f"Predicción de producto contiene NaN/Inf para {self.model_type}. Ajuste fallido.")
self.r2['product'] = np.nan
self.rmse['product'] = np.nan
return None
ss_res = np.sum((product - y_pred) ** 2)
ss_tot = np.sum((product - np.mean(product)) ** 2)
if ss_tot == 0:
self.r2['product'] = 1.0 if ss_res == 0 else 0.0
else:
self.r2['product'] = 1 - (ss_res / ss_tot)
self.rmse['product'] = np.sqrt(mean_squared_error(product, y_pred))
return y_pred
except RuntimeError as e:
print(f"Error de Runtime en fit_product_{self.model_type} (probablemente no se pudo ajustar): {e}")
self.params['product'] = {}
self.r2['product'] = np.nan
self.rmse['product'] = np.nan
return None
except Exception as e:
print(f"Error general en fit_product_{self.model_type}: {e}")
self.params['product'] = {}
self.r2['product'] = np.nan
self.rmse['product'] = np.nan
return None
def generate_fine_time_grid(self, time):
if time is None or len(time) == 0:
return np.array([0]) # Default if time is not set
time_fine = np.linspace(time.min(), time.max(), 500)
return time_fine
def system(self, y, t, biomass_params_list, substrate_params_list, product_params_list, model_type):
X, S, P = y # X, S, P current values
# Biomass growth (dX/dt)
if model_type == 'logistic':
# biomass_params_list for logistic: [xo, xm, um]
# logistic_diff expects X (current biomass), t, params=[xo, xm, um]
# However, logistic_diff is defined as um * X * (1 - X / xm) using current X
# For ODE integration, xo is part of initial conditions, not the rate params.
# So, params for logistic_diff should be [xm, um] effectively, if xo is handled by y[0]
# Let's assume biomass_params_list = [xo, xm, um] from fitted model
# The differential equation for logistic growth does not directly use xo.
# It's um * X * (1 - X / Xm). So params = [Xm, um]
# For consistency, we pass all fitted params and let the diff eq select.
dXdt = self.logistic_diff(X, t, biomass_params_list)
elif model_type == 'gompertz':
# biomass_params_list for gompertz: [xm, um, lag]
dXdt = self.gompertz_diff(X, t, biomass_params_list)
elif model_type == 'moser':
# biomass_params_list for moser: [Xm, um, Ks]
dXdt = self.moser_diff(X, t, biomass_params_list)
else:
dXdt = 0.0 # Should not happen if model_type is validated
# Substrate consumption (dS/dt)
# substrate_params_list: [so, p, q]
# dS/dt = -p * dX/dt - q * X
# so is initial substrate, not used in differential form directly
p_val = substrate_params_list[1] if len(substrate_params_list) > 1 else 0
q_val = substrate_params_list[2] if len(substrate_params_list) > 2 else 0
dSdt = -p_val * dXdt - q_val * X
# Product formation (dP/dt)
# product_params_list: [po, alpha, beta]
# dP/dt = alpha * dX/dt + beta * X
# po is initial product, not used in differential form directly
alpha_val = product_params_list[1] if len(product_params_list) > 1 else 0
beta_val = product_params_list[2] if len(product_params_list) > 2 else 0
dPdt = alpha_val * dXdt + beta_val * X
return [dXdt, dSdt, dPdt]
def get_initial_conditions(self, time, biomass, substrate, product):
# Use experimental data for initial conditions if params are not available or to be robust
X0_exp = biomass[0] if len(biomass) > 0 else 0
S0_exp = substrate[0] if len(substrate) > 0 else 0
P0_exp = product[0] if len(product) > 0 else 0
# Initial biomass (X0)
if 'biomass' in self.params and self.params['biomass']:
if self.model_type == 'logistic':
# xo is the initial biomass in logistic model definition
X0 = self.params['biomass'].get('xo', X0_exp)
elif self.model_type == 'gompertz':
# X(t=0) for Gompertz
xm = self.params['biomass'].get('xm', 1)
um = self.params['biomass'].get('um', 0.1)
lag = self.params['biomass'].get('lag', 0)
X0 = self.gompertz(0, xm, um, lag) # Calculate X at t=0
if np.isnan(X0): X0 = X0_exp # Fallback if calculation fails
elif self.model_type == 'moser':
# X(t=0) for Moser
Xm_param = self.params['biomass'].get('Xm', 1)
um_param = self.params['biomass'].get('um', 0.1)
Ks_param = self.params['biomass'].get('Ks', 0)
X0 = self.moser(0, Xm_param, um_param, Ks_param) # Calculate X at t=0
if np.isnan(X0): X0 = X0_exp # Fallback
else:
X0 = X0_exp # Fallback for unknown model type
else:
X0 = X0_exp
# Initial substrate (S0)
if 'substrate' in self.params and self.params['substrate']:
# so is the initial substrate in the Luedeking-Piret substrate model
S0 = self.params['substrate'].get('so', S0_exp)
else:
S0 = S0_exp
# Initial product (P0)
if 'product' in self.params and self.params['product']:
# po is the initial product in the Luedeking-Piret product model
P0 = self.params['product'].get('po', P0_exp)
else:
P0 = P0_exp
# Ensure initial conditions are not NaN
X0 = X0 if not np.isnan(X0) else 0.0
S0 = S0 if not np.isnan(S0) else 0.0
P0 = P0 if not np.isnan(P0) else 0.0
return [X0, S0, P0]
def solve_differential_equations(self, time, biomass, substrate, product):
if 'biomass' not in self.params or not self.params['biomass']:
print("No hay parámetros de biomasa, no se pueden resolver las EDO.")
return None, None, None, time
if time is None or len(time) == 0 : # Check if time is valid
print("Tiempo no válido para resolver EDOs.")
return None, None, None, np.array([])
# Prepare biomass_params_list for ODE system
# These are the parameters *of the differential equation itself*, not necessarily all fitted constants
# For logistic_diff: expects [xm, um] effectively if xo is IC.
# But our diff functions are written to take the full fitted set.
if self.model_type == 'logistic':
# self.params['biomass'] = {'xo': popt[0], 'xm': popt[1], 'um': popt[2]}
biomass_params_list = [self.params['biomass']['xo'], self.params['biomass']['xm'], self.params['biomass']['um']]
elif self.model_type == 'gompertz':
# self.params['biomass'] = {'xm': popt[0], 'um': popt[1], 'lag': popt[2]}
biomass_params_list = [self.params['biomass']['xm'], self.params['biomass']['um'], self.params['biomass']['lag']]
elif self.model_type == 'moser':
# self.params['biomass'] = {'Xm': popt[0], 'um': popt[1], 'Ks': popt[2]}
biomass_params_list = [self.params['biomass']['Xm'], self.params['biomass']['um'], self.params['biomass']['Ks']]
else:
print(f"Tipo de modelo de biomasa desconocido: {self.model_type}")
return None, None, None, time
# Prepare substrate_params_list for ODE system
# self.params['substrate'] = {'so': popt[0], 'p': popt[1], 'q': popt[2]}
# The ODE system uses p and q. so is an initial condition.
substrate_params_list = [
self.params.get('substrate', {}).get('so', 0),
self.params.get('substrate', {}).get('p', 0),
self.params.get('substrate', {}).get('q', 0)
]
# Prepare product_params_list for ODE system
# self.params['product'] = {'po': popt[0], 'alpha': popt[1], 'beta': popt[2]}
# The ODE system uses alpha and beta. po is an initial condition.
product_params_list = [
self.params.get('product', {}).get('po', 0),
self.params.get('product', {}).get('alpha', 0),
self.params.get('product', {}).get('beta', 0)
]
initial_conditions = self.get_initial_conditions(time, biomass, substrate, product)
time_fine = self.generate_fine_time_grid(time)
if len(time_fine) == 0:
print("No se pudo generar la malla de tiempo fina.")
return None, None, None, time
try:
sol = odeint(self.system, initial_conditions, time_fine,
args=(biomass_params_list, substrate_params_list, product_params_list, self.model_type),
rtol=1e-6, atol=1e-6) # Added tolerances
except Exception as e:
print(f"Error al resolver EDOs con odeint: {e}")
# Try with lsoda if default fails (often more robust)
try:
print("Intentando con método 'lsoda'...")
sol = odeint(self.system, initial_conditions, time_fine,
args=(biomass_params_list, substrate_params_list, product_params_list, self.model_type),
rtol=1e-6, atol=1e-6, method='lsoda')
except Exception as e_lsoda:
print(f"Error al resolver EDOs con odeint (método lsoda): {e_lsoda}")
return None, None, None, time_fine
X = sol[:, 0]
S = sol[:, 1]
P = sol[:, 2]
return X, S, P, time_fine
def plot_results(self, time, biomass, substrate, product,
y_pred_biomass, y_pred_substrate, y_pred_product,
biomass_std=None, substrate_std=None, product_std=None,
experiment_name='', legend_position='best', params_position='upper right',
show_legend=True, show_params=True,
style='whitegrid',
line_color='#0000FF', point_color='#000000', line_style='-', marker_style='o',
use_differential=False, axis_labels=None):
if y_pred_biomass is None and not use_differential: # If using differential, biomass params might still be there
print(f"No se pudo ajustar biomasa para {experiment_name} con {self.model_type} y no se usan EDO. Omitiendo figura.")
return None
if use_differential and ('biomass' not in self.params or not self.params['biomass']):
print(f"Se solicitó usar EDO pero no hay parámetros de biomasa para {experiment_name}. Omitiendo EDO.")
use_differential = False # Fallback to curve_fit results if any
# Set axis labels with defaults
if axis_labels is None:
axis_labels = {
'x_label': 'Tiempo',
'biomass_label': 'Biomasa',
'substrate_label': 'Sustrato',
'product_label': 'Producto'
}
sns.set_style(style)
time_to_plot = time # Default time grid
if use_differential and 'biomass' in self.params and self.params['biomass']:
X_ode, S_ode, P_ode, time_fine_ode = self.solve_differential_equations(time, biomass, substrate, product)
if X_ode is not None:
y_pred_biomass, y_pred_substrate, y_pred_product = X_ode, S_ode, P_ode
time_to_plot = time_fine_ode # Use the fine time grid for ODE results
else:
print(f"Fallo al resolver EDOs para {experiment_name}, usando resultados de curve_fit si existen.")
# Keep original y_pred_biomass etc. from curve_fit if ODE failed
time_to_plot = time # Revert to original time if ODE failed
else:
# If not using differential or if biomass params are missing, use the curve_fit time
# For curve_fit, the predictions are already on the original 'time' grid.
# If we want smoother curve_fit lines, we need to evaluate them on a finer grid too.
if not use_differential and self.biomass_model and 'biomass' in self.params and self.params['biomass']:
time_fine_curvefit = self.generate_fine_time_grid(time)
if time_fine_curvefit is not None and len(time_fine_curvefit)>0:
biomass_params_values = list(self.params['biomass'].values())
y_pred_biomass_fine = self.biomass_model(time_fine_curvefit, *biomass_params_values)
if 'substrate' in self.params and self.params['substrate']:
substrate_params_values = list(self.params['substrate'].values())
y_pred_substrate_fine = self.substrate(time_fine_curvefit, *substrate_params_values, biomass_params_values)
else:
y_pred_substrate_fine = np.full_like(time_fine_curvefit, np.nan)
if 'product' in self.params and self.params['product']:
product_params_values = list(self.params['product'].values())
y_pred_product_fine = self.product(time_fine_curvefit, *product_params_values, biomass_params_values)
else:
y_pred_product_fine = np.full_like(time_fine_curvefit, np.nan)
# Check if any fine predictions are all NaN
if not np.all(np.isnan(y_pred_biomass_fine)):
y_pred_biomass = y_pred_biomass_fine
time_to_plot = time_fine_curvefit # Update time_to_plot only if biomass_fine is valid
if not np.all(np.isnan(y_pred_substrate_fine)):
y_pred_substrate = y_pred_substrate_fine
if not np.all(np.isnan(y_pred_product_fine)):
y_pred_product = y_pred_product_fine
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(10, 15))
fig.suptitle(f'{experiment_name} ({self.model_type.capitalize()})', fontsize=16)
plots_config = [
(ax1, biomass, y_pred_biomass, biomass_std, axis_labels['biomass_label'], 'Modelo', self.params.get('biomass', {}),
self.r2.get('biomass', np.nan), self.rmse.get('biomass', np.nan)),
(ax2, substrate, y_pred_substrate, substrate_std, axis_labels['substrate_label'], 'Modelo', self.params.get('substrate', {}),
self.r2.get('substrate', np.nan), self.rmse.get('substrate', np.nan)),
(ax3, product, y_pred_product, product_std, axis_labels['product_label'], 'Modelo', self.params.get('product', {}),
self.r2.get('product', np.nan), self.rmse.get('product', np.nan))
]
for idx, (ax, data_exp, y_pred_model, data_std_exp, ylabel, model_name_legend, params_dict, r2_val, rmse_val) in enumerate(plots_config):
# Plot experimental data if available and not all NaN
if data_exp is not None and len(data_exp) > 0 and not np.all(np.isnan(data_exp)):
if data_std_exp is not None and len(data_std_exp) == len(data_exp) and not np.all(np.isnan(data_std_exp)):
ax.errorbar(time, data_exp, yerr=data_std_exp, fmt=marker_style, color=point_color,
label='Datos experimentales', capsize=5, elinewidth=1, markeredgewidth=1)
else:
ax.plot(time, data_exp, marker=marker_style, linestyle='', color=point_color,
label='Datos experimentales')
else:
ax.text(0.5, 0.5, 'No hay datos experimentales para mostrar.',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=10, color='gray')
# Plot model prediction if available and not all NaN
if y_pred_model is not None and len(y_pred_model) > 0 and not np.all(np.isnan(y_pred_model)):
ax.plot(time_to_plot, y_pred_model, linestyle=line_style, color=line_color, label=model_name_legend)
elif idx == 0 and y_pred_biomass is None: # Special message if biomass model failed
ax.text(0.5, 0.6, 'Modelo de biomasa no ajustado.',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=10, color='red')
elif (idx == 1 and y_pred_substrate is None) or (idx == 2 and y_pred_product is None) :
if 'biomass' not in self.params or not self.params['biomass']:
ax.text(0.5, 0.4, 'Modelo no ajustado (depende de biomasa).',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=10, color='orange')
elif y_pred_model is None:
ax.text(0.5, 0.4, 'Modelo no ajustado.',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=10, color='orange')
ax.set_xlabel(axis_labels['x_label'])
ax.set_ylabel(ylabel)
if show_legend:
ax.legend(loc=legend_position)
ax.set_title(f'{ylabel}')
if show_params and params_dict and all(isinstance(v, (int, float)) and np.isfinite(v) for v in params_dict.values()):
param_text = '\n'.join([f"{k} = {v:.3g}" for k, v in params_dict.items()]) # Use .3g for general format
# Ensure R2 and RMSE are finite for display
r2_display = f"{r2_val:.3f}" if np.isfinite(r2_val) else "N/A"
rmse_display = f"{rmse_val:.3f}" if np.isfinite(rmse_val) else "N/A"
text = f"{param_text}\nR² = {r2_display}\nRMSE = {rmse_display}"
if params_position == 'outside right':
bbox_props = dict(boxstyle='round,pad=0.3', facecolor='wheat', alpha=0.5)
# Adjust x position to be truly outside
fig.subplots_adjust(right=0.75) # Make space for the annotation
ax.annotate(text, xy=(1.05, 0.5), xycoords='axes fraction',
xytext=(10,0), textcoords='offset points', # Small offset for padding
verticalalignment='center', horizontalalignment='left',
bbox=bbox_props)
else:
text_x, ha = (0.95, 'right') if 'right' in params_position else (0.05, 'left')
text_y, va = (0.95, 'top') if 'upper' in params_position else (0.05, 'bottom')
ax.text(text_x, text_y, text, transform=ax.transAxes,
verticalalignment=va, horizontalalignment=ha,
bbox={'boxstyle': 'round,pad=0.3', 'facecolor':'wheat', 'alpha':0.5})
elif show_params and not params_dict :
ax.text(0.5, 0.3, 'Parámetros no disponibles.',
horizontalalignment='center', verticalalignment='center',
transform=ax.transAxes, fontsize=9, color='grey')
plt.tight_layout(rect=[0, 0.03, 1, 0.95]) # Adjust rect to accommodate suptitle
buf = io.BytesIO()
fig.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
image = Image.open(buf).convert("RGB")
plt.close(fig)
return image
def plot_combined_results(self, time, biomass, substrate, product,
y_pred_biomass, y_pred_substrate, y_pred_product,
biomass_std=None, substrate_std=None, product_std=None,
experiment_name='', legend_position='best', params_position='upper right',
show_legend=True, show_params=True,
style='whitegrid',
line_color='#0000FF', point_color='#000000', line_style='-', marker_style='o',
use_differential=False, axis_labels=None):
# Similar checks as in plot_results
if y_pred_biomass is None and not use_differential:
print(f"No se pudo ajustar biomasa para {experiment_name} con {self.model_type} (combinado). Omitiendo figura.")
return None
if use_differential and ('biomass' not in self.params or not self.params['biomass']):
print(f"Se solicitó usar EDO (combinado) pero no hay parámetros de biomasa para {experiment_name}. Omitiendo EDO.")
use_differential = False
if axis_labels is None:
axis_labels = {
'x_label': 'Tiempo',
'biomass_label': 'Biomasa',
'substrate_label': 'Sustrato',
'product_label': 'Producto'
}
sns.set_style(style)
time_to_plot = time # Default
if use_differential and 'biomass' in self.params and self.params['biomass']:
X_ode, S_ode, P_ode, time_fine_ode = self.solve_differential_equations(time, biomass, substrate, product)
if X_ode is not None:
y_pred_biomass, y_pred_substrate, y_pred_product = X_ode, S_ode, P_ode
time_to_plot = time_fine_ode
else:
print(f"Fallo al resolver EDOs para {experiment_name} (combinado), usando resultados de curve_fit si existen.")
time_to_plot = time # Revert
else: # Smoother curve_fit lines if not using ODE
if not use_differential and self.biomass_model and 'biomass' in self.params and self.params['biomass']:
time_fine_curvefit = self.generate_fine_time_grid(time)
if time_fine_curvefit is not None and len(time_fine_curvefit)>0:
biomass_params_values = list(self.params['biomass'].values())
y_pred_biomass_fine = self.biomass_model(time_fine_curvefit, *biomass_params_values)
if 'substrate' in self.params and self.params['substrate']:
substrate_params_values = list(self.params['substrate'].values())
y_pred_substrate_fine = self.substrate(time_fine_curvefit, *substrate_params_values, biomass_params_values)
else:
y_pred_substrate_fine = np.full_like(time_fine_curvefit, np.nan)
if 'product' in self.params and self.params['product']:
product_params_values = list(self.params['product'].values())
y_pred_product_fine = self.product(time_fine_curvefit, *product_params_values, biomass_params_values)
else:
y_pred_product_fine = np.full_like(time_fine_curvefit, np.nan)
if not np.all(np.isnan(y_pred_biomass_fine)):
y_pred_biomass = y_pred_biomass_fine
time_to_plot = time_fine_curvefit
if not np.all(np.isnan(y_pred_substrate_fine)):
y_pred_substrate = y_pred_substrate_fine
if not np.all(np.isnan(y_pred_product_fine)):
y_pred_product = y_pred_product_fine
fig, ax1 = plt.subplots(figsize=(12, 7)) # Increased width for params possibly outside
fig.suptitle(f'{experiment_name} ({self.model_type.capitalize()})', fontsize=16)
colors = {'Biomasa': 'blue', 'Sustrato': 'green', 'Producto': 'red'}
data_colors = {'Biomasa': 'darkblue', 'Sustrato': 'darkgreen', 'Producto': 'darkred'}
model_colors = {'Biomasa': 'cornflowerblue', 'Sustrato': 'limegreen', 'Producto': 'salmon'}
ax1.set_xlabel(axis_labels['x_label'])
ax1.set_ylabel(axis_labels['biomass_label'], color=colors['Biomasa'])
if biomass is not None and len(biomass) > 0 and not np.all(np.isnan(biomass)):
if biomass_std is not None and len(biomass_std) == len(biomass) and not np.all(np.isnan(biomass_std)):
ax1.errorbar(time, biomass, yerr=biomass_std, fmt=marker_style, color=data_colors['Biomasa'],
label=f'{axis_labels["biomass_label"]} (Datos)', capsize=3, elinewidth=1, markersize=5)
else:
ax1.plot(time, biomass, marker=marker_style, linestyle='', color=data_colors['Biomasa'],
label=f'{axis_labels["biomass_label"]} (Datos)', markersize=5)
if y_pred_biomass is not None and len(y_pred_biomass) > 0 and not np.all(np.isnan(y_pred_biomass)):
ax1.plot(time_to_plot, y_pred_biomass, linestyle=line_style, color=model_colors['Biomasa'],
label=f'{axis_labels["biomass_label"]} (Modelo)')
ax1.tick_params(axis='y', labelcolor=colors['Biomasa'])
ax2 = ax1.twinx()
ax2.set_ylabel(axis_labels['substrate_label'], color=colors['Sustrato'])
if substrate is not None and len(substrate) > 0 and not np.all(np.isnan(substrate)):
if substrate_std is not None and len(substrate_std) == len(substrate) and not np.all(np.isnan(substrate_std)):
ax2.errorbar(time, substrate, yerr=substrate_std, fmt=marker_style, color=data_colors['Sustrato'],
label=f'{axis_labels["substrate_label"]} (Datos)', capsize=3, elinewidth=1, markersize=5)
else:
ax2.plot(time, substrate, marker=marker_style, linestyle='', color=data_colors['Sustrato'],
label=f'{axis_labels["substrate_label"]} (Datos)', markersize=5)
if y_pred_substrate is not None and len(y_pred_substrate) > 0 and not np.all(np.isnan(y_pred_substrate)):
ax2.plot(time_to_plot, y_pred_substrate, linestyle=line_style, color=model_colors['Sustrato'],
label=f'{axis_labels["substrate_label"]} (Modelo)')
ax2.tick_params(axis='y', labelcolor=colors['Sustrato'])
ax3 = ax1.twinx()
ax3.spines["right"].set_position(("axes", 1.15)) # Adjusted position for third axis
ax3.set_frame_on(True)
ax3.patch.set_visible(False)
ax3.set_ylabel(axis_labels['product_label'], color=colors['Producto'])
if product is not None and len(product) > 0 and not np.all(np.isnan(product)):
if product_std is not None and len(product_std) == len(product) and not np.all(np.isnan(product_std)):
ax3.errorbar(time, product, yerr=product_std, fmt=marker_style, color=data_colors['Producto'],
label=f'{axis_labels["product_label"]} (Datos)', capsize=3, elinewidth=1, markersize=5)
else:
ax3.plot(time, product, marker=marker_style, linestyle='', color=data_colors['Producto'],
label=f'{axis_labels["product_label"]} (Datos)', markersize=5)
if y_pred_product is not None and len(y_pred_product) > 0 and not np.all(np.isnan(y_pred_product)):
ax3.plot(time_to_plot, y_pred_product, linestyle=line_style, color=model_colors['Producto'],
label=f'{axis_labels["product_label"]} (Modelo)')
ax3.tick_params(axis='y', labelcolor=colors['Producto'])
# Collect legends from all axes
lines_labels_collect = []
for ax_current in [ax1, ax2, ax3]:
h, l = ax_current.get_legend_handles_labels()
if h: # Only add if there are handles/labels
lines_labels_collect.append((h,l))
if lines_labels_collect:
lines, labels = [sum(lol, []) for lol in zip(*[(h,l) for h,l in lines_labels_collect])] # careful with empty h,l
# Filter out duplicate labels for legend, keeping order
unique_labels_dict = dict(zip(labels, lines))
if show_legend:
ax1.legend(unique_labels_dict.values(), unique_labels_dict.keys(), loc=legend_position)
if show_params:
texts_to_display = []
param_categories = [
(axis_labels['biomass_label'], self.params.get('biomass', {}), self.r2.get('biomass', np.nan), self.rmse.get('biomass', np.nan)),
(axis_labels['substrate_label'], self.params.get('substrate', {}), self.r2.get('substrate', np.nan), self.rmse.get('substrate', np.nan)),
(axis_labels['product_label'], self.params.get('product', {}), self.r2.get('product', np.nan), self.rmse.get('product', np.nan))
]
for label, params_dict, r2_val, rmse_val in param_categories:
if params_dict and all(isinstance(v, (int, float)) and np.isfinite(v) for v in params_dict.values()):
param_text = '\n'.join([f" {k} = {v:.3g}" for k, v in params_dict.items()])
r2_display = f"{r2_val:.3f}" if np.isfinite(r2_val) else "N/A"
rmse_display = f"{rmse_val:.3f}" if np.isfinite(rmse_val) else "N/A"
texts_to_display.append(f"{label}:\n{param_text}\n R² = {r2_display}\n RMSE = {rmse_display}")
elif params_dict: # Some params but maybe not all finite, or model failed
texts_to_display.append(f"{label}:\n Parámetros no válidos o N/A")
# else: No params for this category, skip.
total_text = "\n\n".join(texts_to_display)
if total_text: # Only display if there's something to show
if params_position == 'outside right':
fig.subplots_adjust(right=0.70) # Make more space for text outside
bbox_props = dict(boxstyle='round,pad=0.3', facecolor='wheat', alpha=0.7)
# Annotate relative to the figure, not a specific axis, for true "outside"
fig.text(0.72, 0.5, total_text, transform=fig.transFigure,
verticalalignment='center', horizontalalignment='left',
bbox=bbox_props, fontsize=8)
else:
text_x, ha = (0.95, 'right') if 'right' in params_position else (0.05, 'left')
text_y, va = (0.95, 'top') if 'upper' in params_position else (0.05, 'bottom')
ax1.text(text_x, text_y, total_text, transform=ax1.transAxes,
verticalalignment=va, horizontalalignment=ha,
bbox={'boxstyle':'round,pad=0.3', 'facecolor':'wheat', 'alpha':0.7}, fontsize=8)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
# For combined plot, ensure right spine of ax3 is visible if params are outside
if params_position == 'outside right':
fig.subplots_adjust(right=0.70)
buf = io.BytesIO()
fig.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
image = Image.open(buf).convert("RGB")
plt.close(fig)
return image
def process_all_data(file, legend_position, params_position, model_types_selected, experiment_names_str,
lower_bounds_str, upper_bounds_str, # These are not used in current model fit, but kept for future
mode, style, line_color, point_color, line_style, marker_style,
show_legend, show_params, use_differential, maxfev_val,
axis_labels_dict): # Added axis_labels_dict
if file is None:
return [], pd.DataFrame(), "Por favor, sube un archivo Excel."
try:
# Try reading with multi-index header first
try:
xls = pd.ExcelFile(file.name)
except AttributeError: # If file is already a path (e.g. from tempfile)
xls = pd.ExcelFile(file)
sheet_names = xls.sheet_names
if not sheet_names:
return [], pd.DataFrame(), "El archivo Excel está vacío o no contiene hojas."
except Exception as e:
return [], pd.DataFrame(), f"Error al leer el archivo Excel: {e}"
figures = []
comparison_data = []
experiment_counter = 0
experiment_names_list = experiment_names_str.strip().split('\n') if experiment_names_str.strip() else []
all_plot_messages = []
for sheet_name_idx, sheet_name in enumerate(sheet_names):
current_experiment_name_base = (experiment_names_list[sheet_name_idx]
if sheet_name_idx < len(experiment_names_list) and experiment_names_list[sheet_name_idx]
else f"Hoja '{sheet_name}'")
try:
df = pd.read_excel(xls, sheet_name=sheet_name, header=[0, 1])
if df.empty:
all_plot_messages.append(f"Hoja '{sheet_name}' está vacía.")
continue
# Basic validation of expected column structure (Tiempo, Biomasa, etc.)
if not any(col_level2 == 'Tiempo' for _, col_level2 in df.columns):
all_plot_messages.append(f"Hoja '{sheet_name}' no contiene la subcolumna 'Tiempo'. Saltando hoja.")
continue
except Exception as e:
all_plot_messages.append(f"Error al leer la hoja '{sheet_name}': {e}. Saltando hoja.")
continue
# Create a dummy model instance to process data for this sheet
model_dummy_for_sheet = BioprocessModel()
try:
model_dummy_for_sheet.process_data(df)
except ValueError as e: # Catch specific errors from process_data
all_plot_messages.append(f"Error procesando datos de la hoja '{sheet_name}': {e}. Saltando hoja.")
continue
time_exp_full = model_dummy_for_sheet.time # Time from the first experiment in the sheet usually
# INDEPENDENT MODE: Iterate through top-level columns (experiments)
if mode == 'independent':
# df.columns.levels[0] gives unique top-level column names
# However, direct iteration over df.columns.levels[0] might not align if some experiments are missing certain sub-columns.
# A safer way is to group by the first level of the column index.
grouped_cols = df.columns.get_level_values(0).unique()
for exp_idx, exp_col_name in enumerate(grouped_cols):
current_experiment_name = f"{current_experiment_name_base} - Exp {exp_idx + 1} ({exp_col_name})"
exp_df = df[exp_col_name] # DataFrame for the current experiment
try:
time_exp = exp_df['Tiempo'].dropna().values
# Ensure data is 1D array of numbers, handle potential errors
biomass_exp = exp_df['Biomasa'].dropna().astype(float).values if 'Biomasa' in exp_df else np.array([])
substrate_exp = exp_df['Sustrato'].dropna().astype(float).values if 'Sustrato' in exp_df else np.array([])
product_exp = exp_df['Producto'].dropna().astype(float).values if 'Producto' in exp_df else np.array([])
if len(time_exp) == 0:
all_plot_messages.append(f"No hay datos de tiempo para {current_experiment_name}. Saltando.")
continue
if len(biomass_exp) == 0 : # Biomass is essential for fitting other models
all_plot_messages.append(f"No hay datos de biomasa para {current_experiment_name}. Saltando modelos para este experimento.")
# Still add to comparison_data as NaN
for model_type_iter in model_types_selected:
comparison_data.append({
'Experimento': current_experiment_name, 'Modelo': model_type_iter.capitalize(),
**{f'R² {comp}': np.nan for comp in ['Biomasa', 'Sustrato', 'Producto']},
**{f'RMSE {comp}': np.nan for comp in ['Biomasa', 'Sustrato', 'Producto']}
})
continue
except KeyError as e:
all_plot_messages.append(f"Faltan columnas (Tiempo, Biomasa, Sustrato, Producto) en '{current_experiment_name}': {e}. Saltando.")
continue
except Exception as e_data:
all_plot_messages.append(f"Error extrayendo datos para '{current_experiment_name}': {e_data}. Saltando.")
continue
# For independent mode, standard deviation is not applicable unless replicates are within this exp_df
# Assuming exp_df contains single replicate data here. If it has sub-columns for replicates,
# then mean/std should be calculated here. For now, pass None for std.
biomass_std_exp, substrate_std_exp, product_std_exp = None, None, None
for model_type_iter in model_types_selected:
model_instance = BioprocessModel(model_type=model_type_iter, maxfev=maxfev_val)
model_instance.fit_model() # Sets self.biomass_model and self.biomass_diff
y_pred_biomass = model_instance.fit_biomass(time_exp, biomass_exp)
y_pred_substrate, y_pred_product = None, None
if y_pred_biomass is not None and model_instance.params.get('biomass'):
if len(substrate_exp) > 0 :
y_pred_substrate = model_instance.fit_substrate(time_exp, substrate_exp, model_instance.params['biomass'])
if len(product_exp) > 0:
y_pred_product = model_instance.fit_product(time_exp, product_exp, model_instance.params['biomass'])
else:
all_plot_messages.append(f"Ajuste de biomasa falló para {current_experiment_name} con modelo {model_type_iter}.")
comparison_data.append({
'Experimento': current_experiment_name,
'Modelo': model_type_iter.capitalize(),
'R² Biomasa': model_instance.r2.get('biomass', np.nan),
'RMSE Biomasa': model_instance.rmse.get('biomass', np.nan),
'R² Sustrato': model_instance.r2.get('substrate', np.nan),
'RMSE Sustrato': model_instance.rmse.get('substrate', np.nan),
'R² Producto': model_instance.r2.get('product', np.nan),
'RMSE Producto': model_instance.rmse.get('product', np.nan)
})
fig = model_instance.plot_results(
time_exp, biomass_exp, substrate_exp, product_exp,
y_pred_biomass, y_pred_substrate, y_pred_product,
biomass_std_exp, substrate_std_exp, product_std_exp,
current_experiment_name, legend_position, params_position,
show_legend, show_params, style,
line_color, point_color, line_style, marker_style,
use_differential, axis_labels_dict # Pass axis_labels_dict
)
if fig: figures.append(fig)
experiment_counter +=1
# AVERAGE or COMBINADO MODE: Use processed data (mean, std) from model_dummy_for_sheet
elif mode in ['average', 'combinado']:
current_experiment_name = f"{current_experiment_name_base} - Promedio"
# Data from model_dummy_for_sheet (which processed the whole sheet)
# These are lists, take the last appended (corresponds to current sheet)
time_avg = model_dummy_for_sheet.time # Should be consistent across sheet
biomass_avg = model_dummy_for_sheet.dataxp[-1] if model_dummy_for_sheet.dataxp else np.array([])
substrate_avg = model_dummy_for_sheet.datasp[-1] if model_dummy_for_sheet.datasp else np.array([])
product_avg = model_dummy_for_sheet.datapp[-1] if model_dummy_for_sheet.datapp else np.array([])
biomass_std_avg = model_dummy_for_sheet.datax_std[-1] if model_dummy_for_sheet.datax_std and len(model_dummy_for_sheet.datax_std[-1]) == len(biomass_avg) else None
substrate_std_avg = model_dummy_for_sheet.datas_std[-1] if model_dummy_for_sheet.datas_std and len(model_dummy_for_sheet.datas_std[-1]) == len(substrate_avg) else None
product_std_avg = model_dummy_for_sheet.datap_std[-1] if model_dummy_for_sheet.datap_std and len(model_dummy_for_sheet.datap_std[-1]) == len(product_avg) else None
if len(time_avg) == 0:
all_plot_messages.append(f"No hay datos de tiempo para el promedio de '{sheet_name}'. Saltando.")
continue
if len(biomass_avg) == 0:
all_plot_messages.append(f"No hay datos de biomasa promedio para '{sheet_name}'. Saltando modelos.")
for model_type_iter in model_types_selected:
comparison_data.append({
'Experimento': current_experiment_name, 'Modelo': model_type_iter.capitalize(),
**{f'R² {comp}': np.nan for comp in ['Biomasa', 'Sustrato', 'Producto']},
**{f'RMSE {comp}': np.nan for comp in ['Biomasa', 'Sustrato', 'Producto']}
})
continue
for model_type_iter in model_types_selected:
model_instance = BioprocessModel(model_type=model_type_iter, maxfev=maxfev_val)
model_instance.fit_model()
y_pred_biomass = model_instance.fit_biomass(time_avg, biomass_avg)
y_pred_substrate, y_pred_product = None, None
if y_pred_biomass is not None and model_instance.params.get('biomass'):
if len(substrate_avg) > 0:
y_pred_substrate = model_instance.fit_substrate(time_avg, substrate_avg, model_instance.params['biomass'])
if len(product_avg) > 0:
y_pred_product = model_instance.fit_product(time_avg, product_avg, model_instance.params['biomass'])
else:
all_plot_messages.append(f"Ajuste de biomasa promedio falló para {current_experiment_name} con modelo {model_type_iter}.")
comparison_data.append({
'Experimento': current_experiment_name,
'Modelo': model_type_iter.capitalize(),
'R² Biomasa': model_instance.r2.get('biomass', np.nan),
'RMSE Biomasa': model_instance.rmse.get('biomass', np.nan),
'R² Sustrato': model_instance.r2.get('substrate', np.nan),
'RMSE Sustrato': model_instance.rmse.get('substrate', np.nan),
'R² Producto': model_instance.r2.get('product', np.nan),
'RMSE Producto': model_instance.rmse.get('product', np.nan)
})
plot_func = model_instance.plot_combined_results if mode == 'combinado' else model_instance.plot_results
fig = plot_func(
time_avg, biomass_avg, substrate_avg, product_avg,
y_pred_biomass, y_pred_substrate, y_pred_product,
biomass_std_avg, substrate_std_avg, product_std_avg,
current_experiment_name, legend_position, params_position,
show_legend, show_params, style,
line_color, point_color, line_style, marker_style,
use_differential, axis_labels_dict # Pass axis_labels_dict
)
if fig: figures.append(fig)
experiment_counter +=1
comparison_df = pd.DataFrame(comparison_data)
if not comparison_df.empty:
# Ensure numeric columns for sorting, coerce errors to NaN
for col in ['R² Biomasa', 'RMSE Biomasa', 'R² Sustrato', 'RMSE Sustrato', 'R² Producto', 'RMSE Producto']:
if col in comparison_df.columns:
comparison_df[col] = pd.to_numeric(comparison_df[col], errors='coerce')
comparison_df_sorted = comparison_df.sort_values(
by=['Experimento', 'Modelo', 'R² Biomasa', 'R² Sustrato', 'R² Producto', 'RMSE Biomasa', 'RMSE Sustrato', 'RMSE Producto'],
ascending=[True, True, False, False, False, True, True, True] # Sort R² descending, RMSE ascending
).reset_index(drop=True)
else:
comparison_df_sorted = pd.DataFrame(columns=[ # Ensure empty DF has correct columns
'Experimento', 'Modelo', 'R² Biomasa', 'RMSE Biomasa',
'R² Sustrato', 'RMSE Sustrato', 'R² Producto', 'RMSE Producto'
])
final_message = "Procesamiento completado."
if all_plot_messages:
final_message += " Mensajes:\n" + "\n".join(all_plot_messages)
if not figures and not comparison_df_sorted.empty:
final_message += "\nNo se generaron gráficos, pero hay datos en la tabla."
elif not figures and comparison_df_sorted.empty:
final_message += "\nNo se generaron gráficos ni datos para la tabla."
return figures, comparison_df_sorted, final_message
def create_interface():
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Modelos Cinéticos de Bioprocesos")
gr.Markdown(r"""
Análisis y visualización de datos de bioprocesos utilizando modelos cinéticos como Logístico, Gompertz y Moser para el crecimiento de biomasa,
y el modelo de Luedeking-Piret para el consumo de sustrato y la formación de producto.
**Instrucciones:**
1. Sube un archivo Excel. El archivo debe tener una estructura de MultiIndex en las columnas:
- Nivel 0: Nombre del experimento/tratamiento (ej: "Control", "Tratamiento A")
- Nivel 1: Tipo de dato ("Tiempo", "Biomasa", "Sustrato", "Producto")
- Si hay réplicas, deben estar como columnas separadas bajo el mismo nombre de experimento (Nivel 0) y tipo de dato (Nivel 1).
Ejemplo: (Control, Biomasa, Rep1), (Control, Biomasa, Rep2). El código promediará estas réplicas para los modos "average" y "combinado".
Para el modo "independent", se asume una sola serie de datos por (Experimento, TipoDato).
2. Selecciona el/los tipo(s) de modelo(s) de biomasa a ajustar.
3. Elige el modo de análisis:
- `independent`: Analiza cada experimento (columna de Nivel 0) individualmente.
- `average`: Promedia los datos de todos los experimentos dentro de una hoja y ajusta los modelos a estos promedios. Se grafica en subplots separados.
- `combinado`: Similar a `average`, pero grafica Biomasa, Sustrato y Producto en un solo gráfico con múltiples ejes Y.
4. Configura las opciones de graficación (leyenda, parámetros, estilos, colores, etc.).
5. (Opcional) Personaliza los nombres de los experimentos y los títulos de los ejes.
6. Haz clic en "Simular" para generar los gráficos y la tabla comparativa.
7. Puedes exportar la tabla de resultados a Excel.
""")
gr.Markdown(r"""
## Ecuaciones Diferenciales Utilizadas
**Biomasa:**
- Logístico:
$$
\frac{dX}{dt} = \mu_m X\left(1 - \frac{X}{X_m}\right)
$$
Solución integral: $X(t) = \frac{X_0 \exp(\mu_m t)}{1 - (X_0/X_m)(1 - \exp(\mu_m t))}$
- Gompertz (Modificado):
$$
X(t) = X_m \exp\left(-\exp\left(\left(\frac{\mu_m e}{X_m}\right)(\text{lag}-t)+1\right)\right)
$$
Ecuación diferencial:
$$
\frac{dX}{dt} = X(t)\left(\frac{\mu_m e}{X_m}\right)\exp\left(\left(\frac{\mu_m e}{X_m}\right)(\text{lag}-t)+1\right)
$$
- Moser (simplificado, asumiendo $S \gg K_s$ o crecimiento no limitado por sustrato modelado explícitamente aquí):
$$
X(t)=X_m(1-e^{-\mu_m(t-K_s)})
$$
Ecuación diferencial (forma simplificada, no estándar de Moser que depende de S):
$$
\frac{dX}{dt}=\mu_m(X_m - X)
$$
**Sustrato y Producto (Luedeking-Piret):**
$$
\frac{dS}{dt} = -p \frac{dX}{dt} - q X \quad \Rightarrow \quad S(t) = S_0 - p(X(t)-X_0) - q \int_0^t X(\tau)d\tau
$$
$$
\frac{dP}{dt} = \alpha \frac{dX}{dt} + \beta X \quad \Rightarrow \quad P(t) = P_0 + \alpha(X(t)-X_0) + \beta \int_0^t X(\tau)d\tau
$$
Donde $X_0, S_0, P_0$ son las concentraciones iniciales.
Parámetros:
- $X_m$: Máxima concentración de biomasa.
- $\mu_m$: Máxima tasa de crecimiento específico.
- $X_0$: Concentración inicial de biomasa.
- $\text{lag}$: Duración de la fase de latencia.
- $K_s$: Constante de afinidad (en el modelo de Moser simplificado, actúa como un tiempo de retardo).
- $p$: Coeficiente de rendimiento de biomasa a partir de sustrato (asociado al crecimiento). $1/Y_{X/S}^{crecimiento}$.
- $q$: Coeficiente de mantenimiento. $m_S$.
- $\alpha$: Coeficiente de formación de producto asociado al crecimiento. $Y_{P/X}^{crecimiento}$.
- $\beta$: Coeficiente de formación de producto no asociado al crecimiento. $m_P$.
""")
with gr.Row():
file_input = gr.File(label="Subir archivo Excel (.xlsx)", file_types=['.xlsx'])
mode = gr.Radio(["independent", "average", "combinado"], label="Modo de Análisis", value="independent",
info="Independent: cada experimento. Average/Combinado: promedio de la hoja.")
with gr.Accordion("Configuración de Modelos y Simulación", open=False):
model_types_selected = gr.CheckboxGroup(
choices=["logistic", "gompertz", "moser"],
label="Tipo(s) de Modelo de Biomasa",
value=["logistic"]
)
use_differential = gr.Checkbox(label="Usar Ecuaciones Diferenciales para Graficar (experimental)", value=False,
info="Si se marca, las curvas se generan resolviendo las EDOs. Si no, por ajuste directo de las formas integradas.")
maxfev_input = gr.Number(label="maxfev (Máx. evaluaciones para el ajuste)", value=50000, minimum=1000, step=1000)
experiment_names_str = gr.Textbox(
label="Nombres de los experimentos/hojas (uno por línea, opcional)",
placeholder="Nombre para Hoja 1\nNombre para Hoja 2\n...",
lines=3,
info="Si se deja vacío, se usarán los nombres de las hojas o 'Exp X'."
)
with gr.Accordion("Configuración de Gráficos", open=False):
with gr.Row():
with gr.Column(scale=1):
legend_position = gr.Radio(
choices=["upper left", "upper right", "lower left", "lower right", "best"],
label="Posición de Leyenda", value="best"
)
show_legend = gr.Checkbox(label="Mostrar Leyenda", value=True)
with gr.Column(scale=1):
params_position = gr.Radio(
choices=["upper left", "upper right", "lower left", "lower right", "outside right"],
label="Posición de Parámetros", value="upper right"
)
show_params = gr.Checkbox(label="Mostrar Parámetros", value=True)
with gr.Row():
style_dropdown = gr.Dropdown(choices=['white', 'dark', 'whitegrid', 'darkgrid', 'ticks'],
label="Estilo de Gráfico (Seaborn)", value='whitegrid')
line_color_picker = gr.ColorPicker(label="Color de Línea (Modelo)", value='#0072B2') # Seaborn blue
point_color_picker = gr.ColorPicker(label="Color de Puntos (Datos)", value='#D55E00') # Seaborn orange
with gr.Row():
line_style_dropdown = gr.Dropdown(choices=['-', '--', '-.', ':'], label="Estilo de Línea", value='-')
marker_style_dropdown = gr.Dropdown(choices=['o', 's', '^', 'v', 'D', 'x', '+', '*'],
label="Estilo de Marcador (Puntos)", value='o')
with gr.Row():
x_axis_label_input = gr.Textbox(label="Título Eje X", value="Tiempo (h)", placeholder="Tiempo (unidades)")
biomass_axis_label_input = gr.Textbox(label="Título Eje Y (Biomasa)", value="Biomasa (g/L)", placeholder="Biomasa (unidades)")
with gr.Row():
substrate_axis_label_input = gr.Textbox(label="Título Eje Y (Sustrato)", value="Sustrato (g/L)", placeholder="Sustrato (unidades)")
product_axis_label_input = gr.Textbox(label="Título Eje Y (Producto)", value="Producto (g/L)", placeholder="Producto (unidades)")
# Lower/Upper bounds are not currently used by the curve_fit in BioprocessModel,
# but kept here for potential future implementation.
with gr.Accordion("Configuración Avanzada de Ajuste (No implementado aún)", open=False):
with gr.Row():
lower_bounds_str = gr.Textbox(label="Lower Bounds (no usado actualmente)", lines=3)
upper_bounds_str = gr.Textbox(label="Upper Bounds (no usado actualmente)", lines=3)
simulate_btn = gr.Button("Simular y Graficar", variant="primary")
status_message = gr.Textbox(label="Estado del Procesamiento", interactive=False)
output_gallery = gr.Gallery(label="Resultados Gráficos", columns=[2,1], height='auto', object_fit="contain")
# Change the gr.Dataframe initialization
output_table = gr.Dataframe(
label="Tabla Comparativa de Modelos (Ordenada por R² Biomasa Descendente)",
headers=["Experimento", "Modelo", "R² Biomasa", "RMSE Biomasa",
"R² Sustrato", "RMSE Sustrato", "R² Producto", "RMSE Producto"],
interactive=False, wrap=True # Remove height=400
)
state_df = gr.State(pd.DataFrame()) # To store the dataframe for export
def run_simulation_interface(file, legend_pos, params_pos, models_sel, analysis_mode, exp_names,
low_bounds, up_bounds, plot_style,
line_col, point_col, line_sty, marker_sty,
show_leg, show_par, use_diff, maxfev,
x_label, biomass_label, substrate_label, product_label):
if file is None:
return [], pd.DataFrame(), "Error: Por favor, sube un archivo Excel."
axis_labels = {
'x_label': x_label if x_label else 'Tiempo',
'biomass_label': biomass_label if biomass_label else 'Biomasa',
'substrate_label': substrate_label if substrate_label else 'Sustrato',
'product_label': product_label if product_label else 'Producto'
}
if not models_sel: # Check if no models are selected
return [], pd.DataFrame(), "Error: Por favor, selecciona al menos un tipo de modelo de biomasa."
figures, comparison_df, message = process_all_data(
file, legend_pos, params_pos, models_sel, exp_names,
low_bounds, up_bounds, analysis_mode, plot_style,
line_col, point_col, line_sty, marker_sty,
show_leg, show_par, use_diff, int(maxfev),
axis_labels # Pass the constructed dictionary
)
return figures, comparison_df, message, comparison_df # Pass df to state too
simulate_btn.click(
fn=run_simulation_interface,
inputs=[
file_input, legend_position, params_position, model_types_selected, mode, experiment_names_str,
lower_bounds_str, upper_bounds_str, style_dropdown,
line_color_picker, point_color_picker, line_style_dropdown, marker_style_dropdown,
show_legend, show_params, use_differential, maxfev_input,
x_axis_label_input, biomass_axis_label_input, substrate_axis_label_input, product_axis_label_input # New axis label inputs
],
outputs=[output_gallery, output_table, status_message, state_df]
)
def export_excel_interface(df_to_export):
if df_to_export is None or df_to_export.empty:
# Create a temporary empty file to satisfy Gradio's file output expectation
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as tmp:
tmp.write(b"No hay datos para exportar.")
return tmp.name # Return path to this dummy file
# Alternatively, raise an error or return a specific message if Gradio handles None better
# For now, returning a dummy file path is safer.
try:
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False, mode='w+b') as tmp:
df_to_export.to_excel(tmp.name, index=False)
return tmp.name
except Exception as e:
# print(f"Error al exportar a Excel: {e}")
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as tmp:
tmp.write(f"Error al exportar a Excel: {e}".encode())
return tmp.name
export_btn = gr.Button("Exportar Tabla a Excel")
download_file_output = gr.File(label="Descargar archivo Excel", interactive=False)
export_btn.click(
fn=export_excel_interface,
inputs=state_df, # Get the DataFrame from the state
outputs=download_file_output
)
gr.Examples(
examples=[
[None, "best", "upper right", ["logistic"], "independent", "Exp A\nExp B", "", "", "whitegrid", "#0072B2", "#D55E00", "-", "o", True, True, False, 50000, "Tiempo (días)", "Células (millones/mL)", "Glucosa (mM)", "Anticuerpo (mg/L)"]
],
inputs=[
file_input, legend_position, params_position, model_types_selected, mode, experiment_names_str,
lower_bounds_str, upper_bounds_str, style_dropdown,
line_color_picker, point_color_picker, line_style_dropdown, marker_style_dropdown,
show_legend, show_params, use_differential, maxfev_input,
x_axis_label_input, biomass_axis_label_input, substrate_axis_label_input, product_axis_label_input
],
label="Ejemplo de Configuración (subir archivo manualmente)"
)
return demo
if __name__ == '__main__':
# For local execution without explicit share=True, Gradio might choose a local URL.
# share=True is useful for Colab or when needing external access.
# For robust execution, explicitly manage the server if needed.
# Check if running in a Google Colab environment
try:
import google.colab
IN_COLAB = True
except:
IN_COLAB = False
demo_instance = create_interface()
# demo_instance.launch(share=IN_COLAB) # Share only if in Colab, otherwise local
demo_instance.launch(share=True) # Force share for testing purposes