Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(rust, python): vectorize integer vec-hash by using very simple, … #5572

Merged
merged 1 commit into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ bitflags.workspace = true
chrono = { version = "0.4", optional = true }
chrono-tz = { version = "0.6", optional = true }
comfy-table = { version = "6.1.1", optional = true }
fxhash = "0.2.1"
hashbrown.workspace = true
hex = { version = "0.4", optional = true }
indexmap = { version = "1", features = ["std"] }
Expand Down
187 changes: 119 additions & 68 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,93 +42,146 @@ pub(crate) fn get_null_hash_value(random_state: RandomState) -> u64 {
hasher.finish()
}

impl<T> VecHash for ChunkedArray<T>
macro_rules! fx_hash_8_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u8>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_16_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u16>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_32_bit {
($val: expr, $k: expr ) => {{
let val = std::mem::transmute::<_, u32>($val);
(val as u64).wrapping_mul($k)
}};
}
macro_rules! fx_hash_64_bit {
($val: expr, $k: expr ) => {{
($val as u64).wrapping_mul($k)
}};
}

fn finish_vec_hash<T>(ca: &ChunkedArray<T>, random_state: RandomState, buf: &mut Vec<u64>)
where
T: PolarsIntegerType,
T::Native: Hash,
{
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
// Note that we don't use the no null branch! This can break in unexpected ways.
// for instance with threading we split an array in n_threads, this may lead to
// splits that have no nulls and splits that have nulls. Then one array is hashed with
// Option<T> and the other array with T.
// Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
// the only deterministic seed.
buf.clear();
buf.reserve(self.len());
self.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.map(|v| random_state.hash_single(v)),
);
});

let null_h = get_null_hash_value(random_state);
let hashes = buf.as_mut_slice();
let null_h = get_null_hash_value(random_state);
let hashes = buf.as_mut_slice();

let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
if arr.null_count() > 0 {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
}
offset += arr.len();
});
}

let mut offset = 0;
self.downcast_iter().for_each(|arr| {
if arr.null_count() > 0 {
fn integer_vec_hash_combine<T>(ca: &ChunkedArray<T>, random_state: RandomState, hashes: &mut [u64])
where
T: PolarsIntegerType,
T::Native: Hash,
{
let null_h = get_null_hash_value(random_state.clone());

let mut offset = 0;
ca.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = random_state.hash_single(v);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
*h = [null_h, *h][valid as usize];
})
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = _boost_hash_combine(
[null_h, random_state.hash_single(l)][valid as usize],
*h,
)
});
}
offset += arr.len();
});
}
}
offset += arr.len();
});
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state.clone());
macro_rules! vec_hash_int {
($ca:ident, $fx_hash:ident) => {
impl VecHash for $ca {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
// Note that we don't use the no null branch! This can break in unexpected ways.
// for instance with threading we split an array in n_threads, this may lead to
// splits that have no nulls and splits that have nulls. Then one array is hashed with
// Option<T> and the other array with T.
// Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
// the only deterministic seed.
buf.clear();
buf.reserve(self.len());

let k: u64 = 0x517cc1b727220a95;
let k = random_state.hash_one(k);

#[allow(unused_unsafe)]
#[allow(clippy::useless_transmute)]
self.downcast_iter().for_each(|arr| {
buf.extend(
arr.values()
.as_slice()
.iter()
.copied()
.map(|v| unsafe { $fx_hash!(v, k) }),
);
});
finish_vec_hash(self, random_state, buf)
}

let mut offset = 0;
self.downcast_iter().for_each(|arr| {
match arr.null_count() {
0 => arr
.values()
.as_slice()
.iter()
.zip(&mut hashes[offset..])
.for_each(|(v, h)| {
let l = random_state.hash_single(v);
*h = _boost_hash_combine(l, *h)
}),
_ => {
let validity = arr.validity().unwrap();
let (slice, byte_offset, _) = validity.as_slice();
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) })
.zip(&mut hashes[offset..])
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = _boost_hash_combine(
[null_h, random_state.hash_single(l)][valid as usize],
*h,
)
});
}
fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
integer_vec_hash_combine(self, random_state, hashes)
}
offset += arr.len();
});
}
}
};
}

vec_hash_int!(Int64Chunked, fx_hash_64_bit);
vec_hash_int!(Int32Chunked, fx_hash_32_bit);
vec_hash_int!(Int16Chunked, fx_hash_16_bit);
vec_hash_int!(Int8Chunked, fx_hash_8_bit);
vec_hash_int!(UInt64Chunked, fx_hash_64_bit);
vec_hash_int!(UInt32Chunked, fx_hash_32_bit);
vec_hash_int!(UInt16Chunked, fx_hash_16_bit);
vec_hash_int!(UInt8Chunked, fx_hash_8_bit);

impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
buf.reserve(self.len());
let null_h = get_null_hash_value(random_state.clone());
// for strings we use fxhash
let fxh = fxhash::FxBuildHasher::default();
self.downcast_iter().for_each(|arr| {
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| fxh.hash_single(v)))
buf.extend(arr.values_iter().map(|v| random_state.hash_single(v)))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => random_state.hash_single(v),
Expand All @@ -139,13 +192,11 @@ impl VecHash for Utf8Chunked {
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
let null_h = get_null_hash_value(random_state);
// for strings we use fxhash
let fxh = fxhash::FxBuildHasher::default();
let null_h = get_null_hash_value(random_state.clone());
self.apply_to_slice(
|opt_v, h| {
let l = match opt_v {
Some(v) => fxh.hash_single(v),
Some(v) => random_state.hash_single(v),
None => null_h,
};
_boost_hash_combine(l, *h)
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ description = "Lazy query engine for the Polars DataFrame library"

[dependencies]
enum_dispatch = "0.3"
fxhash = "0.2.1"
hashbrown.workspace = true
num.workspace = true
polars-arrow = { version = "0.25.1", path = "../../polars-arrow", default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::Any;

use hashbrown::hash_map::RawEntryMut;
use num::NumCast;
use polars_core::export::ahash::RandomState;
use polars_core::frame::row::AnyValueBuffer;
use polars_core::prelude::*;
use polars_core::utils::{_set_partition_size, accumulate_dataframes_vertical_unchecked};
Expand Down Expand Up @@ -42,8 +43,7 @@ pub struct Utf8GroupbySink {
key_column: Arc<dyn PhysicalPipedExpr>,
// the columns that will be aggregated
aggregation_columns: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
hb: fxhash::FxBuildHasher,
// hb: RandomState,
hb: RandomState,
// Initializing Aggregation functions. If we aggregate by 2 columns
// this vec will have two functions. We will use these functions
// to populate the buffer where the hashmap points to
Expand All @@ -63,7 +63,7 @@ impl Utf8GroupbySink {
output_schema: SchemaRef,
slice: Option<(i64, usize)>,
) -> Self {
let hb = fxhash::FxBuildHasher::default();
let hb = Default::default();
let partitions = _set_partition_size();

let pre_agg = load_vec(partitions, || PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE));
Expand Down
2 changes: 0 additions & 2 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions py-polars/polars/internals/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6496,10 +6496,10 @@ def hash_rows(
shape: (4,)
Series: '' [u64]
[
1160655983065896620
8421503603771360652
4702262519505526977
5983473495725024293
12239174968153954787
17976148875586754089
10047419486152048166
13766281409932363907
]

"""
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3630,9 +3630,9 @@ def hash(
│ --- ┆ --- │
│ u64 ┆ u64 │
╞══════════════════════╪══════════════════════╡
4629889412789719550 ┆ 6959506404929392568 │
9774092659964970114 ┆ 6959506404929392568 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
16386608652769605760 ┆ 11638928888656214026 │
1101441246220388612 ┆ 11638928888656214026 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 11638928888656214026 ┆ 11040941213715918520 │
└──────────────────────┴──────────────────────┘
Expand Down
6 changes: 3 additions & 3 deletions py-polars/polars/internals/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4143,9 +4143,9 @@ def hash(
shape: (3,)
Series: 'a' [u64]
[
2374023516666777365
10386026231460783898
17796317186427479491
10734580197236529959
3022416320763508302
13756996518000038261
]

"""
Expand Down
12 changes: 2 additions & 10 deletions py-polars/tests/unit/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -1394,17 +1394,9 @@ def test_reproducible_hash_with_seeds() -> None:
# in the meantime, account for arm64 (mac) hash values to reduce noise
expected = pl.Series(
"s",
[
7179856081800753525,
15496313222292466864,
4963241831945886452,
]
[8823051245921001677, 988796329533502010, 7528667241828618484]
if platform.mac_ver()[-1] == "arm64"
else [
8823051245921001677,
988796329533502010,
7528667241828618484,
],
else [6629530352159708028, 988796329533502010, 6048298245521876612],
dtype=pl.UInt64,
)
result = df.hash_rows(*seeds)
Expand Down