-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rudimentary bin cover solver using the user propagator
Signed-off-by: Nikolaj Bjorner <nbjorner@microsoft.com>
- Loading branch information
1 parent
68a2c08
commit b09c237
Showing
1 changed file
with
286 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,286 @@ | ||
from z3 import * | ||
import math | ||
|
||
# Rudimentary bin cover solver using the UserPropagator feature. | ||
# It supports the most basic propagation for bin covering. | ||
# - each bin has a propositional variable set to true if the bin is covered | ||
# - each item has a bit-vector recording the assigned bin | ||
# It searches for a locally optimal solution. | ||
|
||
class Bin: | ||
""" | ||
Each bin carries values: | ||
- min_bound - the lower bound required to be added to bin | ||
- weight - the sum of weight of items currently added to bin | ||
- slack - the difference between the maximal possible assignment and the assignments to other bin2bound. | ||
- var - is propagated to true/false if the bin gets filled/cannot be filled. | ||
""" | ||
def __init__(self, min_bound, index): | ||
assert min_bound > 0 | ||
assert index >= 0 | ||
self.index = index | ||
self.min_bound = min_bound | ||
self.weight = 0 | ||
self.slack = 0 | ||
self.added = [] | ||
self.var = Bool(f"bin-{index}") | ||
|
||
def set_slack(self, slack): | ||
self.slack = slack | ||
|
||
def set_fill(self, fill): | ||
self.weight = fill | ||
|
||
def __repr__(self): | ||
return f"{self.var}:bound-{self.min_bound}" | ||
|
||
|
||
class Item: | ||
def __init__(self, weight, index): | ||
self.weight = weight | ||
self.index = index | ||
self.var = None | ||
|
||
def set_var(self, num_bits): | ||
self.var = BitVec(f"binof-{self.index}", num_bits) | ||
|
||
def __repr__(self): | ||
return f"binof-{self.index}:weight-{self.weight}" | ||
|
||
|
||
class BinCoverSolver(UserPropagateBase): | ||
"""Represent a bin-covering problem by associating each bin with a variable | ||
For each item i associate a bit-vector | ||
- bin-of-i that carries the bin identifier where an item is assigned. | ||
""" | ||
|
||
def __init__(self, s=None, ctx=None): | ||
UserPropagateBase.__init__(self, s, ctx) | ||
self.bins = [] | ||
self.items = [] | ||
self.item2index = {} | ||
self.trail = [] # Undo stack | ||
self.lim = [] | ||
self.solver = s | ||
self.initialized = False | ||
self.add_fixed(lambda x, v : self._fixed(x, v)) | ||
|
||
|
||
# Initialize bit-vector variables for items. | ||
# Register the bit-vector variables with the user propagator to get callbacks | ||
# Ensure the bit-vector variables are assigned to a valid bin. | ||
# Initialize the slack of each bin. | ||
def init(self): | ||
print(self.bins, len(self.bins)) | ||
print(self.items) | ||
assert not self.initialized | ||
self.initialized = True | ||
powerof2, num_bits = self._num_bits() | ||
for item in self.items: | ||
item.set_var(num_bits) | ||
self.item2index[item.var.get_id()] = item.index | ||
self.add(item.var) | ||
if not powerof2: | ||
bound = BitVecVal(len(self.bins), num_bits) | ||
ineq = ULT(item.var, bound) | ||
self.solver.add(ineq) | ||
total_weight = sum(item.weight for item in self.items) | ||
for i in range(len(self.bins)): | ||
self.bins[i].slack = total_weight | ||
|
||
def add_bin(self, min_bound): | ||
assert not self.initialized | ||
index = len(self.bins) | ||
self.bins += [Bin(min_bound, index)] | ||
return index | ||
|
||
def add_item(self, weight): | ||
assert not self.initialized | ||
assert weight > 0 | ||
index = len(self.items) | ||
self.items += [Item(weight, index)] | ||
return index | ||
|
||
def num_items(self): | ||
return len(self.items) | ||
|
||
def num_bins(self): | ||
return len(self.bins) | ||
|
||
def _num_bits(self): | ||
log = math.log2(self.num_bins()) | ||
if log.is_integer(): | ||
return True, int(log) | ||
else: | ||
return False, int(log) + 1 | ||
|
||
def _set_slack(self, bin, slack_value): | ||
bin.slack = slack_value | ||
|
||
def _set_fill(self, bin, fill_value): | ||
bin.weight = fill_value | ||
bin.added.pop() | ||
|
||
def _itemvar2item(self, v): | ||
index = self.item2index[v.get_id()] | ||
if index >= len(self.items): | ||
return None | ||
return self.items[index] | ||
|
||
def _value2bin(self, value): | ||
assert isinstance(value, BitVecNumRef) | ||
bin_index = value.as_long() | ||
if bin_index >= len(self.bins): | ||
return NOne | ||
return self.bins[bin_index] | ||
|
||
def _add_item2bin(self, item, bin): | ||
# print("add", item, "to", bin) | ||
old_weight = bin.weight | ||
bin.weight += item.weight | ||
bin.added += [item] | ||
self.trail += [lambda : self._set_fill(bin, old_weight)] | ||
if old_weight < bin.min_bound and old_weight + item.weight >= bin.min_bound: | ||
self._propagate_filled(bin) | ||
|
||
# This item can never go into bin | ||
def _exclude_item2bin(self, item, bin): | ||
# print("exclude", item, "from", bin) | ||
# Check if bin has already been blocked | ||
if bin.slack < bin.weight: | ||
return | ||
if bin.weight >= bin.min_bound: | ||
return | ||
old_slack = bin.slack | ||
new_slack = old_slack - item.weight | ||
bin.slack = new_slack | ||
self.trail += [lambda : self._set_slack(bin, old_slack)] | ||
# If the new slack does not permit the bin to be filled, propagate | ||
if new_slack < bin.min_bound: | ||
self._propagate_slack(bin) | ||
|
||
|
||
# Callback from Z3 when an item gets fixed. | ||
def _fixed(self, _item, value): | ||
item = self._itemvar2item(_item) | ||
if item is None: | ||
print("no item for ", _item) | ||
return | ||
bin = self._value2bin(value) | ||
if bin is None: | ||
print("no bin for ", value) | ||
return | ||
self._add_item2bin(item, bin) | ||
for idx in range(len(self.bins)): | ||
if idx == bin.index: | ||
continue | ||
other_bin = self.bins[idx] | ||
self._exclude_item2bin(item, other_bin) | ||
|
||
def _propagate_filled(self, bin): | ||
"""Propagate that bin_index is filled justified by the set of | ||
items that have been added | ||
""" | ||
justification = [i.var for i in bin.added] | ||
self.propagate(bin.var, justification) | ||
|
||
def _propagate_slack(self, bin): | ||
"""Propagate that bin_index cannot be filled""" | ||
justification = [] | ||
for other_bin in self.bins: | ||
if other_bin.index == bin.index: | ||
continue | ||
justification += other_bin.added | ||
justification = [item.var for item in justification] | ||
self.propagate(Not(bin.var), justification) | ||
|
||
def push(self): | ||
self.lim += [len(self.trail)] | ||
|
||
def pop(self, n): | ||
head = self.lim[len(self.lim) - n] | ||
while len(self.trail) > head: | ||
self.trail[-1]() | ||
self.trail.pop(-1) | ||
self.lim = self.lim[0:len(self.lim)-n] | ||
|
||
# Find a first maximally satisfying subset | ||
class MaximalSatisfyingSubset: | ||
def __init__(self, s): | ||
self.s = s | ||
self.model = None | ||
|
||
def tt(self, f): | ||
return is_true(self.model.eval(f)) | ||
|
||
def get_mss(self, ps): | ||
s = self.s | ||
if sat != s.check(): | ||
return [] | ||
self.model = s.model() | ||
mss = { q for q in ps if self.tt(q) } | ||
return self._get_mss(mss, ps) | ||
|
||
def _get_mss(self, mss, ps): | ||
ps = set(ps) - mss | ||
backbones = set([]) | ||
s = self.s | ||
while len(ps) > 0: | ||
p = ps.pop() | ||
if sat == s.check(mss | backbones | { p }): | ||
self.model = s.model() | ||
mss = mss | { p } | { q for q in ps if self.tt(q) } | ||
ps = ps - mss | ||
else: | ||
backbones = backbones | { Not(p) } | ||
return mss | ||
|
||
|
||
class OptimizeBinCoverSolver: | ||
def __init__(self): | ||
self.solver = Solver() | ||
self.bin_solver = BinCoverSolver(self.solver) | ||
self.mss_solver = MaximalSatisfyingSubset(self.solver) | ||
|
||
# Facilities to set up solver | ||
def init(self): | ||
self.bin_solver.init() | ||
|
||
def add_item(self, weight): | ||
return self.bin_solver.add_item(weight) | ||
|
||
def add_bin(self, min_bound): | ||
return self.bin_solver.add_bin(min_bound) | ||
|
||
def item_index2item(self, index): | ||
return self.bin_solver.items[index] | ||
|
||
def bin_index2bin(self, index): | ||
return self.bin_solver.bins[index] | ||
|
||
def optimize(self): | ||
self.init() | ||
mss = self.mss_solver.get_mss([bin.var for bin in self.bin_solver.bins]) | ||
print(self.mss_solver.model) | ||
print("filled bins", mss) | ||
print("bin contents") | ||
for bin in self.bin_solver.bins: | ||
print(bin, bin.added) | ||
|
||
|
||
|
||
def example1(): | ||
s = OptimizeBinCoverSolver() | ||
i1 = s.add_item(2) | ||
i2 = s.add_item(4) | ||
i3 = s.add_item(5) | ||
i4 = s.add_item(2) | ||
b1 = s.add_bin(3) | ||
b2 = s.add_bin(6) | ||
b3 = s.add_bin(1) | ||
s.optimize() | ||
|
||
example1() | ||
|
||
|