diff --git a/python/ray/data/_internal/logical/optimizers.py b/python/ray/data/_internal/logical/optimizers.py index d3d28e218a89..4bb57bdaaad9 100644 --- a/python/ray/data/_internal/logical/optimizers.py +++ b/python/ray/data/_internal/logical/optimizers.py @@ -12,16 +12,24 @@ ) from ray.data._internal.planner.planner import Planner +# TODO(scottjlee): add back LimitPushdownRule once we +# enforce number of input/output rows remains the same +# for Map/MapBatches ops. +LOGICAL_OPTIMIZER_RULES = [ + ReorderRandomizeBlocksRule, +] + +PHYSICAL_OPTIMIZER_RULES = [ + OperatorFusionRule, +] + class LogicalOptimizer(Optimizer): """The optimizer for logical operators.""" @property def rules(self) -> List[Rule]: - # TODO(scottjlee): add back LimitPushdownRule once we - # enforce number of input/output rows remains the same - # for Map/MapBatches ops. - return [ReorderRandomizeBlocksRule()] + return [rule_cls() for rule_cls in LOGICAL_OPTIMIZER_RULES] class PhysicalOptimizer(Optimizer): @@ -29,7 +37,7 @@ class PhysicalOptimizer(Optimizer): @property def rules(self) -> List["Rule"]: - return [OperatorFusionRule()] + return [rule_cls() for rule_cls in PHYSICAL_OPTIMIZER_RULES] def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: