Skip to content

Commit

Permalink
Merge pull request #2242 from mabel-dev/#2215
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Jan 14, 2025
2 parents 726408b + bf2e3a8 commit 638a125
Show file tree
Hide file tree
Showing 33 changed files with 683 additions and 15 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,4 @@ space_missions.parquet
planets.parquet
tmp/iceberg/**
hits_split/*.parquet
clickbench/**
6 changes: 3 additions & 3 deletions opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 979
__build__ = 982

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,8 +21,8 @@ class VersionStatus(Enum):

_major = 0
_minor = 19
_revision = 1
_status = VersionStatus.RELEASE
_revision = 2
_status = VersionStatus.BETA

__author__ = "@joocer"
__version__ = f"{_major}.{_minor}.{_revision}" + (
Expand Down
10 changes: 9 additions & 1 deletion opteryx/connectors/capabilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,13 @@
from opteryx.connectors.capabilities.limit_pushable import LimitPushable
from opteryx.connectors.capabilities.partitionable import Partitionable
from opteryx.connectors.capabilities.predicate_pushable import PredicatePushable
from opteryx.connectors.capabilities.statistics import Statistics

__all__ = ("Asynchronous", "Cacheable", "LimitPushable", "Partitionable", "PredicatePushable")
__all__ = (
"Asynchronous",
"Cacheable",
"LimitPushable",
"Partitionable",
"PredicatePushable",
"Statistics",
)
31 changes: 31 additions & 0 deletions opteryx/connectors/capabilities/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License at http://www.apache.org/licenses/LICENSE-2.0
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.


from typing import Optional

from orso.schema import RelationSchema

from opteryx.models import RelationStatistics


class Statistics:
def __init__(self, statistics: dict, **kwargs):
self.relation_statistics = RelationStatistics()

def map_statistics(
self, statistics: Optional[RelationStatistics], schema: RelationSchema
) -> RelationSchema:
if statistics is None:
return schema

schema.record_count_metric = statistics.record_count

for column in schema.columns:
column.highest_value = statistics.upper_bounds.get(column.name, None)
column.lowest_value = statistics.lower_bounds.get(column.name, None)
column.null_count = statistics.null_count.get(column.name, None)

return schema
10 changes: 7 additions & 3 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,23 @@ def read_dataset(
blob_name=blob_name,
statistics=self.statistics,
decoder=decoder,
just_schema=just_schema,
just_schema=False,
projection=columns,
selection=predicates,
)
self.statistics.rows_seen += num_rows
yield decoded
else:
yield read_blob(
schema = read_blob(
blob_name=blob_name,
statistics=self.statistics,
decoder=decoder,
just_schema=just_schema,
just_schema=True,
)
if schema.row_count_metric:
schema.row_count_metric *= len(blob_names)
self.statistics.estimated_row_count += schema.row_count_metric
yield schema

except UnsupportedFileTypeError:
pass # Skip unsupported file types
Expand Down
5 changes: 4 additions & 1 deletion opteryx/connectors/file_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.connectors.capabilities import PredicatePushable
from opteryx.connectors.capabilities import Statistics
from opteryx.connectors.disk_connector import read_blob
from opteryx.exceptions import DatasetNotFoundError
from opteryx.utils.file_decoders import get_decoder


class FileConnector(BaseConnector, PredicatePushable):
class FileConnector(BaseConnector, PredicatePushable, Statistics):
"""
Connector for reading datasets from a file.
"""
Expand Down Expand Up @@ -58,6 +59,7 @@ def interal_only(self):
def __init__(self, *args, **kwargs):
BaseConnector.__init__(self, **kwargs)
PredicatePushable.__init__(self, **kwargs)
Statistics.__init__(self, **kwargs)
if ".." in self.dataset or self.dataset[0] in ("\\", "/", "~"):
# Don't find any datasets which look like path traversal
raise DatasetNotFoundError(dataset=self.dataset)
Expand Down Expand Up @@ -90,4 +92,5 @@ def get_dataset_schema(self) -> RelationSchema:
size = os.path.getsize(self.dataset)
_map = mmap.mmap(file_descriptor, size, access=mmap.ACCESS_READ)
self.schema = self.decoder(_map, just_schema=True)
self.relation_statistics = self.decoder(_map, just_statistics=True)
return self.schema
12 changes: 11 additions & 1 deletion opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from opteryx.connectors.capabilities import Cacheable
from opteryx.connectors.capabilities import Partitionable
from opteryx.connectors.capabilities import PredicatePushable
from opteryx.connectors.capabilities import Statistics
from opteryx.exceptions import DatasetNotFoundError
from opteryx.exceptions import DatasetReadError
from opteryx.exceptions import MissingDependencyError
Expand Down Expand Up @@ -51,7 +52,7 @@ def get_storage_credentials():


class GcpCloudStorageConnector(
BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous
BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous, Statistics
):
__mode__ = "Blob"
__type__ = "GCS"
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(self, credentials=None, **kwargs):
Cacheable.__init__(self, **kwargs)
PredicatePushable.__init__(self, **kwargs)
Asynchronous.__init__(self, **kwargs)
Statistics.__init__(self, **kwargs)

self.dataset = self.dataset.replace(".", OS_SEP)
self.credentials = credentials
Expand Down Expand Up @@ -253,6 +255,8 @@ def read_dataset(
selection=predicates,
just_schema=just_schema,
)
if len(blob_names) == 1:
self.relation_statistics = decoder(blob_bytes, just_statistics=True)
except Exception as err:
raise DatasetReadError(f"Unable to read file {blob_name} ({err})") from err

Expand All @@ -268,10 +272,16 @@ def get_dataset_schema(self) -> RelationSchema:
if self.schema:
return self.schema

number_of_blobs = sum(len(b) for b in self.blob_list.values())

# Read first blob for schema inference and cache it
self.schema = next(self.read_dataset(just_schema=True), None)

if self.schema is None:
raise DatasetNotFoundError(dataset=self.dataset)

if self.schema.row_count_metric:
self.schema.row_count_metric *= number_of_blobs
self.statistics.estimated_row_count += self.schema.row_count_metric

return self.schema
5 changes: 4 additions & 1 deletion opteryx/connectors/virtual_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.connectors.base.base_connector import DatasetReader
from opteryx.connectors.capabilities import Partitionable
from opteryx.connectors.capabilities import Statistics
from opteryx.exceptions import DatasetNotFoundError
from opteryx.utils import arrow

Expand Down Expand Up @@ -52,13 +53,14 @@ def suggest(dataset):
)


class SampleDataConnector(BaseConnector, Partitionable):
class SampleDataConnector(BaseConnector, Partitionable, Statistics):
__mode__ = "Internal"
__type__ = "SAMPLE"

def __init__(self, *args, **kwargs):
BaseConnector.__init__(self, **kwargs)
Partitionable.__init__(self, **kwargs)
Statistics.__init__(self, **kwargs)
self.dataset = self.dataset.lower()
self.variables = None

Expand All @@ -80,6 +82,7 @@ def get_dataset_schema(self) -> RelationSchema:
suggestion = suggest(self.dataset)
raise DatasetNotFoundError(suggestion=suggestion, dataset=self.dataset)
data_provider, _ = WELL_KNOWN_DATASETS.get(self.dataset)
self.relation_statistics = data_provider.statistics()
return data_provider.schema()


Expand Down
6 changes: 6 additions & 0 deletions opteryx/managers/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from opteryx.exceptions import InvalidInternalStateError

from .parallel_engine import execute as parallel_execute
from .serial_engine import execute as serial_execute


Expand All @@ -11,4 +12,9 @@ def execute(plan, statistics):
# Label the join legs to ensure left/right ordering
plan.label_join_legs()

"""
If we have 1 CPU, or less than 1Gb/CPU we use the serial engine.
"""

# yield from parallel_execute(plan, statistics=statistics)
yield from serial_execute(plan, statistics=statistics)
124 changes: 124 additions & 0 deletions opteryx/managers/execution/parallel_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import multiprocessing as mp
from queue import Empty
from typing import Any
from typing import Generator
from typing import Tuple

import pyarrow

from opteryx import EOS
from opteryx.constants import ResultType
from opteryx.exceptions import InvalidInternalStateError
from opteryx.models import PhysicalPlan
from opteryx.models import QueryStatistics

WORKERS = 4
kill = object()


def execute(
plan: PhysicalPlan, statistics: QueryStatistics = None, num_workers: int = WORKERS
) -> Tuple[Generator[pyarrow.Table, Any, Any], ResultType]:
"""
Execute the physical plan with morsel-level parallelism.
Parameters:
plan: PhysicalPlan
The physical plan to execute.
statistics: QueryStatistics, optional
Object to collect query statistics, defaults to None.
num_workers: int, optional
Number of parallel workers for processing morsels, defaults to 4.
Returns:
Tuple[Generator[pyarrow.Table, Any, Any], ResultType]
A generator producing pyarrow tables and the result type.
"""
try:
mp.set_start_method("fork", force=True)

# Ensure there's a single head node
head_nodes = list(set(plan.get_exit_points()))
if len(head_nodes) != 1:
raise InvalidInternalStateError(
f"Query plan has {len(head_nodes)} heads, expected exactly 1."
)

head_node = plan[head_nodes[0]]

# Queue for incoming morsels and a queue for results
work_queue = mp.Queue()
result_queue = mp.Queue()

# Create a worker pool for processing morsels
pool = mp.Pool(num_workers, _worker_init, (plan, work_queue, result_queue))

def inner_execute(plan: PhysicalPlan) -> Generator:
# Get the pump nodes from the plan and execute them in order
pump_nodes = [
(nid, node) for nid, node in plan.depth_first_search_flat() if node.is_scan
]
for pump_nid, pump_instance in pump_nodes:
work_queue.put((pump_nid, None, None))
work_queue.put((pump_nid, EOS, None))
while True:
try:
result = result_queue.get(timeout=0.1)
print("got final result", type(result))
if result == EOS:
continue
return result
except Empty:
pass

result_generator = inner_execute(plan)

print("I'm done here")

# pool.close()
# pool.join()

return result_generator, ResultType.TABULAR

finally:
# Close and join the pool after execution
pass


def _worker_init(plan: PhysicalPlan, work_queue: mp.Queue, completion_queue: mp.Queue):
"""
Initialize the worker process for morsel-level parallelism.
Parameters:
plan: PhysicalPlan
The overall physical plan.
morsel_queue: mp.Queue
Queue from which morsels are fetched.
result_queue: mp.Queue
Queue to which processed morsels are pushed.
"""
while True:
try:
work = work_queue.get(timeout=0.1)
except Empty:
continue

nid, morsel, join_leg = work

operator = plan[nid]

results = operator(morsel, join_leg)

if results is None:
continue

print("Worker got work for", operator.name, type(morsel), "results")

for result in (result for result in results if result is not None):
children = plan.outgoing_edges(nid)
print("results", type(result), children)
if len(children) == 0:
print("done")
completion_queue.put(result)
for _, child, leg in children:
work_queue.put((child, result, leg))
2 changes: 2 additions & 0 deletions opteryx/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from opteryx.models.physical_plan import PhysicalPlan
from opteryx.models.query_properties import QueryProperties
from opteryx.models.query_statistics import QueryStatistics
from opteryx.models.relation_statistics import RelationStatistics

__all__ = (
"ConnectionContext",
Expand All @@ -19,4 +20,5 @@
"PhysicalPlan",
"QueryProperties",
"QueryStatistics",
"RelationStatistics",
)
1 change: 1 addition & 0 deletions opteryx/models/logical_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def copy(self):
return LogicalColumn(
node_type=self.node_type,
source_column=self.source_column,
source_connector=self.source_connector,
source=self.source,
alias=self.alias,
schema_column=None
Expand Down
Loading

0 comments on commit 638a125

Please sign in to comment.