Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2266 #2274

Merged
merged 4 commits into from
Jan 19, 2025
Merged

#2266 #2274

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 996
__build__ = 998

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
6 changes: 6 additions & 0 deletions opteryx/compiled/functions/murmurhash3_32.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# cython: language_level=3

from libc.stdint cimport uint32_t


cdef uint32_t cy_murmurhash3(const void *key, uint32_t len, uint32_t seed) nogil
81 changes: 81 additions & 0 deletions opteryx/compiled/functions/murmurhash3_32.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: nonecheck=False
# cython: overflowcheck=False
# cython: cdivision=True
# distutils: language=c++

from libc.stdint cimport uint32_t
from cpython cimport PyUnicode_AsUTF8String

# MurmurHash3 implementation
cdef inline uint32_t cy_murmurhash3(const void *key, uint32_t len, uint32_t seed) nogil:
cdef uint32_t c1 = 0xcc9e2d51
cdef uint32_t c2 = 0x1b873593
cdef uint32_t r1 = 15
cdef uint32_t r2 = 13
cdef uint32_t m = 5
cdef uint32_t n = 0xe6546b64

cdef const unsigned char *data = <const unsigned char *>key
cdef uint32_t nblocks = len >> 2
cdef uint32_t h1 = seed
cdef uint32_t k1 = 0

# body
cdef const uint32_t *blocks = <const uint32_t *>(data)
for i in range(nblocks):
k1 = blocks[i]

k1 *= c1
k1 = (k1 << r1) | (k1 >> (32 - r1))
k1 *= c2

h1 ^= k1
h1 = (h1 << r2) | (h1 >> (32 - r2))
h1 = h1 * m + n

# tail
cdef const unsigned char *tail = data + (nblocks << 2)
k1 = 0

if len & 3 == 3:
k1 ^= tail[2] << 16
if len & 3 >= 2:
k1 ^= tail[1] << 8
if len & 3 >= 1:
k1 ^= tail[0]
k1 *= c1
k1 = (k1 << r1) | (k1 >> (32 - r1))
k1 *= c2
h1 ^= k1

# finalization
h1 ^= len
h1 ^= (h1 >> 16)
h1 *= <uint32_t>0x85ebca6b
h1 ^= (h1 >> 13)
h1 *= <uint32_t>0xc2b2ae35
h1 ^= (h1 >> 16)
return h1


# Python wrapper
cpdef uint32_t murmurhash3(str key, uint32_t seed):
"""
Hashes a string using MurmurHash3 32-bit.

Parameters:
key: str
The input string to hash.
seed: uint32_t
The seed value for the hash function.
Default 703115 is the duration of Apollo 11 in seconds.

Returns:
uint32_t: The resulting hash value.
"""
cdef bytes key_bytes = PyUnicode_AsUTF8String(key)
cdef const char *key_ptr = key_bytes
return cy_murmurhash3(<const void *>key_ptr, len(key_bytes), seed)
25 changes: 25 additions & 0 deletions opteryx/compiled/structures/bloom_filter.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=False
# cython: boundscheck=False

from libc.stdint cimport uint32_t
cimport numpy as cnp

# Declaration of the BloomFilter class
cdef class BloomFilter:
cdef unsigned char* bit_array
cdef uint32_t bit_array_size
cdef uint32_t byte_array_size

cpdef void add(self, bytes member)
cpdef bint possibly_contains(self, bytes member)
cdef inline bint _possibly_contains(self, bytes member)
cpdef cnp.ndarray[cnp.npy_bool, ndim=1] possibly_contains_many(self, cnp.ndarray keys)
cpdef memoryview serialize(self)

cpdef BloomFilter deserialize(const unsigned char* data)
cpdef BloomFilter create_bloom_filter(keys)
126 changes: 126 additions & 0 deletions opteryx/compiled/structures/bloom_filter.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=False
# cython: boundscheck=False

"""
This is not a general perpose Bloom Filter, if used outside of Draken, it may not
perform entirely as expected as it is optimized for a specific configuration.

We have two size options, both using 2 hashes:
- A 512k slot bit array for up to 50k items (about 3% FPR)
- a 8m slot bit array for up to 1m items (about 2% FPR)

We perform one hash and then use a calculation based on the golden ratio to
determine the second position.
"""

from libc.stdlib cimport malloc, free
from libc.string cimport memset, memcpy
from libc.stdint cimport uint64_t

from opteryx.compiled.functions.murmurhash3_32 cimport cy_murmurhash3

import numpy
cimport numpy as cnp
import pyarrow

# Define sizes for the two Bloom filters
cdef uint32_t BYTE_ARRAY_SIZE_SMALL = 64 * 1024 # 64 KB for ~50K records
cdef uint32_t BYTE_ARRAY_SIZE_LARGE = 1024 * 1024 # 1 MB for ~1M records

cdef uint32_t BIT_ARRAY_SIZE_SMALL = BYTE_ARRAY_SIZE_SMALL << 3 # 512 Kbits
cdef uint32_t BIT_ARRAY_SIZE_LARGE = BYTE_ARRAY_SIZE_LARGE << 3 # 8 Mbits


cdef inline void set_bit(unsigned char* bit_array, uint32_t bit):
cdef uint32_t byte_idx = bit >> 3
cdef uint32_t bit_idx = bit & 7
bit_array[byte_idx] |= 1 << bit_idx

cdef class BloomFilter:
# cdef unsigned char* bit_array # defined in the .pxd file only
# cdef uint32_t bit_array_size
# cdef uint32_t byte_array_size

def __cinit__(self, uint32_t expected_records=50000):
"""Initialize Bloom Filter based on expected number of records."""
if expected_records <= 50000:
self.byte_array_size = BYTE_ARRAY_SIZE_SMALL
self.bit_array_size = BIT_ARRAY_SIZE_SMALL
elif expected_records <= 1000000:
self.byte_array_size = BYTE_ARRAY_SIZE_LARGE
self.bit_array_size = BIT_ARRAY_SIZE_LARGE
else:
raise ValueError("Too many records for this Bloom filter implementation")

# Allocate memory
self.bit_array = <unsigned char*>malloc(self.byte_array_size)
if not self.bit_array:
raise MemoryError("Failed to allocate memory for the Bloom filter.")
memset(self.bit_array, 0, self.byte_array_size)

def __dealloc__(self):
if self.bit_array:
free(self.bit_array)

cpdef void add(self, bytes member):
cdef uint32_t item, h1, h2

item = cy_murmurhash3(<char*>member, len(member), 0)
h1 = item & (self.bit_array_size - 1)
# Apply the golden ratio to the item and use modulo to wrap within the size of the bit array
h2 = (item * 2654435769U) & (self.bit_array_size - 1)
# Set bits
set_bit(self.bit_array, h1)
set_bit(self.bit_array, h2)

cdef inline bint _possibly_contains(self, bytes member):
"""Check if the item might be in the set"""
cdef uint32_t item, h1, h2

item = cy_murmurhash3(<char*>member, len(member), 0)
h1 = item & (self.bit_array_size - 1)
# Apply the golden ratio to the item and mask within the size of the bit array
h2 = (item * 2654435769U) & (self.bit_array_size - 1)
# Check bits using bitwise AND
return ((self.bit_array[h1 >> 3] & (1 << (h1 & 7))) != 0) and \
((self.bit_array[h2 >> 3] & (1 << (h2 & 7))) != 0)

cpdef bint possibly_contains(self, bytes member):
return self._possibly_contains(member)

cpdef cnp.ndarray[cnp.npy_bool, ndim=1] possibly_contains_many(self, cnp.ndarray keys):
cdef Py_ssize_t i
cdef Py_ssize_t n = len(keys)
cdef cnp.ndarray[cnp.npy_bool, ndim=1] result = numpy.zeros(n, dtype=bool)

for i in range(n):
key = keys[i]
if key is not None and self._possibly_contains(key):
result[i] = 1
return result

cpdef memoryview serialize(self):
"""Serialize the Bloom filter to a memory view"""
return memoryview(self.bit_array[:self.byte_array_size])

cpdef BloomFilter deserialize(const unsigned char* data):
"""Deserialize a memory view to a Bloom filter"""
cdef BloomFilter bf = BloomFilter()
memcpy(bf.bit_array, data, bf.byte_array_size)
return bf


cpdef BloomFilter create_bloom_filter(keys):
cdef BloomFilter bf = BloomFilter(len(keys))

keys = keys.drop_null()
keys = keys.cast(pyarrow.binary()).to_numpy(False)
for key in keys:
bf.add(key)

return bf
9 changes: 9 additions & 0 deletions opteryx/functions/string_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,16 @@ def match_against(arr, val):


def regex_replace(array, _pattern, _replacement):
import re

import pyarrow

pattern = _pattern[0]
replacement = _replacement[0]
compiled_pattern = re.compile(pattern)

return compute.replace_substring_regex(array, pattern, replacement)

# Apply the regex replacement to each element in the array
vectorized_replace = numpy.vectorize(lambda x: compiled_pattern.sub(replacement, x))
return pyarrow.array(vectorized_replace(array), type=pyarrow.string())
39 changes: 39 additions & 0 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
from threading import Lock

import pyarrow
from orso.types import OrsoTypes
from pyarrow import Table

from opteryx import EOS
from opteryx.compiled.joins.inner_join import abs_hash_join_map
from opteryx.compiled.structures import hash_join_map
from opteryx.compiled.structures.bloom_filter import create_bloom_filter
from opteryx.compiled.structures.buffers import IntBuffer
from opteryx.models import QueryProperties
from opteryx.utils.arrow import align_tables
Expand Down Expand Up @@ -85,6 +87,7 @@ def __init__(self, properties: QueryProperties, **parameters):
self.left_buffer = []
self.right_buffer = []
self.left_hash = None
self.left_filter = None

self.lock = Lock()

Expand All @@ -109,6 +112,21 @@ def execute(self, morsel: Table, join_leg: str) -> Table:
start = time.monotonic_ns()
self.left_hash = abs_hash_join_map(self.left_relation, self.left_columns)
self.statistics.time_build_hash_map += time.monotonic_ns() - start

left_join_column = [c for c in self.columns if c.schema_column.identity in self.left_columns][0]

if (
self.left_relation.num_rows < 1e6
and len(self.left_columns) == 1
and left_join_column.schema_column.type
in (OrsoTypes.BLOB, OrsoTypes.VARCHAR)
):
start = time.monotonic_ns()
self.left_filter = create_bloom_filter(
self.left_relation.column(self.left_columns[0])
)
self.statistics.time_build_bloom_filter += time.monotonic_ns() - start
self.statistics.feature_bloom_filter += 1
else:
self.left_buffer.append(morsel)
yield None
Expand All @@ -119,6 +137,27 @@ def execute(self, morsel: Table, join_leg: str) -> Table:
yield EOS
return

if self.left_filter is not None:
# Filter the morsel using the bloom filter, it's a quick way to
# reduce the number of rows that need to be joined.
start = time.monotonic_ns()

maybe_in_left = self.left_filter.possibly_contains_many(
morsel.column(self.right_columns[0]).cast(pyarrow.binary()).to_numpy(False)
)

self.statistics.time_bloom_filtering += time.monotonic_ns() - start
morsel = morsel.filter(maybe_in_left)

# If the bloom filter is not effective, disable it.
# In basic benchmarks, the bloom filter is ~15x the speed of the join.
# so the break-even point is about 7% of the rows being eliminated.
eliminated_rows = len(maybe_in_left) - morsel.num_rows
if eliminated_rows < 0.1 * morsel.num_rows:
self.left_filter = None
self.statistics.feature_dynamically_disabled_bloom_filter += 1

self.statistics.rows_eliminated_by_bloom_filter += eliminated_rows
# do the join
new_morsel = inner_join_with_preprocessed_left_side(
left_relation=self.left_relation,
Expand Down
11 changes: 11 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
include_dirs=include_dirs,
extra_compile_args=COMPILE_FLAGS,
),
Extension(
name="opteryx.compiled.functions.murmurhash3_32",
sources=["opteryx/compiled/functions/murmurhash3_32.pyx"],
extra_compile_args=COMPILE_FLAGS,
),
Extension(
name="opteryx.compiled.functions.levenstein",
sources=["opteryx/compiled/functions/levenshtein.pyx"],
Expand Down Expand Up @@ -151,6 +156,12 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
language="c++",
extra_compile_args=COMPILE_FLAGS + ["-std=c++17"],
),
Extension(
name="opteryx.compiled.structures.bloom_filter",
sources=["opteryx/compiled/structures/bloom_filter.pyx"],
include_dirs=include_dirs,
extra_compile_args=COMPILE_FLAGS,
),
Extension(
name="opteryx.compiled.structures.buffers",
sources=["opteryx/compiled/structures/buffers.pyx"],
Expand Down
Loading