# Copyright (C) 2010 Thomas Leonard # Copyright (C) 2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # The design of this solver is very heavily based on the one described in # the MiniSat paper "An Extensible SAT-solver [extended version 1.2]" # http://minisat.se/Papers.html # # The main differences are: # # - We care about which solution we find (not just "satisfiable" or "not"). # - We take care to be deterministic (always select the same versions given # the same input). We do not do random restarts, etc. # - We add an _AtMostOneClause (the paper suggests this in the Excercises, and # it's very useful for our purposes). import logging from collections import deque from sugar_network.toolkit import enforce _logger = logging.getLogger('sat') def solve(clauses, at_most_one_clauses, decide): if not clauses: _logger.info('No clauses') return None max_var = 0 for clause in clauses: max_var = max(max_var, max([abs(i) for i in clause])) for clause in at_most_one_clauses.values(): max_var = max(max_var, max([abs(i) for i in clause])) problem = _Problem(max_var) for clause in clauses: if problem.add_clause(clause) is False: _logger.info('The %r clause is unresolved', clause) return None clauses = {} for name, clause in at_most_one_clauses.items(): clauses[name] = problem.at_most_one(clause) if not problem.run_solver(lambda: decide(clauses)): return None result = {} for name, clause in clauses.items(): if clause.current is not None: result[name] = clause.current return result class _AtMostOneClause(object): def __init__(self, problem, varset): self.problem = problem self.varset = varset # The single literal from our set that is True. # We store this explicitly because the decider needs to know quickly. self.current = None def propagate(self, v): # Re-add ourselves to the watch list. # (we we won't get any more notifications unless we backtrack, # in which case we'd need to get back on the list anyway) self.problem.watch(v, self) # value[v] has just become True assert self.problem.value(v) is True # If we already propagated successfully when the first # one was set then we set all the others to False and # anyone trying to set one True will get rejected. And # if we didn't propagate yet, current will still be # None, even if we now have a conflict (which we'll # detect below). assert self.current is None self.current = v # If we later backtrace, call our undo function to unset current self.problem.undo[abs(v)].append(self) for v_ in self.varset: value = self.problem.value(v_) if value is True and v_ != v: # Due to queuing, we might get called with current = None # and two versions already selected. _logger.trace('CONFLICT already selected %s', v_) return False if value is None: # Since one of our varset is already true, all unknown ones # can be set to False. if not self.problem.enqueue(-v_, self): _logger.trace('CONFLICT enqueue failed for %s', v_) return False # Conflict; abort return True def undo(self, v): _logger.trace('Undo %s', v) assert v == self.current self.current = None # Why is v True? # Or, why are we causing a conflict (if v is None)? def calc_reason(self, v): if v is None: # Find two True literals trues = [] for v_ in self.varset: if self.problem.value(v_) is True: trues.append(v_) if len(trues) == 2: return trues else: for v_ in self.varset: if v_ != v and self.problem.value(v_) is True: return [v_] # Find one True literal assert 0 # don't know why! def best_undecided(self): for v in self.varset: if self.problem.value(v) is None: _logger.trace('Best undecided %r from %r', v, self.varset) return v return None def __repr__(self): return "" % self.varset class _UnionClause(object): def __init__(self, problem, varset): self.problem = problem self.varset = varset # Try to infer new facts. # We can do this only when all of our literals are False except one, # which is undecided. That is, # False... or X or False... = True => X = True # # To get notified when this happens, we tell the solver to # watch two of our undecided literals. Watching two undecided # literals is sufficient. When one changes we check the state # again. If we still have two or more undecided then we switch # to watching them, otherwise we propagate. # # Returns False on conflict. def propagate(self, v): # value[get(v)] has just become False # For simplicity, only handle the case where self.varset[1] # is the one that just got set to False, so that: # - value[varset[0]] = None | True # - value[varset[1]] = False # If it's the other way around, just swap them before we start. if self.varset[0] == -v: self.varset[0], self.varset[1] = self.varset[1], self.varset[0] if self.problem.value(self.varset[0]) is True: # We're already satisfied. Do nothing. self.problem.watch(v, self) return True assert self.problem.value(self.varset[1]) is False # Find a new literal to watch now that varset[1] is resolved, # swap it with varset[1], and start watching it. for i in range(2, len(self.varset)): value = self.problem.value(self.varset[i]) if value is not False: # Could be None or True. If it's True then we've already done # our job, so this means we don't get notified # unless we backtrack, which is fine. self.varset[1], self.varset[i] = self.varset[i], self.varset[1] self.problem.watch(-self.varset[1], self) return True # Only varset[0], is now undefined. self.problem.watch(v, self) return self.problem.enqueue(self.varset[0], self) def undo(self, v): pass # Why is v True? # Or, why are we causing a conflict (if v is None)? def calc_reason(self, v): assert v is None or v == self.varset[0] # The cause is everything except v. return [-v_ for v_ in self.varset if v_ != v] def __repr__(self): return "" % self.varset class _Problem(object): def __init__(self, max_var): # True/False/None self._vars = [None] # constraints to check when _vars[i] becomes True self._pos_watch = [None] # constraints to check when _vars[i] becomes False self._neg_watch = [None] # Constraints to update if we become unbound for every _vars[i] self.undo = [None] # The decision level at which we got a value for _vars[i] self._levels = [None] # The constraint that implied _vars, if True or False self._reasons = [None] # order of assignments self._trail = [] # decision levels (len(_trail) at each decision) self._trail_lim = [] # propagation queue self._propQ = deque() count = max_var - len(self._vars) + 1 self._vars.extend([None] * count) self._pos_watch.extend([[] for __ in range(count)]) self._neg_watch.extend([[] for __ in range(count)]) self.undo.extend([[] for __ in range(count)]) self._levels.extend([-1] * count) self._reasons.extend([None] * count) def watch(self, v, cb): (self._neg_watch[-v] if v < 0 else self._pos_watch[v]).append(cb) def value(self, v): assert v if v > 0: return self._vars[v] else: value = self._vars[-v] if value is None: return None else: return not value def add_clause(self, varset): _logger.trace('Add clause %r', varset) if any(self.value(v) is True for v in varset): # Trivially true already. return True varset_ = set(varset) for v in varset: if -v in varset_: # X or not(X) is always True. return True # Remove duplicates and values known to be False varset = [v for v in varset_ if self.value(v) is not False] return self._add_clause(varset, learnt=False, reason="input fact") def at_most_one(self, varset): _logger.trace('At most one of %r', varset) # If we have zero or one literals then we're trivially true # and not really needed for the solve. However, Zero Install # monitors these objects to find out what was selected, so # keep even trivial ones around for that. # #if len(varset) < 2: # return True # Trivially true # Ensure no duplicates assert len(set(varset)) == len(varset), varset # Ignore any literals already known to be False. # If any are True then they're enqueued and we'll process them # soon. varset = [v for v in varset if self.value(v) is not False] clause = _AtMostOneClause(self, varset) for v in varset: self.watch(v, clause) return clause def run_solver(self, decide): """@rtype: bool""" while True: # Use logical deduction to simplify the clauses # and assign literals where there is only one possibility. conflicting_clause = self._propagate() if not conflicting_clause: _logger.trace('New state: %r', self._vars[1:]) if all(i is not None for i in self._vars[1:]): # Everything is assigned without conflicts _logger.trace('SUCCESS!') return True else: # Pick a variable and try assigning it one way. # If it leads to a conflict, we'll backtrack and # try it the other way. v = decide() assert v is not None, "decide function returned None!" assert self.value(v) is None self._trail_lim.append(len(self._trail)) r = self.enqueue(v, reason="considering") assert r is True else: if self._decision_level == 0: _logger.trace('FAIL: conflict found at top level') return False else: # Figure out the root cause of this failure. learnt, backtrack_level = self._analyse(conflicting_clause) self._cancel_until(backtrack_level) clause = self._add_clause(learnt, learnt=True, reason=conflicting_clause) if clause is not True: # Everything except the first literal in learnt # is known to be False, so the first must be True. enforce(self.enqueue(learnt[0], clause) is True) # v is now True # Returns False if this immediately causes a conflict. def enqueue(self, v, reason): _logger.trace('Enqueue %r', v) old_value = self.value(v) if old_value is not None: if old_value is False: # Conflict return False else: # Already set (shouldn't happen) return True v_abs = abs(v) self._vars[v_abs] = v > 0 self._levels[v_abs] = self._decision_level self._reasons[v_abs] = reason self._trail.append(v) self._propQ.append(v) return True @property def _decision_level(self): return len(self._trail_lim) # Pop most recent assignment from self._trail def _undo_one(self): v = self._trail[-1] _logger.trace('Pop %s', v) v_abs = abs(v) self._vars[v_abs] = None self._levels[v_abs] = -1 self._reasons[v_abs] = None self._trail.pop() undo = self.undo[v_abs] while undo: undo.pop().undo(v) def _cancel_until(self, level): while self._decision_level > level: n_this_level = len(self._trail) - self._trail_lim[-1] _logger.trace('Cancel at level %d (%d assignments)', self._decision_level, n_this_level) while n_this_level != 0: self._undo_one() n_this_level -= 1 self._trail_lim.pop() # Process the propQ. # Returns None when done, or the clause that caused a conflict. def _propagate(self): while self._propQ: watches_ = [] v = self._propQ.popleft() if v < 0: watches, self._neg_watch[-v] = self._neg_watch[-v], watches_ else: watches, self._pos_watch[v] = self._pos_watch[v], watches_ _logger.trace('%s -> True : watches: %r', v, watches) for i, clause in enumerate(watches): if clause.propagate(v): continue # Conflict, re-add remaining watches watches_.extend(watches[i + 1:]) # No point processing the rest of the queue as # we'll have to backtrack now. self._propQ.clear() return clause return None # Returns the new clause if one was added, True if none was added # because this clause is trivially True, or False if the clause is # False. def _add_clause(self, varset, learnt, reason): if len(varset) == 1: # A clause with only a single literal is represented # as an assignment rather than as a clause. return self.enqueue(varset[0], reason) clause = _UnionClause(self, varset) if learnt: # varset[0] is None because we just backtracked. # Start watching the next literal that we will # backtrack over. best_level = -1 best_i = 1 for i in range(1, len(varset)): level = self._levels[abs(varset[i])] if level > best_level: best_level = level best_i = i varset[1], varset[best_i] = varset[best_i], varset[1] # Watch the first two literals in the clause (both must be # undefined at this point). for v in varset[:2]: self.watch(-v, clause) return clause def _analyse(self, cause): # After trying some assignments, we've discovered a conflict. # e.g. # - we selected A then B then C # - from A, B, C we got X, Y # - we have a rule: not(A) or not(X) or not(Y) # # The simplest thing to do would be: # 1. add the rule "not(A) or not(B) or not(C)" # 2. unassign C # # Then we we'd deduce not(C) and we could try something else. # However, that would be inefficient. We want to learn a more # general rule that will help us with the rest of the problem. # # We take the clause that caused the conflict ("cause") and # ask it for its cause. In this case: # # A and X and Y => conflict # # Since X and Y followed logically from A, B, C there's no # point learning this rule; we need to know to avoid A, B, C # *before* choosing C. We ask the two varset deduced at the # current level (X and Y) what caused them, and work backwards. # e.g. # # X: A and C => X # Y: C => Y # # Combining these, we get the cause of the conflict in terms of # things we knew before the current decision level: # # A and X and Y => conflict # A and (A and C) and (C) => conflict # A and C => conflict # # We can then learn (record) the more general rule: # # not(A) or not(C) # # Then, in future, whenever A is selected we can remove C and # everything that depends on it from consideration. # The general rule we're learning learnt = [None] # The deepest decision in learnt learnt_level = 0 # The literal we want to expand now p = None # The varset involved in the conflict seen = [False] * len(self._vars) counter = 0 while True: # cause is the reason why p is True (i.e. it enqueued it). # The first time, p is None, which requests the reason # why it is conflicting. if p is None: p_reason = cause.calc_reason(p) _logger.trace('%s failed because of %r', cause, p_reason) else: p_reason = cause.calc_reason(p) _logger.trace('%s => %s because of %r', cause, p, p_reason) # p_reason is in the form (A and B and ...) # p_reason => p # Check each of the varset in p_reason that we haven't # already considered: # - if the variable was assigned at the current level, # mark it for expansion # - otherwise, add it to learnt for v in p_reason: v_abs = abs(v) if seen[v_abs]: continue seen[v_abs] = True level = self._levels[v_abs] if level == self._decision_level: # We deduced this var since the last decision. # It must be in self._trail, so we'll get to it # soon. Remember not to stop until we've processed it. counter += 1 elif level > 0: # We won't expand v, just remember it. # (we could expand it if it's not a decision, but # apparently not doing so is useful) learnt.append(-v) learnt_level = max(learnt_level, level) # At this point, counter is the number of assigned # varset in self._trail at the current decision level that # we've seen. That is, the number left to process. Pop # the next one off self._trail (as well as any unrelated # varset before it; everything up to the previous # decision has to go anyway). # On the first time round the loop, we must find the # conflict depends on at least one assignment at the # current level. Otherwise, simply setting the decision # variable caused a clause to conflict, in which case # the clause should have asserted not(decision-variable) # before we ever made the decision. # On later times round the loop, counter was already > # 0 before we started iterating over p_reason. assert counter > 0 while True: p = self._trail[-1] cause = self._reasons[abs(p)] self._undo_one() if seen[abs(p)]: break _logger.trace('(irrelevant)') counter -= 1 if counter <= 0: assert counter == 0 # If counter = 0 then we still have one more # literal (p) at the current level that we # could expand. However, apparently it's best # to leave this unprocessed (says the minisat # paper). break # p is the literal we decided to stop processing on. It's either # a derived variable at the current level, or the decision that # led to this level. Since we're not going to expand it, add it # directly to the learnt clause. learnt[0] = -p return learnt, learnt_level