diff --git a/discrete_optimization/knapsack/knapsack_model.py b/discrete_optimization/knapsack/knapsack_model.py index ab19fab58..5068002ff 100644 --- a/discrete_optimization/knapsack/knapsack_model.py +++ b/discrete_optimization/knapsack/knapsack_model.py @@ -4,7 +4,7 @@ from copy import deepcopy from dataclasses import dataclass -from typing import Dict, List, Optional, Sequence, Type, cast +from typing import Any, Dict, List, Optional, Sequence, Type, Union, cast import numpy as np @@ -29,7 +29,7 @@ class Item: value: float weight: float - def __str__(self): + def __str__(self) -> str: return ( "ind: " + str(self.index) @@ -53,7 +53,7 @@ def __init__( self.weight = weight self.list_taken = list_taken - def copy(self): + def copy(self) -> "KnapsackSolution": return KnapsackSolution( problem=self.problem, value=self.value, @@ -61,7 +61,7 @@ def copy(self): list_taken=list(self.list_taken), ) - def lazy_copy(self): + def lazy_copy(self) -> "KnapsackSolution": return KnapsackSolution( problem=self.problem, value=self.value, @@ -69,23 +69,25 @@ def lazy_copy(self): list_taken=self.list_taken, ) - def change_problem(self, new_problem: Problem): + def change_problem(self, new_problem: Problem) -> None: if not isinstance(new_problem, KnapsackModel): raise ValueError("new_problem must a KnapsackModel for a KnapsackSolution.") self.problem = new_problem self.list_taken = list(self.list_taken) - def __str__(self): + def __str__(self) -> str: s = "Value=" + str(self.value) + "\n" s += "Weight=" + str(self.weight) + "\n" s += "Taken : " + str(self.list_taken) return s - def __hash__(self): + def __hash__(self) -> int: return hash(str(self)) - def __eq__(self, other): - return self.list_taken == other.list_taken + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, KnapsackSolution) and self.list_taken == other.list_taken + ) class KnapsackModel(Problem): @@ -178,7 +180,7 @@ def satisfy(self, knapsack_solution: KnapsackSolution) -> bool: # type: ignore self.evaluate(knapsack_solution) return knapsack_solution.weight <= self.max_capacity # type: ignore # avoid is None check for efficiency - def __str__(self): + def __str__(self) -> str: s = ( "Knapsack model with " + str(self.nb_items) @@ -200,7 +202,7 @@ def get_solution_type(self) -> Type[Solution]: class KnapsackModel_Mobj(KnapsackModel): @staticmethod - def from_knapsack(knapsack_model: KnapsackModel): + def from_knapsack(knapsack_model: KnapsackModel) -> "KnapsackModel_Mobj": return KnapsackModel_Mobj( list_items=knapsack_model.list_items, max_capacity=knapsack_model.max_capacity, @@ -245,17 +247,21 @@ def evaluate_mobj(self, solution: KnapsackSolution) -> TupleFitness: # type: ig class KnapsackSolutionMultidimensional(Solution): - value: float - weights: List[float] - list_taken: List[bool] - - def __init__(self, problem, list_taken, value=None, weights=None): + def __init__( + self, + problem: Union[ + "MultidimensionalKnapsack", "MultiScenarioMultidimensionalKnapsack" + ], + list_taken: List[int], + value: Optional[float] = None, + weights: Optional[List[float]] = None, + ): self.problem = problem self.value = value self.weights = weights self.list_taken = list_taken - def copy(self): + def copy(self) -> "KnapsackSolutionMultidimensional": return KnapsackSolutionMultidimensional( problem=self.problem, value=self.value, @@ -263,7 +269,7 @@ def copy(self): list_taken=list(self.list_taken), ) - def lazy_copy(self): + def lazy_copy(self) -> "KnapsackSolutionMultidimensional": return KnapsackSolutionMultidimensional( problem=self.problem, value=self.value, @@ -271,25 +277,28 @@ def lazy_copy(self): list_taken=self.list_taken, ) - def change_problem(self, new_problem): - self.__init__( - problem=new_problem, - value=self.value, - weights=self.weights, - list_taken=list(self.list_taken), - ) + def change_problem(self, new_problem: Problem) -> None: + if not isinstance(new_problem, MultidimensionalKnapsack): + raise ValueError( + "new_problem must a MultidimensionalKnapsack for a KnapsackSolutionMultidimensional." + ) + self.problem = new_problem + self.list_taken = list(self.list_taken) - def __str__(self): + def __str__(self) -> str: s = "Value=" + str(self.value) + "\n" s += "Weights=" + str(self.weights) + "\n" s += "Taken : " + str(self.list_taken) return s - def __hash__(self): + def __hash__(self) -> int: return hash(str(self)) - def __eq__(self, other): - return self.list_taken == other.list_taken + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, KnapsackSolutionMultidimensional) + and self.list_taken == other.list_taken + ) @dataclass(frozen=True) @@ -298,7 +307,7 @@ class ItemMultidimensional: value: float weights: List[float] - def __str__(self): + def __str__(self) -> str: return ( "ind: " + str(self.index) @@ -351,14 +360,16 @@ def get_objective_register(self) -> ObjectiveRegister: dict_objective_to_doc=dict_objective, ) - def evaluate_from_encoding(self, int_vector, encoding_name) -> Dict[str, float]: + def evaluate_from_encoding( + self, int_vector: List[int], encoding_name: str + ) -> Dict[str, float]: if encoding_name == "list_taken": kp_sol = KnapsackSolutionMultidimensional( problem=self, list_taken=int_vector ) elif encoding_name == "custom": - kwargs = {encoding_name: int_vector, "problem": self} - kp_sol = KnapsackSolutionMultidimensional(**kwargs) + kwargs: Dict[str, Any] = {encoding_name: int_vector} + kp_sol = KnapsackSolutionMultidimensional(problem=self, **kwargs) else: raise NotImplementedError("encoding_name must be 'list_taken' or 'custom'") objectives = self.evaluate(kp_sol) @@ -397,6 +408,10 @@ def evaluate_value( def evaluate_weight_violation( self, knapsack_solution: KnapsackSolutionMultidimensional ) -> float: + if knapsack_solution.weights is None: + raise RuntimeError( + "knapsack_solution.weights should not be None when calling evaluate_weight_violation." + ) return sum( [ max(0.0, knapsack_solution.weights[j] - self.max_capacities[j]) @@ -405,14 +420,18 @@ def evaluate_weight_violation( ) def satisfy(self, knapsack_solution: KnapsackSolutionMultidimensional) -> bool: # type: ignore # avoid isinstance checks for efficiency - if knapsack_solution.value is None: + if knapsack_solution.value is None or knapsack_solution.weights is None: self.evaluate(knapsack_solution) + if knapsack_solution.value is None or knapsack_solution.weights is None: + raise RuntimeError( + "knapsack_solution.value and knapsack_solution.weights should not be None now." + ) return all( knapsack_solution.weights[j] <= self.max_capacities[j] for j in range(len(self.max_capacities)) ) - def __str__(self): + def __str__(self) -> str: s = ( "Knapsack model with " + str(self.nb_items) @@ -433,7 +452,7 @@ def get_dummy_solution(self) -> KnapsackSolutionMultidimensional: def get_solution_type(self) -> Type[Solution]: return KnapsackSolutionMultidimensional - def copy(self): + def copy(self) -> "MultidimensionalKnapsack": return MultidimensionalKnapsack( list_items=[deepcopy(x) for x in self.list_items], max_capacities=list(self.max_capacities), diff --git a/discrete_optimization/knapsack/knapsack_parser.py b/discrete_optimization/knapsack/knapsack_parser.py index 38be30b64..a882e64d4 100644 --- a/discrete_optimization/knapsack/knapsack_parser.py +++ b/discrete_optimization/knapsack/knapsack_parser.py @@ -3,7 +3,7 @@ # LICENSE file in the root directory of this source tree. import os -from typing import Optional +from typing import List, Optional from discrete_optimization.datasets import get_data_home from discrete_optimization.knapsack.knapsack_model import Item, KnapsackModel @@ -11,7 +11,7 @@ def get_data_available( data_folder: Optional[str] = None, data_home: Optional[str] = None -): +) -> List[str]: """Get datasets available for knapsack. Params: @@ -35,7 +35,9 @@ def get_data_available( return datasets -def parse_input_data(input_data, force_recompute_values: bool = False) -> KnapsackModel: +def parse_input_data( + input_data: str, force_recompute_values: bool = False +) -> KnapsackModel: """ Parse a string of the following form : item_count max_capacity @@ -59,7 +61,7 @@ def parse_input_data(input_data, force_recompute_values: bool = False) -> Knapsa ) -def parse_file(file_path, force_recompute_values=False) -> KnapsackModel: +def parse_file(file_path: str, force_recompute_values: bool = False) -> KnapsackModel: with open(file_path, "r", encoding="utf-8") as input_data_file: input_data = input_data_file.read() knapsack_model = parse_input_data( diff --git a/discrete_optimization/knapsack/knapsack_solvers.py b/discrete_optimization/knapsack/knapsack_solvers.py index cd7d15045..56ca89c2e 100644 --- a/discrete_optimization/knapsack/knapsack_solvers.py +++ b/discrete_optimization/knapsack/knapsack_solvers.py @@ -4,7 +4,11 @@ from typing import Any, Dict, List, Tuple, Type +from discrete_optimization.generic_tools.do_problem import Problem from discrete_optimization.generic_tools.lp_tools import ParametersMilp +from discrete_optimization.generic_tools.result_storage.result_storage import ( + ResultStorage, +) from discrete_optimization.knapsack.knapsack_model import KnapsackModel from discrete_optimization.knapsack.solvers.cp_solvers import ( CPKnapsackMZN, @@ -13,6 +17,7 @@ ) from discrete_optimization.knapsack.solvers.dyn_prog_knapsack import KnapsackDynProg from discrete_optimization.knapsack.solvers.greedy_solvers import GreedyBest +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack from discrete_optimization.knapsack.solvers.lp_solvers import ( KnapsackORTools, LPKnapsack, @@ -21,7 +26,7 @@ MilpSolverName, ) -solvers: Dict[str, List[Tuple[Type, Dict[str, Any]]]] = { +solvers: Dict[str, List[Tuple[Type[SolverKnapsack], Dict[str, Any]]]] = { "lp": [ (KnapsackORTools, {}), (LPKnapsackCBC, {}), @@ -57,18 +62,20 @@ for solver, param in solvers[key]: solvers_map[solver] = (key, param) -solvers_compatibility = {} +solvers_compatibility: Dict[Type[SolverKnapsack], List[Type[Problem]]] = {} for x in solvers: for y in solvers[x]: solvers_compatibility[y[0]] = [KnapsackModel] -def look_for_solver(domain): +def look_for_solver(domain: KnapsackModel) -> List[Type[SolverKnapsack]]: class_domain = domain.__class__ return look_for_solver_class(class_domain) -def look_for_solver_class(class_domain): +def look_for_solver_class( + class_domain: Type[KnapsackModel], +) -> List[Type[SolverKnapsack]]: available = [] for solver in solvers_compatibility: if class_domain in solvers_compatibility[solver]: @@ -76,7 +83,9 @@ def look_for_solver_class(class_domain): return available -def solve(method, knapsack_model: KnapsackModel, **args): +def solve( + method: Type[SolverKnapsack], knapsack_model: KnapsackModel, **args: Any +) -> ResultStorage: solver = method(knapsack_model) solver.init_model(**args) return solver.solve(**args) diff --git a/discrete_optimization/knapsack/mutation/mutation_knapsack.py b/discrete_optimization/knapsack/mutation/mutation_knapsack.py index a458e738d..8992ccceb 100644 --- a/discrete_optimization/knapsack/mutation/mutation_knapsack.py +++ b/discrete_optimization/knapsack/mutation/mutation_knapsack.py @@ -131,7 +131,7 @@ def __init__(self, knapsack_model: KnapsackModel, attribute: Optional[str] = Non self.attribute = attribute def switch_on( - self, variable: KnapsackSolution, come_from_outside=False + self, variable: KnapsackSolution, come_from_outside: bool = False ) -> Tuple[KnapsackSolution, LocalMove, Dict[str, float]]: if variable.weight is None or variable.value is None: raise RuntimeError( @@ -182,7 +182,7 @@ def switch_on( return self.switch_off(variable, True) def switch_off( - self, variable: KnapsackSolution, come_from_outside=False + self, variable: KnapsackSolution, come_from_outside: bool = False ) -> Tuple[KnapsackSolution, LocalMove, Dict[str, float]]: if variable.weight is None or variable.value is None: raise RuntimeError( diff --git a/discrete_optimization/knapsack/solvers/cp_solvers.py b/discrete_optimization/knapsack/solvers/cp_solvers.py index 50d425fc7..9ffa4a99d 100644 --- a/discrete_optimization/knapsack/solvers/cp_solvers.py +++ b/discrete_optimization/knapsack/solvers/cp_solvers.py @@ -6,9 +6,9 @@ import os import random from datetime import timedelta -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union -from minizinc import Instance, Model, Solver +from minizinc import Instance, Model, Result, Solver from discrete_optimization.generic_tools.cp_tools import ( CPSolver, @@ -34,6 +34,7 @@ MultidimensionalKnapsack, MultiScenarioMultidimensionalKnapsack, ) +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack logger = logging.getLogger(__name__) this_path = os.path.dirname(os.path.abspath(__file__)) @@ -43,7 +44,7 @@ class KnapsackSol: objective: int __output_item: Optional[str] = None - def __init__(self, objective, _output_item, **kwargs): + def __init__(self, objective: int, _output_item: Optional[str], **kwargs: Any): self.objective = objective self.dict = kwargs logger.debug(f"One solution {self.objective}") @@ -53,17 +54,17 @@ def check(self) -> bool: return True -class CPKnapsackMZN(MinizincCPSolver): +class CPKnapsackMZN(MinizincCPSolver, SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, cp_solver_name: CPSolverName = CPSolverName.CHUFFED, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, silent_solve_error: bool = False, - **args, + **kwargs: Any, ): + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.silent_solve_error = silent_solve_error - self.knapsack_model = knapsack_model self.cp_solver_name = cp_solver_name self.key_decision_variable = ["list_items"] ( @@ -75,7 +76,9 @@ def __init__( params_objective_function=params_objective_function, ) - def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStorage: + def retrieve_solutions( + self, result: Result, parameters_cp: ParametersCP + ) -> ResultStorage: intermediate_solutions = parameters_cp.intermediate_solution l_items = [] objectives = [] @@ -110,7 +113,7 @@ def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStora mode_optim=self.params_objective_function.sense_function, ) - def init_model(self, **args): + def init_model(self, **kwargs: Any) -> None: # Load n-Queens model from file model = Model(os.path.join(this_path, "../minizinc/knapsack_mzn.mzn")) # Find the MiniZinc solver configuration for Gecode @@ -130,17 +133,17 @@ def init_model(self, **args): self.instance = instance -class CPKnapsackMZN2(MinizincCPSolver): +class CPKnapsackMZN2(MinizincCPSolver, SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, cp_solver_name: CPSolverName = CPSolverName.CHUFFED, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, silent_solve_error: bool = False, - **args, + **kwargs: Any, ): + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.silent_solve_error = silent_solve_error - self.knapsack_model = knapsack_model self.cp_solver_name = cp_solver_name ( self.aggreg_sol, @@ -151,7 +154,7 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, **args): + def init_model(self, **kwargs: Any) -> None: # Load n-Queens model from file model = Model(os.path.join(this_path, "../minizinc/knapsack_global.mzn")) # Find the MiniZinc solver configuration for Gecode @@ -170,7 +173,9 @@ def init_model(self, **args): instance["max_capacity"] = self.knapsack_model.max_capacity self.instance = instance - def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStorage: + def retrieve_solutions( + self, result: Result, parameters_cp: ParametersCP + ) -> ResultStorage: l_items_taken = [] intermediate_solution = parameters_cp.intermediate_solution if intermediate_solution: @@ -202,10 +207,10 @@ def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStora mode_optim=self.params_objective_function.sense_function, ) - def retrieve(self, items_taken): + def retrieve(self, items_taken: List[int]) -> List[KnapsackSolution]: taken = [0] * self.knapsack_model.nb_items - weight = 0 - value = 0 + weight = 0.0 + value = 0.0 for i in range(len(items_taken)): if items_taken[i] != 0: taken[self.knapsack_model.list_items[i].index] = 1 @@ -226,12 +231,12 @@ def __init__( self, knapsack_model: MultidimensionalKnapsack, cp_solver_name: CPSolverName = CPSolverName.CHUFFED, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, silent_solve_error: bool = False, - **args, + **kwargs: Any, ): - self.silent_solve_error = silent_solve_error self.knapsack_model = knapsack_model + self.silent_solve_error = silent_solve_error self.cp_solver_name = cp_solver_name self.key_decision_variable = ["list_items"] ( @@ -244,12 +249,12 @@ def __init__( ) self.custom_output_type = False - def init_model(self, **args): + def init_model(self, **kwargs: Any) -> None: model = Model( os.path.join(this_path, "../minizinc/multidimension_knapsack.mzn") ) solver = Solver.lookup(map_cp_solver_name[self.cp_solver_name]) - custom_output_type = args.get("output_type", False) + custom_output_type = kwargs.get("output_type", False) if custom_output_type: model.output_type = KnapsackSol self.custom_output_type = True @@ -270,7 +275,9 @@ def init_model(self, **args): instance["max_capacity"] = self.knapsack_model.max_capacities self.instance = instance - def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStorage: + def retrieve_solutions( + self, result: Result, parameters_cp: ParametersCP + ) -> ResultStorage: intermediate_solutions = parameters_cp.intermediate_solution l_taken = [] objectives = [] @@ -308,9 +315,9 @@ def __init__( self, knapsack_model: MultiScenarioMultidimensionalKnapsack, cp_solver_name: CPSolverName = CPSolverName.CHUFFED, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, silent_solve_error: bool = False, - **args, + **kwargs: Any, ): self.silent_solve_error = silent_solve_error self.knapsack_model = knapsack_model @@ -326,17 +333,19 @@ def __init__( ) self.custom_output_type = False - def init_model(self, **args): + def init_model(self, **kwargs: Any) -> None: model = Model( os.path.join(this_path, "../minizinc/multidim_multiscenario_knapsack.mzn") ) solver = Solver.lookup(map_cp_solver_name[self.cp_solver_name]) - custom_output_type = args.get("output_type", False) + custom_output_type = kwargs.get("output_type", False) if custom_output_type: model.output_type = KnapsackSol self.custom_output_type = True instance = Instance(solver, model) - list_problems: List[MultidimensionalKnapsack] = self.knapsack_model.list_problem + list_problems: Sequence[ + MultidimensionalKnapsack + ] = self.knapsack_model.list_problem instance["nb_items"] = list_problems[0].nb_items instance["nb_dimension"] = len(list_problems[0].max_capacities) instance["nb_scenario"] = len(list_problems) @@ -363,7 +372,9 @@ def init_model(self, **args): ] self.instance = instance - def retrieve_solutions(self, result, parameters_cp: ParametersCP) -> ResultStorage: + def retrieve_solutions( + self, result: Result, parameters_cp: ParametersCP + ) -> ResultStorage: intermediate_solutions = parameters_cp.intermediate_solution l_taken = [] objectives = [] @@ -418,6 +429,13 @@ def adding_constraint_from_results_store( range_item = range(nb_item) subpart_item = set(random.sample(range_item, int(self.fraction_fix * nb_item))) current_best_solution = last_result_store.get_last_best_solution()[0] + if not isinstance( + current_best_solution, (KnapsackSolution, KnapsackSolutionMultidimensional) + ): + raise RuntimeError( + "current_best_solution must be a KnapsackSolution " + "or a KnapsackSolutionMultidimensional." + ) for i in range_item: if i in subpart_item: strings += [ @@ -433,9 +451,9 @@ def adding_constraint_from_results_store( def remove_constraints_from_previous_iteration( self, cp_solver: CPSolver, - child_instance, + child_instance: Instance, previous_constraints: Iterable[Any], - ): + ) -> None: if not isinstance(cp_solver, CPMultidimensionalMultiScenarioSolver): raise ValueError( "cp_solver must a CPMultidimensionalMultiScenarioSolver for this constraint." diff --git a/discrete_optimization/knapsack/solvers/dyn_prog_knapsack.py b/discrete_optimization/knapsack/solvers/dyn_prog_knapsack.py index 60a5280bf..6f74b4c43 100644 --- a/discrete_optimization/knapsack/solvers/dyn_prog_knapsack.py +++ b/discrete_optimization/knapsack/solvers/dyn_prog_knapsack.py @@ -4,7 +4,7 @@ import logging import time -from typing import Optional +from typing import Any, Optional import numpy as np @@ -20,17 +20,18 @@ KnapsackModel, KnapsackSolution, ) +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack logger = logging.getLogger(__name__) -class KnapsackDynProg(SolverDO): +class KnapsackDynProg(SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.nb_items = self.knapsack_model.nb_items capacity = int(self.knapsack_model.max_capacity) if capacity != self.knapsack_model.max_capacity: @@ -49,14 +50,11 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, **args): - pass - - def solve(self, **args) -> ResultStorage: - start_by_most_promising = args.get("greedy_start", False) - max_items = args.get("max_items", self.knapsack_model.nb_items + 1) + def solve(self, **kwargs: Any) -> ResultStorage: + start_by_most_promising = kwargs.get("greedy_start", False) + max_items = kwargs.get("max_items", self.knapsack_model.nb_items + 1) max_items = min(self.knapsack_model.nb_items + 1, max_items) - max_time_seconds = args.get("max_time_seconds", None) + max_time_seconds = kwargs.get("max_time_seconds", None) if max_time_seconds is None: do_time = False else: @@ -130,10 +128,10 @@ def solve(self, **args) -> ResultStorage: mode_optim=self.params_objective_function.sense_function, ) - def solve_np(self, **args) -> ResultStorage: - start_by_most_promising = args.get("greedy_start", False) - max_items = args.get("max_items", self.knapsack_model.nb_items + 1) - max_time_seconds = args.get("max_time_seconds", None) + def solve_np(self, **kwargs: Any) -> ResultStorage: + start_by_most_promising = kwargs.get("greedy_start", False) + max_items = kwargs.get("max_items", self.knapsack_model.nb_items + 1) + max_time_seconds = kwargs.get("max_time_seconds", None) if max_time_seconds is None: do_time = False else: diff --git a/discrete_optimization/knapsack/solvers/gphh_knapsack.py b/discrete_optimization/knapsack/solvers/gphh_knapsack.py index c329312a2..87885a409 100644 --- a/discrete_optimization/knapsack/solvers/gphh_knapsack.py +++ b/discrete_optimization/knapsack/solvers/gphh_knapsack.py @@ -4,11 +4,12 @@ import operator from enum import Enum -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Union import matplotlib.pyplot as plt import networkx as nx import numpy as np +import numpy.typing as npt from deap import algorithms, creator, gp, tools from deap.base import Fitness, Toolbox from deap.gp import ( @@ -26,6 +27,13 @@ build_aggreg_function_and_params_objective, ) from discrete_optimization.generic_tools.do_solver import SolverDO +from discrete_optimization.generic_tools.ghh_tools import ( + max_operator, + max_operator_list, + min_operator, + min_operator_list, + protected_div, +) from discrete_optimization.generic_tools.result_storage.result_storage import ( ResultStorage, ) @@ -35,29 +43,6 @@ ) -def protected_div(left, right): - if right != 0.0: - return left / right - else: - return 1.0 - - -def max_operator(left, right): - return max(left, right) - - -def min_operator(left, right): - return min(left, right) - - -def max_operator_list(list_): - return max(list_) - - -def min_operator_list(list_): - return min(list_) - - class FeatureEnum(Enum): PROFIT = "profit" CAPACITIES = "capacities" @@ -65,22 +50,24 @@ class FeatureEnum(Enum): AVG_RES_CONSUMPTION_DELTA_CAPACITY = "avg_res_consumption_delta_capacity" -def get_profit(problem: MultidimensionalKnapsack, item_index: int, **kwargs) -> float: +def get_profit( + problem: MultidimensionalKnapsack, item_index: int, **kwargs: Any +) -> float: return problem.list_items[item_index].value -def get_capacities(problem: MultidimensionalKnapsack, **kwargs) -> List[float]: +def get_capacities(problem: MultidimensionalKnapsack, **kwargs: Any) -> List[float]: return problem.max_capacities def get_res_consumption( - problem: MultidimensionalKnapsack, item_index, **kwargs + problem: MultidimensionalKnapsack, item_index: int, **kwargs: Any ) -> List[float]: return problem.list_items[item_index].weights def get_avg_res_consumption_delta_capacity( - problem: MultidimensionalKnapsack, item_index: int, **kwargs + problem: MultidimensionalKnapsack, item_index: int, **kwargs: Any ) -> float: return sum( [ @@ -91,7 +78,7 @@ def get_avg_res_consumption_delta_capacity( ) / len(problem.max_capacities) -feature_function_map: Dict[FeatureEnum, Callable[..., Any]] = { +feature_function_map: Dict[FeatureEnum, Callable[..., Union[float, List[float]]]] = { FeatureEnum.PROFIT: get_profit, FeatureEnum.CAPACITIES: get_capacities, FeatureEnum.RES_CONSUMPTION_ARRAY: get_res_consumption, @@ -125,7 +112,7 @@ def __init__( self.deap_verbose = deap_verbose @staticmethod - def default(): + def default() -> "ParametersGPHH": list_feature = [ FeatureEnum.PROFIT, FeatureEnum.CAPACITIES, @@ -178,7 +165,7 @@ def __init__( domain_model: MultidimensionalKnapsack, weight: int = 1, params_gphh: Optional[ParametersGPHH] = None, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, ): self.training_domains = training_domains self.domain_model = domain_model @@ -188,7 +175,9 @@ def __init__( self.params_gphh = params_gphh self.list_feature = self.params_gphh.list_feature self.list_feature_names = [feature.value for feature in self.list_feature] - self.pset: PrimitiveSet = self.init_primitives(self.params_gphh.set_primitves) + self.pset: PrimitiveSetTyped = self.init_primitives( + self.params_gphh.set_primitves + ) self.weight = weight ( self.aggreg_from_sol, @@ -199,7 +188,7 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self): + def init_model(self) -> None: tournament_ratio = self.params_gphh.tournament_ratio pop_size = self.params_gphh.pop_size min_tree_depth = self.params_gphh.min_tree_depth @@ -249,7 +238,7 @@ def init_model(self): mstats.register("min", np.min) mstats.register("max", np.max) - def solve(self, **kwargs) -> ResultStorage: + def solve(self, **kwargs: Any) -> ResultStorage: stats_fit = tools.Statistics(lambda ind: ind.fitness.values) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) @@ -281,7 +270,9 @@ def solve(self, **kwargs) -> ResultStorage: mode_optim=self.params_objective_function.sense_function, ) - def build_result_storage_for_domain(self, domain) -> ResultStorage: + def build_result_storage_for_domain( + self, domain: MultidimensionalKnapsack + ) -> ResultStorage: solution = self.build_solution( domain=domain, func_heuristic=self.func_heuristic ) @@ -292,18 +283,21 @@ def build_result_storage_for_domain(self, domain) -> ResultStorage: mode_optim=self.params_objective_function.sense_function, ) - def init_primitives(self, pset) -> PrimitiveSet: + def init_primitives(self, pset: PrimitiveSetTyped) -> PrimitiveSetTyped: for i in range(len(self.list_feature)): pset.renameArguments(**{"ARG" + str(i): self.list_feature[i].value}) return pset def build_solution( - self, domain: MultidimensionalKnapsack, individual=None, func_heuristic=None - ): + self, + domain: MultidimensionalKnapsack, + individual: Optional[Any] = None, + func_heuristic: Optional[Callable[..., float]] = None, + ) -> KnapsackSolutionMultidimensional: if func_heuristic is None: func_heuristic = self.toolbox.compile(expr=individual) d: MultidimensionalKnapsack = domain - raw_values = [] + raw_values: List[float] = [] for j in range(len(d.list_items)): input_features = [ feature_function_map[lf](problem=domain, item_index=j) @@ -343,7 +337,7 @@ def build_solution( return solution def evaluate_heuristic( - self, individual, domains: List[MultidimensionalKnapsack] + self, individual: Any, domains: List[MultidimensionalKnapsack] ) -> List[float]: vals = [] func_heuristic = self.toolbox.compile(expr=individual) @@ -351,12 +345,12 @@ def evaluate_heuristic( solution = self.build_solution( individual=individual, domain=domain, func_heuristic=func_heuristic ) - value = self.aggreg_dict(domain.evaluate(solution)) + value: float = self.aggreg_dict(domain.evaluate(solution)) # type: ignore # could also be TupleFitness vals.append(value) fitness = [np.mean(vals)] return [fitness[0] - 10 * self.evaluate_complexity(individual)] - def evaluate_complexity(self, individual): + def evaluate_complexity(self, individual: Any) -> float: all_primitives_list = [] all_features_list = [] for i in range(len(individual)): @@ -369,7 +363,7 @@ def evaluate_complexity(self, individual): val = 1.0 * n_operators + 1.0 * n_features return val - def plot_solution(self, show=True): + def plot_solution(self, show: bool = True) -> None: nodes, edges, labels = gp.graph(self.best_heuristic) g = nx.Graph() g.add_nodes_from(nodes) diff --git a/discrete_optimization/knapsack/solvers/greedy_solvers.py b/discrete_optimization/knapsack/solvers/greedy_solvers.py index c3ed04422..7e563d989 100644 --- a/discrete_optimization/knapsack/solvers/greedy_solvers.py +++ b/discrete_optimization/knapsack/solvers/greedy_solvers.py @@ -2,18 +2,22 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. +from typing import Any, Callable, List, Optional + from discrete_optimization.generic_tools.do_problem import ( ParamsObjectiveFunction, build_aggreg_function_and_params_objective, ) from discrete_optimization.generic_tools.do_solver import ResultStorage, SolverDO from discrete_optimization.knapsack.knapsack_model import ( + Item, KnapsackModel, KnapsackSolution, ) +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack -def compute_density(knapsack_model: KnapsackModel): +def compute_density(knapsack_model: KnapsackModel) -> List[Item]: dd = sorted( [ l @@ -26,7 +30,7 @@ def compute_density(knapsack_model: KnapsackModel): return dd -def compute_density_and_penalty(knapsack_model: KnapsackModel): +def compute_density_and_penalty(knapsack_model: KnapsackModel) -> List[Item]: dd = sorted( [ l @@ -40,12 +44,13 @@ def compute_density_and_penalty(knapsack_model: KnapsackModel): def greedy_using_queue( - knapsack_model: KnapsackModel, method_queue=None + knapsack_model: KnapsackModel, + method_queue: Optional[Callable[[KnapsackModel], List[Item]]] = None, ) -> KnapsackSolution: if method_queue is None: method_queue = compute_density - value = 0 - weight = 0 + value = 0.0 + weight = 0.0 taken = [0] * knapsack_model.nb_items sorted_per_density = method_queue(knapsack_model) for i in range(len(sorted_per_density)): @@ -70,13 +75,13 @@ def best_of_greedy(knapsack_model: KnapsackModel) -> KnapsackSolution: return result1 if result1.value > result2.value else result2 -class GreedyBest(SolverDO): +class GreedyBest(SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) ( self.aggreg_sol, self.aggreg_dict, @@ -86,10 +91,7 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, *args): - pass - - def solve(self): + def solve(self, **kwargs: Any) -> ResultStorage: res = best_of_greedy(self.knapsack_model) fit = self.aggreg_sol(res) return ResultStorage( @@ -99,13 +101,13 @@ def solve(self): ) -class GreedyDummy(SolverDO): +class GreedyDummy(SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, - params_objective_function: ParamsObjectiveFunction = None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) ( self.aggreg_sol, self.aggreg_dict, @@ -115,10 +117,7 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, *args): - pass - - def solve(self): + def solve(self, **kwargs: Any) -> ResultStorage: sol = KnapsackSolution( problem=self.knapsack_model, value=0, diff --git a/discrete_optimization/knapsack/solvers/knapsack_cpmpy.py b/discrete_optimization/knapsack/solvers/knapsack_cpmpy.py index 370747715..292bf34c5 100644 --- a/discrete_optimization/knapsack/solvers/knapsack_cpmpy.py +++ b/discrete_optimization/knapsack/solvers/knapsack_cpmpy.py @@ -19,40 +19,43 @@ KnapsackModel, KnapsackSolution, ) +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack -class CPMPYKnapsackSolver(SolverDO): +class CPMPYKnapsackSolver(SolverKnapsack): def __init__( self, - problem: KnapsackModel, + knapsack_model: KnapsackModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, ): - self.problem = problem + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) ( self.aggreg_sol, self.aggreg_from_dict_values, self.params_objective_function, ) = build_aggreg_function_and_params_objective( - self.problem, params_objective_function=params_objective_function + self.knapsack_model, params_objective_function=params_objective_function ) self.model: Optional[Model] = None self.variables: Dict[str, Any] = {} - def init_model(self): + def init_model(self, **kwargs: Any) -> None: values = [ - self.problem.list_items[i].value for i in range(self.problem.nb_items) + self.knapsack_model.list_items[i].value + for i in range(self.knapsack_model.nb_items) ] weights = [ - self.problem.list_items[i].weight for i in range(self.problem.nb_items) + self.knapsack_model.list_items[i].weight + for i in range(self.knapsack_model.nb_items) ] - capacity = self.problem.max_capacity + capacity = self.knapsack_model.max_capacity # Construct the model. - x = boolvar(shape=self.problem.nb_items, name="x") + x = boolvar(shape=self.knapsack_model.nb_items, name="x") self.model = Model(sum(x * weights) <= capacity, maximize=sum(x * values)) self.variables["x"] = x def solve( - self, parameters_cp: Optional[ParametersCP] = None, **kwargs + self, parameters_cp: Optional[ParametersCP] = None, **kwargs: Any ) -> ResultStorage: if parameters_cp is None: parameters_cp = ParametersCP.default() @@ -66,7 +69,7 @@ def solve( kwargs.get("solver", "ortools"), time_limit=parameters_cp.time_limit ) list_taken = self.variables["x"].value() - sol = KnapsackSolution(problem=self.problem, list_taken=list_taken) + sol = KnapsackSolution(problem=self.knapsack_model, list_taken=list_taken) fit = self.aggreg_sol(sol) return ResultStorage( [(sol, fit)], mode_optim=self.params_objective_function.sense_function diff --git a/discrete_optimization/knapsack/solvers/knapsack_lns_cp_solver.py b/discrete_optimization/knapsack/solvers/knapsack_lns_cp_solver.py index 595fdd283..c136444e6 100644 --- a/discrete_optimization/knapsack/solvers/knapsack_lns_cp_solver.py +++ b/discrete_optimization/knapsack/solvers/knapsack_lns_cp_solver.py @@ -38,7 +38,15 @@ def adding_constraint_from_results_store( int(self.fraction_to_fix * self.problem.nb_items), ) ) - current_solution: KnapsackSolution = result_storage.get_best_solution_fit()[0] + current_solution = result_storage.get_best_solution() + if current_solution is None: + raise ValueError( + "result_storage.get_best_solution() " "should not be None." + ) + if not isinstance(current_solution, KnapsackSolution): + raise ValueError( + "result_storage.get_best_solution() " "should be a KnapsackSolution." + ) list_strings = [] for item in subpart_item: list_strings += [ @@ -54,9 +62,9 @@ def adding_constraint_from_results_store( def remove_constraints_from_previous_iteration( self, cp_solver: CPSolver, - child_instance, + child_instance: Instance, previous_constraints: Iterable[Any], - ): + ) -> None: if not isinstance(cp_solver, CPKnapsackMZN2): raise ValueError("cp_solver must a CPKnapsackMZN2 for this constraint.") pass diff --git a/discrete_optimization/knapsack/solvers/knapsack_lns_solver.py b/discrete_optimization/knapsack/solvers/knapsack_lns_solver.py index 38d40cfe5..2badd8404 100644 --- a/discrete_optimization/knapsack/solvers/knapsack_lns_solver.py +++ b/discrete_optimization/knapsack/solvers/knapsack_lns_solver.py @@ -48,7 +48,7 @@ def __init__( problem=self.problem, params_objective_function=params_objective_function ) - def get_starting_solution(self): + def get_starting_solution(self) -> ResultStorage: if self.initial_method == InitialKnapsackMethod.GREEDY: greedy_solver = GreedyBest( self.problem, params_objective_function=self.params_objective_function @@ -87,7 +87,15 @@ def adding_constraint_from_results_store( int(self.fraction_to_fix * self.problem.nb_items), ) ) - current_solution: KnapsackSolution = result_storage.get_best_solution_fit()[0] + current_solution = result_storage.get_best_solution() + if current_solution is None: + raise ValueError( + "result_storage.get_best_solution() " "should not be None." + ) + if not isinstance(current_solution, KnapsackSolution): + raise ValueError( + "result_storage.get_best_solution() " "should be a KnapsackSolution." + ) dict_f_fixed = {} dict_f_start = {} start = [] @@ -110,7 +118,7 @@ def adding_constraint_from_results_store( def remove_constraints_from_previous_iteration( self, milp_solver: MilpSolver, previous_constraints: Mapping[Hashable, Any] - ): + ) -> None: if not isinstance(milp_solver, LPKnapsack): raise ValueError("milp_solver must a ColoringLP for this constraint.") if milp_solver.model is None: diff --git a/discrete_optimization/knapsack/solvers/knapsack_solver.py b/discrete_optimization/knapsack/solvers/knapsack_solver.py new file mode 100644 index 000000000..8d3866db1 --- /dev/null +++ b/discrete_optimization/knapsack/solvers/knapsack_solver.py @@ -0,0 +1,16 @@ +# Copyright (c) 2023 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from typing import Any + +from discrete_optimization.generic_tools.do_solver import SolverDO +from discrete_optimization.knapsack.knapsack_model import KnapsackModel + + +class SolverKnapsack(SolverDO): + def __init__(self, knapsack_model: KnapsackModel, **kwargs: Any): + self.knapsack_model = knapsack_model + + def init_model(self, **kwargs: Any) -> None: + pass diff --git a/discrete_optimization/knapsack/solvers/lp_solvers.py b/discrete_optimization/knapsack/solvers/lp_solvers.py index 0fdcb0435..6278baf7a 100644 --- a/discrete_optimization/knapsack/solvers/lp_solvers.py +++ b/discrete_optimization/knapsack/solvers/lp_solvers.py @@ -30,6 +30,7 @@ KnapsackModel, KnapsackSolution, ) +from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack try: import gurobipy @@ -43,16 +44,16 @@ logger = logging.getLogger(__name__) -class _BaseLPKnapsack(MilpSolver): +class _BaseLPKnapsack(MilpSolver, SolverKnapsack): """Base class for Knapsack LP solvers.""" def __init__( self, knapsack_model: KnapsackModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, - **args, + **kwargs: Any, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.variable_decision: Dict[str, Dict[int, Union["Var", mip.Var]]] = {} self.constraints_dict: Dict[ str, Union["Constr", "QConstr", "MConstr", "GenConstr", "mip.Constr"] @@ -68,7 +69,7 @@ def __init__( params_objective_function=params_objective_function, ) - def retrieve_solutions(self, parameters_milp: ParametersMilp): + def retrieve_solutions(self, parameters_milp: ParametersMilp) -> ResultStorage: if parameters_milp.retrieve_all_solution: n_solutions = min(parameters_milp.n_solutions_max, self.nb_solutions) else: @@ -104,8 +105,8 @@ def retrieve_solutions(self, parameters_milp: ParametersMilp): class LPKnapsackGurobi(GurobiMilpSolver, _BaseLPKnapsack): - def init_model(self, **args): - warm_start = args.get("warm_start", {}) + def init_model(self, **kwargs: Any) -> None: + warm_start = kwargs.get("warm_start", {}) self.model = Model("Knapsack") self.variable_decision = {"x": {}} self.description_variable_description = { @@ -150,14 +151,14 @@ def retrieve_solutions(self, parameters_milp: ParametersMilp) -> ResultStorage: return _BaseLPKnapsack.retrieve_solutions(self, parameters_milp=parameters_milp) -class LPKnapsackCBC(SolverDO): +class LPKnapsackCBC(SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, - **args, + **kwargs: Any, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.model: Optional[pywraplp.Solver] = None self.variable_decision: Dict[str, Dict[int, Any]] = {} self.constraints_dict: Dict[str, Any] = {} @@ -172,7 +173,9 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, warm_start: Optional[Dict[int, int]] = None): + def init_model( + self, warm_start: Optional[Dict[int, int]] = None, **kwargs: Any + ) -> None: if warm_start is None: warm_start = {} self.description_variable_description = { @@ -208,7 +211,7 @@ def init_model(self, warm_start: Optional[Dict[int, int]] = None): self.model = S self.variable_decision["x"] = x - def solve(self, **kwargs): + def solve(self, **kwargs: Any) -> ResultStorage: if self.model is None: self.init_model() if self.model is None: # for mypy @@ -230,7 +233,7 @@ def solve(self, **kwargs): objective = self.model.Objective().Value() xs = {} x = self.variable_decision["x"] - weight = 0 + weight = 0.0 for i in x: sv = x[i].solution_value() if sv >= 0.5: @@ -258,19 +261,19 @@ def __init__( knapsack_model: KnapsackModel, milp_solver_name: MilpSolverName, params_objective_function: Optional[ParamsObjectiveFunction] = None, - **args, + **kwargs: Any, ): super().__init__( knapsack_model=knapsack_model, params_objective_function=params_objective_function, - **args, + **kwargs, ) self.milp_solver_name = milp_solver_name self.solver_name = map_solver[milp_solver_name] - def init_model(self, **args): - warm_start = args.get("warm_start", {}) - solver_name = args.get("solver_name", self.solver_name) + def init_model(self, **kwargs: Any) -> None: + warm_start = kwargs.get("warm_start", {}) + solver_name = kwargs.get("solver_name", self.solver_name) self.model = MyModelMilp("Knapsack", solver_name=solver_name, sense=MAXIMIZE) self.variable_decision = {"x": {}} self.description_variable_description = { @@ -311,14 +314,14 @@ def retrieve_solutions(self, parameters_milp: ParametersMilp) -> ResultStorage: return _BaseLPKnapsack.retrieve_solutions(self, parameters_milp=parameters_milp) -class KnapsackORTools(SolverDO): +class KnapsackORTools(SolverKnapsack): def __init__( self, knapsack_model: KnapsackModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, - **args, + **kwargs: Any, ): - self.knapsack_model = knapsack_model + SolverKnapsack.__init__(self, knapsack_model=knapsack_model) self.model = None ( self.aggreg_sol, @@ -329,7 +332,7 @@ def __init__( params_objective_function=params_objective_function, ) - def init_model(self, **kwargs): + def init_model(self, **kwargs: Any) -> None: solver = pywrapknapsack_solver.KnapsackSolver( pywrapknapsack_solver.KnapsackSolver.KNAPSACK_MULTIDIMENSION_BRANCH_AND_BOUND_SOLVER, "KnapsackExample", @@ -342,9 +345,13 @@ def init_model(self, **kwargs): solver.Init(values, weights, capacities) self.model = solver - def solve(self, **kwargs): + def solve(self, **kwargs: Any) -> ResultStorage: if self.model is None: self.init_model(**kwargs) + if self.model is None: + raise RuntimeError( + "self.model must not be None after self.init_model()." + ) computed_value = self.model.Solve() logger.debug(f"Total value = {computed_value}") xs = {} diff --git a/examples/knapsack/knapsack_cpmyp.py b/examples/knapsack/knapsack_cpmyp.py index 46fa2deb5..45a11713a 100644 --- a/examples/knapsack/knapsack_cpmyp.py +++ b/examples/knapsack/knapsack_cpmyp.py @@ -30,7 +30,7 @@ def run(): # sol_lp = res.get_best_solution() a = SolverLookup.base_solvers() print(SolverLookup.base_solvers()) - solver = CPMPYKnapsackSolver(problem=knapsack_model) + solver = CPMPYKnapsackSolver(knapsack_model=knapsack_model) solver.init_model() parameters_cp = ParametersCP.default() parameters_cp.time_limit = 20 diff --git a/tests/knapsack/test_knapsack_cpmyp.py b/tests/knapsack/test_knapsack_cpmyp.py index 7de4c984c..56545d9f5 100644 --- a/tests/knapsack/test_knapsack_cpmyp.py +++ b/tests/knapsack/test_knapsack_cpmyp.py @@ -14,7 +14,7 @@ def test_knapsack_cpmyp(): file = [f for f in get_data_available() if "ks_30_0" in f][0] knapsack_model = parse_file(file) - solver = CPMPYKnapsackSolver(problem=knapsack_model) + solver = CPMPYKnapsackSolver(knapsack_model=knapsack_model) solver.init_model() parameters_cp = ParametersCP.default() parameters_cp.time_limit = 20