Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jan 3, 2025
1 parent a7bbd3c commit c9a8041
Show file tree
Hide file tree
Showing 28 changed files with 107 additions and 23 deletions.
2 changes: 2 additions & 0 deletions opteryx/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from opteryx.connectors.disk_connector import DiskConnector
from opteryx.connectors.gcp_cloudstorage_connector import GcpCloudStorageConnector
from opteryx.connectors.gcp_firestore_connector import GcpFireStoreConnector
from opteryx.connectors.iceberg_connector import IcebergConnector
from opteryx.connectors.mongodb_connector import MongoDbConnector
from opteryx.connectors.sql_connector import SqlConnector
from opteryx.shared import MaterializedDatasets
Expand All @@ -35,6 +36,7 @@
"DiskConnector",
"GcpCloudStorageConnector",
"GcpFireStoreConnector",
"IcebergConnector",
"MongoDbConnector",
"SqlConnector",
)
Expand Down
4 changes: 4 additions & 0 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@


class DiskConnector(BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous):
"""
Connector for reading datasets from files on local storage.
"""

__mode__ = "Blob"
__type__ = "LOCAL"

Expand Down
4 changes: 4 additions & 0 deletions opteryx/connectors/file_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@


class FileConnector(BaseConnector, PredicatePushable):
"""
Connector for reading datasets from a file.
"""

__mode__ = "Blob"
__type__ = "FILE"
_byte_array: Optional[bytes] = None # Instance attribute to store file bytes
Expand Down
52 changes: 52 additions & 0 deletions opteryx/connectors/iceberg_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License at http://www.apache.org/licenses/LICENSE-2.0
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.

"""
Arrow Reader
Used to read datasets registered using the register_arrow or register_df functions.
"""

import pyarrow
from orso.schema import FlatColumn
from orso.schema import RelationSchema

from opteryx.connectors.base.base_connector import DEFAULT_MORSEL_SIZE
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.shared import MaterializedDatasets
from opteryx.utils import arrow


class IcebergConnector(BaseConnector):
__mode__ = "Internal"
__type__ = "ARROW"

def __init__(self, *args, **kwargs):
BaseConnector.__init__(self, **kwargs)

self.dataset = self.dataset.lower()
self._datasets = MaterializedDatasets()

def get_dataset_schema(self) -> RelationSchema:
dataset = self._datasets[self.dataset]
arrow_schema = dataset.schema

self.schema = RelationSchema(
name=self.dataset,
columns=[FlatColumn.from_arrow(field) for field in arrow_schema],
)

return self.schema

def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
dataset = self._datasets[self.dataset]

batch_size = DEFAULT_MORSEL_SIZE // (dataset.nbytes / dataset.num_rows)

for batch in dataset.to_batches(max_chunksize=batch_size):
morsel = pyarrow.Table.from_batches([batch], schema=dataset.schema)
if columns:
morsel = arrow.post_read_projector(morsel, columns)
yield morsel
4 changes: 2 additions & 2 deletions opteryx/planner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ def query_planner(
from opteryx.models import QueryProperties
from opteryx.planner.ast_rewriter import do_ast_rewriter
from opteryx.planner.binder import do_bind_phase
from opteryx.planner.cost_based_optimizer import do_cost_based_optimizer
from opteryx.planner.logical_planner import apply_visibility_filters
from opteryx.planner.logical_planner import do_logical_planning_phase
from opteryx.planner.optimizer import do_optimizer
from opteryx.planner.physical_planner import create_physical_plan
from opteryx.planner.sql_rewriter import do_sql_rewrite
from opteryx.third_party import sqloxide
Expand Down Expand Up @@ -182,7 +182,7 @@ def query_planner(
statistics.time_planning_binder += time.monotonic_ns() - start

start = time.monotonic_ns()
optimized_plan = do_cost_based_optimizer(bound_plan, statistics)
optimized_plan = do_optimizer(bound_plan, statistics)
statistics.time_planning_optimizer += time.monotonic_ns() - start

# before we write the new optimizer and execution engine, convert to a V1 plan
Expand Down
4 changes: 2 additions & 2 deletions opteryx/planner/executor/v2_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def query_planner(
from opteryx.models import QueryProperties
from opteryx.planner.ast_rewriter import do_ast_rewriter
from opteryx.planner.binder import do_bind_phase
from opteryx.planner.cost_based_optimizer import do_cost_based_optimizer
from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.logical_planner import do_logical_planning_phase
from opteryx.planner.optimizer import do_optimizer
from opteryx.planner.sql_rewriter import do_sql_rewrite
from opteryx.planner.temporary_physical_planner import create_physical_plan
from opteryx.third_party import sqloxide
Expand Down Expand Up @@ -130,7 +130,7 @@ def query_planner(
# common_table_expressions=ctes,
)

optimized_plan = do_cost_based_optimizer(bound_plan, statistics)
optimized_plan = do_optimizer(bound_plan, statistics)

# before we write the new optimizer and execution engine, convert to a V1 plan
query_properties = QueryProperties(qid=qid, variables=conn.context.variables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@
- Context: Maintains the state during optimization, including the pre-optimized and optimized plans.
The `CostBasedOptimizerVisitor` class orchestrates the optimization process by applying each strategy
in sequence. The `do_cost_based_optimizer` function serves as the entry point for optimizing a logical plan.
in sequence. The `do_optimizer` function serves as the entry point for optimizing a logical plan.
Example Usage:
optimized_plan = do_cost_based_optimizer(logical_plan)
optimized_plan = do_optimizer(logical_plan)
This module aims to enhance query performance through systematic and incremental optimization steps.
"""

from opteryx.config import DISABLE_OPTIMIZER
from opteryx.models import QueryStatistics
from opteryx.planner.cost_based_optimizer.strategies import *
from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.optimizer.strategies import *

from .strategies.optimization_strategy import OptimizerContext

__all__ = "do_cost_based_optimizer"
__all__ = "do_optimizer"


class CostBasedOptimizerVisitor:
Expand Down Expand Up @@ -134,9 +134,9 @@ def optimize(self, plan: LogicalPlan) -> LogicalPlan:
return current_plan


def do_cost_based_optimizer(plan: LogicalPlan, statistics: QueryStatistics) -> LogicalPlan:
def do_optimizer(plan: LogicalPlan, statistics: QueryStatistics) -> LogicalPlan:
"""
Perform cost-based optimization on the given logical plan.
Perform optimization on the given logical plan.
Parameters:
plan (LogicalPlan): The logical plan to optimize.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ select = ["SIM"]
ignore = []

[tool.ruff.lint.per-file-ignores]
"**/cost_based_optimizer/**" = ["SIM102"]
"**/optimizer/**" = ["SIM102"]
"opteryx/managers/expression/ops.py" = ["SIM118"]
46 changes: 34 additions & 12 deletions tests/catalog/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@

import os
import sys

sys.path.insert(1, os.path.join(sys.path[0], "../.."))

from tests.tools import is_arm, is_mac, is_windows, skip_if
import opteryx
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.catalog import load_catalog
from opteryx.connectors import DiskConnector
from opteryx.connectors import IcebergConnector

BASE_PATH: str = "tmp/iceberg"

@skip_if(is_arm() or is_windows() or is_mac())
def set_up_iceberg():
"""
Set up a local Iceberg catalog for testing with NVD data.
Expand All @@ -19,6 +25,9 @@ def set_up_iceberg():
Returns:
str: Path to the created Iceberg table.
"""

from pyiceberg.catalog.sql import SqlCatalog

# Clean up previous test runs if they exist
if os.path.exists(BASE_PATH):
import shutil
Expand Down Expand Up @@ -47,15 +56,28 @@ def set_up_iceberg():
print(f"Iceberg table set up at {BASE_PATH}")
return BASE_PATH

set_up_iceberg()

catalog = load_catalog(
"default",
**{
"uri": f"sqlite:///{BASE_PATH}/pyiceberg_catalog.db",
"warehouse": f"file://{BASE_PATH}",
},
)
def test_iceberg_basic():

from pyiceberg.catalog import load_catalog

set_up_iceberg()

catalog = load_catalog(
"default",
**{
"uri": f"sqlite:///{BASE_PATH}/pyiceberg_catalog.db",
"warehouse": f"file://{BASE_PATH}",
},
)

opteryx.register_store("iceberg", IcebergConnector, io=DiskConnector)

table = catalog.load_table("iceberg.tweets")
print(table.scan().to_arrow())


if __name__ == "__main__": # pragma: no cover
from tests.tools import run_tests

table = catalog.load_table("iceberg.tweets")
print(table.scan().to_arrow())
run_tests()

0 comments on commit c9a8041

Please sign in to comment.