From ee79a1749a3c7d092a71ee4c4496045ce2119c66 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 21 Nov 2022 13:18:57 +0100 Subject: [PATCH] perf(rust, python): vectorize integer vec-hash by using very simple, but very fast fxhash --- polars/polars-core/Cargo.toml | 1 - polars/polars-core/src/vector_hasher.rs | 187 +++++++++++------- polars/polars-lazy/polars-pipe/Cargo.toml | 1 - .../src/executors/sinks/groupby/string.rs | 6 +- py-polars/Cargo.lock | 2 - py-polars/polars/internals/dataframe/frame.py | 8 +- py-polars/polars/internals/expr/expr.py | 4 +- py-polars/polars/internals/series/series.py | 6 +- py-polars/tests/unit/test_df.py | 12 +- 9 files changed, 133 insertions(+), 94 deletions(-) diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index 838091c84183..b9aaa5665ecb 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -154,7 +154,6 @@ bitflags.workspace = true chrono = { version = "0.4", optional = true } chrono-tz = { version = "0.6", optional = true } comfy-table = { version = "6.1.1", optional = true } -fxhash = "0.2.1" hashbrown.workspace = true hex = { version = "0.4", optional = true } indexmap = { version = "1", features = ["std"] } diff --git a/polars/polars-core/src/vector_hasher.rs b/polars/polars-core/src/vector_hasher.rs index 01b64ff0049e..2cb87c4a8f61 100644 --- a/polars/polars-core/src/vector_hasher.rs +++ b/polars/polars-core/src/vector_hasher.rs @@ -42,93 +42,146 @@ pub(crate) fn get_null_hash_value(random_state: RandomState) -> u64 { hasher.finish() } -impl VecHash for ChunkedArray +macro_rules! fx_hash_8_bit { + ($val: expr, $k: expr ) => {{ + let val = std::mem::transmute::<_, u8>($val); + (val as u64).wrapping_mul($k) + }}; +} +macro_rules! fx_hash_16_bit { + ($val: expr, $k: expr ) => {{ + let val = std::mem::transmute::<_, u16>($val); + (val as u64).wrapping_mul($k) + }}; +} +macro_rules! fx_hash_32_bit { + ($val: expr, $k: expr ) => {{ + let val = std::mem::transmute::<_, u32>($val); + (val as u64).wrapping_mul($k) + }}; +} +macro_rules! fx_hash_64_bit { + ($val: expr, $k: expr ) => {{ + ($val as u64).wrapping_mul($k) + }}; +} + +fn finish_vec_hash(ca: &ChunkedArray, random_state: RandomState, buf: &mut Vec) where T: PolarsIntegerType, T::Native: Hash, { - fn vec_hash(&self, random_state: RandomState, buf: &mut Vec) { - // Note that we don't use the no null branch! This can break in unexpected ways. - // for instance with threading we split an array in n_threads, this may lead to - // splits that have no nulls and splits that have nulls. Then one array is hashed with - // Option and the other array with T. - // Meaning that they cannot be compared. By always hashing on Option the random_state is - // the only deterministic seed. - buf.clear(); - buf.reserve(self.len()); - self.downcast_iter().for_each(|arr| { - buf.extend( - arr.values() - .as_slice() - .iter() - .map(|v| random_state.hash_single(v)), - ); - }); - - let null_h = get_null_hash_value(random_state); - let hashes = buf.as_mut_slice(); + let null_h = get_null_hash_value(random_state); + let hashes = buf.as_mut_slice(); + + let mut offset = 0; + ca.downcast_iter().for_each(|arr| { + if arr.null_count() > 0 { + let validity = arr.validity().unwrap(); + let (slice, byte_offset, _) = validity.as_slice(); + (0..validity.len()) + .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) }) + .zip(&mut hashes[offset..]) + .for_each(|(valid, h)| { + *h = [null_h, *h][valid as usize]; + }) + } + offset += arr.len(); + }); +} - let mut offset = 0; - self.downcast_iter().for_each(|arr| { - if arr.null_count() > 0 { +fn integer_vec_hash_combine(ca: &ChunkedArray, random_state: RandomState, hashes: &mut [u64]) +where + T: PolarsIntegerType, + T::Native: Hash, +{ + let null_h = get_null_hash_value(random_state.clone()); + + let mut offset = 0; + ca.downcast_iter().for_each(|arr| { + match arr.null_count() { + 0 => arr + .values() + .as_slice() + .iter() + .zip(&mut hashes[offset..]) + .for_each(|(v, h)| { + let l = random_state.hash_single(v); + *h = _boost_hash_combine(l, *h) + }), + _ => { let validity = arr.validity().unwrap(); let (slice, byte_offset, _) = validity.as_slice(); (0..validity.len()) .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) }) .zip(&mut hashes[offset..]) - .for_each(|(valid, h)| { - *h = [null_h, *h][valid as usize]; - }) + .zip(arr.values().as_slice()) + .for_each(|((valid, h), l)| { + *h = _boost_hash_combine( + [null_h, random_state.hash_single(l)][valid as usize], + *h, + ) + }); } - offset += arr.len(); - }); - } + } + offset += arr.len(); + }); +} - fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) { - let null_h = get_null_hash_value(random_state.clone()); +macro_rules! vec_hash_int { + ($ca:ident, $fx_hash:ident) => { + impl VecHash for $ca { + fn vec_hash(&self, random_state: RandomState, buf: &mut Vec) { + // Note that we don't use the no null branch! This can break in unexpected ways. + // for instance with threading we split an array in n_threads, this may lead to + // splits that have no nulls and splits that have nulls. Then one array is hashed with + // Option and the other array with T. + // Meaning that they cannot be compared. By always hashing on Option the random_state is + // the only deterministic seed. + buf.clear(); + buf.reserve(self.len()); + + let k: u64 = 0x517cc1b727220a95; + let k = random_state.hash_one(k); + + #[allow(unused_unsafe)] + #[allow(clippy::useless_transmute)] + self.downcast_iter().for_each(|arr| { + buf.extend( + arr.values() + .as_slice() + .iter() + .copied() + .map(|v| unsafe { $fx_hash!(v, k) }), + ); + }); + finish_vec_hash(self, random_state, buf) + } - let mut offset = 0; - self.downcast_iter().for_each(|arr| { - match arr.null_count() { - 0 => arr - .values() - .as_slice() - .iter() - .zip(&mut hashes[offset..]) - .for_each(|(v, h)| { - let l = random_state.hash_single(v); - *h = _boost_hash_combine(l, *h) - }), - _ => { - let validity = arr.validity().unwrap(); - let (slice, byte_offset, _) = validity.as_slice(); - (0..validity.len()) - .map(|i| unsafe { get_bit_unchecked(slice, i + byte_offset) }) - .zip(&mut hashes[offset..]) - .zip(arr.values().as_slice()) - .for_each(|((valid, h), l)| { - *h = _boost_hash_combine( - [null_h, random_state.hash_single(l)][valid as usize], - *h, - ) - }); - } + fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) { + integer_vec_hash_combine(self, random_state, hashes) } - offset += arr.len(); - }); - } + } + }; } +vec_hash_int!(Int64Chunked, fx_hash_64_bit); +vec_hash_int!(Int32Chunked, fx_hash_32_bit); +vec_hash_int!(Int16Chunked, fx_hash_16_bit); +vec_hash_int!(Int8Chunked, fx_hash_8_bit); +vec_hash_int!(UInt64Chunked, fx_hash_64_bit); +vec_hash_int!(UInt32Chunked, fx_hash_32_bit); +vec_hash_int!(UInt16Chunked, fx_hash_16_bit); +vec_hash_int!(UInt8Chunked, fx_hash_8_bit); + impl VecHash for Utf8Chunked { fn vec_hash(&self, random_state: RandomState, buf: &mut Vec) { buf.clear(); buf.reserve(self.len()); let null_h = get_null_hash_value(random_state.clone()); - // for strings we use fxhash - let fxh = fxhash::FxBuildHasher::default(); self.downcast_iter().for_each(|arr| { if arr.null_count() == 0 { - buf.extend(arr.values_iter().map(|v| fxh.hash_single(v))) + buf.extend(arr.values_iter().map(|v| random_state.hash_single(v))) } else { buf.extend(arr.into_iter().map(|opt_v| match opt_v { Some(v) => random_state.hash_single(v), @@ -139,13 +192,11 @@ impl VecHash for Utf8Chunked { } fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) { - let null_h = get_null_hash_value(random_state); - // for strings we use fxhash - let fxh = fxhash::FxBuildHasher::default(); + let null_h = get_null_hash_value(random_state.clone()); self.apply_to_slice( |opt_v, h| { let l = match opt_v { - Some(v) => fxh.hash_single(v), + Some(v) => random_state.hash_single(v), None => null_h, }; _boost_hash_combine(l, *h) diff --git a/polars/polars-lazy/polars-pipe/Cargo.toml b/polars/polars-lazy/polars-pipe/Cargo.toml index 83fbe51c47a3..1a12def55995 100644 --- a/polars/polars-lazy/polars-pipe/Cargo.toml +++ b/polars/polars-lazy/polars-pipe/Cargo.toml @@ -10,7 +10,6 @@ description = "Lazy query engine for the Polars DataFrame library" [dependencies] enum_dispatch = "0.3" -fxhash = "0.2.1" hashbrown.workspace = true num.workspace = true polars-arrow = { version = "0.25.1", path = "../../polars-arrow", default-features = false } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs index 7be78c81b0b7..f318c001de2f 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs @@ -2,6 +2,7 @@ use std::any::Any; use hashbrown::hash_map::RawEntryMut; use num::NumCast; +use polars_core::export::ahash::RandomState; use polars_core::frame::row::AnyValueBuffer; use polars_core::prelude::*; use polars_core::utils::{_set_partition_size, accumulate_dataframes_vertical_unchecked}; @@ -42,8 +43,7 @@ pub struct Utf8GroupbySink { key_column: Arc, // the columns that will be aggregated aggregation_columns: Arc>>, - hb: fxhash::FxBuildHasher, - // hb: RandomState, + hb: RandomState, // Initializing Aggregation functions. If we aggregate by 2 columns // this vec will have two functions. We will use these functions // to populate the buffer where the hashmap points to @@ -63,7 +63,7 @@ impl Utf8GroupbySink { output_schema: SchemaRef, slice: Option<(i64, usize)>, ) -> Self { - let hb = fxhash::FxBuildHasher::default(); + let hb = Default::default(); let partitions = _set_partition_size(); let pre_agg = load_vec(partitions, || PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE)); diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 7d482b74dd21..4806e7f3cc15 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1497,7 +1497,6 @@ dependencies = [ "chrono", "chrono-tz", "comfy-table", - "fxhash", "hashbrown 0.13.1", "hex", "indexmap", @@ -1580,7 +1579,6 @@ name = "polars-pipe" version = "0.25.1" dependencies = [ "enum_dispatch", - "fxhash", "hashbrown 0.13.1", "num", "polars-arrow", diff --git a/py-polars/polars/internals/dataframe/frame.py b/py-polars/polars/internals/dataframe/frame.py index 1d22ac9fe8b8..eb548f4556d7 100644 --- a/py-polars/polars/internals/dataframe/frame.py +++ b/py-polars/polars/internals/dataframe/frame.py @@ -6496,10 +6496,10 @@ def hash_rows( shape: (4,) Series: '' [u64] [ - 1160655983065896620 - 8421503603771360652 - 4702262519505526977 - 5983473495725024293 + 12239174968153954787 + 17976148875586754089 + 10047419486152048166 + 13766281409932363907 ] """ diff --git a/py-polars/polars/internals/expr/expr.py b/py-polars/polars/internals/expr/expr.py index 7fcfff75c20f..3a273775385e 100644 --- a/py-polars/polars/internals/expr/expr.py +++ b/py-polars/polars/internals/expr/expr.py @@ -3630,9 +3630,9 @@ def hash( │ --- ┆ --- │ │ u64 ┆ u64 │ ╞══════════════════════╪══════════════════════╡ - │ 4629889412789719550 ┆ 6959506404929392568 │ + │ 9774092659964970114 ┆ 6959506404929392568 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ - │ 16386608652769605760 ┆ 11638928888656214026 │ + │ 1101441246220388612 ┆ 11638928888656214026 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ 11638928888656214026 ┆ 11040941213715918520 │ └──────────────────────┴──────────────────────┘ diff --git a/py-polars/polars/internals/series/series.py b/py-polars/polars/internals/series/series.py index 29d3ef16c449..38be2fbccc1c 100644 --- a/py-polars/polars/internals/series/series.py +++ b/py-polars/polars/internals/series/series.py @@ -4143,9 +4143,9 @@ def hash( shape: (3,) Series: 'a' [u64] [ - 2374023516666777365 - 10386026231460783898 - 17796317186427479491 + 10734580197236529959 + 3022416320763508302 + 13756996518000038261 ] """ diff --git a/py-polars/tests/unit/test_df.py b/py-polars/tests/unit/test_df.py index 55cca2c6e93d..00d421ad5ee9 100644 --- a/py-polars/tests/unit/test_df.py +++ b/py-polars/tests/unit/test_df.py @@ -1394,17 +1394,9 @@ def test_reproducible_hash_with_seeds() -> None: # in the meantime, account for arm64 (mac) hash values to reduce noise expected = pl.Series( "s", - [ - 7179856081800753525, - 15496313222292466864, - 4963241831945886452, - ] + [8823051245921001677, 988796329533502010, 7528667241828618484] if platform.mac_ver()[-1] == "arm64" - else [ - 8823051245921001677, - 988796329533502010, - 7528667241828618484, - ], + else [6629530352159708028, 988796329533502010, 6048298245521876612], dtype=pl.UInt64, ) result = df.hash_rows(*seeds)