diff --git a/discrete_optimization/coloring/solvers/coloring_quantum.py b/discrete_optimization/coloring/solvers/coloring_quantum.py new file mode 100644 index 000000000..4b181ed1f --- /dev/null +++ b/discrete_optimization/coloring/solvers/coloring_quantum.py @@ -0,0 +1,309 @@ +# Copyright (c) 2024 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import logging +from typing import Optional, Union + +import numpy as np + +from discrete_optimization.coloring.coloring_model import ( + ColoringProblem, + ColoringSolution, +) +from discrete_optimization.coloring.solvers.coloring_solver import SolverColoring +from discrete_optimization.generic_tools.do_problem import ( + ParamsObjectiveFunction, + Solution, +) +from discrete_optimization.generic_tools.qiskit_tools import ( + QiskitQAOASolver, + QiskitVQESolver, + qiskit_available, +) + +logger = logging.getLogger(__name__) + +if qiskit_available: + from qiskit_optimization import QuadraticProgram + from qiskit_optimization.algorithms import OptimizationResult + from qiskit_optimization.applications import OptimizationApplication +else: + msg = ( + "ColoringQiskit_MinimizeNbColor, QAOAColoringSolver_MinimizeNbColor, VQEColoringSolver_MinimizeNbColor, " + "ColoringQiskit_FeasibleNbColor, QAOAColoringSolver_FeasibleNbColor and VQEColoringSolver_FeasibleNbColor, " + "need qiskit, qiskit_aer, qiskit_algorithms, qiskit_ibm_runtime, " + "and qiskit_optimization to be installed." + "You can use the command `pip install discrete-optimization[quantum]` to install them." + ) + logger.warning(msg) + OptimizationApplication = object + OptimizationResult = object + QuadraticProgram = object + + +class ColoringQiskit_MinimizeNbColor(OptimizationApplication): + def __init__(self, problem: ColoringProblem, nb_max_color=None) -> None: + """ + Args: + problem : the coloring problem instance + """ + self.problem = problem + if nb_max_color is None: + nb_max_color = self.problem.number_of_nodes + self.nb_max_color = nb_max_color + self.nb_variable = ( + self.problem.number_of_nodes * self.nb_max_color + self.nb_max_color + ) + + def to_quadratic_program(self) -> QuadraticProgram: + quadratic_program = QuadraticProgram() + + # X_i,j == 1 si le noeud i prend la couleur j + # C_j == 1 si la couleur j est choisit au moins une fois + + p = self.nb_max_color + + var_names = {} + for i in range(0, self.nb_max_color): + for j in range(0, self.problem.number_of_nodes): + x_new = quadratic_program.binary_var("x" + str(j) + str(i)) + var_names[(j, i)] = x_new.name + color_new = quadratic_program.binary_var("color" + str(i)) + var_names[i] = color_new.name + + # on cherche à minimiser le nombre de couleurs utilisées + + constant = 0 + linear = {} + quadratic = {} + + for i in range(0, self.nb_max_color): + quadratic[var_names[i], var_names[i]] = 1 + + """ + On va ici intégrer sous forme de pénalité les différentes contraintes afin d'avoir directement une formulation QUBO + x <= y devient P(x-xy) + x1 + ... + xi = 1 devient P(-x1 + ... + -xi + 2x1x2 + ... + 2x1xi + 2x2x3 + .... + 2x2xi + ... + 2x(i-1)xi) + x + y <= 1 devient P(xy) + où P est un scalaire qui doit idéalement être ni trop petit, ni trop grand (ici on prend le nombre de couleur max autorisé) + """ + + # si une couleur j est attribué à un noeud, la contrainte C_j doit valoir 1 + for i in range(0, self.problem.number_of_nodes): + for j in range(0, self.nb_max_color): + quadratic[var_names[(i, j)], var_names[(i, j)]] = p + quadratic[var_names[(i, j)], var_names[j]] = -p + + # chaque noeud doit avoir une unique couleur + for i in range(0, self.problem.number_of_nodes): + for j in range(0, self.nb_max_color): + quadratic[var_names[(i, j)], var_names[(i, j)]] += -p + for k in range(j + 1, self.nb_max_color): + quadratic[var_names[(i, j)], var_names[(i, k)]] = 2 * p + constant += p + + # deux noeuds adjacents ne peuvent avoir la même couleur + for edge in self.problem.graph.graph_nx.edges(): + for j in range(0, self.nb_max_color): + quadratic[ + var_names[(self.problem.index_nodes_name[edge[0]], j)], + var_names[(self.problem.index_nodes_name[edge[1]], j)], + ] = p + + quadratic_program.minimize(constant, linear, quadratic) + + return quadratic_program + + def interpret(self, result: Union[OptimizationResult, np.ndarray]): + + x = self._result_to_x(result) + + colors = [0] * self.problem.number_of_nodes + nb_color = 0 + + for node in range(0, self.problem.number_of_nodes): + color_find = False + color = 0 + while not color_find and color < self.nb_max_color: + if x[self.problem.number_of_nodes * color + node + color] == 1: + colors[node] = color + color_find = True + color += 1 + + # TODO think about what we want to do when a node has no color + + for color in range(0, self.nb_max_color): + if ( + x[ + self.problem.number_of_nodes * color + + self.problem.number_of_nodes + + color + ] + == 1 + ): + nb_color += 1 + + sol = ColoringSolution(self.problem, colors=colors, nb_color=nb_color) + + return sol + + +class QAOAColoringSolver_MinimizeNbColor(SolverColoring, QiskitQAOASolver): + def __init__( + self, + problem: ColoringProblem, + nb_max_color=None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, + ): + super().__init__(problem, params_objective_function) + self.coloring_qiskit = ColoringQiskit_MinimizeNbColor( + problem, nb_max_color=nb_max_color + ) + + def init_model(self): + self.quadratic_programm = self.coloring_qiskit.to_quadratic_program() + + def retrieve_current_solution(self, result) -> Solution: + return self.coloring_qiskit.interpret(result) + + +class VQEColoringSolver_MinimizeNbColor(SolverColoring, QiskitVQESolver): + def __init__( + self, + problem: ColoringProblem, + nb_max_color=None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, + ): + super().__init__(problem, params_objective_function) + self.coloring_qiskit = ColoringQiskit_MinimizeNbColor( + problem, nb_max_color=nb_max_color + ) + + def init_model(self): + self.quadratic_programm = self.coloring_qiskit.to_quadratic_program() + self.nb_variable = self.coloring_qiskit.nb_variable + + def retrieve_current_solution(self, result) -> Solution: + return self.coloring_qiskit.interpret(result) + + +class ColoringQiskit_FeasibleNbColor(OptimizationApplication): + def __init__(self, problem: ColoringProblem, nb_color=None) -> None: + """ + Args: + problem : the coloring problem instance + """ + self.problem = problem + if nb_color is None: + nb_color = self.problem.number_of_nodes + self.nb_color = nb_color + self.nb_variable = self.problem.number_of_nodes * self.nb_color + + def to_quadratic_program(self) -> QuadraticProgram: + quadratic_program = QuadraticProgram() + + # C_j == 1 si la couleur j est choisit au moins une fois + + var_names = {} + for i in range(0, self.nb_color): + for j in range(0, self.problem.number_of_nodes): + x_new = quadratic_program.binary_var("x" + str(j) + str(i)) + var_names[(j, i)] = x_new.name + + # on cherche à savoir si il est possible de satisfaire le problème de coloring avec ce nombre de couleur + + constant = 0 + linear = {} + quadratic = {} + + """ + On va ici intégrer sous forme de pénalité les différentes contraintes afin d'avoir directement une formulation QUBO + x1 + ... + xi = 1 devient P(-x1 + ... + -xi + 2x1x2 + ... + 2x1xi + 2x2x3 + .... + 2x2xi + ... + 2x(i-1)xi) + x + y <= 1 devient P(xy) + où P est un scalaire qui doit idéalement être ni trop petit, ni trop grand (ici on prend le nombre de couleur max autorisé) + """ + + p = self.nb_color + + # chaque noeud doit avoir une unique couleur + for i in range(0, self.problem.number_of_nodes): + for j in range(0, self.nb_color): + quadratic[var_names[(i, j)], var_names[(i, j)]] = -p + for k in range(j + 1, self.nb_color): + quadratic[var_names[(i, j)], var_names[(i, k)]] = 2 * p + + # deux noeuds adjacents ne peuvent avoir la même couleur + for edge in self.problem.graph.graph_nx.edges(): + for j in range(0, self.nb_color): + quadratic[ + var_names[(self.problem.index_nodes_name[edge[0]], j)], + var_names[(self.problem.index_nodes_name[edge[1]], j)], + ] = p + + quadratic_program.minimize(constant, linear, quadratic) + + return quadratic_program + + def interpret(self, result: Union[OptimizationResult, np.ndarray]): + + x = self._result_to_x(result) + + colors = [0] * self.problem.number_of_nodes + + color_used = set() + + for node in range(0, self.problem.number_of_nodes): + color_find = False + color = 0 + while not color_find and color < self.nb_color: + if x[self.problem.number_of_nodes * color + node] == 1: + colors[node] = color + color_find = True + color_used.add(color) + color += 1 + + # TODO think about what we want to do when a node has no color + + sol = ColoringSolution(self.problem, colors=colors, nb_color=len(color_used)) + + return sol + + +class QAOAColoringSolver_FeasibleNbColor(SolverColoring, QiskitQAOASolver): + def __init__( + self, + problem: ColoringProblem, + nb_color=None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, + ): + super().__init__(problem, params_objective_function) + self.coloring_qiskit = ColoringQiskit_FeasibleNbColor( + problem, nb_color=nb_color + ) + + def init_model(self): + self.quadratic_programm = self.coloring_qiskit.to_quadratic_program() + + def retrieve_current_solution(self, result) -> Solution: + return self.coloring_qiskit.interpret(result) + + +class VQEColoringSolver_FeasibleNbColor(SolverColoring, QiskitVQESolver): + def __init__( + self, + problem: ColoringProblem, + nb_color=None, + params_objective_function: Optional[ParamsObjectiveFunction] = None, + ): + super().__init__(problem, params_objective_function) + self.coloring_qiskit = ColoringQiskit_FeasibleNbColor( + problem, nb_color=nb_color + ) + + def init_model(self): + self.quadratic_programm = self.coloring_qiskit.to_quadratic_program() + self.nb_variable = self.coloring_qiskit.nb_variable + + def retrieve_current_solution(self, result) -> Solution: + return self.coloring_qiskit.interpret(result) diff --git a/discrete_optimization/generic_tools/qiskit_tools.py b/discrete_optimization/generic_tools/qiskit_tools.py index 20b45eaea..fcfcffbc7 100644 --- a/discrete_optimization/generic_tools/qiskit_tools.py +++ b/discrete_optimization/generic_tools/qiskit_tools.py @@ -12,6 +12,14 @@ Solution, ) from discrete_optimization.generic_tools.do_solver import SolverDO +from discrete_optimization.generic_tools.hyperparameters.hyperparameter import ( + CategoricalHyperparameter, + FloatHyperparameter, + IntegerHyperparameter, +) +from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import ( + Hyperparametrizable, +) from discrete_optimization.generic_tools.result_storage.result_storage import ( ResultStorage, ) @@ -102,23 +110,27 @@ def cost_func(params, ansatz, hamiltonian, estimator, callback_dict): def execute_ansatz_with_Hamiltonian( - backend, ansatz, hamiltonian, **kwargs + backend, ansatz, hamiltonian, use_session: Optional[bool] = False, **kwargs ) -> np.ndarray: """ @param backend: the backend use to run the circuit (simulator or real device) @param ansatz: the quantum circuit @param hamiltonian: the hamiltonian corresponding to the problem - @param kwargs: a list of parameters who can be specified + @param use_session: boolean to set to True for use a session + @param kwargs: a list of hyperparameters who can be specified @return: the qubit's value the must often chose """ if backend is None: backend = AerSimulator() - with_session = kwargs.get("with_session", False) - optimization_level = kwargs.get("optimization_level", 3) - nb_shots = kwargs.get("nb_shot", 10000) - maxiter = kwargs.get("maxiter", None) - disp = kwargs.get("disp", False) + """ + if use_session: + print("To use a session you need to use a real device not a simulator") + use_session = False + """ + + optimization_level = kwargs["optimization_level"] + nb_shots = kwargs["nb_shots"] # transpile and optimize the quantum circuit depending on the device who are going to use # there are four level_optimization, to 0 to 3, 3 is the better but also the longest @@ -126,11 +138,11 @@ def execute_ansatz_with_Hamiltonian( pm = generate_preset_pass_manager( target=target, optimization_level=optimization_level ) - ansatz = pm.run(ansatz) - hamiltonian = hamiltonian.apply_layout(ansatz.layout) + new_ansatz = pm.run(ansatz) + hamiltonian = hamiltonian.apply_layout(new_ansatz.layout) - # open a session - if with_session: + # open a session if desired + if use_session: session = Session(backend=backend, max_time="2h") else: session = None @@ -149,17 +161,51 @@ def execute_ansatz_with_Hamiltonian( "cost_history": [], } # step of minimization - res = minimize( - cost_func, - validate_initial_point(point=None, circuit=ansatz), - args=(ansatz, hamiltonian, estimator, callback_dict), - method="COBYLA", - bounds=validate_bounds(ansatz), - options={"maxiter": maxiter, "disp": disp}, - ) + if kwargs.get("optimizer"): + + def fun(x): + pub = (new_ansatz, [hamiltonian], [x]) + result = estimator.run(pubs=[pub]).result() + cost = result[0].data.evs[0] + callback_dict["iters"] += 1 + callback_dict["prev_vector"] = x + callback_dict["cost_history"].append(cost) + print(f"Iters. done: {callback_dict['iters']} [Current cost: {cost}]") + return cost + + optimizer = kwargs["optimizer"] + res = optimizer.minimize( + fun, + validate_initial_point(point=None, circuit=ansatz), + bounds=validate_bounds(ansatz), + ) + + else: + + method = kwargs["method"] + if kwargs.get("options"): + options = kwargs["options"] + else: + if method == "COBYLA": + options = { + "maxiter": kwargs["maxiter"], + "rhobeg": kwargs["rhobeg"], + "tol": kwargs["tol"], + } + else: + options = {} + + res = minimize( + cost_func, + validate_initial_point(point=None, circuit=ansatz), + args=(new_ansatz, hamiltonian, estimator, callback_dict), + method=method, + bounds=validate_bounds(ansatz), + options=options, + ) # Assign solution parameters to our ansatz - qc = ansatz.assign_parameters(res.x) + qc = new_ansatz.assign_parameters(res.x) # Add measurements to our circuit qc.measure_all() # transpile our circuit @@ -167,22 +213,45 @@ def execute_ansatz_with_Hamiltonian( # run our circuit with optimal parameters find at the minimization step results = sampler.run([qc]).result() # extract a dictionnary of results, key is binary values of variable and value is number of time of these values has been found - result = get_result_from_dict_result(results[0].data.meas.get_counts()) + best_result = get_result_from_dict_result(results[0].data.meas.get_counts()) # Close the session since we are now done with it - if with_session: + if use_session: # with_session: session.close() - return result + return best_result -class QiskitQAOASolver(SolverDO): +class QiskitSolver(SolverDO): + def __init__( + self, + problem: Problem, + params_objective_function: Optional[ParamsObjectiveFunction] = None, + ): + super().__init__(problem, params_objective_function) + + +class QiskitQAOASolver(QiskitSolver, Hyperparametrizable): + + hyperparameters = [ + IntegerHyperparameter(name="reps", low=1, high=6, default=2), + IntegerHyperparameter(name="optimization_level", low=0, high=3, default=1), + CategoricalHyperparameter(name="method", choices=["COBYLA"], default="COBYLA"), + IntegerHyperparameter( + name="nb_shots", low=10000, high=100000, step=10000, default=10000 + ), + IntegerHyperparameter(name="maxiter", low=100, high=1000, step=50, default=300), + FloatHyperparameter(name="rhobeg", low=0.5, high=1.5, default=1.0), + CategoricalHyperparameter(name="tol", choices=[1e-1, 1e-2, 1e-3], default=1e-2), + # TODO rajouter initial_point et initial_bound dans les hyperparams ? + # TODO add mixer_operator comme hyperparam + ] + def __init__( self, problem: Problem, params_objective_function: Optional[ParamsObjectiveFunction] = None, backend: Optional = None, - **kwargs: Any, ): super().__init__(problem, params_objective_function) self.quadratic_programm = None @@ -192,10 +261,13 @@ def solve( self, callbacks: Optional[List[Callback]] = None, backend: Optional = None, + use_session: Optional[bool] = False, **kwargs: Any, ) -> ResultStorage: - reps = kwargs.get("reps", 2) + kwargs = self.complete_with_default_hyperparameters(kwargs) + + reps = kwargs["reps"] if backend is not None: self.backend = backend @@ -211,9 +283,19 @@ def solve( qubo = conv.convert(self.quadratic_programm) hamiltonian, offset = qubo.to_ising() ansatz = QAOAAnsatz(hamiltonian, reps=reps) + """ + by default only hyperparameters of the mixer operator are initialized + but for some optimizer we need to initialize also hyperparameters of the cost operator + """ + bounds = [] + for hp in ansatz.parameter_bounds: + if hp == (None, None): + hp = (0, np.pi) + bounds.append(hp) + ansatz.parameter_bounds = bounds result = execute_ansatz_with_Hamiltonian( - self.backend, ansatz, hamiltonian, **kwargs + self.backend, ansatz, hamiltonian, use_session, **kwargs ) result = conv.interpret(result) @@ -241,13 +323,25 @@ def retrieve_current_solution(self, result) -> Solution: ... -class QiskitVQESolver(SolverDO): +class QiskitVQESolver(QiskitSolver): + + hyperparameters = [ + IntegerHyperparameter(name="optimization_level", low=0, high=3, default=1), + CategoricalHyperparameter(name="method", choices=["COBYLA"], default="COBYLA"), + IntegerHyperparameter( + name="nb_shots", low=10000, high=100000, step=10000, default=10000 + ), + IntegerHyperparameter(name="maxiter", low=100, high=2000, step=50, default=300), + FloatHyperparameter(name="rhobeg", low=0.5, high=1.5, default=1.0), + CategoricalHyperparameter(name="tol", choices=[1e-1, 1e-2, 1e-3], default=1e-2), + # TODO rajouter initial_point et initial_bound dans les hyperparams ? + ] + def __init__( self, problem: Problem, params_objective_function: Optional[ParamsObjectiveFunction] = None, backend: Optional = None, - **kwargs: Any, ): super().__init__(problem, params_objective_function) self.quadratic_programm = None @@ -258,9 +352,12 @@ def solve( self, callbacks: Optional[List[Callback]] = None, backend: Optional = None, + use_session: Optional[bool] = False, **kwargs: Any, ) -> ResultStorage: + kwargs = self.complete_with_default_hyperparameters(kwargs) + if backend is not None: self.backend = backend @@ -281,7 +378,7 @@ def solve( ansatz = EfficientSU2(hamiltonian.num_qubits) result = execute_ansatz_with_Hamiltonian( - self.backend, ansatz, hamiltonian, **kwargs + self.backend, ansatz, hamiltonian, use_session, **kwargs ) result = conv.interpret(result) diff --git a/discrete_optimization/generic_tools/quantum_solvers.py b/discrete_optimization/generic_tools/quantum_solvers.py new file mode 100644 index 000000000..bd6bed15b --- /dev/null +++ b/discrete_optimization/generic_tools/quantum_solvers.py @@ -0,0 +1,118 @@ +"""Utility module to launch different solvers on the coloring problem.""" + +# Copyright (c) 2022 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from typing import Any, Dict, List, Tuple, Type, Union + +from discrete_optimization.coloring.solvers.coloring_quantum import ( + QAOAColoringSolver_FeasibleNbColor, + QAOAColoringSolver_MinimizeNbColor, + VQEColoringSolver_FeasibleNbColor, + VQEColoringSolver_MinimizeNbColor, +) +from discrete_optimization.coloring.solvers.greedy_coloring import ColoringProblem +from discrete_optimization.generic_tools.qiskit_tools import QiskitSolver +from discrete_optimization.generic_tools.result_storage.result_storage import ( + ResultStorage, +) +from discrete_optimization.maximum_independent_set.mis_model import MisProblem +from discrete_optimization.maximum_independent_set.solvers.mis_quantum import ( + QAOAMisSolver, + VQEMisSolver, +) + +solvers_coloring: Dict[str, List[Tuple[Type[QiskitSolver], Dict[str, Any]]]] = { + "qaoa": [ + ( + QAOAColoringSolver_MinimizeNbColor, + {}, + ), + ( + QAOAColoringSolver_FeasibleNbColor, + {}, + ), + ], + "vqe": [ + ( + VQEColoringSolver_MinimizeNbColor, + {}, + ), + ( + VQEColoringSolver_FeasibleNbColor, + {}, + ), + ], +} + +solvers_map_coloring = {} +for key in solvers_coloring: + for solver, param in solvers_coloring[key]: + solvers_map_coloring[solver] = (key, param) + + +solvers_mis: Dict[str, List[Tuple[Type[QiskitSolver], Dict[str, Any]]]] = { + "qaoa": [ + ( + QAOAMisSolver, + {}, + ), + ], + "vqe": [ + ( + VQEMisSolver, + {}, + ), + ], +} + +solvers_map_mis = {} +for key in solvers_mis: + for solver, param in solvers_mis[key]: + solvers_map_mis[solver] = (key, param) + + +def solve( + method: Type[QiskitSolver], problem: MisProblem, **kwargs: Any +) -> ResultStorage: + """Solve a problem instance with a given class of solver. + + Args: + method: class of the solver to use + problem: problem instance + **args: specific options of the solver + + Returns: a ResultsStorage objecting obtained by the solver. + + """ + solver_ = method(problem, **kwargs) + try: + + solver_.init_model() + except AttributeError: + pass + return solver_.solve(**kwargs) + + +def solve_coloring( + method: Type[QiskitSolver], problem: ColoringProblem, nb_color, **kwargs: Any +) -> ResultStorage: + """Solve a problem instance with a given class of solver. + + Args: + method: class of the solver to use + problem: problem instance + nb_color: the number of colors or the max number of colors + **args: specific options of the solver + + Returns: a ResultsStorage objecting obtained by the solver. + + """ + solver_ = method(problem, nb_color) + try: + + solver_.init_model() + except AttributeError: + pass + return solver_.solve(**kwargs) diff --git a/discrete_optimization/maximum_independent_set/mis_plot.py b/discrete_optimization/maximum_independent_set/mis_plot.py index 62fea533d..b36d6c5c5 100644 --- a/discrete_optimization/maximum_independent_set/mis_plot.py +++ b/discrete_optimization/maximum_independent_set/mis_plot.py @@ -15,14 +15,20 @@ def plot_mis_solution(solution: MisSolution, name_figure: str = ""): problem: MisProblem = solution.problem graph_nx = problem.graph_nx color_map = [] + labels = {} + ind = 0 for node in solution.chosen: if node == 1: color_map.append("red") + labels[problem.index_to_nodes[ind]] = str(problem.index_to_nodes[ind]) else: color_map.append("blue") + labels[problem.index_to_nodes[ind]] = str(problem.index_to_nodes[ind]) + ind += 1 pos = nx.kamada_kawai_layout(graph_nx) fig, ax = plt.subplots(1) nx.draw_networkx_nodes(graph_nx, pos=pos, ax=ax, node_color=color_map) + nx.draw_networkx_labels(graph_nx, pos=pos, ax=ax, labels=labels) nx.draw_networkx_edges(graph_nx, pos=pos, ax=ax) ax.set_title(name_figure) plt.show() @@ -31,13 +37,10 @@ def plot_mis_solution(solution: MisSolution, name_figure: str = ""): def plot_mis_graph(problem: MisProblem, name_figure: str = ""): graph_nx = problem.graph_nx pos = nx.kamada_kawai_layout(graph_nx) + labels = {n: str(n) for n in problem.graph_nx.nodes()} fig, ax = plt.subplots(1) - nx.draw_networkx_nodes( - graph_nx, - pos=pos, - nodelist=problem.graph_nx.nodes(), - ax=ax, - ) + nx.draw_networkx_nodes(graph_nx, pos=pos, nodelist=problem.graph_nx.nodes(), ax=ax) + nx.draw_networkx_labels(graph_nx, pos=pos, ax=ax, labels=labels) nx.draw_networkx_edges(graph_nx, pos=pos, ax=ax) ax.set_title(name_figure) plt.show() diff --git a/discrete_optimization/maximum_independent_set/solvers/mis_quantum.py b/discrete_optimization/maximum_independent_set/solvers/mis_quantum.py index b12316eb7..571479ae9 100644 --- a/discrete_optimization/maximum_independent_set/solvers/mis_quantum.py +++ b/discrete_optimization/maximum_independent_set/solvers/mis_quantum.py @@ -40,6 +40,8 @@ ) logger.warning(msg) OptimizationApplication = object + OptimizationResult = object + QuadraticProgram = object class MisQiskit(OptimizationApplication): diff --git a/examples/qiskit_examples/coloring_example.py b/examples/qiskit_examples/coloring_example.py new file mode 100644 index 000000000..e06c1f96b --- /dev/null +++ b/examples/qiskit_examples/coloring_example.py @@ -0,0 +1,49 @@ +from qiskit_aer import AerSimulator + +from discrete_optimization.coloring.coloring_model import ColoringProblem +from discrete_optimization.coloring.solvers.coloring_quantum import ( + QAOAColoringSolver_FeasibleNbColor, +) +from discrete_optimization.generic_tools.graph_api import Graph + + +def quantum_coloring(): + + """ + in this example we solve a small coloring problem using a quantum hybrid algorithm : QAOA + this algorithm is an approximate algorithm and it's not deterministic + """ + + # We construct a graph with 4 nodes and three edges, two colors are sufficiant + nodes = [(1, {}), (2, {}), (3, {}), (4, {})] + edges = [(1, 2, {}), (1, 3, {}), (2, 4, {})] + + # can make the problem unsat + the number of variable depend on this parameter + nb_color = 2 + + # we create an instance of ColoringProblem + coloringProblem = ColoringProblem(Graph(nodes=nodes, edges=edges)) + # we create an instance of a QAOAMisSolver + coloringSolver = QAOAColoringSolver_FeasibleNbColor( + coloringProblem, nb_color=nb_color + ) + # we initialize the solver, in fact this step transform the problem in a QUBO formulation + coloringSolver.init_model() + # we solve the mis problem + """ + by default we use a quantum simulator to solve the problem, a AerSimulator() but it's possible to use + any backend (the same simulator with defined parameters, an other simulator or any real quantum device we can + use as a qiskit backend) + for this you have just to define your own backend and then pass it at the creation of the solver or + when you use the solve function of the solver + """ + backend = AerSimulator() + kwargs = {"reps": 4, "optimization_level": 1, "maxiter": 300} + res = coloringSolver.solve(backend=backend, **kwargs) + + sol, fit = res.get_best_solution_fit() + print(sol) + print( + "Two nodes connected by an edge have never the same color : ", + coloringProblem.satisfy(sol), + ) diff --git a/examples/qiskit_examples/qiskit_optuna_coloring.py b/examples/qiskit_examples/qiskit_optuna_coloring.py new file mode 100644 index 000000000..d0cdc469e --- /dev/null +++ b/examples/qiskit_examples/qiskit_optuna_coloring.py @@ -0,0 +1,195 @@ +# Copyright (c) 2024 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +"""Example using OPTUNA to choose a solving method and tune its hyperparameters for quantum solvers. + +Results can be viewed on optuna-dashboard with: + + optuna-dashboard optuna-journal.log + +""" + +import os + +os.environ["DO_SKIP_MZN_CHECK"] = "1" + +import logging +import time +from typing import Any, Dict, List, Tuple, Type + +import optuna +from optuna import Trial +from optuna.storages import JournalFileStorage, JournalStorage +from optuna.trial import TrialState + +from discrete_optimization.coloring.coloring_model import ColoringProblem +from discrete_optimization.coloring.solvers.coloring_quantum import ( + QAOAColoringSolver_MinimizeNbColor, + VQEColoringSolver_MinimizeNbColor, +) +from discrete_optimization.generic_tools.callbacks.loggers import ObjectiveLogger +from discrete_optimization.generic_tools.callbacks.optuna import OptunaCallback +from discrete_optimization.generic_tools.do_problem import ModeOptim +from discrete_optimization.generic_tools.do_solver import SolverDO +from discrete_optimization.generic_tools.graph_api import Graph +from discrete_optimization.generic_tools.optuna.timed_percentile_pruner import ( + TimedPercentilePruner, +) +from discrete_optimization.generic_tools.qiskit_tools import QiskitSolver + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s") + +seed = 42 +optuna_nb_trials = 100 + +create_another_study = True # avoid relaunching the same study, keep the previous ones +max_time_per_solver = 60 # max duration (s) +min_time_per_solver = 5 # min duration before pruning (s) + +nodes = [(1, {}), (2, {}), (3, {}), (4, {})] +edges = [(1, 2, {}), (1, 3, {}), (2, 4, {})] +nb_max_color = 2 + +# we create an instance of ColoringProblem +problem = ColoringProblem(Graph(nodes=nodes, edges=edges)) + +modelfilename = "coloring_quantum_solvers" + +suffix = f"-{time.time()}" if create_another_study else "" +study_name = f"{modelfilename}{suffix}" +storage_path = "./optuna-journal.log" # NFS path for distributed optimization +elapsed_time_attr = "elapsed_time" # name of the user attribute used to store duration of trials (updated during intermediate reports) + + +solvers: Dict[str, List[Tuple[Type[QiskitSolver], Dict[str, Any]]]] = { + "qaoa": [ + ( + QAOAColoringSolver_MinimizeNbColor, + {}, + ), + ], + "vqe": [ + ( + VQEColoringSolver_MinimizeNbColor, + {}, + ), + ], +} + +solvers_map = {} +for key in solvers: + for solver, param in solvers[key]: + solvers_map[solver] = (key, param) + +solvers_to_test: List[Type[SolverDO]] = [s for s in solvers_map] + +# we need to map the classes to a unique string, to be seen as a categorical hyperparameter by optuna +# by default, we use the class name, but if there are identical names, f"{cls.__module__}.{cls.__name__}" could be used. +solvers_by_name: Dict[str, Type[SolverDO]] = { + cls.__name__: cls for cls in solvers_to_test +} + +# sense of optimization +objective_register = problem.get_objective_register() +if objective_register.objective_sense == ModeOptim.MINIMIZATION: + direction = "minimize" +else: + direction = "maximize" + + +def objective(trial: Trial): + # hyperparameters to test + + # first parameter: solver choice + solver_name: str = trial.suggest_categorical("solver", choices=solvers_by_name) + solver_class = solvers_by_name[solver_name] + + # hyperparameters for the chosen solver + suggested_hyperparameters_kwargs = solver_class.suggest_hyperparameters_with_optuna( + trial=trial, prefix=solver_name + "." + ) + + # use existing value if corresponding to a previous complete trial + """ + states_to_consider = (TrialState.COMPLETE,) + trials_to_consider = trial.study.get_trials( + deepcopy=False, states=states_to_consider + ) + for t in reversed(trials_to_consider): + if trial.params == t.params: + logger.warning( + "Trial with same hyperparameters as a previous complete trial: returning previous fit." + ) + return t.value + """ + # prune if corresponding to a previous failed trial + states_to_consider = (TrialState.FAIL,) + trials_to_consider = trial.study.get_trials( + deepcopy=False, states=states_to_consider + ) + for t in reversed(trials_to_consider): + if trial.params == t.params: + raise optuna.TrialPruned( + "Pruning trial identical to a previous failed trial." + ) + + logger.info(f"Launching trial {trial.number} with parameters: {trial.params}") + + # construct kwargs for __init__, init_model, and solve + kwargs = {} + kwargs.update(suggested_hyperparameters_kwargs) + print(kwargs) + + # solver init + solver = solver_class(problem=problem, nb_max_color=nb_max_color) + solver.init_model() + + # init timer + starting_time = time.perf_counter() + + # solve + res = solver.solve( + callbacks=[ + OptunaCallback( + trial=trial, + starting_time=starting_time, + elapsed_time_attr=elapsed_time_attr, + report_time=True, + ), + ObjectiveLogger( + step_verbosity_level=logging.INFO, end_verbosity_level=logging.INFO + ), + ], + **kwargs, + ) + + # store elapsed time + elapsed_time = time.perf_counter() - starting_time + trial.set_user_attr(elapsed_time_attr, elapsed_time) + + if len(res.list_solution_fits) != 0: + sol, fit = res.get_best_solution_fit() + trial.set_user_attr("satisfy", problem.satisfy(sol)) + trial.set_user_attr("color by node", sol.colors) + trial.set_user_attr("nb_colors", sol.nb_color) + return fit + else: + raise optuna.TrialPruned("Pruned because failed") + + +# create study + database to store it +storage = "sqlite:///example.db" +study = optuna.create_study( + study_name=study_name, + direction=direction, + sampler=optuna.samplers.TPESampler(seed=seed), + pruner=TimedPercentilePruner( # intermediate values interpolated at same "step" + percentile=50, # median pruner + n_warmup_steps=min_time_per_solver, # no pruning during first seconds + ), + storage=storage, + load_if_exists=True, +) +study.set_metric_names(["nb_color"]) +study.optimize(objective, n_trials=optuna_nb_trials) diff --git a/examples/qiskit_examples/qiskit_optuna_mis.py b/examples/qiskit_examples/qiskit_optuna_mis.py new file mode 100644 index 000000000..97484e48e --- /dev/null +++ b/examples/qiskit_examples/qiskit_optuna_mis.py @@ -0,0 +1,200 @@ +# Copyright (c) 2024 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +"""Example using OPTUNA to choose a solving method and tune its hyperparameters for quantum solvers. + +Results can be viewed on optuna-dashboard with: + + optuna-dashboard optuna-journal.log + +""" + +import os + +os.environ["DO_SKIP_MZN_CHECK"] = "1" + +import logging +import time +from typing import Any, Dict, List, Tuple, Type + +import networkx as nx +import optuna +from optuna import Trial +from optuna.storages import JournalFileStorage, JournalStorage +from optuna.trial import TrialState + +from discrete_optimization.generic_tools.callbacks.loggers import ObjectiveLogger +from discrete_optimization.generic_tools.callbacks.optuna import OptunaCallback +from discrete_optimization.generic_tools.do_problem import ModeOptim +from discrete_optimization.generic_tools.do_solver import SolverDO +from discrete_optimization.generic_tools.optuna.timed_percentile_pruner import ( + TimedPercentilePruner, +) +from discrete_optimization.generic_tools.qiskit_tools import QiskitSolver +from discrete_optimization.maximum_independent_set.mis_model import MisProblem +from discrete_optimization.maximum_independent_set.solvers.mis_quantum import ( + QAOAMisSolver, + VQEMisSolver, +) + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s") + +seed = 42 +optuna_nb_trials = 100 + +create_another_study = True # avoid relaunching the same study, keep the previous ones +max_time_per_solver = 60 # max duration (s) +min_time_per_solver = 5 # min duration before pruning (s) + +graph = nx.Graph() + +graph.add_edge(1, 2) +graph.add_edge(1, 3) +graph.add_edge(2, 4) +graph.add_edge(2, 6) +graph.add_edge(3, 4) +graph.add_edge(3, 5) +graph.add_edge(4, 5) +graph.add_edge(4, 6) + +# problem definition +problem = MisProblem(graph) + +modelfilename = "MIS_quantum_solvers" + +suffix = f"-{time.time()}" if create_another_study else "" +study_name = f"{modelfilename}{suffix}" +storage_path = "./optuna-journal.log" # NFS path for distributed optimization +elapsed_time_attr = "elapsed_time" # name of the user attribute used to store duration of trials (updated during intermediate reports) + + +solvers: Dict[str, List[Tuple[Type[QiskitSolver], Dict[str, Any]]]] = { + "qaoa": [ + ( + QAOAMisSolver, + {}, + ), + ], + "vqe": [ + ( + VQEMisSolver, + {}, + ), + ], +} + +solvers_map = {} +for key in solvers: + for solver, param in solvers[key]: + solvers_map[solver] = (key, param) + +solvers_to_test: List[Type[SolverDO]] = [s for s in solvers_map] + +# we need to map the classes to a unique string, to be seen as a categorical hyperparameter by optuna +# by default, we use the class name, but if there are identical names, f"{cls.__module__}.{cls.__name__}" could be used. +solvers_by_name: Dict[str, Type[SolverDO]] = { + cls.__name__: cls for cls in solvers_to_test +} + +# sense of optimization +objective_register = problem.get_objective_register() +if objective_register.objective_sense == ModeOptim.MINIMIZATION: + direction = "minimize" +else: + direction = "maximize" + + +def objective(trial: Trial): + # hyperparameters to test + + # first parameter: solver choice + solver_name: str = trial.suggest_categorical("solver", choices=solvers_by_name) + solver_class = solvers_by_name[solver_name] + + # hyperparameters for the chosen solver + suggested_hyperparameters_kwargs = solver_class.suggest_hyperparameters_with_optuna( + trial=trial, prefix=solver_name + "." + ) + + # use existing value if corresponding to a previous complete trial + """ + states_to_consider = (TrialState.COMPLETE,) + trials_to_consider = trial.study.get_trials( + deepcopy=False, states=states_to_consider + ) + for t in reversed(trials_to_consider): + if trial.params == t.params: + logger.warning( + "Trial with same hyperparameters as a previous complete trial: returning previous fit." + ) + return t.value + """ + # prune if corresponding to a previous failed trial + states_to_consider = (TrialState.FAIL,) + trials_to_consider = trial.study.get_trials( + deepcopy=False, states=states_to_consider + ) + for t in reversed(trials_to_consider): + if trial.params == t.params: + raise optuna.TrialPruned( + "Pruning trial identical to a previous failed trial." + ) + + logger.info(f"Launching trial {trial.number} with parameters: {trial.params}") + + # construct kwargs for __init__, init_model, and solve + kwargs = {} + kwargs.update(suggested_hyperparameters_kwargs) + + # solver init + solver = solver_class(problem=problem) + solver.init_model() + + # init timer + starting_time = time.perf_counter() + + # solve + res = solver.solve( + callbacks=[ + OptunaCallback( + trial=trial, + starting_time=starting_time, + elapsed_time_attr=elapsed_time_attr, + report_time=True, + ), + ObjectiveLogger( + step_verbosity_level=logging.INFO, end_verbosity_level=logging.INFO + ), + ], + **kwargs, + ) + + # store elapsed time + elapsed_time = time.perf_counter() - starting_time + trial.set_user_attr(elapsed_time_attr, elapsed_time) + + if len(res.list_solution_fits) != 0: + sol, fit = res.get_best_solution_fit() + trial.set_user_attr("satisfy", problem.satisfy(sol)) + trial.set_user_attr("nodes chose", list(sol.chosen)) + return fit + else: + raise optuna.TrialPruned("Pruned because failed") + + +# create study + database to store it +storage = "sqlite:///example.db" +study = optuna.create_study( + study_name=study_name, + direction=direction, + sampler=optuna.samplers.TPESampler(seed=seed), + pruner=TimedPercentilePruner( # intermediate values interpolated at same "step" + percentile=50, # median pruner + n_warmup_steps=min_time_per_solver, # no pruning during first seconds + ), + storage=storage, + load_if_exists=True, +) +study.set_metric_names(["nb_nodes"]) +study.optimize(objective, n_trials=optuna_nb_trials) diff --git a/tests/quantum/quantum.py b/tests/quantum/quantum.py new file mode 100644 index 000000000..db0bc5bad --- /dev/null +++ b/tests/quantum/quantum.py @@ -0,0 +1,70 @@ +# Copyright (c) 2024 AIRBUS and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os + +os.environ["DO_SKIP_MZN_CHECK"] = "1" + +import logging + +import networkx as nx +import pytest + +from discrete_optimization.coloring.coloring_model import ColoringProblem +from discrete_optimization.generic_tools.graph_api import Graph +from discrete_optimization.generic_tools.qiskit_tools import qiskit_available +from discrete_optimization.generic_tools.quantum_solvers import ( + solve, + solve_coloring, + solvers_map_coloring, + solvers_map_mis, +) +from discrete_optimization.maximum_independent_set.mis_model import MisProblem + +logger = logging.getLogger(__name__) + + +@pytest.mark.skipif( + not qiskit_available, reason="You need Qiskit modules to this test." +) +@pytest.mark.parametrize("solver_class", solvers_map_coloring) +def test_solvers_coloring(solver_class): + nodes = [(1, {}), (2, {}), (3, {}), (4, {})] + edges = [(1, 2, {}), (1, 3, {}), (2, 4, {})] + nb_colors = 2 + coloring_model: ColoringProblem = ColoringProblem(Graph(nodes=nodes, edges=edges)) + results = solve_coloring( + method=solver_class, + problem=coloring_model, + nb_color=nb_colors, + **solvers_map_coloring[solver_class][1] + ) + sol, fit = results.get_best_solution_fit() + + +@pytest.mark.skipif( + not qiskit_available, reason="You need Qiskit modules to this test." +) +@pytest.mark.parametrize("solver_class", solvers_map_mis) +def test_solvers_mis(solver_class): + graph = nx.Graph() + + graph.add_edge(1, 2) + graph.add_edge(1, 3) + graph.add_edge(2, 4) + graph.add_edge(2, 6) + graph.add_edge(3, 4) + graph.add_edge(3, 5) + graph.add_edge(4, 5) + graph.add_edge(4, 6) + mis_model: MisProblem = MisProblem(graph) + results = solve( + method=solver_class, problem=mis_model, **solvers_map_mis[solver_class][1] + ) + sol, fit = results.get_best_solution_fit() + + +if __name__ == "__main__": + test_solvers_coloring() + test_solvers_mis()