Skip to content

Commit

Permalink
Merge pull request #5694 from b41sh/variant-group-by
Browse files Browse the repository at this point in the history
feat(function): Support variant group by
  • Loading branch information
BohuTANG authored May 31, 2022
2 parents 351f1cd + 4925276 commit c09228d
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 3 deletions.
77 changes: 74 additions & 3 deletions common/datavalues/src/columns/group_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,77 @@ impl GroupHash for StringColumn {
}
}

// TODO(b41sh): implement GroupHash for VariantColumn
impl GroupHash for VariantColumn {}
impl GroupHash for ArrayColumn {}
impl GroupHash for VariantColumn {
fn serialize(&self, vec: &mut Vec<SmallVu8>, nulls: Option<Bitmap>) -> Result<()> {
assert_eq!(vec.len(), self.len());

match nulls {
Some(bitmap) => {
for ((value, valid), vec) in self.iter().zip(bitmap.iter()).zip(vec) {
BinaryWrite::write_scalar(vec, &valid)?;
if valid {
BinaryWrite::write_binary(vec, value.to_string().as_bytes())?;
}
}
}
None => {
for (value, vec) in self.iter().zip(vec) {
BinaryWrite::write_binary(vec, value.to_string().as_bytes())?;
}
}
}

Ok(())
}
}

impl GroupHash for ArrayColumn {
fn serialize(&self, vec: &mut Vec<SmallVu8>, nulls: Option<Bitmap>) -> Result<()> {
assert_eq!(vec.len(), self.len());

let offsets = self.offsets();
if offsets.len() <= 1 {
return Ok(());
}
let inner_column = self.values();
let inner_length = *offsets.last().unwrap() as usize;
let mut inner_keys = Vec::with_capacity(inner_length);
for _i in 0..inner_length {
inner_keys.push(SmallVu8::new());
}
Series::serialize(inner_column, &mut inner_keys, None)?;

match nulls {
Some(bitmap) => {
let mut offset = 0;
for i in 0..self.len() {
let valid = bitmap.get(i).unwrap();
let v = vec.get_mut(i).unwrap();
BinaryWrite::write_scalar(v, &valid)?;
if valid {
let length = self.size_at_index(i);
BinaryWrite::write_uvarint(v, length as u64)?;
for j in offset..offset + length {
BinaryWrite::write_raw(v, inner_keys.get(j).unwrap())?;
}
offset += length;
}
}
}
None => {
let mut offset = 0;
for i in 0..self.len() {
let v = vec.get_mut(i).unwrap();
let length = self.size_at_index(i);
BinaryWrite::write_uvarint(v, length as u64)?;
for j in offset..offset + length {
BinaryWrite::write_raw(v, inner_keys.get(j).unwrap())?;
}
offset += length;
}
}
}

Ok(())
}
}
7 changes: 7 additions & 0 deletions common/io/src/binary_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub trait BinaryWrite {
fn write_string(&mut self, text: impl AsRef<str>) -> Result<()>;
fn write_uvarint(&mut self, v: u64) -> Result<()>;
fn write_binary(&mut self, text: impl AsRef<[u8]>) -> Result<()>;
fn write_raw(&mut self, text: impl AsRef<[u8]>) -> Result<()>;

fn write_opt_scalar<V>(&mut self, v: &Option<V>) -> Result<()>
where V: Marshal + StatBuffer {
Expand Down Expand Up @@ -71,6 +72,12 @@ where T: std::io::Write
self.write_all(bytes)?;
Ok(())
}

fn write_raw(&mut self, text: impl AsRef<[u8]>) -> Result<()> {
let bytes = text.as_ref();
self.write_all(bytes)?;
Ok(())
}
}

// Another trait like BinaryWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ NULL 2 3
2 1
3 1
4 1
==GROUP BY Variant==
6 5 12
4 3 "abcd"
2 1 {"k":"v"}
8 7 [1,2,3]
==GROUP BY Array(Int32)==
2 1 []
4 3 [1, 2, 3]
6 5 [4, 5, 6]
29 changes: 29 additions & 0 deletions tests/suites/0_stateless/03_dml/03_0003_select_group_by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,32 @@ SELECT number, count(*) FROM numbers_mt(1000) group by number order by number li
set group_by_two_level_threshold=1000000000;
SELECT number, count(*) FROM numbers_mt(1000) group by number order by number limit 5;

SELECT '==GROUP BY Variant==';
CREATE TABLE IF NOT EXISTS t_variant(id Int null, var Variant null) Engine = Fuse;

INSERT INTO t_variant VALUES(1, parse_json('{"k":"v"}')),
(2, parse_json('{"k":"v"}')),
(3, parse_json('"abcd"')),
(4, parse_json('"abcd"')),
(5, parse_json('12')),
(6, parse_json('12')),
(7, parse_json('[1,2,3]')),
(8, parse_json('[1,2,3]'));

SELECT max(id), min(id), var FROM t_variant GROUP BY var ORDER BY var ASC;

DROP TABLE t_variant;

SELECT '==GROUP BY Array(Int32)==';

CREATE TABLE IF NOT EXISTS t_array(id Int null, arr Array(Int32) null) Engine = Fuse;
INSERT INTO t_array VALUES(1, []),
(2, []),
(3, [1,2,3]),
(4, [1,2,3]),
(5, [4,5,6]),
(6, [4,5,6]);

SELECT max(id), min(id), arr FROM t_array GROUP BY arr ORDER BY arr ASC;

DROP TABLE t_array;

0 comments on commit c09228d

Please sign in to comment.