Skip to content

Commit

Permalink
Add physical and logical plan conversion to and from protobuf (#892)
Browse files Browse the repository at this point in the history
* Add physical and logical plan conversion to and from protobuf

* Add wrappers for LogicalPlan and ExecutionPlan

* Add unit tests for to_proto and from_proto for logical and physical plans
  • Loading branch information
timsaucer authored Oct 4, 2024
1 parent 976b700 commit d181a30
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 158 deletions.
251 changes: 117 additions & 134 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"]
arrow = { version = "53", features = ["pyarrow"] }
datafusion = { version = "42.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "42.0.0", optional = true }
datafusion-proto = { version = "42.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
prost-types = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.9", features = ["v4"] }
Expand Down
4 changes: 3 additions & 1 deletion python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .catalog import Catalog, Database, Table

# The following imports are okay to remain as opaque to the user.
from ._internal import Config, LogicalPlan, ExecutionPlan, runtime
from ._internal import Config, runtime

from .record_batch import RecordBatchStream, RecordBatch

Expand All @@ -53,6 +53,8 @@
WindowFrame,
)

from .plan import LogicalPlan, ExecutionPlan

from . import functions, object_store, substrait

__version__ = importlib_metadata.version(__name__)
Expand Down
12 changes: 7 additions & 5 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ._internal import RuntimeConfig as RuntimeConfigInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import SessionContext as SessionContextInternal
from ._internal import LogicalPlan, ExecutionPlan

from datafusion.catalog import Catalog, Table
from datafusion.dataframe import DataFrame
Expand All @@ -39,6 +38,7 @@
import pandas
import polars
import pathlib
from datafusion.plan import LogicalPlan, ExecutionPlan


class SessionConfig:
Expand Down Expand Up @@ -268,8 +268,10 @@ def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConf
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
"""
paths = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(paths)
paths_list = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(
paths_list
)
return self

def with_unbounded_memory_pool(self) -> RuntimeConfig:
Expand Down Expand Up @@ -558,7 +560,7 @@ def create_dataframe_from_logical_plan(self, plan: LogicalPlan) -> DataFrame:
Returns:
DataFrame representation of the logical plan.
"""
return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan))
return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan._raw_plan))

def from_pylist(
self, data: list[dict[str, Any]], name: str | None = None
Expand Down Expand Up @@ -1034,4 +1036,4 @@ def read_table(self, table: Table) -> DataFrame:

def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
return RecordBatchStream(self.ctx.execute(plan, partitions))
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))
11 changes: 4 additions & 7 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any, List, TYPE_CHECKING
from datafusion.record_batch import RecordBatchStream
from typing_extensions import deprecated
from datafusion.plan import LogicalPlan, ExecutionPlan

if TYPE_CHECKING:
import pyarrow as pa
Expand All @@ -34,10 +35,6 @@

from datafusion._internal import DataFrame as DataFrameInternal
from datafusion.expr import Expr, SortExpr, sort_or_default
from datafusion._internal import (
LogicalPlan,
ExecutionPlan,
)


class DataFrame:
Expand Down Expand Up @@ -316,23 +313,23 @@ def logical_plan(self) -> LogicalPlan:
Returns:
Unoptimized logical plan.
"""
return self.df.logical_plan()
return LogicalPlan(self.df.logical_plan())

def optimized_logical_plan(self) -> LogicalPlan:
"""Return the optimized ``LogicalPlan``.
Returns:
Optimized logical plan.
"""
return self.df.optimized_logical_plan()
return LogicalPlan(self.df.optimized_logical_plan())

def execution_plan(self) -> ExecutionPlan:
"""Return the execution/physical plan.
Returns:
Execution plan.
"""
return self.df.execution_plan()
return ExecutionPlan(self.df.execution_plan())

def repartition(self, num: int) -> DataFrame:
"""Repartition a DataFrame into ``num`` partitions.
Expand Down
8 changes: 5 additions & 3 deletions python/datafusion/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@

from __future__ import annotations

from typing import Any, Optional, Type
from typing import Any, Optional, Type, TYPE_CHECKING

import pyarrow as pa
from datafusion.common import DataTypeMap, NullTreatment, RexType
from typing_extensions import deprecated

from ._internal import LogicalPlan
from ._internal import expr as expr_internal
from ._internal import functions as functions_internal

if TYPE_CHECKING:
from datafusion.plan import LogicalPlan

# The following are imported from the internal representation. We may choose to
# give these all proper wrappers, or to simply leave as is. These were added
# in order to support passing the `test_imports` unit test.
Expand Down Expand Up @@ -485,7 +487,7 @@ def rex_call_operator(self) -> str:

def column_name(self, plan: LogicalPlan) -> str:
"""Compute the output column name based on the provided logical plan."""
return self.expr.column_name(plan)
return self.expr.column_name(plan._raw_plan)

def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder:
"""Set the ordering for a window or aggregate function.
Expand Down
147 changes: 147 additions & 0 deletions python/datafusion/plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""This module supports physical and logical plans in DataFusion."""

from __future__ import annotations

import datafusion._internal as df_internal

from typing import List, Any, TYPE_CHECKING

if TYPE_CHECKING:
from datafusion.context import SessionContext

__all__ = [
"LogicalPlan",
"ExecutionPlan",
]


class LogicalPlan:
"""Logical Plan.
A `LogicalPlan` is a node in a tree of relational operators (such as
Projection or Filter).
Represents transforming an input relation (table) to an output relation
(table) with a potentially different schema. Plans form a dataflow tree
where data flows from leaves up to the root to produce the query result.
`LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
or programmatically (for example custom query languages).
"""

def __init__(self, plan: df_internal.LogicalPlan) -> None:
"""This constructor should not be called by the end user."""
self._raw_plan = plan

def to_variant(self) -> Any:
"""Convert the logical plan into its specific variant."""
return self._raw_plan.to_variant()

def inputs(self) -> List[LogicalPlan]:
"""Returns the list of inputs to the logical plan."""
return [LogicalPlan(p) for p in self._raw_plan.inputs()]

def __repr__(self) -> str:
"""Generate a printable representation of the plan."""
return self._raw_plan.__repr__()

def display(self) -> str:
"""Print the logical plan."""
return self._raw_plan.display()

def display_indent(self) -> str:
"""Print an indented form of the logical plan."""
return self._raw_plan.display_indent()

def display_indent_schema(self) -> str:
"""Print an indented form of the schema for the logical plan."""
return self._raw_plan.display_indent_schema()

def display_graphviz(self) -> str:
"""Print the graph visualization of the logical plan.
Returns a `format`able structure that produces lines meant for graphical display
using the `DOT` language. This format can be visualized using software from
[`graphviz`](https://graphviz.org/)
"""
return self._raw_plan.display_graphviz()

@staticmethod
def from_proto(ctx: SessionContext, data: bytes) -> LogicalPlan:
"""Create a LogicalPlan from protobuf bytes.
Tables created in memory from record batches are currently not supported.
"""
return LogicalPlan(df_internal.LogicalPlan.from_proto(ctx.ctx, data))

def to_proto(self) -> bytes:
"""Convert a LogicalPlan to protobuf bytes.
Tables created in memory from record batches are currently not supported.
"""
return self._raw_plan.to_proto()


class ExecutionPlan:
"""Represent nodes in the DataFusion Physical Plan."""

def __init__(self, plan: df_internal.ExecutionPlan) -> None:
"""This constructor should not be called by the end user."""
self._raw_plan = plan

def children(self) -> List[ExecutionPlan]:
"""Get a list of children `ExecutionPlan`s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain a
single value for unary nodes, or two values for binary nodes (such as joins).
"""
return [ExecutionPlan(e) for e in self._raw_plan.children()]

def display(self) -> str:
"""Print the physical plan."""
return self._raw_plan.display()

def display_indent(self) -> str:
"""Print an indented form of the physical plan."""
return self._raw_plan.display_indent()

def __repr__(self) -> str:
"""Print a string representation of the physical plan."""
return self._raw_plan.__repr__()

@property
def partition_count(self) -> int:
"""Returns the number of partitions in the physical plan."""
return self._raw_plan.partition_count

@staticmethod
def from_proto(ctx: SessionContext, data: bytes) -> ExecutionPlan:
"""Create an ExecutionPlan from protobuf bytes.
Tables created in memory from record batches are currently not supported.
"""
return ExecutionPlan(df_internal.ExecutionPlan.from_proto(ctx.ctx, data))

def to_proto(self) -> bytes:
"""Convert an ExecutionPlan into protobuf bytes.
Tables created in memory from record batches are currently not supported.
"""
return self._raw_plan.to_proto()
10 changes: 6 additions & 4 deletions python/datafusion/substrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
from typing import TYPE_CHECKING
from typing_extensions import deprecated
import pathlib
from datafusion.plan import LogicalPlan

if TYPE_CHECKING:
from datafusion.context import SessionContext
from datafusion._internal import LogicalPlan

__all__ = [
"Plan",
Expand Down Expand Up @@ -156,7 +156,9 @@ def to_substrait_plan(logical_plan: LogicalPlan, ctx: SessionContext) -> Plan:
Substrait plan.
"""
return Plan(
substrait_internal.Producer.to_substrait_plan(logical_plan, ctx.ctx)
substrait_internal.Producer.to_substrait_plan(
logical_plan._raw_plan, ctx.ctx
)
)


Expand All @@ -181,8 +183,8 @@ def from_substrait_plan(ctx: SessionContext, plan: Plan) -> LogicalPlan:
Returns:
LogicalPlan.
"""
return substrait_internal.Consumer.from_substrait_plan(
ctx.ctx, plan.plan_internal
return LogicalPlan(
substrait_internal.Consumer.from_substrait_plan(ctx.ctx, plan.plan_internal)
)


Expand Down
42 changes: 42 additions & 0 deletions python/datafusion/tests/test_plans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datafusion import SessionContext, LogicalPlan, ExecutionPlan
import pytest


# Note: We must use CSV because memory tables are currently not supported for
# conversion to/from protobuf.
@pytest.fixture
def df():
ctx = SessionContext()
return ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv").select("c1")


def test_logical_plan_to_proto(ctx, df) -> None:
logical_plan_bytes = df.logical_plan().to_proto()
logical_plan = LogicalPlan.from_proto(ctx, logical_plan_bytes)

df_round_trip = ctx.create_dataframe_from_logical_plan(logical_plan)

assert df.collect() == df_round_trip.collect()

original_execution_plan = df.execution_plan()
execution_plan_bytes = original_execution_plan.to_proto()
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)

assert str(original_execution_plan) == str(execution_plan)
Loading

0 comments on commit d181a30

Please sign in to comment.