Skip to content

Commit

Permalink
Dynamic programming solver implementation
Browse files Browse the repository at this point in the history
-adding didppy models available for many DO problems
-implemented examples, tests for each implemented problem
  • Loading branch information
g-poveda committed Oct 11, 2024
1 parent dda37b4 commit e4e676f
Show file tree
Hide file tree
Showing 31 changed files with 3,446 additions and 1 deletion.
251 changes: 251 additions & 0 deletions discrete_optimization/coloring/solvers/did_coloring_solver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import re
from collections.abc import Iterator
from enum import Enum
from typing import Any

import didppy as dp
import networkx as nx

from discrete_optimization.coloring.coloring_model import (
ColoringProblem,
ColoringSolution,
)
from discrete_optimization.coloring.solvers.coloring_solver import SolverColoring
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import WarmstartMixin
from discrete_optimization.generic_tools.dyn_prog_tools import DidSolver
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
EnumHyperparameter,
SubBrickHyperparameter,
)


class DidColoringModeling(Enum):
COLOR_TRANSITION = 0
COLOR_NODE_TRANSITION = 1


def bfs_iterator(g: nx.Graph, source: Any) -> Iterator[Any]:
visited = set()
queue = [source]
visited.add(source)

while queue:
node = queue.pop(0)
yield node

for neighbor in g.neighbors(node):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)


class DidColoringSolver(DidSolver, SolverColoring):
hyperparameters = DidSolver.hyperparameters + [
EnumHyperparameter(
name="modeling",
enum=DidColoringModeling,
default=DidColoringModeling.COLOR_TRANSITION,
),
CategoricalHyperparameter(
name="dual_bound", choices=[True, False], default=True
),
]
transitions: dict
nodes_reordering: list
modeling: DidColoringModeling

def init_model(self, **kwargs):
kwargs = self.complete_with_default_hyperparameters(kwargs)
modeling: DidColoringModeling = kwargs["modeling"]
if modeling == DidColoringModeling.COLOR_TRANSITION:
self.init_model_color(**kwargs)
if modeling == DidColoringModeling.COLOR_NODE_TRANSITION:
self.init_model_color_and_node(**kwargs)
self.modeling = modeling

def init_model_color_and_node(self, **kwargs: Any) -> None:
kwargs = self.complete_with_default_hyperparameters(kwargs)

graph = self.problem.graph.graph_nx
nb_colors = kwargs.get("nb_colors", 50)
degrees = {x[0]: x[1] for x in nx.degree(graph)}
most_neighbor = max(degrees, key=lambda x: degrees[x])
nodes_order = list(bfs_iterator(graph, source=most_neighbor))
while len(nodes_order) < self.problem.number_of_nodes:
n = next(x for x in self.problem.nodes_name if x not in nodes_order)
nodes_order += list(bfs_iterator(graph, source=n))
nodes_order = sorted(degrees, key=lambda x: degrees[x], reverse=True)
nodes_index = [self.problem.index_nodes_name[n] for n in nodes_order]
index_problem_to_model = {nodes_index[i]: i for i in range(len(nodes_index))}
self.nodes_reordering = nodes_index
model = dp.Model()
colors = [model.add_int_var(target=0)] + [
model.add_int_var(target=5 * i)
for i in range(1, self.problem.number_of_nodes)
]
node = model.add_object_type(number=self.problem.number_of_nodes)
cur_color = model.add_int_var(target=0)
uncolored = model.add_set_var(
object_type=node, target=range(1, self.problem.number_of_nodes)
)
model.add_base_case([uncolored.is_empty()])
self.transitions = {}
for i in range(1, self.problem.number_of_nodes):
cur_node = nodes_index[i]
neigh = self.problem.graph.get_neighbors(self.problem.nodes_name[cur_node])
neighs = [
index_problem_to_model[self.problem.index_nodes_name[n]] for n in neigh
]
for c in range(nb_colors):
color = dp.Transition(
name=f"{i,c}",
cost=dp.max(c, cur_color) - cur_color + dp.IntExpr.state_cost(),
effects=[
(uncolored, uncolored.remove(i)),
(cur_color, dp.max(c, cur_color)),
(colors[i], c),
],
preconditions=[
c <= cur_color + 1,
uncolored.contains(i),
~uncolored.contains(i - 1),
]
+ [colors[n] != c for n in neighs],
)
model.add_transition(color)
self.model = model

def init_model_color(self, **kwargs: Any) -> None:
graph = self.problem.graph.graph_nx
nb_colors = kwargs.get("nb_colors", 50)
nb_nodes = self.problem.number_of_nodes

degrees = {x[0]: x[1] for x in nx.degree(graph)}
most_neighbor = max(degrees, key=lambda x: degrees[x])
nodes_order = list(bfs_iterator(graph, source=most_neighbor))
while len(nodes_order) < self.problem.number_of_nodes:
n = next(x for x in self.problem.nodes_name if x not in nodes_order)
nodes_order += list(bfs_iterator(graph, source=n))
nodes_order = sorted(degrees, key=lambda x: degrees[x], reverse=True)
nodes_index = [self.problem.index_nodes_name[n] for n in nodes_order]
index_problem_to_model = {nodes_index[i]: i for i in range(len(nodes_index))}
self.nodes_reordering = nodes_index
model = dp.Model()
node_type = model.add_object_type(number=nb_nodes)
node_allocated_per_color = [
model.add_set_var(object_type=node_type, target=set())
for _ in range(nb_colors)
]
current_node = model.add_element_var(object_type=node_type, target=0)
neighbors = [
{
index_problem_to_model[self.problem.index_nodes_name[n]]
for n in self.problem.graph.get_neighbors(
self.problem.index_to_nodes_name[nodes_index[i]]
)
}
for i in range(nb_nodes)
]
neighbors_tab = model.add_set_table(neighbors, object_type=node_type)
nb_color_used = model.add_int_resource_var(target=0)
for c in range(nb_colors):
if c == 0:
alloc = dp.Transition(
name=f"alloc_{c}",
cost=1 + dp.IntExpr.state_cost(),
effects=[
(
node_allocated_per_color[c],
node_allocated_per_color[c].add(current_node),
),
(current_node, current_node + 1),
(nb_color_used, nb_color_used + 1),
],
preconditions=[
current_node < nb_nodes,
node_allocated_per_color[c]
.intersection(neighbors_tab[current_node])
.is_empty(),
node_allocated_per_color[c].is_empty(),
],
)
model.add_transition(alloc)
else:
alloc = dp.Transition(
name=f"alloc_{c}",
cost=1 + dp.IntExpr.state_cost(),
effects=[
(
node_allocated_per_color[c],
node_allocated_per_color[c].add(current_node),
),
(current_node, current_node + 1),
(nb_color_used, nb_color_used + 1),
],
preconditions=[
current_node < nb_nodes,
node_allocated_per_color[c]
.intersection(neighbors_tab[current_node])
.is_empty(),
node_allocated_per_color[c].is_empty(),
~node_allocated_per_color[c - 1].is_empty(),
],
)
model.add_transition(alloc)
alloc_no_new = dp.Transition(
name=f"alloc_{c}_no_new",
cost=dp.IntExpr.state_cost(),
effects=[
(
node_allocated_per_color[c],
node_allocated_per_color[c].add(current_node),
),
(current_node, current_node + 1),
],
preconditions=[
current_node < nb_nodes,
node_allocated_per_color[c]
.intersection(neighbors_tab[current_node])
.is_empty(),
~node_allocated_per_color[c].is_empty(),
],
)
model.add_transition(alloc_no_new)
model.add_base_case([current_node == nb_nodes])
if kwargs["dual_bound"]:
model.add_dual_bound(0)
self.model = model

def retrieve_solution(self, sol: dp.Solution) -> Solution:
if self.modeling == DidColoringModeling.COLOR_TRANSITION:
return self.retrieve_solution_color(sol)
if self.modeling == DidColoringModeling.COLOR_NODE_TRANSITION:
return self.retrieve_solution_color_and_node(sol)

def retrieve_solution_color_and_node(self, sol: dp.Solution) -> Solution:
def extract_numbers(s):
return tuple(int(num) for num in re.findall(r"\d+", s))

colors = [None for _ in range(self.problem.number_of_nodes)]
colors[self.nodes_reordering[0]] = 0
for t in sol.transitions:
n, c = extract_numbers(t.name)
colors[self.nodes_reordering[n]] = c
return ColoringSolution(problem=self.problem, colors=colors)

def retrieve_solution_color(self, sol: dp.Solution) -> Solution:
def extract_numbers(s):
return tuple(int(num) for num in re.findall(r"\d+", s))

colors = [None for _ in range(self.problem.number_of_nodes)]
ind = 0
for t in sol.transitions:
c = extract_numbers(t.name)[0]
colors[self.nodes_reordering[ind]] = c
ind += 1
return ColoringSolution(problem=self.problem, colors=colors)
17 changes: 17 additions & 0 deletions discrete_optimization/facility/facility_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import numpy as np

from discrete_optimization.facility.facility_model import FacilityProblem


def compute_matrix_distance_facility_problem(problem: FacilityProblem):
matrix_distance = np.zeros((problem.customer_count, problem.facility_count))
for k in range(problem.customer_count):
for j in range(problem.facility_count):
matrix_distance[k, j] = problem.evaluate_customer_facility(
facility=problem.facilities[j], customer=problem.customers[k]
)
return matrix_distance
Loading

0 comments on commit e4e676f

Please sign in to comment.