From 3eed76b01f707450965c049a82fd6751c6f850b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20N=C3=BA=C3=B1ez=20Molina?= <41746312+TheAeryan@users.noreply.github.com> Date: Thu, 3 Nov 2022 02:22:06 +0100 Subject: [PATCH] Added source files --- src/lifted_pddl/__init__.py | 5 + src/lifted_pddl/__main__.py | 170 ++++++ src/lifted_pddl/data/blocksworld-domain.pddl | 53 ++ src/lifted_pddl/data/blocksworld-problem.pddl | 43 ++ src/lifted_pddl/data/logistics-domain.pddl | 44 ++ src/lifted_pddl/data/logistics-problem.pddl | 161 ++++++ src/lifted_pddl/parser.py | 482 ++++++++++++++++++ 7 files changed, 958 insertions(+) create mode 100644 src/lifted_pddl/__init__.py create mode 100644 src/lifted_pddl/__main__.py create mode 100644 src/lifted_pddl/data/blocksworld-domain.pddl create mode 100644 src/lifted_pddl/data/blocksworld-problem.pddl create mode 100644 src/lifted_pddl/data/logistics-domain.pddl create mode 100644 src/lifted_pddl/data/logistics-problem.pddl create mode 100644 src/lifted_pddl/parser.py diff --git a/src/lifted_pddl/__init__.py b/src/lifted_pddl/__init__.py new file mode 100644 index 0000000..b8f80ee --- /dev/null +++ b/src/lifted_pddl/__init__.py @@ -0,0 +1,5 @@ +# __init__.py + +__version__ = "1.0.1" + +from lifted_pddl.parser import Parser \ No newline at end of file diff --git a/src/lifted_pddl/__main__.py b/src/lifted_pddl/__main__.py new file mode 100644 index 0000000..493ab7f --- /dev/null +++ b/src/lifted_pddl/__main__.py @@ -0,0 +1,170 @@ +# __main__.py + +import argparse +import sys +import os + +from lifted_pddl.parser import Parser + +# Execute the following code if the script is called from the command-line, i.e., +# python -m lifted_parser +def main(): + # Parse command-line arguments with module argparse + # Different options of use: + # > help: we show how to use this script from the command-line + # > example: we show an example of use, for the logistics domain + # > print_planning_task domain_file problem_file: + # we print the information encoded in the PDDL domain and problem, according to the format used in Parser.__str__() + # problem_file is an optional parameter. If not used, we assume there is no PDDL problem. + # > get_applicable_actions domain_file problem_file: + # we print the list of applicable actions at the init state of the planning task given by (domain_file, problem_file) + # > is_action_applicable domain_file problem_file action: + # we print 'True' if action @action is applicable at the init state of the planning task given by (domain_file, problem_file). + # @action must correspond to a string in standard PDDL form (e.g., '(drive t1 l1 l2 c1)') + # > get_next_state domain_file problem_file action: + # we print the next state resulting from applying the action @action at the current state (i.e., the init state of the + # planning task given by (domain_file, problem_file)). The state is printed as a list of its atoms in PDDL format + # (e.g., (at p1 l2) (in-city l1 c5) ...) + + cli_parser = argparse.ArgumentParser(prog='Lifted Parser', + description='This program makes possible to parse PDDL domains and problems to obtain their information, \ + obtain the applicable actions in an efficient manner (without grounding), and obtain the state \ + resulting from applying an action at the current state (successor function).') + + cli_parser.add_argument('execution_mode', choices=('example', 'print_planning_task', 'get_applicable_actions', 'is_action_applicable', 'get_next_state'), + help="Execution mode, i.e., what this program will do. There are several options: example -> shows an example of use for the logistics domain, \ + print_planning_task -> prints the information encoded in the PDDL domain (and also the PDDL problem if it's passed as an argument), \ + get_applicable_actions -> prints the ground actions applicable at the init state of the problem passed as an argument, \ + is_action_applicable -> prints whether the action passed as an argument is applicable at the init state of the problem, \ + get_next_state -> prints the next state (as a list of atoms) obtained by applying the action to the init state of the problem.") + cli_parser.add_argument('-d', '--domain', required=False, help='Path to the domain file') + cli_parser.add_argument('-p', '--problem', required=False, help='Path to the problem file') + cli_parser.add_argument('-a', '--action', required=False, help='Action in PDDL format (e.g., "(unload t1 p11 l1)" )') + + args = cli_parser.parse_args() + execution_mode = args.execution_mode + domain_path = args.domain + problem_path = args.problem + action = args.action + + # --- Example of use --- + if execution_mode == 'example': + # Change working directory to this file's location + os.chdir(os.path.realpath(os.path.dirname(__file__))) + + # Parse logistics domain + parser = Parser() + parser.parse_domain('data/logistics-domain.pddl') + print("Parser information after parsing the domain\n", parser) + + # Parse logistics problem + parser.parse_problem('data/logistics-problem.pddl') + print("\nParser information after parsing the domain AND problem\n", parser) + + # Obtain actions applicable at the current state (given by logistics-problem.pddl) + print("\nApplicable actions:\n", parser.get_applicable_actions()) # Average execution time: 0.0007s + print("\nApplicable actions in PDDL format:\n", parser.encode_ground_actions_as_pddl(parser.get_applicable_actions(), 'str')) + + # Check if individual actions are applicable + print("\nIs ('drive', (44, 15, 12, 1)) applicable?:", parser.is_action_applicable('drive', (44, 15, 12, 1))) + print("Is ('drive', (44, 15, 18, 1)) applicable?:", parser.is_action_applicable('drive', (44, 15, 18, 1))) + print("Is ('unload', (37, 64, 8)) applicable?:", parser.is_action_applicable('unload', (37, 64, 8))) + + # Apply an action at the current state (given by logistics-problem.pddl) and obtain the next state + print("\nApply action ('drive', (44, 15, 12, 1)) and get next state:\n", parser.get_next_state('drive', (44, 15, 12, 1))) + print("\nApply action ('drive', (44, 15, 12, 1)) and get next state in PDDL format:\n", parser.encode_atoms_as_pddl(parser.get_next_state('drive', (44, 15, 12, 1)), 'str')) + + # --- Print the information of the PDDL domain (and problem if given as argument) --- + elif execution_mode == 'print_planning_task': + if domain_path is None: + raise Exception('No domain provided') + + parser = Parser() + parser.parse_domain(domain_path) + if problem_path is not None: + parser.parse_problem(problem_path) + + print(parser) + + # --- Print the ground actions which are applicable at the init state of the planning task --- + elif execution_mode == 'get_applicable_actions': + if domain_path is None: + raise Exception('No domain provided') + if problem_path is None: + raise Exception('No problem provided') + + parser = Parser() + parser.parse_domain(domain_path) + parser.parse_problem(problem_path) + + applicable_actions = parser.encode_ground_actions_as_pddl(parser.get_applicable_actions(), 'str') + + # Print the applicable ground actions (in PDDL format) + for a in applicable_actions: + print(a) + + # --- Print whether the action given as a parameter is applicable at the init state of the planning task --- + elif execution_mode == 'is_action_applicable': + if domain_path is None: + raise Exception('No domain provided') + if problem_path is None: + raise Exception('No problem provided') + if action is None: + raise Exception('No action provided') + + parser = Parser() + parser.parse_domain(domain_path) + parser.parse_problem(problem_path) + + action = action.strip('(').strip(')').split() + + if len(action) == 0: # Action with no parameters + action_name = action[0] + action_params = tuple() + else: + action_name = action[0] + action_params = action[1:] + + # Encode each object as an index (e.g., loc1 -> 2) + object_names = parser.object_names + action_params = tuple([object_names.index(param) for param in action_params]) + + # Print whether the actions is applicable ('True' or 'False') + print(parser.is_action_applicable(action_name, action_params)) + + # --- execution_mode == get_next_state --- + # --- Print the next state (as a list of atoms) resulting from applying the action given as a parameter to the init state of the planning task --- + else: + if domain_path is None: + raise Exception('No domain provided') + if problem_path is None: + raise Exception('No problem provided') + if action is None: + raise Exception('No action provided') + + parser = Parser() + parser.parse_domain(domain_path) + parser.parse_problem(problem_path) + + action = action.strip('(').strip(')').split() + + if len(action) == 0: # Action with no parameters + raise Exception('Actions with no parameters (variables) are not supported') + + action_name = action[0] + action_params = action[1:] + + # Encode each object as an index (e.g., loc1 -> 2) + object_names = parser.object_names + action_params = tuple([object_names.index(param) for param in action_params]) + + next_state_atoms = parser.encode_atoms_as_pddl(parser.get_next_state(action_name, action_params), 'str') + + # Print the atoms of the next state + for atom in next_state_atoms: + print(atom) + + +if __name__ == '__main__': + main() + diff --git a/src/lifted_pddl/data/blocksworld-domain.pddl b/src/lifted_pddl/data/blocksworld-domain.pddl new file mode 100644 index 0000000..8f93520 --- /dev/null +++ b/src/lifted_pddl/data/blocksworld-domain.pddl @@ -0,0 +1,53 @@ +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; 4 Op-blocks world +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(define (domain blocksworld) + (:requirements :strips :typing) + (:types block) + (:predicates (on ?x - block ?y - block) + (ontable ?x - block) + (clear ?x - block) + (handempty) + (holding ?x - block) + ) + + (:action pick-up + :parameters (?x - block) + :precondition (and (clear ?x) (ontable ?x) + (handempty)) + :effect + (and (not (ontable ?x)) + (not (clear ?x)) + (not (handempty)) + (holding ?x))) + + (:action put-down + :parameters (?x - block) + :precondition (holding ?x) + :effect + (and (not (holding ?x)) + (clear ?x) + (handempty) + (ontable ?x))) + + (:action stack + :parameters (?x - block ?y - block) + :precondition (and (holding ?x) (clear ?y)) + :effect + (and (not (holding ?x)) + (not (clear ?y)) + (clear ?x) + (handempty) + (on ?x ?y))) + + (:action unstack + :parameters (?x - block ?y - block) + :precondition (and (on ?x ?y) (clear ?x) + (handempty)) + :effect + (and (holding ?x) + (clear ?y) + (not (clear ?x)) + (not (handempty)) + (not (on ?x ?y))))) diff --git a/src/lifted_pddl/data/blocksworld-problem.pddl b/src/lifted_pddl/data/blocksworld-problem.pddl new file mode 100644 index 0000000..9a1bb0c --- /dev/null +++ b/src/lifted_pddl/data/blocksworld-problem.pddl @@ -0,0 +1,43 @@ +(define (problem bw_both_generative_policies_8) + +(:domain blocksworld) + +(:objects + obj0 obj1 obj2 obj3 obj4 obj5 obj6 obj7 obj8 obj9 obj10 - block +) + +(:init + (ontable obj0) + (ontable obj1) + (ontable obj2) + (on obj3 obj1) + (on obj4 obj0) + (on obj5 obj4) + (on obj6 obj2) + (on obj7 obj5) + (on obj8 obj7) + (on obj9 obj3) + (clear obj8) + (clear obj6) + (clear obj9) + (holding obj10) +) + +(:goal (and + (holding obj7) + (ontable obj8) + (ontable obj10) + (on obj9 obj10) + (ontable obj2) + (on obj3 obj2) + (on obj5 obj8) + (clear obj9) + (ontable obj0) + (on obj1 obj4) + (clear obj5) + (clear obj3) + (on obj4 obj0) + (clear obj6) + (on obj6 obj1) +)) +) \ No newline at end of file diff --git a/src/lifted_pddl/data/logistics-domain.pddl b/src/lifted_pddl/data/logistics-domain.pddl new file mode 100644 index 0000000..648aec0 --- /dev/null +++ b/src/lifted_pddl/data/logistics-domain.pddl @@ -0,0 +1,44 @@ +;; Source: https://github.com/josej30/Planner-Sat-Constraints + +(define (domain logistics) + +(:requirements :strips :typing) + +(:types city location thing - object + package vehicle - thing + truck airplane - vehicle + airport - location) + +(:predicates (in-city ?l - location ?c - city) + (at ?obj - thing ?l - location) + (in ?p - package ?veh - vehicle)) + +(:action drive + :parameters (?t - truck ?from ?to - location ?c - city) + :precondition (and (at ?t ?from) + (in-city ?from ?c) + (in-city ?to ?c)) + :effect (and (not (at ?t ?from)) + (at ?t ?to))) + +(:action fly + :parameters (?a - airplane ?from ?to - airport) + :precondition (at ?a ?from) + :effect (and (not (at ?a ?from)) + (at ?a ?to))) + +(:action load + :parameters (?v - vehicle ?p - package ?l - location) + :precondition (and (at ?v ?l) + (at ?p ?l)) + :effect (and (not (at ?p ?l)) + (in ?p ?v))) + +(:action unload + :parameters (?v - vehicle ?p - package ?l - location) + :precondition (and (at ?v ?l) + (in ?p ?v)) + :effect (and (not (in ?p ?v)) + (at ?p ?l))) + +) \ No newline at end of file diff --git a/src/lifted_pddl/data/logistics-problem.pddl b/src/lifted_pddl/data/logistics-problem.pddl new file mode 100644 index 0000000..08c1043 --- /dev/null +++ b/src/lifted_pddl/data/logistics-problem.pddl @@ -0,0 +1,161 @@ +(define (problem bw_both_generative_policies_0) + +(:domain logistics) + +(:objects + c0 c1 c2 c3 c4 c5 - city + l0 l1 l2 l3 l4 l5 l6 l7 l8 l9 l10 l11 l12 l13 l14 l15 l16 l17 l18 l19 l20 l21 l22 l23 l24 - location + l25 l26 l27 l28 l29 - airport + t1 t2 t3 t4 t5 t6 t7 t8 t9 t10 - truck + a1 a2 a3 a4 a5 - airplane + p0 p1 p2 p3 p4 p5 p6 p7 p8 p9 p10 p11 p12 p13 p14 p15 - package +) + +(:init + (in-city l0 c0) + (in-city l1 c0) + (in-city l2 c0) + (in-city l3 c0) + (in-city l4 c0) + + (in-city l5 c1) + (in-city l6 c1) + (in-city l7 c1) + (in-city l8 c1) + (in-city l9 c1) + + (in-city l10 c2) + (in-city l11 c2) + (in-city l12 c2) + (in-city l13 c2) + (in-city l14 c2) + + (in-city l15 c3) + (in-city l16 c3) + (in-city l17 c3) + (in-city l18 c3) + (in-city l19 c3) + + (in-city l20 c4) + (in-city l21 c4) + (in-city l22 c4) + (in-city l23 c4) + (in-city l24 c4) + + (in-city l25 c5) + (in-city l26 c5) + (in-city l27 c5) + (in-city l28 c5) + (in-city l29 c5) + + (at t1 l1) + (at t2 l2) + (at t3 l3) + (at t4 l4) + (at t5 l5) + (at t6 l6) + (at t7 l7) + (at t8 l8) + (at t9 l9) + (at t10 l10) + + (at a1 l25) + (at a2 l26) + (at a3 l27) + (at a4 l28) + (at a5 l29) + + (at p0 l0) + (at p1 l1) + (at p2 l2) + (at p3 l12) + (at p4 l20) + (at p5 l20) + + (at p6 l21) + (at p7 l21) + (at p8 l21) + (at p9 l22) + (at p10 l26) + + (in p11 t1) + (in p12 t1) + (in p13 t2) + (in p14 a1) + (in p15 a2) +) + +(:goal (and + (in-city l0 c0) + (in-city l1 c0) + (in-city l2 c0) + (in-city l3 c0) + (in-city l4 c0) + + (in-city l5 c1) + (in-city l6 c1) + (in-city l7 c1) + (in-city l8 c1) + (in-city l9 c1) + + (in-city l10 c2) + (in-city l11 c2) + (in-city l12 c2) + (in-city l13 c2) + (in-city l14 c2) + + (in-city l15 c3) + (in-city l16 c3) + (in-city l17 c3) + (in-city l18 c3) + (in-city l19 c3) + + (in-city l20 c4) + (in-city l21 c4) + (in-city l22 c4) + (in-city l23 c4) + (in-city l24 c4) + + (in-city l25 c5) + (in-city l26 c5) + (in-city l27 c5) + (in-city l28 c5) + (in-city l29 c5) + + (at t1 l1) + (at t2 l2) + (at t3 l3) + (at t4 l4) + (at t5 l5) + (at t6 l6) + (at t7 l7) + (at t8 l8) + (at t9 l9) + (at t10 l10) + + (at a1 l25) + (at a2 l26) + (at a3 l27) + (at a4 l28) + (at a5 l29) + + (at p0 l0) + (at p1 l1) + (at p2 l2) + (at p3 l12) + (at p4 l20) + (at p5 l20) + + (at p6 l21) + (at p7 l21) + (at p8 l21) + (at p9 l22) + (at p10 l26) + + (in p11 t1) + (in p12 t1) + (in p13 t2) + (in p14 a1) + (in p15 a2) +)) +) \ No newline at end of file diff --git a/src/lifted_pddl/parser.py b/src/lifted_pddl/parser.py new file mode 100644 index 0000000..18ce973 --- /dev/null +++ b/src/lifted_pddl/parser.py @@ -0,0 +1,482 @@ +from operator import itemgetter +from collections import deque +from itertools import product + +from tarski.io import PDDLReader +from tarski.syntax.formulas import CompoundFormula, Atom +from tarski.fstrips.fstrips import AddEffect, DelEffect + +""" +This class implements functionality for: + - Parsing PDDL domains and problems + - Obtaining the actions applicable at the init state of the corresponding problem + - Obtaining the next state resulting from applying an action to the init state (successor function) + +: it supports types (as in typed STRIPS) but not other PDDL extensions, such as + negative preconditions, disjuntions (or), conditional effects (:when), etc. + However, this code should be easy to extend to those situations. +""" +class Parser: + + def __init__(self): + self._reader = PDDLReader(raise_on_error=True) + + # Domain information + self.domain_name = '' + self.types = set() + self.type_hierarchy = dict() + self.predicates = set() + self.constant_names = tuple() + self.constant_types = tuple() + self.actions = set() + + # Problem information + self.object_names = list() + self.object_types = list() + self.atoms = set() + self.goals = set() + + def __str__(self): + output = '' + + # Domain information + output += '--- Domain Information ---' + output += '\n> Domain name: {}'.format(self.domain_name) + output += '\n> Types: {}'.format(self.types) + output += '\n> Type hierarchy: {}'.format(self.type_hierarchy) + output += '\n> Predicates: {}'.format(self.predicates) + output += '\n> Constant names: {}'.format(self.constant_names) + output += '\n> Constant types: {}'.format(self.constant_types) + output += '\n> Actions: {}'.format(self.actions) + + # Problem information + output += '\n\n--- Problem Information ---' + output += '\n> Object names (including constants): {}'.format(self.object_names) + output += '\n> Object types (including constants): {}'.format(self.object_types) + output += '\n> Atoms in initial state: {}'.format(self.atoms) + output += '\n> Goals: {}'.format(self.goals) + + return output + + # We use tarski to parse the PDDL domain + def parse_domain(self, domain_path): + # + self._reader.parse_domain(domain_path) + problem = self._reader.problem + language = problem.language + + # + + # Domain name + self.domain_name = problem.domain_name # Example: 'logistics' + + # Types + sorts = language.sorts + self.types = set([sort.name for sort in sorts]) # Example: {'object', 'city', 'location', 'thing', 'package', 'vehicle', 'truck', 'airplane', 'airport'} + + # Type hierarchy + ancestor_sorts = language.ancestor_sorts + # Convert from a dictionary where keys are children types and values are parent types to a dictionary where the keys and values are reversed + self.type_hierarchy = {parent.name : set([child.name for child in sorts if parent in ancestor_sorts[child]] + [parent.name]) for parent in sorts} + # Example: {'object': {'object', 'city', 'airplane', 'thing', 'airport', 'location', 'vehicle', 'truck', 'package'}, 'city': {'city'}, 'location': {'airport', 'location'}, + # 'thing': {'airplane', 'thing', 'vehicle', 'truck', 'package'}, 'package': {'package'}, 'vehicle': {'vehicle', 'truck', 'airplane'}, 'truck': {'truck'}, 'airplane': {'airplane'}, 'airport': {'airport'}} + + # Predicates + # self.predicates = [pred.name for pred in language.predicates] + self.predicates = set([pred.name for pred in language.predicates if type(pred.name) == str]) # type(pred.name) != str -> the predicate is a built-in (either '=' or '!=') + + # Domain constants + # Store two lists, containing the name and type of each constant + constants = language.constants() + self.constant_names = tuple([const.name for const in constants]) + self.constant_types = tuple([const.sort.name for const in constants]) + + # Actions + self.actions = set() + + for action in problem.actions.items(): + variables = action[1].parameters.vars() + var_names = [var.symbol for var in variables] + + # Action parameters, as a tuple of their types + # Example: ('truck', 'location', 'location', 'city') + params = tuple([var.sort.name for var in variables]) + + # Action preconditions, as a tuple made up of every precondition + # Each precondition is represented by a tuple (predicate_name, vars) + # Variables are substituted by their corresponding parameter index + # Example: ( ('at', (0, 1)), ('in-city', (1, 3)), ('in-city', (2, 3)) ) + preconds = action[1].precondition + preconds = preconds.subformulas if isinstance(preconds, CompoundFormula) else [preconds] + preconds = tuple([(precond.predicate.name, tuple([var_names.index(var.symbol) for var in precond.subterms])) for precond in preconds]) + + # Action effects, as a tuple made up of every effect + # Each effect is represented as a tuple (is_add_effect, predicate_name, vars) + # Variables are substituted by their corresponding parameter index + # Example: ( (False, 'at', (0, 1)), (True, 'at', (0, 2)) )) + effects = action[1].effects + effects = tuple([(isinstance(effect, AddEffect), effect.atom.predicate.name, tuple([var_names.index(var.symbol) for var in effect.atom.subterms])) for effect in effects]) + + self.actions.add( (action[0], params, preconds, effects) ) + + # We use tarski to parse the PDDL problem + # : This method can only be called after parse_domain() + def parse_problem(self, problem_path): + problem = self._reader.parse_instance(problem_path) + language = problem.language + + # Objects + # Store two lists, containing the name and type of each object + # : these list also contain the domain constants. For example, if the domain contains 5 constants and the problem 10 objects, + # they will contain 15 different entries. + objects = language.constants() + self.object_names = [obj.name for obj in objects] + self.object_types = [obj.sort.name for obj in objects] + + # Atoms, as a set containing each atom + # Each atom is represented as a tuple (pred_name, object_indexes), where object_indexes is a tuple containing the index of each object + # the atom is instantiated on + # Example: {('in-city', (31, 5)), ('at', (48, 33)), ('in-city', (34, 5)), ('at', (49, 34)), ('at', (41, 12))} + atoms = problem.init.as_atoms() + self.atoms = set([(atom.predicate.name, tuple([self.object_names.index(obj.name) for obj in atom.subterms])) for atom in atoms]) + + # Goals, as a set containing each goal + # Each goal is represented as a tuple (is_true, pred_name, object_indexes) + # object_indexes is a tuple containing the index of each object the atom of the goal is instantiated on + # is_true equals False if the goal is negative (e.g., (not (at t1 l1)) ) and True otherwise + subformulas = problem.goal.subformulas + self.goals = set([(True, x.predicate.name, tuple(self.object_names.index(obj.name) for obj in x.subterms)) if isinstance(x, Atom) else \ + (False, x.subformulas[0].predicate.name, tuple(self.object_names.index(obj.name) for obj in x.subformulas[0].subterms)) for x in subformulas]) + + + """ + + Returns the ground applicable actions at the current state (given by self.object_names, self.object_types and self.atoms) + Applicable actions are returned as a dictionary where the key is the action name and the value + is a list containing all the variable substitutions (groundings) which make the action applicable. + + + The algorithm used to obtain the applicable actions is heavily inspired by Powerlifted (https://github.com/abcorrea/powerlifted). + More specifically, by the method detailed in 'Lifted Successor Generation using Query Optimization Techniques' by Correa et al. + We implement the 'naive join' algorithm detailed in that work directly in Python, without using any framework for query evaluation. + The algorithm is as follows: + + 1. Iterate over actions. + 2. For each action, iterate over preconditions. + 3. For each precondition: + - Associate each atom's object with the corresponding action parameter (atom obj_ind to parameter_ind, which is the same as the variable_assignment_ind) + - Obtain the indexes of atom's objects corresponding to bound variables in the partial variable assignments + - This is something like "We need to compare first object of atom with third object of variable assignemnt and second object of atom with sixth object of variable assignment, + and they need to be equal" + - Iterate over each atom in the problem. + - For each atom: + - Check if the atom's predicate is the same as the precondition's predicate + - Check if the type of each atom's object inherits from the type of the corresponding action parameter + - If any of these two conditions are false, we skip this atom + - Iterate over each partial variable assignment + - Check if the atom objects corresponding to bound variables match the objects of the partial variable assignment + - If true, add a new variable assignment: + - Copy the old variable assignment + - For each atom's object, change the corresponding object of the variable assignment to it (bind new variables) + 4. Return valid (applicable) variable assignments for the action. + """ + def get_applicable_actions(self): + applicable_actions = dict() + + action_schemas = self.actions + type_hierarchy = self.type_hierarchy + objects = self.object_types + atoms = self.atoms # Note: atoms must be a set of tuples and not a set of lists + + for action in action_schemas: + action_name, action_params, action_preconds, _ = action + + # Process action preconditions corresponding to nullary predicates (those of arity 0) + # Check if each nullary predicate in the preconditions appears in the state atoms. + # If so, we remove the nullary predicates from the action preconditions and check the rest of the preconditions. + # If not, the action is not applicable. + nullary_preconds = [precond for precond in action_preconds if len(precond[1]) == 0] + + nullary_preconds_in_atoms = True + for nullary_precond in nullary_preconds: + if nullary_precond not in atoms: # The nullary precondition does not appear in the atoms + nullary_preconds_in_atoms = False + break + + # The current action is not applicable + if not nullary_preconds_in_atoms: + applicable_actions[action_name] = [] + + else: + # Remove the nullary predicates from the action preconditions + action_preconds = [precond for precond in action_preconds if len(precond[1]) > 0] + + # List of partial variable assignments corresponding to potentially aplicable actions + var_assigns = [[-1]*len(action_params),] # A list with a single element [-1, -1, ...] representing a variable assignment with all free variables + is_var_bound = [False]*len(action_params) # Contains True if the corresponding variable is bound, and False if it's free + + for precond in action_preconds: + precond_pred, precond_vars = precond + new_var_assigns = [] # Will contain the partial variable assignments after we process the current precondition (precond) + + # Obtain mapping from the indexes of the atom's objects to the indexes of the variables in var_assigns + # Then, select the subset of those indexes which we need to check to see if an atom matches a variable assignment in var_assigns + # This subset corresponds to those variables which are already bound and also appear at the precondition (precond_vars) + """ + Examples: + - Precondition: ('in-city', (2, 3)) + - Indexes of atom objects: (0, 1) + - Indexes of variables in var_assigns: (2, 3) -> atom_obj[0] maps to var[2] in var_assigns and atom_obj[1] maps to var[3] + + Let's assume we have var assignments like (6, 8, -1, 5) (vars 0, 1 and 3 are bound, and var 2 is free). + Then, we only need to check that atom_obj[1] matches var[3] in the var_assignments. Therefore: + - Indexes of atom objects to check: (1,) + - Indexes of variables in var_assigns to check: (3,) + """ + inds_to_check = [(ind, var) for ind, var in enumerate(precond_vars) if is_var_bound[var]] + + # If len(inds_to_check) == 0, this means that we don't have to check any variable in the assignments for this precondition + # i.e., as long as an atom is of the correct predicate and its objects of the correct type, all partial variable assignments match the atom + if len(inds_to_check) == 0: + no_vars_to_check = True + atom_obj_inds_to_check, vars_to_check = tuple(), tuple() + else: + no_vars_to_check = False + atom_obj_inds_to_check, vars_to_check = zip(*inds_to_check) + + for atom in atoms: + atom_pred, atom_obj_inds = atom + + # Check if the atom's predicate is the same as that of the precondition + if atom_pred == precond_pred: + # The types of the objects the atom is instantiated on must inherit from the type of the corresponding parameter type + types_correct = True + for atom_obj_ind, precond_var in zip(atom_obj_inds, precond_vars): + types_correct = types_correct and (objects[atom_obj_ind] in type_hierarchy[action_params[precond_var]]) + + if types_correct: + + for var_assign in var_assigns: + # Check if the atom matches the current var assignment (var_assign) + # It is a match if the atom's objects match the objects of the corresponding vars in var_assign for those + # vars which are bound, i.e., atom_obj_inds[atom_obj_inds_to_check] == var_assign[vars_to_check] + if no_vars_to_check or itemgetter(*atom_obj_inds_to_check)(atom_obj_inds) == itemgetter(*vars_to_check)(var_assign): + new_var_assign = var_assign.copy() + deque(map(new_var_assign.__setitem__, precond_vars, atom_obj_inds), maxlen=0) # The deque is simply to evaluate the map, as in python 3 it has lazy evaluation + + new_var_assigns.append(new_var_assign) + + + # Update the partial variable assignments + var_assigns = new_var_assigns + + # If this happens, then the current precondition is not met for any variable substitution, so the action is not applicable! + if len(var_assigns) == 0: + applicable_actions[action_name] = [] # No valid variable substitutions for this action + break # No need to check the remaining preconditions + + # Bind variables which appear in the precondition + for var in precond_vars: + is_var_bound[var] = True + + + # Check if there are still partial variable substitutions in var_assigns + # If there are, the free variables in those variable substitutions can be instantiated + # on any object of the correct type (as they don't appear in any precondition) + partial_var_assigns = [var_assign for var_assign in var_assigns if -1 in var_assign] + full_var_assigns = [var_assign for var_assign in var_assigns if -1 not in var_assign] + + for var_assign in partial_var_assigns: + # For each var in var_assign, we obtain a tuple with the objects which can be instantiated on it: + # If var == -1 -> a list with all the obj_inds of the corresponding action parameter type + # If var != -1 -> simply var + new_objs_var_assign = [[ind_obj for ind_obj, obj_type in enumerate(objects) if obj_type==action_params[ind_var]] \ + if var==-1 else (var,) for ind_var, var in enumerate(var_assign)] + + new_var_assigns = product(*new_objs_var_assign) # Cartesian product + full_var_assigns.extend(new_var_assigns) + + + # Convert from list of lists to list of tuples + full_var_assigns[:] = tuple([tuple(var_assign) for var_assign in full_var_assigns]) + + # Save the variable assignments (groundings) which make the current action applicable + applicable_actions[action_name] = full_var_assigns + + return applicable_actions + + """ + This method receives a ground action and returns if it is applicable at the current state (given by self.object_names, self.object_types and self.atoms). + + @action_name Name of the action (e.g., 'drive') + @var_assign Action parameters instantiations, as a list/tuple of object indexes (e.g., (10, 3) -> first action parameter is instantiated on 10-th object + and the second one on the third object) + """ + def is_action_applicable(self, action_name, var_assign): + action_schemas = self.actions + type_hierarchy = self.type_hierarchy + objects = self.object_types + atoms = self.atoms # Note: atoms must be a set of tuples and not a set of lists + + # Select the action_schema corresponding to @action_name + action = [action_schema for action_schema in action_schemas if action_schema[0]==action_name] + assert len(action)==1, "There are several action schemas with the same name!" + action = action[0] + + _, action_params, action_preconds, action_effects = action + + # Check if the objects the action parameters are instantiated on are of the correct type + for obj_ind, param_type in zip(var_assign, action_params): + if objects[obj_ind] not in type_hierarchy[param_type]: + return False # If a single parameter is of the incorrect type, we know the action is not applicable + + # Check each precondition is met + for precond in action_preconds: + # Assign the variables in the precondition according to @var_assign, i.e., ground the variables + ground_vars = tuple([var_assign[var] for var in precond[1]]) + precond = tuple([precond[0], ground_vars]) + + # Check if the grounded atom corresponding to the precondition is in the state atoms + if precond not in atoms: + return False + + # At this point of the code, we know all the preconditions are met, so the action is applicable + return True + + """ + Successor function. + This method receives a ground action and returns the next state as a result of applying the action to the current state + (given by self.object_names, self.object_types and self.atoms). + The next state is returned as the new set of atoms which are true. + If @check_action_applicability is True, we check if the action is applicable before applying it to the current state. + In case it is not applicable, we assume the next state is equal to the current state (the state does not change). + If @check_action_applicability is False, we assume the action is applicable at the current state. + + @action_name Name of the action (e.g., 'drive') + @var_assign Action parameters instantiations, as a list/tuple of object indexes (e.g., (10, 3) -> first action parameter is instantiated on 10-th object + and the second one on the third object) + """ + def get_next_state(self, action_name, var_assign, check_action_applicability=True): + action_schemas = self.actions + type_hierarchy = self.type_hierarchy + objects = self.object_types + atoms = self.atoms # Note: atoms must be a set of tuples and not a set of lists + + # Select the action_schema corresponding to @action_name + action = [action_schema for action_schema in action_schemas if action_schema[0]==action_name] + assert len(action)==1, "There are several action schemas with the same name!" + action = action[0] + + # Copy the atoms, so that the new_atoms don't share the reference + new_atoms = atoms.copy() + + if check_action_applicability: + is_applicable = self.is_action_applicable(action_name, var_assign) + + # If the action is not applicable, the set of atoms will not change (the state remains the same) + if not is_applicable: + return new_atoms + + # Perform the variable substitutions for the action_effects (i.e., ground the variables) according to @var_assign + _, action_params, action_preconds, action_effects = action + + ground_action_effects = [] + for effect in action_effects: + ground_vars = tuple([var_assign[var] for var in effect[2]]) + ground_effect = tuple([effect[0], effect[1], ground_vars]) + ground_action_effects.append(ground_effect) + + # Apply the ground effects: + # Add effects (effect[0]==True) -> add the corresponding atom + # Del effects (effect[0]==False) -> delete the corresponding atom + # Note: we assume that no effects are in both the add and delete list + for effect in ground_action_effects: + if effect[0]: # Add effect + new_atoms.add( (effect[1], effect[2]) ) + else: # Delete effect + effect_atom = (effect[1], effect[2]) + + if effect_atom in new_atoms: # If the corresponding atom does not exist in the state atoms, we do nothing + new_atoms.remove(effect_atom) + + return new_atoms + + """ + Receives as @ground_actions the set of applicable actions in the dictonary form returned by get_applicable_actions() + and returns the same applicable actions in PDDL format (e.g., (drive t1 l2 l3 c1), (unload a8 p6 l3), ...). + If you want to encode the actions applicable at the current state (given by self), you can simply do + parser.encode_ground_actions_as_pddl(parser.get_applicable_actions(), output_format='str') (assuming parser is an + instance of the Parser class). + + @output_format Either 'str' or 'tuple'. + If 'str', the output is a set of actions, where each action is a string like '(drive t1 l2 l3 c1)'. + If 'list', the output is a set of actions, where each action is a tuple like tuple(('drive', 't1', 'l2', 'l3', 'c1')). + Suggestion: choose @output_format='str' if you want to dump the result to a file or print it to the terminal. + choose @output_format='tuple' if you want to process the result in Python (instead of printing it). + """ + def encode_ground_actions_as_pddl(self, ground_actions, output_format): + assert output_format in ('str', 'tuple'), "@output_format must be either 'str' or 'tuple'" + obj_names = self.object_names + + if output_format == 'tuple': + actions_pddl_format = set([tuple([action_name] + [obj_names[var] for var in var_assign]) for action_name in ground_actions \ + for var_assign in ground_actions[action_name]]) + else: + actions_pddl_format = set(['({} '.format(action_name) + ' '.join(obj_names[var] for var in var_assign) + ')' \ + for action_name in ground_actions for var_assign in ground_actions[action_name]]) + + return actions_pddl_format + + """ + Receives as @atoms the set of atoms in the form returned by get_next_state() and returns the same set of atoms in PDDL + format (e.g., (at t1 l2), (in p2 a8), ...) + If you want to encode the atoms corresponding to the current state (given by self), you can simply do + parser.encode_atoms_as_pddl(parser.atoms, output_format='str') (assuming parser is an + instance of the Parser class). + If you want to encode the atoms corresponding to the next state obtained by executing an action (action_to_apply) at the current state (given by self), + you can simply do parser.encode_atoms_as_pddl(parser.get_next_state(action_to_apply), output_format='str'). + + @output_format Either 'str' or 'tuple'. + If 'str', the output is a set of atoms, where each atom is a string like '(at t1 l2)'. + If 'list', the output is a set of atoms, where each atom is a tuple like tuple(('at', 't1', 'l2')). + Suggestion: choose @output_format='str' if you want to dump the result to a file or print it to the terminal. + choose @output_format='tuple' if you want to process the result in Python (instead of printing it). + """ + def encode_atoms_as_pddl(self, atoms, output_format): + assert output_format in ('str', 'tuple'), "@output_format must be either 'str' or 'tuple'" + obj_names = self.object_names + + if output_format == 'tuple': + atoms_pddl_format = set([tuple([atom[0]] + [obj_names[obj_ind] for obj_ind in atom[1]]) for atom in atoms]) + else: + atoms_pddl_format = set(['({} '.format(atom[0]) + ' '.join(obj_names[obj_ind] for obj_ind in atom[1]) + ')' \ + if len(atom[1]) > 0 else '({})'.format(atom[0]) for atom in atoms]) + # We need the if else condition in the list comprehension because, without it, + # atoms corresponding to nullary predicates are printed with a blank space at the end + # (e.g., (handempty ) <- note the blank space) + + return atoms_pddl_format + + + + + + + + + + + + + + + + + + + + + + + +