# COMP3620/6320 Artificial Intelligence
# The Australian National University – 2021
# Authors: COMP3620 team
“”” !!!!!!!!!!!!!!!!!!!!! STOP !!!!!!!!!!!!!!!!!!!!!!!!!!!!
This file is big and messy and deals with the translation from PDDL (with ADL)
to some nice internal data structures.
To look here is to take the first step down the path to madness.
Look at strips_problem.py instead.
import itertools
import strips_problem
POST_COND, PRE_COND, ProblemException, cond_prefix,
default_type_name, grounding_error_code,
inequality_prefix, lower_var_alphabet)
class Object:
“”” A PDDL object “””
def __init__(self, name, otype, constant):
“”” Make a new Object
(Object, str, Type, bool) -> None
self.name = name
self.otype = otype
self.constant = constant
def __dump__(self):
“”” Return a full string representation of the object
(Object) -> None
return self.name + ” – ” + str(self.otype)
def __str__(self):
“”” Return a short string representation of the object
(Object) -> str
return self.name
class Type:
“”” A PDDL type “””
def __init__(self, name, parent):
“”” Make a new Type
(Type, str, Type) -> None
self.name = name
self.parent = parent
self.objects = []
def __dump__(self):
“”” Return a full string representation of the type
(Type) -> None
if self.parent:
return self.name + ” – ” + self.parent.name
return self.name
def __str__(self):
“”” Return a short string representation of the type
(Type) -> str
return self.name
def add_object(self, obj):
“”” Add the object to the type and to all of its ancestors.
(Type, str) -> None
if obj not in self.objects:
if self.parent:
class Function:
“”” A PDDL numeric function “””
def __init__(self, name, variables, types):
“”” Make a new Function
(Function, str, [str], [Type]) -> None
self.name = name
self.variables = variables
self.types = types
self.values = {}
def __dump__(self):
“”” Return a full string representation of the function
(Function) -> None
return “( ” + self.name + ” ” + ” “.join([x + ” – ” + str(y)\
for x, y in zip(self.variables, self.types)]) + ” ) – numeric”
def __str__(self):
“”” Return a short string representation of the function
(Function) -> str
return “( ” + self.name + ” ” + ” “.join(self.variables) + ” )”
class Predicate:
“”” A domain predicate “””
def __init__(self, name, variables, types):
“”” Make a new Predicate
(Predicate, str, [str], [Type]) -> None
self.name = name
self.variables = variables
self.types = types
self.groundings = []
self.neg_groundings = []
def __dump__(self):
“”” Return a full string representation of the predicate
(Predicate) -> None
return self.__str__()
def __str__(self):
“”” Return a short string representation of the predicate
(Predicate) -> str
return “( ” + self.name + ” ” + ” “.join([x + ” – ” + str(y)\
for x, y in zip(self.variables, self.types)]) + ” )”
def __lt__(self, other) :
return str(self) < str(other)
class Condition:
""" A formula used as a precondition or head of an axiom or conditional effect. """
def __init__(self):
""" Make a new condition.
(Condition) -> None
self.groundings = []
self.desc = None
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(Condition, { Predicate : [DerivedPredicate] }, [Predicate]) -> Condition
return self
def has_quant(self):
“”” Return true iff this condition or one of its descendants is a quantifier
(Condition) -> bool
return False
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(Condition) -> None
return self
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(Condition, { str : Predicate }, { str : Type }, bool) -> None
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this
condition. Default to doing nothing.
(Condition, set([(Predicate, [str]), set([(Predicate, [str])]) -> None
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(Condition, set([Condition]), bool) -> None
class PredicateCondition(Condition):
“”” A formula consisting of just a Predicate “””
def __init__(self, pred, variables):
“”” Make a new PredicateCondition.
(PredicateCondition, Predicate, [str]) -> None
super(PredicateCondition, self).__init__()
self.pred = pred
self.variables = variables
self.sign = True
self.ground_conditions = {}
def __dump__(self):
“”” Return a full string representation of the condition
(PrecdicateCondition) -> None
return self.__str__()
def __str__(self):
“”” Return a short string representation of the condition
(PredicateCondition) -> str
if self.sign:
return “(” + self.pred.name + ” ” + ” “.join(self.variables) + “)”
return “(not (” + self.pred.name + ” ” + ” “.join(self.variables) + “))”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(PredicateCondition, { str : str }) -> PredicateCondition
return PredicateCondition(self.pred, [substitutions[v] for v in self.variables])
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the conditions
of the derived predicates.
(PredicateCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> Condition
if self.pred in derived_predicates:
if self.pred in seen_preds:
raise ProblemException(“Error cycle in derived predicate definitions: ” +\
self.pred.name, grounding_error_code)
sp_copy = list(seen_preds)
dp_copies = []
for dp in derived_predicates[self.pred]:
dp.precondition = dp.precondition.substitute_derived_predicates(derived_predicates, sp_copy)
dp_copies.append(dp.precondition.substitute_vars(dict(list(zip(dp.param_names, self.variables)))))
if len(dp_copies) > 1:
return OrCondition(dp_copies)
return dp_copies[0]
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(PredicateCondition, bool) -> Condition
if negation:
self.sign = False
return self
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(Condition, { str : Predicate }, { str : Type } ) -> None
if not self.sign and collect:
if self.pred.name not in neg_precs:
neg_precs[self.pred.name] = self.pred
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(PredicateCondition, { str : Type }) -> [str]
self.relevant_vars = []
self.var_types = {}
self.var_indices = {} #Each element here is a list as there might be dups
for vid, var in enumerate(self.variables):
if var[0] == “?”:
assert var in var_types
self.var_types[var] = var_types[var]
if var not in self.var_indices:
self.var_indices[var] = []
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Make a condition code for this condition.
(PredicateCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this effect.
(PredicateCondition, set([(Predicate, [str])], set([(Predicate, [str])) -> None
if self.sign:
for grounding in self.groundings:
neg_candidates.discard((self.pred, grounding))
for grounding in self.groundings:
candidates.discard((self.pred, grounding))
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
predicate. Also conditions from this action which have no groundings,
which refer only to static predicates, equalities, etc.
(PredicateCondition, set([(Predicate, (str, …))]),
set([(Predicate, (str, …))])) -> Condition
new_groundings = []
for grounding in self.groundings:
t_grounding = list(self.variables)
for vid, var in enumerate(self.relevant_vars):
for vi in self.var_indices[var]:
t_grounding[vi] = grounding[vid]
t_grounding = tuple(t_grounding)
pg_pair = (self.pred, t_grounding)
if (not self.sign and pg_pair not in neg_static_preds) or\
(self.sign and pg_pair not in static_preds):
self.ground_conditions[grounding] = tuple(t_grounding)
self.groundings = new_groundings
if not self.groundings:
return None
return self
def make_flat_effects(self, ground_effects, grounding):
“”” Make the flat list of ground effects
(PredicateCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_effects.append((self, grounding, self.sign))
def make_flat_preconditions(self, ground_precs, grounding):
“”” Make the flat list of ground preconditions
(PredicateCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_precs.append((self, grounding, self.sign))
class NotCondition(Condition):
“”” A formula consisting of just negation – Instances of this class are
removed in the conversion to NNF.
def __init__(self, condition):
“”” Make a new NotCondition.
(NotCondition, Condition) -> None
super(NotCondition, self).__init__()
self.condition = condition
self.desc = NOT_CONDITION
def __dump__(self):
“”” Return a full string representation of the condition
(NotCondition) -> str
return “(not ” + self.condition.__dump__() + “)”
def __str__(self):
“”” Return a short string representation of the condition
(NotCondition) -> str
return “(not …)”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(NotCondition, { str : str }) -> NotCondition
return NotCondition(self.condition.substitute_vars(substitutions))
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(NotCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> NotCondition
self.condition = self.condition.substitute_derived_predicates(derived_predicates,
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(NotCondition, bool) -> Condition
return self.condition.nnf(not negation)
class AndCondition(Condition):
“”” A formula consisting of a conjunction “””
def __init__(self, conditions):
“”” Make a new AndCondition.
(AndCondition, [Condition]) -> None
super(AndCondition, self).__init__()
self.conditions = conditions
self.ground_conditions = []
self.desc = AND_CONDITION
def __dump__(self):
“”” Return a full string representation of the condition
(AndCondition) -> str
return “(and ” + ” “.join([x.__dump__() for x in self.conditions]) + “)”
def __str__(self):
“”” Return a short string representation of the condition
(AndCondition) -> str
return “(and …)”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(AndCondition, { str : str }) -> AndCondition
return AndCondition([cond.substitute_vars(substitutions) for cond in self.conditions])
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(AndCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> AndCondition
for cid, cond in enumerate(self.conditions):
self.conditions[cid] = cond.substitute_derived_predicates(derived_predicates, seen_preds)
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(AndCondition, bool) -> Condition
for cid, cond in enumerate(self.conditions):
self.conditions[cid] = cond.nnf(negation)
if negation:
return OrCondition(self.conditions)
return self
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(AndCondition) -> None
child_conjs = [cond for cond in self.conditions\
if isinstance(cond, AndCondition)]
for cond in child_conjs:
for cond in self.conditions:
def has_quant(self):
“”” Return true iff this condition or one of its descendants is a quantifier
(AndCondition) -> bool
for cond in self.conditions:
if cond.has_quant():
return True
return False
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(AndCondition, { str : Predicate }, { str : Type }, bool) -> None
for cond in self.conditions:
cond.collect_neg_precs(neg_precs, param_types, collect)
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(AndCondition, { str : Type }) -> [str]
self.relevant_vars = []
for condition in self.conditions:
for rv in condition.compute_relevant_vars(var_types):
if rv not in self.relevant_vars:
self.var_types = {}
for rv in self.relevant_vars:
assert rv in var_types
self.var_types[rv] = var_types[rv]
self.var_indices = dict([(v, [i]) for i, v in enumerate(self.relevant_vars)])
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(AndCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
for condition in self.conditions:
condition.assign_cond_code(cond_index, code_conds)
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this cond.
(AndCondition, set([(Predicate, [str])], set([(Predicate, [str])) -> None
for condition in self.conditions:
condition.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
sub_conditions and prune statics, and conditions without groundings.
(AndCondition, set([(Predicate, (str, …))]),
set([(Predicate, (str, …))])) -> Condition
new_conditions = []
for condition in self.conditions:
condition = condition.link_groundings(static_preds, neg_static_preds)
if isinstance(condition, AndCondition):
for cond in condition.conditions:
elif condition is not None:
sub_indices = [self.var_indices[v][0] for v in condition.relevant_vars]
ground_conditions = {}
for grounding in self.groundings:
sub_grounding = tuple([grounding[i] for i in sub_indices])
if sub_grounding in condition.groundings:
ground_conditions[grounding] = sub_grounding
if ground_conditions:
self.conditions = new_conditions
new_groundings = []
for grounding in self.groundings:
for ground_conditions in self.ground_conditions:
if grounding in ground_conditions:
self.groundings = new_groundings
if not self.groundings:
return None
if len(self.conditions) == 1:
return self.conditions[0]
return self
def make_flat_effects(self, ground_effects, grounding):
“”” Make the flat list of ground effects
(AndCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
for cid, condition in enumerate(self.conditions):
condition.make_flat_effects(ground_effects, self.ground_conditions[cid][grounding])
def make_flat_preconditions(self, ground_precs, grounding):
“”” Make the flat list of ground preconditions
(AndCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
for cid, condition in enumerate(self.conditions):
if grounding in self.ground_conditions[cid]:
condition.make_flat_preconditions(ground_precs, self.ground_conditions[cid][grounding])
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(AndCondition, set([Condition])) -> None
if do_encode:
for condition in self.conditions:
condition.get_encode_conds(encode_conds, do_encode)
class OrCondition(Condition):
“”” A formula consisting of a disjunction “””
def __init__(self, conditions):
“”” Make a new OrCondition.
(OrCondition, [Condition]) -> None
super(OrCondition, self).__init__()
self.conditions = conditions
self.ground_conditions = []
self.desc = OR_CONDITION
def __dump__(self):
“”” Return a full string representation of the condition
(OrCondition) -> str
return “(or ” + ” “.join([x.__dump__() for x in self.conditions]) + “)”
def __str__(self):
“”” Return a short string representation of the condition
(OrCondition) -> str
return “(or …)”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(OrCondition, { str : str }) -> OrCondition
return OrCondition([cond.substitute_vars(substitutions) for cond in self.conditions])
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(OrCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> OrCondition
for cid, cond in enumerate(self.conditions):
self.conditions[cid] = cond.substitute_derived_predicates(derived_predicates,
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(OrCondition, bool) -> Condition
for cid, cond in enumerate(self.conditions):
self.conditions[cid] = cond.nnf(negation)
if negation:
return AndCondition(self.conditions)
return self
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(OrCondition) -> None
child_conjs = [cond for cond in self.conditions if isinstance(cond, OrCondition)]
for cond in child_conjs:
for cond in self.conditions:
def has_quant(self):
“”” Return true iff this condition or one of its descendants is a quantifier
(OrCondition) -> bool
for cond in self.conditions:
if cond.has_quant():
return True
return False
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(OrCondition, { str : Predicate }, { str : Type }, bool) -> None
for cond in self.conditions:
cond.collect_neg_precs(neg_precs, param_types, collect)
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition
along with their types.
(OrCondition, { str : Type }) -> [str]
self.relevant_vars = []
for condition in self.conditions:
for rv in condition.compute_relevant_vars(var_types):
if rv not in self.relevant_vars:
self.var_types = {}
for rv in self.relevant_vars:
assert rv in var_types
self.var_types[rv] = var_types[rv]
self.var_indices = dict([(v, [i]) for i, v in enumerate(self.relevant_vars)])
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(OrCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
for condition in self.conditions:
condition.assign_cond_code(cond_index, code_conds)
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this cond.
(OrCondition, set([(Predicate, [str])], set([(Predicate, [str])) -> None
for condition in self.conditions:
condition.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
sub_conditions and prune statics, and conditions without groundings.
(OrCondition, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> Condition
new_conditions = []
for condition in self.conditions:
condition = condition.link_groundings(static_preds, neg_static_preds)
if isinstance(condition, OrCondition):
for cond in condition.conditions:
elif condition is not None:
ground_conditions = {}
sub_indices = [self.var_indices[v] for v in condition.relevant_vars]
for grounding in self.groundings:
sub_grounding = tuple([grounding[i] for i in sub_indices])
if sub_grounding in condition.groundings:
ground_conditions[grounding] = sub_grounding
if ground_conditions:
self.conditions = new_conditions
new_groundings = []
for grounding in self.groundings:
for ground_conditions in self.ground_conditions:
if grounding in ground_conditions:
self.groundings = new_groundings
if not self.groundings:
return None
if len(self.conditions) == 1:
return self.conditions[0]
return self
def make_flat_preconditions(self, ground_precs, grounding):
“”” Make the flat list of ground preconditions
(OrCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_precs.append((self, grounding))
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(OrCondition, set([Condition])) -> None
for condition in self.conditions:
condition.get_encode_conds(encode_conds, True)
class ForAllCondition(Condition):
“”” A formula consisting of a forall quantifier, a variable, and a condition. “””
def __init__(self, var, v_type, condition):
“”” Make a new ForAllCondition.
(ForAllCondition, str, Type, Condition) -> None
super(ForAllCondition, self).__init__()
self.var = var
self.v_type = v_type
self.condition = condition
self.ground_conditions = {}
def __dump__(self):
“”” Return a full string representation of the condition
(ForAllCondition) -> str
out_str = “(forall (” + self.var
if self.v_type:
out_str += ” – ” + str(self.v_type)
return out_str + “) ” + self.condition.__dump__() + “)”
def __str__(self):
“”” Return a short string representation of the condition
(ForAllCondition) -> str
out_str = “(forall ” + self.var
if self.v_type:
out_str += ” – ” + str(self.v_type)
return out_str + ” … )”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition. We might need to also rename the
quantifier variable.
(ForAllCondition, { str : str }) -> ForAllCondition
subs = dict(substitutions)
if self.var in list(substitutions.values()):
for v in lower_var_alphabet:
if v not in list(substitutions.values()):
subs[self.var] = v
assert self.var in subs
subs[self.var] = self.var
return ForAllCondition(self.var, self.v_type, self.condition.substitute_vars(subs))
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(ForAllCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> ForAllCondition
self.condition = self.condition.substitute_derived_predicates(derived_predicates,
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(ForAllCondition, bool) -> Condition
self.condition = self.condition.nnf(negation)
if negation:
return ExistsCondition(self.var, self.v_type, self.condition)
return self
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(ForAllCondition) -> None
def has_quant(self):
“”” Return true iff this condition or one of its descendants is a quantifier
(ForAllCondition) -> bool
return True
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(ForAllCondition, { str : Predicate }, { str : Type }, bool) -> None
param_types = dict(param_types)
param_types[self.var] = self.v_type
self.condition.collect_neg_precs(neg_precs, param_types, collect)
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(ForAllCondition, { str : Type } ) -> [str]
var_types = dict(var_types)
assert self.var not in var_types
var_types[self.var] = self.v_type
self.relevant_vars = list(self.condition.compute_relevant_vars(var_types))
if self.var in self.relevant_vars:
print(“Warning: problem contains a useless forall quantifier”)
self.var_types = {}
for rv in self.relevant_vars:
assert rv in var_types
self.var_types[rv] = var_types[rv]
self.var_indices = dict([(v, [i]) for i, v in enumerate(self.relevant_vars)])
self.var_indices[self.var] = [len(self.relevant_vars)]
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(ForAllCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
self.condition.assign_cond_code(cond_index, code_conds)
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this cond.
(ForAllCondition, set([(Predicate, [str])], set([(Predicate, [str])) -> None
self.condition.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
We do the quantifiers backwards (working from their childrens
groundings to theirs. This is so we can deal with the quantifiers
introduced variable.
(ForAllCondition, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> Condition
self.condition = self.condition.link_groundings(static_preds, neg_static_preds)
if self.condition is None:
return None
for cgrounding in self.condition.groundings:
grounding = [None]*len(self.relevant_vars)
for vid, var in enumerate(self.condition.relevant_vars):
index = self.var_indices[var][0]
if index >= len(self.relevant_vars):
var_val = cgrounding[vid]
grounding[index] = cgrounding[vid]
grounding = tuple(grounding)
if grounding not in self.ground_conditions:
self.ground_conditions[grounding] = {}
self.ground_conditions[grounding][var_val] = cgrounding
new_groundings = []
for grounding in self.groundings:
if grounding in self.ground_conditions:
self.groundings = new_groundings
if not self.groundings:
return None
return self
def make_flat_effects(self, ground_effects, grounding):
“”” Make the flat list of ground effects
(ForAllCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
for cgrounding in list(self.ground_conditions[grounding].values()):
self.condition.make_flat_effects(ground_effects, cgrounding)
def make_flat_preconditions(self, ground_precs, grounding):
“”” Make the flat list of ground preconditions
(ForAllCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
for cgrounding in list(self.ground_conditions[grounding].values()):
self.condition.make_flat_preconditions(ground_precs, cgrounding)
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(ForAllCondition, set([Condition])) -> None
if do_encode:
self.condition.get_encode_conds(encode_conds, do_encode)
class ExistsCondition(Condition):
“”” A formula consisting of an exists quantifier, a variable, and a condition.”””
def __init__(self, var, v_type, condition):
“”” Make a new ExistsCondition.
(ExistsCondition, str, Type, Condition) -> None
super(ExistsCondition, self).__init__()
self.var = var
self.v_type = v_type
self.condition = condition
self.ground_conditions = {}
def __dump__(self):
“”” Return a full string representation of the condition
(ExistsCondition) -> str
out_str = “(exists (” + self.var
if self.v_type:
out_str += ” – ” + str(self.v_type)
return out_str + “) ” + self.condition.__dump__() + “)”
def __str__(self):
“”” Return a short string representation of the condition
(ExistsCondition) -> str
out_str = “(exists ” + self.var
if self.v_type:
out_str += ” – ” + str(self.v_type)
return out_str + ” … )”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition. We might need to also rename the
quantifier variable.
(ExistsCondition, { str : str }) -> ExistsCondition
subs = dict(substitutions)
if self.var in list(substitutions.values()):
for v in lower_var_alphabet:
if v not in list(substitutions.values()):
subs[self.var] = v
assert self.var in subs
subs[self.var] = self.var
return ExistsCondition(self.var, self.v_type, self.condition.substitute_vars(subs))
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the condition with the
conditions of the derived predicates.
(ExistsCondition, { Predicate : [DerivedPredicate] }, [Predicate]) -> ExistsCondition
self.condition = self.condition.substitute_derived_predicates(derived_predicates,
return self
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(ExistsCondition, bool) -> Condition
self.condition = self.condition.nnf(negation)
if negation:
return ForAllCondition(self.var, self.v_type, self.condition)
return self
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(ExistsCondition) -> None
def has_quant(self):
“”” Return true iff this condition or one of its descendants is a quantifier
(ExistsCondition) -> bool
return True
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(ExistsCondition, { str : Predicate }, { str : Type }, bool) -> None
param_types = dict(param_types)
param_types[self.var] = self.v_type
self.condition.collect_neg_precs(neg_precs, param_types, collect)
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(ExistsCondition, { str : Type }) -> [str]
var_types = dict(var_types)
assert self.var not in var_types
var_types[self.var] = self.v_type
self.relevant_vars = list(self.condition.compute_relevant_vars(var_types))
if self.var in self.relevant_vars:
print(“Warning: problem contains a useless exists quantifier”)
self.var_types = {}
for rv in self.relevant_vars:
assert rv in var_types
self.var_types[rv] = var_types[rv]
self.var_indices = dict([(v, [i]) for i, v in enumerate(self.relevant_vars)])
self.var_indices[self.var] = [len(self.relevant_vars)]
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(ExistsCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
self.condition.assign_cond_code(cond_index, code_conds)
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this cond.
(ExistsCondition, set([(Predicate, [str])], set([(Predicate, [str])) -> None
self.condition.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
We do the quantifiers backwards (working from their childrens
groundings to theirs. This is so we can deal with the quantifiers
introduced variable.
(ExistsCondition, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> Condition
self.condition = self.condition.link_groundings(static_preds, neg_static_preds)
if self.condition is None:
return None
for cgrounding in self.condition.groundings:
grounding = [None]*len(self.relevant_vars)
for vid, var in enumerate(self.condition.relevant_vars):
index = self.var_indices[var][0]
if index >= len(self.relevant_vars):
var_val = cgrounding[vid]
grounding[index] = cgrounding[vid]
grounding = tuple(grounding)
if grounding not in self.ground_conditions:
self.ground_conditions[grounding] = {}
self.ground_conditions[grounding][var_val] = cgrounding
new_groundings = []
for grounding in self.groundings:
if grounding in self.ground_conditions:
self.groundings = new_groundings
if not self.groundings:
return None
return self
def make_flat_preconditions(self, ground_precs, grounding):
“”” Make the flat list of ground preconditions
(ExistsCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_precs.append((self, grounding))
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(ForAllCondition, set([Condition])) -> None
self.condition.get_encode_conds(encode_conds, True)
class IncreaseCondition(Condition):
“”” A formula consisting of an increase condition, a variable, and a value.
def __init__(self, var, value, value_args):
“”” Make a new IncreaseCondition.
(IncreaseCondition, Function, int/Function, [str]) -> None
super(IncreaseCondition, self).__init__()
self.var = var
self.value = value
self.value_args = value_args
self.ground_conditions = {}
def __dump__(self):
“”” Return a full string representation of the condition
(IncreaseCondition) -> str
return self.__str__()
def __str__(self):
“”” Return a short string representation of the condition
(IncreaseCondition) -> str
if isinstance(self.value, int):
return “(increase ” + str(self.var) + ” ” + str(self.value) + “)”
return “(increase ” + str(self.var) + ” (” + self.value.name + ” ” +\
” “.join(self.value_args) + “))”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(IncreaseCondition, { str : str }) -> IncreaseCondition
return IncreaseCondition(self.var, self.value, [substitutions[v] for v in self.value_args])
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(IncreaseCondition, bool) -> Condition
assert not negation
return self
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(IncreaseCondition, { str : Type } ) -> [str]
self.relevant_vars = list(self.value_args)
self.var_types = {}
self.var_indices = {}
for vid, rv in enumerate(self.relevant_vars):
assert rv in var_types
self.var_types[rv] = var_types[rv]
if rv not in self.var_indices:
self.var_indices[rv] = []
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(IncreaseCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
def link_groundings(self, static_preds, neg_static_preds):
“”” If the value is not a number then we need to link the groundings
to their values.
(IncreaseCondition, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> Condition
sub_indices = [self.var_indices[v] for v in self.value_args]
for grounding in self.groundings:
f_args = tuple([grounding[ii] for i in sub_indices for ii in i])
if isinstance(self.value, Function):
if f_args not in self.value.values:
raise ProblemException(“Unknown increase function args: ” +
“, “.join(f_args), grounding_error_code)
self.ground_conditions[grounding] = self.value.values[f_args]
self.ground_conditions[grounding] = self.value
return self
def make_flat_effects(self, ground_effects, grounding):
“”” Make the flat list of ground effects
(IncreaseCondition, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_effects.append((self, grounding))
class EqualsCondition(Condition):
“”” A formula consisting of an equlity constraint “””
def __init__(self, variables):
“”” Make a new EqualsCondition.
(EqualsCondition, [int/Constant]) -> None
super(EqualsCondition, self).__init__()
self.variables = variables
self.sign = True
def __dump__(self):
“”” Return a full string representation of the condition
(EqualsCondition) -> str
return self.__str__()
def __str__(self):
“”” Return a short string representation of the condition
(EqualsCondition) -> str
if self.sign:
return “(= ” + self.variables[0] + ” ” + self.variables[1] + “)”
return “(!= ” + self.variables[0] + ” ” + self.variables[1] + “)”
def substitute_vars(self, substitutions):
“”” Replace the variables in this condition as specified in substitutions.
Return a copy of the condition.
(EqualsCondition, { str : str }) -> EqualsCondition
return EqualsCondition([substitutions[v] for v in self.variables])
def nnf(self, negation = False):
“”” Convert the condition to negation normal form.
(EqualsCondition, bool) -> Condition
if negation:
self.sign = False
return self
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(EqualsCondition, { str : Predicate }, { str : Type }, bool) -> None
if not self.sign and collect:
if inequality_prefix not in neg_precs:
neg_precs[inequality_prefix] = []
for v in self.variables:
assert v in param_types
neg_precs[inequality_prefix].append([param_types[v] for v in self.variables])
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this condition.
(EqualsCondition, { str : Type }) -> [str]
self.relevant_vars = []
self.var_types = {}
self.var_indices = {}
for vid, rv in enumerate(self.variables):
if rv[0] == “?”:
assert rv in var_types
self.var_types[rv] = var_types[rv]
if rv not in self.var_indices:
self.var_indices[rv] = []
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(EqualsCondition, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
def link_groundings(self, static_preds, neg_static_preds):
“”” This condition will be removed.
(EqualsCondition, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> None
return None
class ConditionalEffect(Condition):
“”” A conditional effect. “””
def __init__(self, condition, effect):
“”” Make a new conditional effect.
(ConditionalEffect, Condition, Condition) -> None
super(ConditionalEffect, self).__init__()
self.condition = condition
self.effect = effect
self.ground_preconditions = {}
self.ground_effects = {}
self.flat_ground_effects = {}
self.flat_ground_preconditions = {}
def __dump__(self):
“”” Write a string representation of the effect.
(ConditionalEffect) -> None
return “(when ” + self.condition.__dump__() + ” ” +\
self.effect.__dump__() + “)”
def __str__(self):
“””Return a short string representation of the effect
(ConditionalEffect) -> str
return “(when …)”
def substitute_derived_predicates(self, derived_predicates, seen_preds):
“”” Replace the derived predicates in the conditional effect with the
conditions of the derived predicates.
(ConditionalEffect, { Predicate : [DerivedPredicate] }, [Predicate]) -> ConditionalEffect
self.condition = self.condition.substitute_derived_predicates(derived_predicates, seen_preds)
return self
def nnf(self, negation = False):
“”” Convert the conditions of this conditional effect into NNF.
(ConditionalEffect, bool) -> Condition
assert not negation
self.condition = self.condition.nnf(negation)
self.effect = self.effect.nnf(negation)
return self
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(ConditionalEffect) -> None
def collect_neg_precs(self, neg_precs, param_types, collect = True):
“”” Recurse through the condition tree looking for negative preconditions
(ConditionalEffect, { str : Predicate }, { str : Type } ) -> None
self.condition.collect_neg_precs(neg_precs, param_types, True)
def compute_relevant_vars(self, var_types):
“”” Compute, store, and return the relevant variables to this effect.
(ConditionalEffect, { str : Type }) -> [str]
self.relevant_vars = list(self.condition.compute_relevant_vars(var_types))
eff_rv = self.effect.compute_relevant_vars(var_types)
for rv in eff_rv:
if rv not in self.relevant_vars:
self.var_types = {}
for rv in self.relevant_vars:
assert rv in var_types
self.var_types[rv] = var_types[rv]
self.var_indices = dict([(v, [i]) for i, v in enumerate(self.relevant_vars)])
return self.relevant_vars
def assign_cond_code(self, cond_index, code_conds):
“”” Assign a code to this condition.
(ConditionalEffect, [int], { str : Condition }) -> None
self.cond_code = cond_prefix + str(cond_index[0])
code_conds[self.cond_code] = self
cond_index[0] += 1
self.condition.assign_cond_code(cond_index, code_conds)
self.effect.assign_cond_code(cond_index, code_conds)
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this effect.
(ConditionalEffect, set([(Predicate, [str])], set([(Predicate, [str])) -> None
self.effect.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this condition to the groundings in its
sub_conditions and remove condtions with no groundings or that are
(ConditionalEffect, set([(Predicate, (str, …))],
set([(Predicate, (str, …))])) -> Condition
self.effect = self.effect.link_groundings(static_preds, neg_static_preds)
if self.effect is None:
return None
self.condition = self.condition.link_groundings(static_preds, neg_static_preds)
if self.condition is None:
return self.effect
pre_sub_indices = [self.var_indices[v][0] for v in self.condition.relevant_vars]
pre_groundings = set(self.condition.groundings)
eff_sub_indices = [self.var_indices[v][0] for v in self.effect.relevant_vars]
eff_groundings = set(self.effect.groundings)
new_groundings = []
for grounding in self.groundings:
eff_sub_grounding = tuple([grounding[i] for i in eff_sub_indices])
if eff_sub_grounding in eff_groundings:
self.ground_effects[grounding] = eff_sub_grounding
pre_sub_grounding = tuple([grounding[i] for i in pre_sub_indices])
if pre_sub_grounding in pre_groundings:
self.ground_preconditions[grounding] = pre_sub_grounding
self.groundings = new_groundings
if not self.groundings:
return None
return self
def make_flat_effects(self, ground_effects, grounding):
“”” Make the flat list of ground effects
(ConditionalEffect, [(Predicate, bool, (str, …))], (str, …)) -> None
ground_effects.append((self, grounding))
t_effects = []
self.effect.make_flat_effects(t_effects, self.ground_effects[grounding])
self.flat_ground_effects[grounding] = t_effects
t_precs = []
self.condition.make_flat_preconditions(t_precs, self.ground_preconditions[grounding])
self.flat_ground_preconditions[grounding] = t_precs
def get_encode_conds(self, encode_conds, do_encode):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(ForAllCondition, set([Condition])) -> None
self.condition.get_encode_conds(encode_conds, False)
self.effect.get_encode_conds(encode_conds, False)
class Action:
“”” A non-grounded planning action “””
def __init__(self, name, parameters, types, precondition, effect, is_derived):
“”” Make a new a action.
(Action, str, [str], [Type], Condition, Condition, bool) -> None
self.name = name
self.param_names = parameters
self.parameters = list(zip(parameters, types))
self.param_types = dict([(p, t) for (p, t) in self.parameters])
self.precondition = precondition
self.effect = effect
self.is_derived = is_derived
self.is_noop = False
self.groundings = []
self.ground_preconditions = {}
self.ground_effects = {}
#These will be lists of PredicateConditions or Conditional Effects
self.flat_ground_effects = {}
self.flat_ground_preconditions = {}
def __dump__(self):
“”” Write a string representation of the operator to the given file in
the fast-downward output format.
(Action, file) -> None
if self.is_derived:
return “(:derived ” + self.effect.__dump__() + “\n ” +\
self.precondition.__dump__() + “\n)\n”
out_str = “(:action ” + self.name + “\n”
out_str += ” :parameters (” + ” “.join([p + ” – ” + str(t)
for p,t in self.parameters]) + “)\n”
out_str += ” :precondition (and\n”
if isinstance(self.precondition, PredicateCondition):
out_str += ” ” + self.precondition.__dump__()
assert isinstance(self.precondition, AndCondition)
for cond in self.precondition.conditions:
out_str += ” ” + cond.__dump__() + “\n”
out_str += ” )\n”
out_str += ” :effect (and\n”
if isinstance(self.effect, PredicateCondition):
out_str += ” ” +self.effect.__dump__()
assert isinstance(self.effect, AndCondition)
for cond in self.effect.conditions:
out_str += ” ” + cond.__dump__() + “\n”
out_str += ” )\n”
out_str += “)\n”
return out_str
def __str__(self):
“”” Return a short string representation of the operator.
(Action) -> str
if self.is_derived:
return “(:derived ” + self.pred_cond.__dump__() + ” …)”
return “( ” + self.name + ” … )”
def substitute_derived_predicates(self, derived_predicates):
“”” Replace the derived predicates in the actions precondition and
the conditions of its conditional effects, with the conditions
of the derived predicates.
(Action, { (Predicate) : [DerivedPredicate] }) -> None
if self.precondition:
self.precondition = self.precondition.substitute_derived_predicates(derived_predicates, [])
self.effect = self.effect.substitute_derived_predicates(derived_predicates, [])
def nnf(self):
“”” Convert the conditions of this conditional effect into NNF.
(Action, bool) -> None
if self.precondition:
self.precondition = self.precondition.nnf()
self.effect = self.effect.nnf()
def flatten(self):
“”” Flatten conjunctions and disjunctions.
(Action) -> None
if self.precondition:
def collect_neg_precs(self, neg_precs, param_types):
“”” Recurse through the condition tree looking for negative preconditions
(Action, { str : Predicate }, { str : Type } ) -> None
if self.precondition:
self.precondition.collect_neg_precs(neg_precs, param_types, True)
self.effect.collect_neg_precs(neg_precs, param_types, False)
def compute_relevant_vars(self):
“”” Compute, store, and return the relevant variables to this dp.
(Action) -> set([str])
self.relevant_vars = list(self.param_names)
if self.precondition:
self.var_indices = dict([(v, i) for i, v in enumerate(self.relevant_vars)])
return self.relevant_vars
def detect_statics(self, candidates, neg_candidates):
“”” Remove entries from candidates which are deleted somewhere in this
(Action, set([(Predicate, [str])], set([(Predicate, [str])) -> None
self.effect.detect_statics(candidates, neg_candidates)
def link_groundings(self, static_preds, neg_static_preds):
“”” Link the groundings in this action to the groundings in its
pre and post conditions. And do the same for the conditions in these.
Remove conditions from this action which have no groundings,
which refer only to static predicates, equalities, etc.
(Action, set([(Predicate, (str, …))]), set([(Predicate, (str, …))])) -> None
self.effect = self.effect.link_groundings(static_preds, neg_static_preds)
if self.effect is None:
self.groundings = []
eff_groundings = set(self.effect.groundings)
if self.precondition:
self.precondition = self.precondition.link_groundings(static_preds, neg_static_preds)
pre_groundings = set(self.precondition.groundings)
new_groundings = []
for grounding in self.groundings:
eff_sub_grounding = tuple([grounding[self.var_indices[v]]\
for v in self.effect.relevant_vars])
if eff_sub_grounding in eff_groundings:
self.ground_effects[grounding] = eff_sub_grounding
if self.precondition:
pre_sub_grounding = tuple([grounding[self.var_indices[v]]\
for v in self.precondition.relevant_vars])
if pre_sub_grounding in pre_groundings:
self.ground_preconditions[grounding] = pre_sub_grounding
self.groundings = new_groundings
def make_flat_effects(self):
“”” Make the flat list of ground effects
(Action) -> None
for grounding in self.groundings:
flat_effs = []
self.effect.make_flat_effects(flat_effs, self.ground_effects[grounding])
self.flat_ground_effects[grounding] = flat_effs
def make_flat_preconditions(self):
“”” Make the flat list of ground preconditions
(Action) -> None
for grounding in self.groundings:
flat_precs = []
if self.precondition:
self.flat_ground_preconditions[grounding] = flat_precs
def get_encode_conds(self, encode_conds):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(Action, set([Condition])) -> None
self.precondition.get_encode_conds(encode_conds, False)
self.effect.get_encode_conds(encode_conds, False)
def make_strips_conditions(self):
“”” Make pre and post-conditions for each grounding which link directly
to the predicates involved. Assumes strips with negaitve precs.
(Action) -> None
self.strips_preconditions = {}
self.strips_effects = {}
for grounding in self.groundings:
precs = set()
for prec in self.flat_ground_preconditions[grounding]:
assert isinstance(prec[0], PredicateCondition)
precs.add((prec[0].pred, prec[0].ground_conditions[prec[1]], prec[2]))
self.strips_preconditions[grounding] = precs
effs = set()
for eff in self.flat_ground_effects[grounding]:
if isinstance(eff[0], PredicateCondition):
effs.add((eff[0].pred, eff[0].ground_conditions[eff[1]], eff[2]))
self.strips_effects[grounding] = effs
def __lt__(self, other) :
return str(self) < str(other)
class Problem:
""" An instance of a ADL planning problem. After parsing this class also
represents a PDDL planning problem. """
default_type = Type(default_type_name, None)
def __init__(self, name):
""" Make a new problem instance with the given name.
(Problem, str) -> None
self.name = name
self.problem = None
self.requirements = []
self.types = { self.default_type.name : self.default_type }
self.predicates = {}
self.functions = {}
self.actions = {}
self.derived_predicates = {}
self.objects = {}
self.initial_state = []
self.initial_state_set = None
self.goal = None
self.inequalities = []
self.inequality_set = None
self.neg_initial_state = []
self.neg_initial_state_set = None
self.neg_precs = set()
self.conditions = []
self.conditional_effects = []
#A map from condition codes to conditions
self.code_conds = {}
self.state_mutexes = None
self.first_layer_fluents = None
self.first_layer_actions = None
def __str__(self):
“”” Return a short string representation of the problem
(Problem) -> str
return self.name
def simplify(self):
“”” Convert to NNF and flatten conjunctions and disjunctions.
(Problem) -> None
neg_precs = {}
for action in list(self.actions.values()):
if self.derived_predicates:
action.collect_neg_precs(neg_precs, action.param_types)
if self.derived_predicates:
self.goal = self.goal.substitute_derived_predicates(self.derived_predicates, [])
self.derived_predicates = None
self.goal = self.goal.nnf()
self.goal.collect_neg_precs(neg_precs, {})
#Make lists of the negative fluents and inequalities
for pred in list(neg_precs.values()):
if isinstance(pred, list):
for t1, t2 in pred:
for v1, v2 in itertools.product(t1.objects, t2.objects):
if v1 != v2:
self.inequalities.append((v1.name, v2.name))
if not pred.types:
if (pred, ()) not in self.initial_state_set:
self.neg_initial_state.append((pred, ()))
for tvars in itertools.product(*[[o.name for o in t.objects] for t in pred.types]):
tvars = tuple(tvars)
if (pred, tvars) not in self.initial_state_set:
self.neg_initial_state.append((pred, tvars))
self.inequality_set = set(self.inequalities)
self.neg_initial_state_set = set(self.neg_initial_state)
def assign_cond_codes(self):
“”” Assign condition codes to the conditions in the problem.
(Problem) -> None
cond_index = [0]
self.goal.assign_cond_code(cond_index, self.code_conds)
for action in list(self.actions.values()):
if action.precondition:
action.precondition.assign_cond_code(cond_index, self.code_conds)
action.effect.assign_cond_code(cond_index, self.code_conds)
def compute_static_preds(self):
“”” Compute the static predicates.
(Problem) -> None
self.static_preds = set(self.initial_state_set)
self.neg_static_preds = set(self.neg_initial_state_set)
for action in list(self.actions.values()):
if self.static_preds or self.neg_static_preds:
action.detect_statics(self.static_preds, self.neg_static_preds)
else: break
def link_groundings(self):
“”” Link the groundings between all of the conditions and actions and
predicates. Also do the goal.
(Problem) -> None
#Remove statics from the initial state
if self.static_preds:
new_initial = []
for init in self.initial_state:
if init not in self.static_preds:
self.initial_state = new_initial
self.initial_state_set = set(self.initial_state)
if self.neg_static_preds:
#This should never occur – basically negative statics just dont exist
assert False
new_neg_initial = []
for init in self.neg_initial_state:
if init not in self.neg_static_preds:
self.neg_initial_state = new_neg_initial
self.neg_initial_state_set = set(self.neg_initial_state)
#Remove static groundings from predicates
for pred in list(self.predicates.values()):
new_groundings = []
for grounding in pred.groundings:
if (pred, grounding) not in self.static_preds:
pred.groundings = new_groundings
new_actions = []
for action in list(self.actions.values()):
action.link_groundings(self.static_preds, self.neg_static_preds)
if action.groundings:
self.actions = dict([(a.name, a) for a in new_actions])
self.goal = self.goal.link_groundings(self.static_preds, self.neg_static_preds)
def make_flat_effects(self):
“”” Make the flat list of ground effects
(Problem) -> None
for action in list(self.actions.values()):
def make_flat_preconditions(self):
“”” Make the flat list of ground preconditions (we only flatten up
until we hit disjunctions or existential quantifiers.
(Problem) -> None
for action in list(self.actions.values()):
flat_goal_precs = []
self.goal.make_flat_preconditions(flat_goal_precs, ())
self.flat_ground_goal_preconditions = flat_goal_precs
def get_encode_conds(self):
“”” Populate encode_conds with conditions that actually need to be
properly encoded for invariants preprocessing and in SAT.
(Problem, set([Condition])) -> None
self.encode_conds = set()
for action in list(self.actions.values()):
self.goal.get_encode_conds(self.encode_conds, False)
def make_cond_and_cond_eff_lists(self):
“”” Make seperate lists of conditions and conditional effects that will
actually be encoded.
(Problem) -> None
for cond in list(self.code_conds.values()):
if cond in self.encode_conds:
if isinstance(cond, ConditionalEffect):
def link_conditions_to_actions(self):
“”” Make links from the groundings of conditions to the groundings
of actions, where these groundings represent the direct pre- and
post conditions of actions. We use this for mutex and SSAs.
(Problem) -> None
for cond in itertools.chain(self.conditions, self.conditional_effects):
if cond.groundings:
cond.ground_precs = {}
cond.ground_nprecs = {}
cond.ground_adds = {}
cond.ground_dels = {}
for grounding in cond.groundings:
cond.ground_precs[grounding] = set()
cond.ground_nprecs[grounding] = set()
cond.ground_adds[grounding] = set()
cond.ground_dels[grounding] = set()
for pred in list(self.predicates.values()):
if pred.groundings:
pred.ground_precs = {}
pred.ground_nprecs = {}
pred.ground_adds = {}
pred.ground_dels = {}
for grounding in pred.groundings:
pred.ground_precs[grounding] = set()
pred.ground_nprecs[grounding] = set()
pred.ground_adds[grounding] = set()
pred.ground_dels[grounding] = set()
for action in list(self.actions.values()):
for grounding in action.groundings:
ag_pair = (action, grounding)
for prec in action.flat_ground_preconditions[grounding]:
if isinstance(prec[0], PredicateCondition):
if prec[2]:
for eff in action.flat_ground_effects[grounding]:
if isinstance(eff[0], PredicateCondition):
if eff[2]:
elif not isinstance(eff[0], IncreaseCondition):
def make_strips_conditions(self):
“”” Make pre and post-conditions for each grounding which link directly
to the predicates involved. Assumes strips with negaitve precs.
(Problem) -> None
for action in list(self.actions.values()):
def compute_conflict_mutex(self):
“”” Compute conflict mutex relationships.
(Problem) -> None
#Conflict mutex:
self.conflicts = {}
self.eff_eff_conflicts = {}
self.pre_eff_conflicts = {}
for action in list(self.actions.values()):
for grounding in action.groundings:
ag_pair = (action, grounding)
self.conflicts[ag_pair] = set()
self.eff_eff_conflicts[ag_pair] = set()
self.pre_eff_conflicts[ag_pair] = set()
for action1 in list(self.actions.values()):
for grounding1 in action1.groundings:
ag_pair1 = (action1, grounding1)
for eff in action1.flat_ground_effects[grounding1]:
if isinstance(eff[0], IncreaseCondition): continue
#Assuming STRIPS
if eff[2]:
e_list = eff[0].pred.ground_dels[eff[0].ground_conditions[eff[1]]]
p_list = eff[0].pred.ground_nprecs[eff[0].ground_conditions[eff[1]]]
e_list = eff[0].pred.ground_adds[eff[0].ground_conditions[eff[1]]]
p_list = eff[0].pred.ground_precs[eff[0].ground_conditions[eff[1]]]
#Eff – Eff
for ag_pair2 in e_list:
if ag_pair1 != ag_pair2:
#Pre – Eff
for ag_pair2 in p_list:
if ag_pair1 != ag_pair2:
def simulate_plan(self, plan):
“”” Simulate the suplied plan and return its validity and cost
(Problem, [[(Action, (str,))]]) -> bool, cost
########### ASSUMING STRIPS (with n-precs)
state = set(self.initial_state_set)
cost = 0
#The grounding might not just be enough to do this stuff because it
#does not take into account constants – we need to look
#inside the PredicateCondtions a bit deeper.
for sid, step in enumerate(plan):
step_pre = set()
step_npre = set()
step_add = set()
step_del = set()
for action, grounding in step:
action_add = set()
action_del = set()
for eff in action.flat_ground_effects[grounding]:
if isinstance(eff[0], IncreaseCondition):
cost += eff[0].ground_conditions[eff[1]]
assert isinstance(eff[0], PredicateCondition)
eff_pair = (eff[0].pred, eff[0].ground_conditions[eff[1]])
if eff[2]:
if eff_pair in action_del:
if eff_pair in step_npre or eff_pair in step_del:
print(“Error at step”, sid, “mutex conflict on:”,
eff_pair[0].name, eff_pair[1])
return False, 0
if eff_pair in action_add: continue
if eff_pair in step_pre or eff_pair in step_add:
print(“Error at step”, sid, “mutex conflict on:”,
eff_pair[0].name, eff_pair[1])
return False, 0
for pre in action.flat_ground_preconditions[grounding]:
assert isinstance(pre[0], PredicateCondition)
pre_pair = (pre[0].pred, pre[0].ground_conditions[pre[1]])
if pre[2]:
if pre_pair not in state:
print(“Error at step”, sid, “invalid precondition:”,
pre_pair[0].name, pre_pair[1], “of action:”, action.name, grounding)
return False, 0
if pre_pair in state:
print(“Error at step”, sid, “invalid neg precondition:”,
pre_pair[0].name, pre_pair[1], “of action:”, action.name, grounding)
return False, 0
for eff_pair in step_del:
for eff_pair in step_add:
#check goal
for pre in self.flat_ground_goal_preconditions:
assert isinstance(pre[0], PredicateCondition)
t_grounding = list(pre[0].variables)
for vid, var in enumerate(pre[0].relevant_vars):
for vi in pre[0].var_indices[var]:
t_grounding[vi] = pre[1][vid]
t_grounding = tuple(t_grounding)
pre_pair = (pre[0].pred, t_grounding)
if pre[2]:
if pre_pair not in state:
print(“Unsatisfied goal:”, pre_pair[0].name, ” “.join(pre_pair[1]))
return False, 0
if pre_pair in state:
print(“Unsatisfied negative goal:”, pre_pair[0].name, ” “.join(pre_pair[1]))
return False, 0
return True, cost
def make_strips_problem(self):
“”” Return an instance of strips_problem.Problem made from this problem.
(Problem) -> strips_problem.Problem
not_prefix = “not_”
problem = strips_problem.Problem()
problem.objects = dict(self.objects)
propositions = {}
for predicate in list(self.predicates.values()):
for grounding in predicate.groundings:
prop = strips_problem.Proposition(predicate.name, list(grounding))
propositions[(predicate, grounding)] = prop
neg_propositions = {}
actions = {}
for action in list(self.actions.values()):
for grounding in action.groundings:
strips_action = strips_problem.Action(action.name, list(grounding))
actions[(action, grounding)] = strips_action
for prec in action.flat_ground_preconditions[grounding]:
if not isinstance(prec[0], PredicateCondition):
raise ProblemException(“Error: complex precondition detected in action:” +
action.name, “\nThis system does not allow disjunction or existential ” +
“quantifiers in preconditions.”)
fg_pair = (prec[0].pred, prec[0].ground_conditions[prec[1]])
prop = propositions[fg_pair]
if not prec[2]:
if fg_pair not in neg_propositions:
neg_pred = not_prefix+fg_pair[0].name
prop = strips_problem.Proposition(neg_pred, list(fg_pair[1]))
propositions[(neg_pred, grounding)] = prop
neg_propositions[fg_pair] = prop
prop = neg_propositions[fg_pair]
for eff in action.flat_ground_effects[grounding]:
if isinstance(eff[0], ConditionalEffect):
raise ProblemException(“Error: this system does not support ” +
“conditional effects.”)
if isinstance(eff[0], PredicateCondition):
prop = propositions[(eff[0].pred, eff[0].ground_conditions[eff[1]])]
if eff[2]:
#Prune delete effects that are added by the same action
for action in problem.actions:
for prop in action.pos_effects:
if prop in action.neg_effects:
#Initial State
for predicate in list(self.predicates.values()):
for grounding in predicate.groundings:
prop = (predicate, grounding)
if prop in self.initial_state:
for prec in self.flat_ground_goal_preconditions:
if not isinstance(prec[0], PredicateCondition):
raise ProblemException(“Error: this system does not support ” +
“non-conjunctive goals.”)
fg_pair = (prec[0].pred, prec[0].ground_conditions[prec[1]])
prop = propositions[fg_pair]
if not prec[2]:
if fg_pair not in neg_propositions:
neg_pred = not_prefix+fg_pair[0].name
prop = strips_problem.Proposition(neg_pred, list(fg_pair[1]))
propositions[(neg_pred, grounding)] = prop
neg_propositions[fg_pair] = prop
prop = neg_propositions[fg_pair]
#If there are any negative preconditions, augment the effects of
#actions to deal with them
for (predicate, grounding), prop in list(neg_propositions.items()):
for action in predicate.ground_adds[grounding]:
strips_action = actions[action]
for action in predicate.ground_dels[grounding]:
strips_action = actions[action]
#Initial State
if (predicate, grounding) in self.initial_state:
#Prune useless actions that don’t add anything more than their precondititions
actions_to_remove = [action for action in problem.actions if not \
[prop for prop in action.pos_effects if prop not in action.preconditions]]
for action in actions_to_remove:
for prop in action.preconditions:
for prop in action.pos_effects:
for prop in action.neg_effects:
#First layers
if self.first_layer_actions is not None:
problem.action_first_step = {}
for action, step in list(self.first_layer_actions.items()):
if action in actions and actions[action] in problem.actions:
problem.action_first_step[actions[action]] = step
#State mutexes
if self.state_mutexes is not None:
problem.fluent_mutex = {}
for step, mutexes in list(self.state_mutexes.items()):
problem.fluent_mutex[step] = []
for (prop1, grounding1, sign1), (prop2, grounding2, sign2) in mutexes:
if not sign1 or not sign2: continue
problem.fluent_mutex[step].append((propositions[(prop1, grounding1)],
propositions[(prop2, grounding2)]))
return problem
def make_plan_from_strips(self, plan):
“”” Return a plan made for this type of Problem from a plan made for
(Problem, [[strips_problem.Action]]) -> [[(problem.Action, grounding)]]
new_plan = []
for actions in plan:
new_plan.append([(self.actions[a.name], tuple(a.parameters)) for a in actions])
return new_plan
def simulate_strips_plan(self, problem, plan):
“”” Simulate the suplied plan and return its validity and cost
(Problem, [[(Action, (str,))]]) -> bool, cost
state = set(problem.pos_initial_state)
success = True
for sid, step in enumerate(plan):
step_pre = set()
step_add = set()
step_del = set()
for action in step:
for pre in action.preconditions:
if pre not in state:
print(“Error at step”, sid, “unsatisifed preconditon”, pre, “of action”, action)
success = False
if pre in step_del:
print(“Error at step”, sid, “mutex conflict on:”, pre)
success = False
for eff in action.pos_effects:
if eff in step_del:
print(“Error at step”, sid, “mutex conflict on:”, eff)
success = False
for eff in action.neg_effects:
if eff in step_pre or eff in step_add:
print(“Error at step”, sid, “mutex conflict on:”, eff)
success = False
for prop in step_del:
for prop in step_add:
for prop in problem.goal:
if prop not in state:
print(“Unsatisfied goal:”, prop)
success = False
return success, 0