diff --git a/CHANGELOG.md b/CHANGELOG.md index 567f88c8..f2ba3f2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Features +- [#413](https://github.com/pybop-team/PyBOP/pull/413) - Adds `DesignCost` functionality to `WeightedCost` class with additional tests. - [#357](https://github.com/pybop-team/PyBOP/pull/357) - Adds `Transformation()` class with `LogTransformation()`, `IdentityTransformation()`, and `ScaledTransformation()`, `ComposedTransformation()` implementations with corresponding examples and tests. - [#427](https://github.com/pybop-team/PyBOP/issues/427) - Adds the nbstripout pre-commit hook to remove unnecessary metadata from notebooks. - [#327](https://github.com/pybop-team/PyBOP/issues/327) - Adds the `WeightedCost` subclass, defines when to evaluate a problem and adds the `spm_weighted_cost` example script. diff --git a/examples/scripts/spm_weighted_cost.py b/examples/scripts/spm_weighted_cost.py index 74c43a33..43c5cd00 100644 --- a/examples/scripts/spm_weighted_cost.py +++ b/examples/scripts/spm_weighted_cost.py @@ -24,24 +24,37 @@ # Generate data sigma = 0.001 -t_eval = np.arange(0, 900, 3) -values = model.predict(t_eval=t_eval) -corrupt_values = values["Voltage [V]"].data + np.random.normal(0, sigma, len(t_eval)) +init_soc = 0.5 +experiment = pybop.Experiment( + [ + ( + "Discharge at 0.5C for 3 minutes (3 second period)", + "Charge at 0.5C for 3 minutes (3 second period)", + ), + ] + * 2 +) +values = model.predict(experiment=experiment, init_soc=init_soc) + + +def noise(sigma): + return np.random.normal(0, sigma, len(values["Voltage [V]"].data)) + # Form dataset dataset = pybop.Dataset( { - "Time [s]": t_eval, + "Time [s]": values["Time [s]"].data, "Current function [A]": values["Current [A]"].data, - "Voltage [V]": corrupt_values, + "Voltage [V]": values["Voltage [V]"].data + noise(sigma), } ) # Generate problem, cost function, and optimisation class -problem = pybop.FittingProblem(model, parameters, dataset) +problem = pybop.FittingProblem(model, parameters, dataset, init_soc=init_soc) cost1 = pybop.SumSquaredError(problem) cost2 = pybop.RootMeanSquaredError(problem) -weighted_cost = pybop.WeightedCost(cost1, cost2, weights=[1, 100]) +weighted_cost = pybop.WeightedCost(cost1, cost2, weights=[0.1, 1]) for cost in [weighted_cost, cost1, cost2]: optim = pybop.IRPropMin(cost, max_iterations=60) diff --git a/examples/scripts/spme_max_energy.py b/examples/scripts/spme_max_energy.py index f5b7c827..8542b4ad 100644 --- a/examples/scripts/spme_max_energy.py +++ b/examples/scripts/spme_max_energy.py @@ -9,12 +9,6 @@ # electrode widths, particle radii, volume fractions and # separator width. -# NOTE: This script can be easily adjusted to consider the volumetric -# (instead of gravimetric) energy density by changing the line which -# defines the cost and changing the output to: -# print(f"Initial volumetric energy density: {cost(optim.x0):.2f} Wh.m-3") -# print(f"Optimised volumetric energy density: {final_cost:.2f} Wh.m-3") - # Define parameter set and model parameter_set = pybop.ParameterSet.pybamm("Chen2020", formation_concentrations=True) model = pybop.lithium_ion.SPMe(parameter_set=parameter_set) @@ -45,17 +39,21 @@ model, parameters, experiment, signal=signal, init_soc=init_soc ) -# Generate cost function and optimisation class: -cost = pybop.GravimetricEnergyDensity(problem) +# Generate multiple cost functions and combine them. +cost1 = pybop.GravimetricEnergyDensity(problem, update_capacity=True) +cost2 = pybop.VolumetricEnergyDensity(problem, update_capacity=True) +cost = pybop.WeightedCost(cost1, cost2, weights=[1, 1]) + +# Run optimisation optim = pybop.PSO( cost, verbose=True, allow_infeasible_solutions=False, max_iterations=15 ) - -# Run optimisation x, final_cost = optim.run() print("Estimated parameters:", x) -print(f"Initial gravimetric energy density: {cost(optim.x0):.2f} Wh.kg-1") -print(f"Optimised gravimetric energy density: {final_cost:.2f} Wh.kg-1") +print(f"Initial gravimetric energy density: {cost1(optim.x0):.2f} Wh.kg-1") +print(f"Optimised gravimetric energy density: {cost1(x):.2f} Wh.kg-1") +print(f"Initial volumetric energy density: {cost2(optim.x0):.2f} Wh.m-3") +print(f"Optimised volumetric energy density: {cost2(x):.2f} Wh.m-3") # Plot the timeseries output if cost.update_capacity: diff --git a/examples/standalone/problem.py b/examples/standalone/problem.py index 18bf1f7d..5fd33e0f 100644 --- a/examples/standalone/problem.py +++ b/examples/standalone/problem.py @@ -42,7 +42,7 @@ def __init__( ) self._target = {signal: self._dataset[signal] for signal in self.signal} - def evaluate(self, inputs): + def evaluate(self, inputs, **kwargs): """ Evaluate the model with the given parameters and return the signal. diff --git a/pybop/costs/_likelihoods.py b/pybop/costs/_likelihoods.py index 3b0a3631..87ed9381 100644 --- a/pybop/costs/_likelihoods.py +++ b/pybop/costs/_likelihoods.py @@ -43,7 +43,7 @@ def _evaluate(self, inputs: Inputs) -> float: """ Evaluates the Gaussian log-likelihood for the given parameters with known sigma. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return -np.inf e = np.asarray( @@ -51,9 +51,7 @@ def _evaluate(self, inputs: Inputs) -> float: np.sum( self._offset + self._multip - * np.sum( - (self._target[signal] - self._current_prediction[signal]) ** 2.0 - ) + * np.sum((self._target[signal] - self.y[signal]) ** 2.0) ) for signal in self.signal ] @@ -65,20 +63,15 @@ def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]: """ Calculates the log-likelihood and gradient. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return -np.inf, -self._de * np.ones(self.n_parameters) likelihood = self._evaluate(inputs) r = np.asarray( - [ - self._target[signal] - self._current_prediction[signal] - for signal in self.signal - ] - ) - dl = np.sum( - (np.sum((r * self._current_sensitivities.T), axis=2) / self.sigma2), axis=1 + [self._target[signal] - self.y[signal] for signal in self.signal] ) + dl = np.sum((np.sum((r * self.dy.T), axis=2) / self.sigma2), axis=1) return likelihood, dl @@ -123,7 +116,7 @@ def __init__( super().__init__(problem) self._dsigma_scale = dsigma_scale self._logpi = -0.5 * self.n_time_data * np.log(2 * np.pi) - self._fixed_problem = False # keep problem evaluation within _evaluate + self._has_separable_problem = False self.sigma = Parameters() self._add_sigma_parameters(sigma0) @@ -196,10 +189,8 @@ def _evaluate(self, inputs: Inputs) -> float: if np.any(sigma <= 0): return -np.inf - self._current_prediction = self.problem.evaluate( - self.problem.parameters.as_dict() - ) - if not self.verify_prediction(self._current_prediction): + self.y = self.problem.evaluate(self.problem.parameters.as_dict()) + if not self.verify_prediction(self.y): return -np.inf e = np.asarray( @@ -207,9 +198,7 @@ def _evaluate(self, inputs: Inputs) -> float: np.sum( self._logpi - self.n_time_data * np.log(sigma) - - np.sum( - (self._target[signal] - self._current_prediction[signal]) ** 2.0 - ) + - np.sum((self._target[signal] - self.y[signal]) ** 2.0) / (2.0 * sigma**2.0) ) for signal in self.signal @@ -238,23 +227,16 @@ def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]: if np.any(sigma <= 0): return -np.inf, -self._de * np.ones(self.n_parameters) - self._current_prediction, self._current_sensitivities = self.problem.evaluateS1( - self.problem.parameters.as_dict() - ) - if not self.verify_prediction(self._current_prediction): + self.y, self.dy = self.problem.evaluateS1(self.problem.parameters.as_dict()) + if not self.verify_prediction(self.y): return -np.inf, -self._de * np.ones(self.n_parameters) likelihood = self._evaluate(inputs) r = np.asarray( - [ - self._target[signal] - self._current_prediction[signal] - for signal in self.signal - ] - ) - dl = np.sum( - (np.sum((r * self._current_sensitivities.T), axis=2) / (sigma**2.0)), axis=1 + [self._target[signal] - self.y[signal] for signal in self.signal] ) + dl = np.sum((np.sum((r * self.dy.T), axis=2) / (sigma**2.0)), axis=1) dsigma = ( -self.n_time_data / sigma + np.sum(r**2.0, axis=1) / (sigma**3.0) ) / self._dsigma_scale @@ -296,6 +278,9 @@ def __init__(self, problem, likelihood, sigma0=None, gradient_step=1e-3): ): raise ValueError(f"{self.likelihood} must be a subclass of BaseLikelihood") + self.parameters = self.likelihood.parameters + self._has_separable_problem = self.likelihood._has_separable_problem + def _evaluate(self, inputs: Inputs) -> float: """ Calculate the maximum a posteriori cost for a given set of parameters. @@ -317,9 +302,9 @@ def _evaluate(self, inputs: Inputs) -> float: if not np.isfinite(log_prior).any(): return -np.inf - if self._fixed_problem: - self.likelihood._current_prediction = self._current_prediction - log_likelihood = self.likelihood._evaluate(inputs) + if self._has_separable_problem: + self.likelihood.y = self.y + log_likelihood = self.likelihood.evaluate(inputs) posterior = log_likelihood + log_prior return posterior @@ -351,12 +336,9 @@ def _evaluateS1(self, inputs: Inputs) -> tuple[float, np.ndarray]: if not np.isfinite(log_prior).any(): return -np.inf, -self._de * np.ones(self.n_parameters) - if self._fixed_problem: - ( - self.likelihood._current_prediction, - self.likelihood._current_sensitivities, - ) = self._current_prediction, self._current_sensitivities - log_likelihood, dl = self.likelihood._evaluateS1(inputs) + if self._has_separable_problem: + self.likelihood.y, self.likelihood.dy = (self.y, self.dy) + log_likelihood, dl = self.likelihood.evaluateS1(inputs) # Compute a finite difference approximation of the gradient of the log prior delta = self.parameters.initial_value() * self.gradient_step diff --git a/pybop/costs/_weighted_cost.py b/pybop/costs/_weighted_cost.py index 216e0303..ac4d9002 100644 --- a/pybop/costs/_weighted_cost.py +++ b/pybop/costs/_weighted_cost.py @@ -1,8 +1,9 @@ +import warnings from typing import Optional import numpy as np -from pybop import BaseCost +from pybop import BaseCost, BaseLikelihood, DesignCost from pybop.parameters.parameter import Inputs @@ -13,53 +14,72 @@ class WeightedCost(BaseCost): Inherits all parameters and attributes from ``BaseCost``. - Additional Attributes + Attributes --------------------- costs : list[pybop.BaseCost] A list of PyBOP cost objects. weights : list[float] A list of values with which to weight the cost values. - _different_problems : bool - If True, the problem for each cost is evaluated independently during - each evaluation of the cost (default: False). + _has_identical_problems : bool + If True, the shared problem will be evaluated once and saved before the + self._evaluate() method of each cost is called (default: False). + _has_separable_problem: bool + If True, the shared problem is seperable from the cost function and + will be evaluated for each problem before the cost evaluation is + called (default: False). This attribute is used for sub-cost objects; + however, the top-level WeightedCost attribute is not used (i.e. == False). """ - def __init__(self, *args, weights: Optional[list[float]] = None): - self.costs = [] - for cost in args: - if not isinstance(cost, BaseCost): - raise TypeError(f"Received {type(cost)} instead of cost object.") - self.costs.append(cost) - self.weights = weights - self._different_problems = False - - if self.weights is None: + def __init__(self, *costs, weights: Optional[list[float]] = None): + if not all(isinstance(cost, BaseCost) for cost in costs): + raise TypeError("All costs must be instances of BaseCost.") + self.costs = [cost for cost in costs] + if len(set(type(cost.problem) for cost in self.costs)) > 1: + raise TypeError("All problems must be of the same class type.") + self.minimising = not any( + isinstance(cost, (BaseLikelihood, DesignCost)) for cost in self.costs + ) + + # Check if weights are provided + if weights is not None: + try: + self.weights = np.asarray(weights, dtype=float) + except ValueError: + raise ValueError("Weights must be numeric values.") from None + + if self.weights.size != len(self.costs): + raise ValueError("Number of weights must match number of costs.") + else: self.weights = np.ones(len(self.costs)) - elif isinstance(self.weights, list): - self.weights = np.array(self.weights) - if not isinstance(self.weights, np.ndarray): - raise TypeError( - "Expected a list or array of weights the same length as costs." - ) - if not len(self.weights) == len(self.costs): - raise ValueError( - "Expected a list or array of weights the same length as costs." - ) # Check if all costs depend on the same problem - for cost in self.costs: - if hasattr(cost, "problem") and cost.problem is not self.costs[0].problem: - self._different_problems = True + self._has_identical_problems = all( + cost._has_separable_problem and cost.problem is self.costs[0].problem + for cost in self.costs + ) - if not self._different_problems: + if self._has_identical_problems: super().__init__(self.costs[0].problem) - self._fixed_problem = self.costs[0]._fixed_problem else: super().__init__() - self._fixed_problem = False for cost in self.costs: self.parameters.join(cost.parameters) + # Check if any cost function requires capacity update + self.update_capacity = False + if any(cost.update_capacity for cost in self.costs): + self.update_capacity = True + + warnings.warn( + "WeightedCost doesn't currently support DesignCosts with different `update_capacity` attributes,\n" + f"Using global `DesignCost.update_capacity` attribute as: {self.update_capacity}", + UserWarning, + stacklevel=2, + ) + + # Weighted costs do not use this functionality + self._has_separable_problem = False + def _evaluate(self, inputs: Inputs): """ Calculate the weighted cost for a given set of parameters. @@ -74,19 +94,21 @@ def _evaluate(self, inputs: Inputs): float The weighted cost value. """ - e = np.empty_like(self.costs) + self.parameters.update(values=list(inputs.values())) - if not self._fixed_problem and self._different_problems: - self.parameters.update(values=list(inputs.values())) - elif not self._fixed_problem: - self._current_prediction = self.problem.evaluate(inputs) + if self._has_identical_problems: + self.y = self.problem.evaluate(inputs, update_capacity=self.update_capacity) + + e = np.empty_like(self.costs) for i, cost in enumerate(self.costs): - if not self._fixed_problem and self._different_problems: - inputs = cost.parameters.as_dict() - cost._current_prediction = cost.problem.evaluate(inputs) - else: - cost._current_prediction = self._current_prediction + inputs = cost.parameters.as_dict() + if self._has_identical_problems: + cost.y = self.y + elif cost._has_separable_problem: + cost.y = cost.problem.evaluate( + inputs, update_capacity=self.update_capacity + ) e[i] = cost._evaluate(inputs) return np.dot(e, self.weights) @@ -106,30 +128,27 @@ def _evaluateS1(self, inputs: Inputs): A tuple containing the cost and the gradient. The cost is a float, and the gradient is an array-like of the same length as `x`. """ + self.parameters.update(values=list(inputs.values())) + + if self._has_identical_problems: + self.y, self.dy = self.problem.evaluateS1(inputs) + e = np.empty_like(self.costs) de = np.empty((len(self.parameters), len(self.costs))) - if not self._fixed_problem and self._different_problems: - self.parameters.update(values=list(inputs.values())) - elif not self._fixed_problem: - self._current_prediction, self._current_sensitivities = ( - self.problem.evaluateS1(inputs) - ) - for i, cost in enumerate(self.costs): - if not self._fixed_problem and self._different_problems: - inputs = cost.parameters.as_dict() - cost._current_prediction, cost._current_sensitivities = ( - cost.problem.evaluateS1(inputs) - ) - else: - cost._current_prediction, cost._current_sensitivities = ( - self._current_prediction, - self._current_sensitivities, - ) + inputs = cost.parameters.as_dict() + if self._has_identical_problems: + cost.y, cost.dy = (self.y, self.dy) + elif cost._has_separable_problem: + cost.y, cost.dy = cost.problem.evaluateS1(inputs) e[i], de[:, i] = cost._evaluateS1(inputs) e = np.dot(e, self.weights) de = np.dot(de, self.weights) return e, de + + @property + def has_identical_problems(self): + return self._has_identical_problems diff --git a/pybop/costs/base_cost.py b/pybop/costs/base_cost.py index 116104b2..168a4dee 100644 --- a/pybop/costs/base_cost.py +++ b/pybop/costs/base_cost.py @@ -22,19 +22,20 @@ class BaseCost: An array containing the target data to fit. n_outputs : int The number of outputs in the model. - - Additional Attributes - --------------------- - _fixed_problem : bool - If True, the problem does not need to be rebuilt before the cost is - calculated (default: False). + _has_separable_problem : bool + If True, the problem is separable from the cost function and will be + evaluated in advance of the call to self._evaluate() (default: False). """ def __init__(self, problem: Optional[BaseProblem] = None): self.parameters = Parameters() self.transformation = None self.problem = problem - self._fixed_problem = False + self.verbose = False + self._has_separable_problem = False + self.update_capacity = False + self.y = None + self.dy = None self.set_fail_gradient() if isinstance(self.problem, BaseProblem): self._target = self.problem._target @@ -42,12 +43,16 @@ def __init__(self, problem: Optional[BaseProblem] = None): self.n_outputs = self.problem.n_outputs self.signal = self.problem.signal self.transformation = self.parameters.construct_transformation() - self._fixed_problem = True + self._has_separable_problem = True @property def n_parameters(self): return len(self.parameters) + @property + def has_separable_problem(self): + return self._has_separable_problem + def __call__(self, inputs: Union[Inputs, list]): """ Call the evaluate function for a given set of parameters. @@ -78,8 +83,10 @@ def evaluate(self, inputs: Union[Inputs, list]): inputs = self.parameters.verify(p if self.transformation else inputs) try: - if self._fixed_problem: - self._current_prediction = self.problem.evaluate(inputs) + if self._has_separable_problem: + self.y = self.problem.evaluate( + inputs, update_capacity=self.update_capacity + ) return self._evaluate(inputs) @@ -137,10 +144,8 @@ def evaluateS1(self, inputs: Union[Inputs, list]): inputs = self.parameters.verify(p if self.transformation else inputs) try: - if self._fixed_problem: - self._current_prediction, self._current_sensitivities = ( - self.problem.evaluateS1(inputs) - ) + if self._has_separable_problem: + self.y, self.dy = self.problem.evaluateS1(inputs) return self._evaluateS1(inputs) diff --git a/pybop/costs/design_costs.py b/pybop/costs/design_costs.py index 1931bd33..ce00ad9e 100644 --- a/pybop/costs/design_costs.py +++ b/pybop/costs/design_costs.py @@ -65,24 +65,6 @@ def update_simulation_data(self, inputs: Inputs): self.problem._target = {key: solution[key] for key in self.problem.signal} self.dt = solution["Time [s]"][1] - solution["Time [s]"][0] - def _evaluate(self, inputs: Inputs): - """ - Computes the value of the cost function. - - This method must be implemented by subclasses. - - Parameters - ---------- - inputs : Inputs - The parameters for which to compute the cost. - - Raises - ------ - NotImplementedError - If the method has not been implemented by the subclass. - """ - raise NotImplementedError - class GravimetricEnergyDensity(DesignCost): """ @@ -112,31 +94,15 @@ def _evaluate(self, inputs: Inputs): float The gravimetric energy density or -infinity in case of infeasible parameters. """ - try: - with warnings.catch_warnings(): - # Convert UserWarning to an exception - warnings.filterwarnings("error", category=UserWarning) - - if self.update_capacity: - self.problem.model.approximate_capacity(inputs) - solution = self.problem.evaluate(inputs) - - voltage, current = solution["Voltage [V]"], solution["Current [A]"] - energy_density = np.trapz(voltage * current, dx=self.dt) / ( - 3600 * self.problem.model.cell_mass(self.parameter_set) - ) - - return energy_density - - # Catch infeasible solutions and return infinity - except UserWarning as e: - print(f"Ignoring this sample due to: {e}") + if not any(np.isfinite(self.y[signal][0]) for signal in self.signal): return -np.inf - # Catch any other exception and return infinity - except Exception as e: - print(f"An error occurred during the evaluation: {e}") - return -np.inf + voltage, current = self.y["Voltage [V]"], self.y["Current [A]"] + energy_density = np.trapz(voltage * current, dx=self.dt) / ( + 3600 * self.problem.model.cell_mass(self.parameter_set) + ) + + return energy_density class VolumetricEnergyDensity(DesignCost): @@ -151,7 +117,6 @@ class VolumetricEnergyDensity(DesignCost): def __init__(self, problem, update_capacity=False): super().__init__(problem, update_capacity) - self._fixed_problem = False # keep problem evaluation within _evaluate def _evaluate(self, inputs: Inputs): """ @@ -167,28 +132,12 @@ def _evaluate(self, inputs: Inputs): float The volumetric energy density or -infinity in case of infeasible parameters. """ - try: - with warnings.catch_warnings(): - # Convert UserWarning to an exception - warnings.filterwarnings("error", category=UserWarning) - - if self.update_capacity: - self.problem.model.approximate_capacity(inputs) - solution = self.problem.evaluate(inputs) - - voltage, current = solution["Voltage [V]"], solution["Current [A]"] - energy_density = np.trapz(voltage * current, dx=self.dt) / ( - 3600 * self.problem.model.cell_volume(self.parameter_set) - ) - - return energy_density - - # Catch infeasible solutions and return infinity - except UserWarning as e: - print(f"Ignoring this sample due to: {e}") + if not any(np.isfinite(self.y[signal][0]) for signal in self.signal): return -np.inf - # Catch any other exception and return infinity - except Exception as e: - print(f"An error occurred during the evaluation: {e}") - return -np.inf + voltage, current = self.y["Voltage [V]"], self.y["Current [A]"] + energy_density = np.trapz(voltage * current, dx=self.dt) / ( + 3600 * self.problem.model.cell_volume(self.parameter_set) + ) + + return energy_density diff --git a/pybop/costs/fitting_costs.py b/pybop/costs/fitting_costs.py index ce68a9ca..f3690189 100644 --- a/pybop/costs/fitting_costs.py +++ b/pybop/costs/fitting_costs.py @@ -38,16 +38,12 @@ def _evaluate(self, inputs: Inputs): The root mean square error. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf e = np.asarray( [ - np.sqrt( - np.mean( - (self._current_prediction[signal] - self._target[signal]) ** 2 - ) - ) + np.sqrt(np.mean((self.y[signal] - self._target[signal]) ** 2)) for signal in self.signal ] ) @@ -74,19 +70,14 @@ def _evaluateS1(self, inputs: Inputs): ValueError If an error occurs during the calculation of the cost or gradient. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf, self._de * np.ones(self.n_parameters) r = np.asarray( - [ - self._current_prediction[signal] - self._target[signal] - for signal in self.signal - ] + [self.y[signal] - self._target[signal] for signal in self.signal] ) e = np.sqrt(np.mean(r**2, axis=1)) - de = np.mean((r * self._current_sensitivities.T), axis=2) / ( - e + np.finfo(float).eps - ) + de = np.mean((r * self.dy.T), axis=2) / (e + np.finfo(float).eps) if self.n_outputs == 1: return e.item(), de.flatten() @@ -129,12 +120,12 @@ def _evaluate(self, inputs: Inputs): float The Sum of Squared Error. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf e = np.asarray( [ - np.sum((self._current_prediction[signal] - self._target[signal]) ** 2) + np.sum((self.y[signal] - self._target[signal]) ** 2) for signal in self.signal ] ) @@ -161,17 +152,14 @@ def _evaluateS1(self, inputs: Inputs): ValueError If an error occurs during the calculation of the cost or gradient. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf, self._de * np.ones(self.n_parameters) r = np.asarray( - [ - self._current_prediction[signal] - self._target[signal] - for signal in self.signal - ] + [self.y[signal] - self._target[signal] for signal in self.signal] ) e = np.sum(np.sum(r**2, axis=0), axis=0) - de = 2 * np.sum(np.sum((r * self._current_sensitivities.T), axis=2), axis=1) + de = 2 * np.sum(np.sum((r * self.dy.T), axis=2), axis=1) return e, de @@ -231,15 +219,12 @@ def _evaluate(self, inputs: Inputs, grad=None): float The Minkowski cost. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf e = np.asarray( [ - np.sum( - np.abs(self._current_prediction[signal] - self._target[signal]) - ** self.p - ) + np.sum(np.abs(self.y[signal] - self._target[signal]) ** self.p) ** (1 / self.p) for signal in self.signal ] @@ -267,27 +252,21 @@ def _evaluateS1(self, inputs): ValueError If an error occurs during the calculation of the cost or gradient. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf, self._de * np.ones(self.n_parameters) r = np.asarray( - [ - self._current_prediction[signal] - self._target[signal] - for signal in self.signal - ] + [self.y[signal] - self._target[signal] for signal in self.signal] ) e = np.asarray( [ - np.sum( - np.abs(self._current_prediction[signal] - self._target[signal]) - ** self.p - ) + np.sum(np.abs(self.y[signal] - self._target[signal]) ** self.p) ** (1 / self.p) for signal in self.signal ] ) de = np.sum( - np.sum(r ** (self.p - 1) * self._current_sensitivities.T, axis=2) + np.sum(r ** (self.p - 1) * self.dy.T, axis=2) / (e ** (self.p - 1) + np.finfo(float).eps), axis=1, ) @@ -347,15 +326,12 @@ def _evaluate(self, inputs: Inputs, grad=None): float The Sum of Power cost. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf e = np.asarray( [ - np.sum( - np.abs(self._current_prediction[signal] - self._target[signal]) - ** self.p - ) + np.sum(np.abs(self.y[signal] - self._target[signal]) ** self.p) for signal in self.signal ] ) @@ -382,19 +358,14 @@ def _evaluateS1(self, inputs): ValueError If an error occurs during the calculation of the cost or gradient. """ - if not self.verify_prediction(self._current_prediction): + if not self.verify_prediction(self.y): return np.inf, self._de * np.ones(self.n_parameters) r = np.asarray( - [ - self._current_prediction[signal] - self._target[signal] - for signal in self.signal - ] + [self.y[signal] - self._target[signal] for signal in self.signal] ) e = np.sum(np.sum(np.abs(r) ** self.p)) - de = self.p * np.sum( - np.sum(r ** (self.p - 1) * self._current_sensitivities.T, axis=2), axis=1 - ) + de = self.p * np.sum(np.sum(r ** (self.p - 1) * self.dy.T, axis=2), axis=1) return e, de @@ -413,7 +384,7 @@ class ObserverCost(BaseCost): def __init__(self, observer: Observer): super().__init__(problem=observer) self._observer = observer - self._fixed_problem = False # keep problem evaluation within _evaluate + self._has_separable_problem = False def _evaluate(self, inputs: Inputs): """ diff --git a/pybop/optimisers/base_optimiser.py b/pybop/optimisers/base_optimiser.py index ba749a0e..c24b081d 100644 --- a/pybop/optimisers/base_optimiser.py +++ b/pybop/optimisers/base_optimiser.py @@ -3,7 +3,14 @@ import numpy as np -from pybop import BaseCost, BaseLikelihood, DesignCost, Parameter, Parameters +from pybop import ( + BaseCost, + BaseLikelihood, + DesignCost, + Parameter, + Parameters, + WeightedCost, +) class BaseOptimiser: @@ -69,6 +76,8 @@ def __init__( self.transformation = self.cost.transformation self.parameters.join(cost.parameters) self.set_allow_infeasible_solutions() + if isinstance(cost, WeightedCost): + self.minimising = cost.minimising if isinstance(cost, (BaseLikelihood, DesignCost)): self.minimising = False diff --git a/pybop/problems/base_problem.py b/pybop/problems/base_problem.py index ee36a6bb..cb3bc597 100644 --- a/pybop/problems/base_problem.py +++ b/pybop/problems/base_problem.py @@ -64,6 +64,7 @@ def __init__( self.n_outputs = len(self.signal) self._time_data = None self._target = None + self.verbose = False if isinstance(model, BaseModel): self.additional_variables = additional_variables diff --git a/pybop/problems/design_problem.py b/pybop/problems/design_problem.py index 30e2d9c3..74b8ae71 100644 --- a/pybop/problems/design_problem.py +++ b/pybop/problems/design_problem.py @@ -1,3 +1,5 @@ +import warnings + import numpy as np from pybop import BaseProblem @@ -45,7 +47,10 @@ def __init__( signal = ["Voltage [V]"] additional_variables.extend(["Time [s]", "Current [A]"]) additional_variables = list(set(additional_variables)) - + self.warning_patterns = [ + "Ah is greater than", + "Non-physical point encountered", + ] super().__init__( parameters, model, @@ -75,7 +80,7 @@ def __init__( self._target = {key: sol[key] for key in self.signal} self._dataset = None - def evaluate(self, inputs: Inputs): + def evaluate(self, inputs: Inputs, update_capacity=False): """ Evaluate the model with the given parameters and return the signal. @@ -90,19 +95,33 @@ def evaluate(self, inputs: Inputs): The model output y(t) simulated with inputs. """ inputs = self.parameters.verify(inputs) - - sol = self._model.predict( - inputs=inputs, - experiment=self.experiment, - init_soc=self.init_soc, - ) - - if sol == [np.inf]: - return sol - - else: - predictions = {} - for signal in self.signal + self.additional_variables: - predictions[signal] = sol[signal].data + if update_capacity: + self.model.approximate_capacity(inputs) + + try: + with warnings.catch_warnings(): + for pattern in self.warning_patterns: + warnings.filterwarnings( + "error", category=UserWarning, message=pattern + ) + + sol = self._model.predict( + inputs=inputs, + experiment=self.experiment, + init_soc=self.init_soc, + ) + + # Catch infeasible solutions and return infinity + except (UserWarning, Exception) as e: + if self.verbose: + print(f"Ignoring this sample due to: {e}") + return { + signal: np.asarray(np.ones(2) * -np.inf) + for signal in [*self.signal, *self.additional_variables] + } + + predictions = {} + for signal in self.signal + self.additional_variables: + predictions[signal] = sol[signal].data return predictions diff --git a/pybop/problems/fitting_problem.py b/pybop/problems/fitting_problem.py index 58d59e9e..0ad636ce 100644 --- a/pybop/problems/fitting_problem.py +++ b/pybop/problems/fitting_problem.py @@ -79,7 +79,7 @@ def __init__( init_soc=self.init_soc, ) - def evaluate(self, inputs: Inputs): + def evaluate(self, inputs: Inputs, update_capacity=False): """ Evaluate the model with the given parameters and return the signal. diff --git a/tests/integration/test_weighted_cost.py b/tests/integration/test_weighted_cost.py new file mode 100644 index 00000000..e964e970 --- /dev/null +++ b/tests/integration/test_weighted_cost.py @@ -0,0 +1,185 @@ +import numpy as np +import pytest + +import pybop + + +class TestWeightedCost: + """ + A class to test the weighted cost function. + """ + + @pytest.fixture(autouse=True) + def setup(self): + self.sigma0 = 0.002 + self.ground_truth = np.asarray([0.55, 0.55]) + np.random.normal( + loc=0.0, scale=0.05, size=2 + ) + + @pytest.fixture + def model(self): + parameter_set = pybop.ParameterSet.pybamm("Chen2020") + return pybop.lithium_ion.SPM(parameter_set=parameter_set) + + @pytest.fixture + def parameters(self): + return pybop.Parameters( + pybop.Parameter( + "Negative electrode active material volume fraction", + prior=pybop.Uniform(0.4, 0.75), + bounds=[0.375, 0.75], + ), + pybop.Parameter( + "Positive electrode active material volume fraction", + prior=pybop.Uniform(0.4, 0.75), + # no bounds + ), + ) + + @pytest.fixture(params=[0.4]) + def init_soc(self, request): + return request.param + + @pytest.fixture( + params=[ + ( + pybop.GaussianLogLikelihoodKnownSigma, + pybop.RootMeanSquaredError, + pybop.SumSquaredError, + pybop.MAP, + ) + ] + ) + def cost_class(self, request): + return request.param + + def noise(self, sigma, values): + return np.random.normal(0, sigma, values) + + @pytest.fixture + def weighted_fitting_cost(self, model, parameters, cost_class, init_soc): + # Form dataset + solution = self.get_data(model, parameters, self.ground_truth, init_soc) + dataset = pybop.Dataset( + { + "Time [s]": solution["Time [s]"].data, + "Current function [A]": solution["Current [A]"].data, + "Voltage [V]": solution["Voltage [V]"].data + + self.noise(self.sigma0, len(solution["Time [s]"].data)), + } + ) + + # Define the cost to optimise + problem = pybop.FittingProblem(model, parameters, dataset, init_soc=init_soc) + costs = [] + for cost in cost_class: + if issubclass(cost, pybop.MAP): + costs.append( + cost( + problem, + pybop.GaussianLogLikelihoodKnownSigma, + sigma0=self.sigma0, + ) + ) + elif issubclass(cost, pybop.BaseLikelihood): + costs.append(cost(problem, sigma0=self.sigma0)) + else: + costs.append(cost(problem)) + + return pybop.WeightedCost(*costs, weights=[0.1, 1, 0.5, 0.6]) + + @pytest.mark.integration + def test_fitting_costs(self, weighted_fitting_cost): + x0 = weighted_fitting_cost.parameters.initial_value() + optim = pybop.CuckooSearch( + cost=weighted_fitting_cost, + sigma0=0.03, + max_iterations=250, + max_unchanged_iterations=35, + ) + + initial_cost = optim.cost(optim.parameters.initial_value()) + x, final_cost = optim.run() + + # Assertions + if not np.allclose(x0, self.ground_truth, atol=1e-5): + if optim.minimising: + assert initial_cost > final_cost + else: + assert initial_cost < final_cost + np.testing.assert_allclose(x, self.ground_truth, atol=1.5e-2) + + @pytest.fixture( + params=[ + ( + pybop.GravimetricEnergyDensity, + pybop.VolumetricEnergyDensity, + ) + ] + ) + def design_cost(self, request): + return request.param + + @pytest.fixture + def weighted_design_cost(self, model, design_cost): + init_soc = 1.0 + parameters = pybop.Parameters( + pybop.Parameter( + "Positive electrode thickness [m]", + prior=pybop.Gaussian(5e-05, 5e-06), + bounds=[2e-06, 10e-05], + ), + pybop.Parameter( + "Negative electrode thickness [m]", + prior=pybop.Gaussian(5e-05, 5e-06), + bounds=[2e-06, 10e-05], + ), + ) + experiment = pybop.Experiment( + ["Discharge at 1C until 3.5 V (5 seconds period)"], + ) + + problem = pybop.DesignProblem( + model, parameters, experiment=experiment, init_soc=init_soc + ) + costs_update_capacity = [] + costs = [] + for cost in design_cost: + costs_update_capacity.append(cost(problem, update_capacity=True)) + costs.append(cost(problem)) + + return [ + pybop.WeightedCost(*costs, weights=[1.0, 0.1]), + pybop.WeightedCost(*costs_update_capacity, weights=[0.1, 1.0]), + ] + + @pytest.mark.integration + @pytest.mark.parametrize("cost_index", [0, 1]) + def test_design_costs(self, weighted_design_cost, cost_index): + cost = weighted_design_cost[cost_index] + optim = pybop.CuckooSearch( + cost, + max_iterations=15, + allow_infeasible_solutions=False, + ) + initial_values = optim.parameters.initial_value() + initial_cost = optim.cost(initial_values) + x, final_cost = optim.run() + + # Assertions + assert initial_cost < final_cost + for i, _ in enumerate(x): + assert x[i] > initial_values[i] + + def get_data(self, model, parameters, x, init_soc): + model.classify_and_update_parameters(parameters) + experiment = pybop.Experiment( + [ + ( + "Discharge at 0.5C for 3 minutes (4 second period)", + "Charge at 0.5C for 3 minutes (4 second period)", + ), + ] + ) + sim = model.predict(init_soc=init_soc, experiment=experiment, inputs=x) + return sim diff --git a/tests/unit/test_cost.py b/tests/unit/test_cost.py index b67f5ab9..309b7e74 100644 --- a/tests/unit/test_cost.py +++ b/tests/unit/test_cost.py @@ -297,7 +297,7 @@ def test_design_costs(self, cost_class, design_problem): # Compute after updating nominal capacity cost = cost_class(design_problem, update_capacity=True) - assert np.isfinite(cost([0.4])) + cost([0.4]) @pytest.mark.unit def test_weighted_fitting_cost(self, problem): @@ -313,25 +313,25 @@ def test_weighted_fitting_cost(self, problem): np.testing.assert_array_equal(weighted_cost.weights, np.ones(2)) with pytest.raises( TypeError, - match=r"Received instead of cost object.", + match="All costs must be instances of BaseCost.", ): - weighted_cost = pybop.WeightedCost("Invalid string") + weighted_cost = pybop.WeightedCost(cost1.problem) with pytest.raises( - TypeError, - match="Expected a list or array of weights the same length as costs.", + ValueError, + match="Weights must be numeric values.", ): weighted_cost = pybop.WeightedCost(cost1, cost2, weights="Invalid string") with pytest.raises( ValueError, - match="Expected a list or array of weights the same length as costs.", + match="Number of weights must match number of costs.", ): weighted_cost = pybop.WeightedCost(cost1, cost2, weights=[1]) - # Test with and without different problems + # Test with identical problems weight = 100 weighted_cost_2 = pybop.WeightedCost(cost1, cost2, weights=[1, weight]) - assert weighted_cost_2._different_problems is False - assert weighted_cost_2._fixed_problem is True + assert weighted_cost_2.has_identical_problems is True + assert weighted_cost_2.has_separable_problem is False assert weighted_cost_2.problem is problem assert weighted_cost_2([0.5]) >= 0 np.testing.assert_allclose( @@ -340,10 +340,11 @@ def test_weighted_fitting_cost(self, problem): atol=1e-5, ) + # Test with different problems cost3 = pybop.RootMeanSquaredError(copy(problem)) weighted_cost_3 = pybop.WeightedCost(cost1, cost3, weights=[1, weight]) - assert weighted_cost_3._different_problems is True - assert weighted_cost_3._fixed_problem is False + assert weighted_cost_3.has_identical_problems is False + assert weighted_cost_3.has_separable_problem is False assert weighted_cost_3.problem is None assert weighted_cost_3([0.5]) >= 0 np.testing.assert_allclose( @@ -357,15 +358,69 @@ def test_weighted_fitting_cost(self, problem): np.testing.assert_allclose(errors_2, errors_3, atol=1e-5) np.testing.assert_allclose(sensitivities_2, sensitivities_3, atol=1e-5) + # Test MAP explicitly + cost4 = pybop.MAP(problem, pybop.GaussianLogLikelihood) + weighted_cost_4 = pybop.WeightedCost(cost1, cost4, weights=[1, weight]) + assert weighted_cost_4.has_identical_problems is False + assert weighted_cost_4.has_separable_problem is False + sigma = 0.05 + assert weighted_cost_4([0.5, sigma]) <= 0 + np.testing.assert_allclose( + weighted_cost_4.evaluate([0.6, sigma]), + cost1([0.6, sigma]) + weight * cost4([0.6, sigma]), + atol=1e-5, + ) + @pytest.mark.unit def test_weighted_design_cost(self, design_problem): cost1 = pybop.GravimetricEnergyDensity(design_problem) - cost2 = pybop.RootMeanSquaredError(design_problem) + cost2 = pybop.VolumetricEnergyDensity(design_problem) - # Test with and without weights + # Test DesignCosts with identical problems weighted_cost = pybop.WeightedCost(cost1, cost2) - assert weighted_cost._different_problems is False - assert weighted_cost._fixed_problem is False + assert weighted_cost.has_identical_problems is True + assert weighted_cost.has_separable_problem is False + assert weighted_cost.problem is design_problem + assert weighted_cost([0.5]) >= 0 + np.testing.assert_allclose( + weighted_cost.evaluate([0.6]), + cost1([0.6]) + cost2([0.6]), + atol=1e-5, + ) + + # Test DesignCosts with different problems + cost3 = pybop.VolumetricEnergyDensity(copy(design_problem)) + weighted_cost = pybop.WeightedCost(cost1, cost3) + assert weighted_cost.has_identical_problems is False + assert weighted_cost.has_separable_problem is False + for i, _ in enumerate(weighted_cost.costs): + assert isinstance(weighted_cost.costs[i].problem, pybop.DesignProblem) + + # Ensure attributes are set correctly and not modified via side-effects + assert weighted_cost.update_capacity is False + assert weighted_cost.costs[0].update_capacity is False + assert weighted_cost.costs[1].update_capacity is False + + assert weighted_cost([0.5]) >= 0 + np.testing.assert_allclose( + weighted_cost.evaluate([0.6]), + cost1([0.6]) + cost2([0.6]), + atol=1e-5, + ) + + @pytest.mark.unit + def test_weighted_design_cost_with_update_capacity(self, design_problem): + cost1 = pybop.GravimetricEnergyDensity(design_problem, update_capacity=True) + cost2 = pybop.VolumetricEnergyDensity(design_problem) + weighted_cost = pybop.WeightedCost(cost1, cost2, weights=[1, 1]) + + # Ensure attributes are set correctly and not modified via side-effects + assert weighted_cost.update_capacity is True + assert weighted_cost.costs[0].update_capacity is True + assert weighted_cost.costs[1].update_capacity is False + + assert weighted_cost.has_identical_problems is True + assert weighted_cost.has_separable_problem is False assert weighted_cost.problem is design_problem assert weighted_cost([0.5]) >= 0 np.testing.assert_allclose( @@ -373,3 +428,13 @@ def test_weighted_design_cost(self, design_problem): cost1([0.6]) + cost2([0.6]), atol=1e-5, ) + + @pytest.mark.unit + def test_mixed_problem_classes(self, problem, design_problem): + cost1 = pybop.SumSquaredError(problem) + cost2 = pybop.GravimetricEnergyDensity(design_problem) + with pytest.raises( + TypeError, + match="All problems must be of the same class type.", + ): + pybop.WeightedCost(cost1, cost2)