diff --git a/Cargo.lock b/Cargo.lock index db579c5f71..3235f2a230 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1546,6 +1546,7 @@ version = "1.2.6-alpha" dependencies = [ "bytes_ext", "common_types", + "lz4_flex", "macros", "snafu 0.6.10", ] @@ -3726,6 +3727,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" +dependencies = [ + "twox-hash", +] + [[package]] name = "lzma-sys" version = "0.1.20" diff --git a/Cargo.toml b/Cargo.toml index 2e66a3e2bf..048bdcc72b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ futures = "0.3" generic_error = { path = "components/generic_error" } hash_ext = { path = "components/hash_ext" } hex = "0.4.3" +lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } lazy_static = "1.4.0" log = "0.4" logger = { path = "components/logger" } diff --git a/components/bytes_ext/src/lib.rs b/components/bytes_ext/src/lib.rs index ae820dd4fc..4ca0a4537f 100644 --- a/components/bytes_ext/src/lib.rs +++ b/components/bytes_ext/src/lib.rs @@ -182,6 +182,26 @@ where } } +/// The wrapper on the [`BufMut`] for implementing [`std::io::Write`]. +pub struct WriterOnBufMut<'a, B: BufMut> { + pub buf: &'a mut B, +} + +impl<'a, B> std::io::Write for WriterOnBufMut<'a, B> +where + B: BufMut, +{ + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.buf.put_slice(buf); + + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/components/codec/Cargo.toml b/components/codec/Cargo.toml index df54aadbcc..91a57bf56d 100644 --- a/components/codec/Cargo.toml +++ b/components/codec/Cargo.toml @@ -28,6 +28,6 @@ workspace = true # In alphabetical order bytes_ext = { workspace = true } common_types = { workspace = true, features = ["test"] } +lz4_flex = { workspace = true } macros = { workspace = true } snafu = { workspace = true } - diff --git a/components/codec/src/columnar/bytes.rs b/components/codec/src/columnar/bytes.rs index 88e7b1ec2c..126c38e64b 100644 --- a/components/codec/src/columnar/bytes.rs +++ b/components/codec/src/columnar/bytes.rs @@ -12,37 +12,110 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bytes_ext::{Buf, BufMut, Bytes}; -use snafu::ResultExt; +use std::io::{Read, Write}; + +use bytes_ext::{Buf, BufMut, Bytes, WriterOnBufMut}; +use lz4_flex::frame::{FrameDecoder as Lz4Decoder, FrameEncoder as Lz4Encoder}; +use snafu::{ensure, ResultExt}; use crate::{ columnar::{ + Compress, DecodeContext, Decompress, InvalidCompression, InvalidVersion, NotEnoughBytes, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint, }, varint, }; -impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl { - fn encode(&self, buf: &mut B, values: I) -> Result<()> +/// The layout for the string/bytes: +/// ```plaintext +/// +-------------+--------------+------------+-----------------------+-----------------+ +/// | version(u8) | length_block | data_block | length_block_len(u32) | compression(u8) | +/// +-------------+--------------+------------+-----------------------+-----------------+ +/// ``` +/// +/// Currently, the `compression` has two optional values: +/// - 0: No compression over the data block +/// - 1: the data block will be compressed if it is too long +/// +/// And the lengths in the `length block` are encoded in varint. +/// And the reason to put `length_block_len` and `compression` at the footer is +/// to avoid one more loop when encoding. +struct Encoding; + +impl Encoding { + const COMPRESSION_SIZE: usize = 1; + const LENGTH_BLOCK_LEN_SIZE: usize = 4; + const VERSION: u8 = 0; + const VERSION_SIZE: usize = 1; + + fn decide_compression(data_block_len: usize, threshold: usize) -> Compression { + if data_block_len > threshold { + Compression::Lz4 + } else { + Compression::NoCompression + } + } + + fn decode_compression(&self, v: u8) -> Result { + let version = match v { + 0 => Compression::NoCompression, + 1 => Compression::Lz4, + _ => InvalidCompression { flag: v }.fail()?, + }; + + Ok(version) + } + + fn encode<'a, B, I>( + &self, + buf: &mut B, + values: I, + data_block_compress_threshold: usize, + ) -> Result<()> where B: BufMut, - I: Iterator, + I: Iterator + Clone, { - for v in values { - debug_assert!(v.len() < u32::MAX as usize); + // Encode the `version`. + buf.put_u8(Self::VERSION); - varint::encode_uvarint(buf, v.len() as u64).context(Varint)?; - buf.put_slice(v); + // Encode the `length_block`. + let mut data_block_len = 0; + let mut length_block_len = 0; + for v in values.clone() { + data_block_len += v.len(); + let sz = varint::encode_uvarint(buf, v.len() as u64).context(Varint)?; + length_block_len += sz; } + assert!(length_block_len < u32::MAX as usize); + + // Encode the `data_block`. + let compression = Self::decide_compression(data_block_len, data_block_compress_threshold); + match compression { + Compression::NoCompression => { + for v in values { + buf.put_slice(v); + } + } + Compression::Lz4 => self + .encode_with_compression(buf, values) + .context(Compress)?, + } + + // Encode the `data_block` offset. + buf.put_u32(length_block_len as u32); + buf.put_u8(compression as u8); Ok(()) } - fn estimated_encoded_size(&self, values: I) -> usize + fn estimated_encoded_size<'a, I>(&self, values: I) -> usize where I: Iterator, { - let mut total_bytes = 0; + let mut total_bytes = + Self::VERSION_SIZE + Self::LENGTH_BLOCK_LEN_SIZE + Self::COMPRESSION_SIZE; + for v in values { // The length of `v` should be ensured to be smaller than [u32::MAX], that is to // say, at most 5 bytes will be used when do varint encoding over a u32 number. @@ -50,21 +123,142 @@ impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl { } total_bytes } -} -impl ValuesDecoder for ValuesDecoderImpl { - fn decode(&self, buf: &mut B, mut f: F) -> Result<()> + /// The layout can be referred to the docs of [`Encoding`]. + fn decode(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()> + where + B: Buf, + F: FnMut(Bytes) -> Result<()>, + { + let chunk = buf.chunk(); + let footer_len = Self::LENGTH_BLOCK_LEN_SIZE + Self::COMPRESSION_SIZE; + ensure!( + chunk.len() > footer_len + Self::VERSION_SIZE, + NotEnoughBytes { + len: footer_len + Self::VERSION_SIZE + } + ); + + // Read and check the version. + let version = chunk[0]; + ensure!(version == Self::VERSION, InvalidVersion { version }); + + // Read and decode the compression flag. + let compression_offset = chunk.len() - Self::COMPRESSION_SIZE; + let compression = self.decode_compression(chunk[compression_offset])?; + + // Extract the `length_block` and `data_block` for decoding. + let length_block_len_offset = chunk.len() - footer_len; + let length_block_end = { + let mut len_buf = &chunk[length_block_len_offset..compression_offset]; + len_buf.get_u32() as usize + Self::VERSION_SIZE + }; + let mut length_block = &chunk[Self::VERSION_SIZE..length_block_end]; + let data_block = &chunk[length_block_end..length_block_len_offset]; + + match compression { + Compression::NoCompression => { + self.decode_without_compression(&mut length_block, data_block, f) + } + Compression::Lz4 => self.decode_with_compression(length_block, data_block, ctx.buf, f), + } + } + + /// Encode the values into the `buf`, and the compress the encoded payload. + fn encode_with_compression<'a, B, I>(&self, buf: &mut B, values: I) -> std::io::Result<()> + where + B: BufMut, + I: Iterator, + { + let writer = WriterOnBufMut { buf }; + let mut enc = Lz4Encoder::new(writer); + for v in values { + enc.write_all(v)?; + } + enc.finish()?; + + Ok(()) + } + + /// Decode the uncompressed data block. + fn decode_without_compression( + &self, + length_block_buf: &mut B, + data_block_buf: &[u8], + mut f: F, + ) -> Result<()> where B: Buf, F: FnMut(Bytes) -> Result<()>, { - while buf.remaining() > 0 { - let str_len = varint::decode_uvarint(buf).context(Varint)? as usize; - let v = &buf.chunk()[..str_len]; - f(Bytes::copy_from_slice(v))?; - buf.advance(str_len); + let mut offset = 0; + while length_block_buf.remaining() > 0 { + let length = varint::decode_uvarint(length_block_buf).context(Varint)? as usize; + let b = Bytes::copy_from_slice(&data_block_buf[offset..offset + length]); + f(b)?; + offset += length; } Ok(()) } + + /// Decode the compressed data block. + fn decode_with_compression( + &self, + mut length_block_buf: &[u8], + compressed_data_block_buf: &[u8], + reused_buf: &mut Vec, + f: F, + ) -> Result<()> + where + F: FnMut(Bytes) -> Result<()>, + { + let mut decoder = Lz4Decoder::new(compressed_data_block_buf); + decoder.read_to_end(reused_buf).context(Decompress)?; + self.decode_without_compression(&mut length_block_buf, &reused_buf[..], f) + } +} + +/// The compression for [`Encoding`]. +/// +/// It is not allowed to be modified and only allowed to be appended with a new +/// variant. +#[derive(Clone, Copy, Default)] +#[repr(C)] +enum Compression { + #[default] + NoCompression = 0, + Lz4 = 1, +} + +impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl { + /// The layout can be referred to the docs of [`Encoding`]. + fn encode(&self, buf: &mut B, values: I) -> Result<()> + where + B: BufMut, + I: Iterator + Clone, + { + let encoding = Encoding; + encoding.encode(buf, values, self.bytes_compress_threshold) + } + + fn estimated_encoded_size(&self, values: I) -> usize + where + I: Iterator, + { + let encoding = Encoding; + encoding.estimated_encoded_size(values) + } +} + +impl ValuesDecoder for ValuesDecoderImpl { + /// The layout can be referred to the docs of [`Encoding`]. + fn decode(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()> + where + B: Buf, + F: FnMut(Bytes) -> Result<()>, + { + let encoding = Encoding; + encoding.decode(ctx, buf, f) + } } diff --git a/components/codec/src/columnar/float.rs b/components/codec/src/columnar/float.rs index 5878bc8a36..ceff48adfc 100644 --- a/components/codec/src/columnar/float.rs +++ b/components/codec/src/columnar/float.rs @@ -14,7 +14,9 @@ use bytes_ext::{Buf, BufMut}; -use crate::columnar::{Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl}; +use crate::columnar::{ + DecodeContext, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, +}; impl ValuesEncoder for ValuesEncoderImpl { fn encode(&self, buf: &mut B, values: I) -> Result<()> @@ -31,7 +33,7 @@ impl ValuesEncoder for ValuesEncoderImpl { } impl ValuesDecoder for ValuesDecoderImpl { - fn decode(&self, buf: &mut B, mut f: F) -> Result<()> + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> where B: Buf, F: FnMut(f64) -> Result<()>, diff --git a/components/codec/src/columnar/int.rs b/components/codec/src/columnar/int.rs index 047750378b..790e0bcc66 100644 --- a/components/codec/src/columnar/int.rs +++ b/components/codec/src/columnar/int.rs @@ -17,7 +17,8 @@ use snafu::ResultExt; use crate::{ columnar::{ - Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint, + DecodeContext, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, + Varint, }, varint, }; @@ -40,7 +41,7 @@ impl ValuesEncoder for ValuesEncoderImpl { } impl ValuesDecoder for ValuesDecoderImpl { - fn decode(&self, buf: &mut B, mut f: F) -> Result<()> + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> where B: Buf, F: FnMut(i32) -> Result<()>, @@ -78,7 +79,7 @@ impl ValuesEncoder for ValuesEncoderImpl { } impl ValuesDecoder for ValuesDecoderImpl { - fn decode(&self, buf: &mut B, mut f: F) -> Result<()> + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> where B: Buf, F: FnMut(i64) -> Result<()>, @@ -116,7 +117,7 @@ impl ValuesEncoder for ValuesEncoderImpl { } impl ValuesDecoder for ValuesDecoderImpl { - fn decode(&self, buf: &mut B, mut f: F) -> Result<()> + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> where B: Buf, F: FnMut(u64) -> Result<()>, diff --git a/components/codec/src/columnar/mod.rs b/components/codec/src/columnar/mod.rs index 96dd71460d..7b994f14a3 100644 --- a/components/codec/src/columnar/mod.rs +++ b/components/codec/src/columnar/mod.rs @@ -28,7 +28,7 @@ use common_types::{ use macros::define_result; use snafu::{self, ensure, Backtrace, OptionExt, ResultExt, Snafu}; -use crate::{varint, Decoder}; +use crate::varint; mod bytes; mod float; @@ -41,6 +41,9 @@ pub enum Error { #[snafu(display("Invalid version:{version}.\nBacktrace:\n{backtrace}"))] InvalidVersion { version: u8, backtrace: Backtrace }, + #[snafu(display("Invalid compression flag:{flag}.\nBacktrace:\n{backtrace}"))] + InvalidCompression { flag: u8, backtrace: Backtrace }, + #[snafu(display("Invalid datum kind, err:{source}"))] InvalidDatumKind { source: common_types::datum::Error }, @@ -59,8 +62,29 @@ pub enum Error { #[snafu(display("Failed to varint, err:{source}"))] Varint { source: varint::Error }, + #[snafu(display("Failed to do compression, err:{source}.\nBacktrace:\n{backtrace}"))] + Compress { + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decompress, err:{source}.\nBacktrace:\n{backtrace}"))] + Decompress { + source: std::io::Error, + backtrace: Backtrace, + }, + #[snafu(display("Failed to do compact encoding, err:{source}"))] CompactEncode { source: crate::compact::Error }, + + #[snafu(display("Too long bytes, length:{num_bytes}.\nBacktrace:\n{backtrace}"))] + TooLongBytes { + num_bytes: usize, + backtrace: Backtrace, + }, + + #[snafu(display("Bytes is not enough, length:{len}.\nBacktrace:\n{backtrace}"))] + NotEnoughBytes { len: usize, backtrace: Backtrace }, } define_result!(Error); @@ -74,7 +98,7 @@ trait ValuesEncoder { fn encode(&self, buf: &mut B, values: I) -> Result<()> where B: BufMut, - I: Iterator; + I: Iterator + Clone; /// The estimated size for memory pre-allocated. fn estimated_encoded_size(&self, values: I) -> usize @@ -87,16 +111,25 @@ trait ValuesEncoder { } } +/// The decode context for decoding column. +pub struct DecodeContext<'a> { + /// Buffer for reuse during decoding. + buf: &'a mut Vec, +} + /// The trait bound on the decoders for different types. trait ValuesDecoder { - fn decode(&self, buf: &mut B, f: F) -> Result<()> + fn decode(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()> where B: Buf, F: FnMut(T) -> Result<()>; } +#[derive(Debug, Default)] /// The implementation for [`ValuesEncoder`]. -struct ValuesEncoderImpl; +struct ValuesEncoderImpl { + bytes_compress_threshold: usize, +} /// The implementation for [`ValuesDecoder`]. struct ValuesDecoderImpl; @@ -104,6 +137,7 @@ struct ValuesDecoderImpl; #[derive(Clone, Debug)] pub struct ColumnarEncoder { column_id: ColumnId, + bytes_compress_threshold: usize, } /// A hint helps column encoding. @@ -144,8 +178,11 @@ impl EncodeHint { impl ColumnarEncoder { const VERSION: u8 = 0; - pub fn new(column_id: ColumnId) -> Self { - Self { column_id } + pub fn new(column_id: ColumnId, bytes_compress_threshold: usize) -> Self { + Self { + column_id, + bytes_compress_threshold, + } } /// The header includes `version`, `datum_kind`, `column_id`, `num_datums` @@ -201,7 +238,7 @@ impl ColumnarEncoder { buf.put_slice(bit_set.as_bytes()); } - Self::encode_datums(buf, datums, hint.datum_kind) + self.encode_datums(buf, datums, hint.datum_kind) } pub fn estimated_encoded_size<'a, I>(&self, datums: I, hint: &mut EncodeHint) -> usize @@ -215,69 +252,75 @@ impl ColumnarEncoder { BitSet::num_bytes(num_datums) }; - let data_size = - match hint.datum_kind { - DatumKind::Null => 0, - DatumKind::Timestamp => ValuesEncoderImpl.estimated_encoded_size( - datums - .clone() - .filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), - ), - DatumKind::Double => ValuesEncoderImpl - .estimated_encoded_size(datums.clone().filter_map(|v| v.as_f64())), - DatumKind::Float => todo!(), - DatumKind::Varbinary => ValuesEncoderImpl - .estimated_encoded_size(datums.clone().filter_map(|v| v.into_bytes())), - DatumKind::String => ValuesEncoderImpl.estimated_encoded_size( - datums - .clone() - .filter_map(|v| v.into_str().map(|v| v.as_bytes())), - ), - DatumKind::UInt64 => ValuesEncoderImpl - .estimated_encoded_size(datums.clone().filter_map(|v| v.as_u64())), - DatumKind::UInt32 => todo!(), - DatumKind::UInt16 => todo!(), - DatumKind::UInt8 => todo!(), - DatumKind::Int64 => ValuesEncoderImpl - .estimated_encoded_size(datums.clone().filter_map(|v| v.as_i64())), - DatumKind::Int32 => ValuesEncoderImpl - .estimated_encoded_size(datums.clone().filter_map(|v| v.as_i32())), - DatumKind::Int16 => todo!(), - DatumKind::Int8 => todo!(), - DatumKind::Boolean => todo!(), - DatumKind::Date => todo!(), - DatumKind::Time => todo!(), - }; + let enc = ValuesEncoderImpl::default(); + let data_size = match hint.datum_kind { + DatumKind::Null => 0, + DatumKind::Timestamp => enc.estimated_encoded_size( + datums + .clone() + .filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), + ), + DatumKind::Double => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_f64())) + } + DatumKind::Float => todo!(), + DatumKind::Varbinary => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.into_bytes())) + } + DatumKind::String => enc.estimated_encoded_size( + datums + .clone() + .filter_map(|v| v.into_str().map(|v| v.as_bytes())), + ), + DatumKind::UInt64 => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_u64())) + } + DatumKind::UInt32 => todo!(), + DatumKind::UInt16 => todo!(), + DatumKind::UInt8 => todo!(), + DatumKind::Int64 => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i64())) + } + DatumKind::Int32 => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i32())) + } + DatumKind::Int16 => todo!(), + DatumKind::Int8 => todo!(), + DatumKind::Boolean => todo!(), + DatumKind::Date => todo!(), + DatumKind::Time => todo!(), + }; Self::header_size() + bit_set_size + data_size } - fn encode_datums<'a, I, B>(buf: &mut B, datums: I, datum_kind: DatumKind) -> Result<()> + fn encode_datums<'a, I, B>(&self, buf: &mut B, datums: I, datum_kind: DatumKind) -> Result<()> where - I: Iterator>, + I: Iterator> + Clone, B: BufMut, { + let enc = ValuesEncoderImpl { + bytes_compress_threshold: self.bytes_compress_threshold, + }; match datum_kind { DatumKind::Null => Ok(()), - DatumKind::Timestamp => ValuesEncoderImpl.encode( + DatumKind::Timestamp => enc.encode( buf, datums.filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), ), - DatumKind::Double => ValuesEncoderImpl.encode(buf, datums.filter_map(|v| v.as_f64())), + DatumKind::Double => enc.encode(buf, datums.filter_map(|v| v.as_f64())), DatumKind::Float => todo!(), - DatumKind::Varbinary => { - ValuesEncoderImpl.encode(buf, datums.filter_map(|v| v.into_bytes())) - } - DatumKind::String => ValuesEncoderImpl.encode( + DatumKind::Varbinary => enc.encode(buf, datums.filter_map(|v| v.into_bytes())), + DatumKind::String => enc.encode( buf, datums.filter_map(|v| v.into_str().map(|v| v.as_bytes())), ), - DatumKind::UInt64 => ValuesEncoderImpl.encode(buf, datums.filter_map(|v| v.as_u64())), + DatumKind::UInt64 => enc.encode(buf, datums.filter_map(|v| v.as_u64())), DatumKind::UInt32 => todo!(), DatumKind::UInt16 => todo!(), DatumKind::UInt8 => todo!(), - DatumKind::Int64 => ValuesEncoderImpl.encode(buf, datums.filter_map(|v| v.as_i64())), - DatumKind::Int32 => ValuesEncoderImpl.encode(buf, datums.filter_map(|v| v.as_i32())), + DatumKind::Int64 => enc.encode(buf, datums.filter_map(|v| v.as_i64())), + DatumKind::Int32 => enc.encode(buf, datums.filter_map(|v| v.as_i32())), DatumKind::Int16 => todo!(), DatumKind::Int8 => todo!(), DatumKind::Boolean => todo!(), @@ -297,10 +340,8 @@ pub struct DecodeResult { pub datums: Vec, } -impl Decoder for ColumnarDecoder { - type Error = Error; - - fn decode(&self, buf: &mut B) -> Result { +impl ColumnarDecoder { + pub fn decode(&self, ctx: DecodeContext<'_>, buf: &mut B) -> Result { let version = buf.get_u8(); ensure!( version == ColumnarEncoder::VERSION, @@ -322,9 +363,9 @@ impl Decoder for ColumnarDecoder { let datums = if num_nulls == num_datums { vec![Datum::Null; num_datums] } else if num_nulls > 0 { - Self::decode_with_nulls(buf, num_datums, datum_kind)? + Self::decode_with_nulls(ctx, buf, num_datums, datum_kind)? } else { - Self::decode_without_nulls(buf, num_datums, datum_kind)? + Self::decode_without_nulls(ctx, buf, num_datums, datum_kind)? }; Ok(DecodeResult { column_id, datums }) @@ -333,6 +374,7 @@ impl Decoder for ColumnarDecoder { impl ColumnarDecoder { fn decode_with_nulls( + ctx: DecodeContext<'_>, buf: &mut B, num_datums: usize, datum_kind: DatumKind, @@ -353,12 +395,13 @@ impl ColumnarDecoder { }; let mut data_block = &chunk[BitSet::num_bytes(num_datums)..]; - Self::decode_datums(&mut data_block, datum_kind, with_datum)?; + Self::decode_datums(ctx, &mut data_block, datum_kind, with_datum)?; Ok(datums) } fn decode_without_nulls( + ctx: DecodeContext<'_>, buf: &mut B, num_datums: usize, datum_kind: DatumKind, @@ -368,11 +411,16 @@ impl ColumnarDecoder { datums.push(datum); Ok(()) }; - Self::decode_datums(buf, datum_kind, with_datum)?; + Self::decode_datums(ctx, buf, datum_kind, with_datum)?; Ok(datums) } - fn decode_datums(buf: &mut B, datum_kind: DatumKind, mut f: F) -> Result<()> + fn decode_datums( + ctx: DecodeContext<'_>, + buf: &mut B, + datum_kind: DatumKind, + mut f: F, + ) -> Result<()> where B: Buf, F: FnMut(Datum) -> Result<()>, @@ -381,30 +429,30 @@ impl ColumnarDecoder { DatumKind::Null => Ok(()), DatumKind::Timestamp => { let with_i64 = |v| f(Datum::from(Timestamp::new(v))); - ValuesDecoderImpl.decode(buf, with_i64) + ValuesDecoderImpl.decode(ctx, buf, with_i64) } DatumKind::Double => { let with_float = |v: f64| f(Datum::from(v)); - ValuesDecoderImpl.decode(buf, with_float) + ValuesDecoderImpl.decode(ctx, buf, with_float) } DatumKind::Float => todo!(), DatumKind::Varbinary => { let with_bytes = |v: Bytes| f(Datum::from(v)); - ValuesDecoderImpl.decode(buf, with_bytes) + ValuesDecoderImpl.decode(ctx, buf, with_bytes) } DatumKind::String => { let with_str = |value| { let datum = unsafe { Datum::from(StringBytes::from_bytes_unchecked(value)) }; f(datum) }; - ValuesDecoderImpl.decode(buf, with_str) + ValuesDecoderImpl.decode(ctx, buf, with_str) } DatumKind::UInt64 => { let with_u64 = |value: u64| { let datum = Datum::from(value); f(datum) }; - ValuesDecoderImpl.decode(buf, with_u64) + ValuesDecoderImpl.decode(ctx, buf, with_u64) } DatumKind::UInt32 => todo!(), DatumKind::UInt16 => todo!(), @@ -414,11 +462,11 @@ impl ColumnarDecoder { let datum = Datum::from(value); f(datum) }; - ValuesDecoderImpl.decode(buf, with_i64) + ValuesDecoderImpl.decode(ctx, buf, with_i64) } DatumKind::Int32 => { let with_i32 = |v: i32| f(Datum::from(v)); - ValuesDecoderImpl.decode(buf, with_i32) + ValuesDecoderImpl.decode(ctx, buf, with_i32) } DatumKind::Int16 => todo!(), DatumKind::Int8 => todo!(), @@ -433,7 +481,7 @@ mod tests { use super::*; fn check_encode_end_decode(column_id: ColumnId, datums: Vec, datum_kind: DatumKind) { - let encoder = ColumnarEncoder::new(column_id); + let encoder = ColumnarEncoder::new(column_id, 256); let views = datums.iter().map(|v| v.as_view()); let mut hint = EncodeHint { num_nulls: None, @@ -448,11 +496,15 @@ mod tests { // Ensure no growth over the capacity. assert!(buf.capacity() <= buf_len); + let mut reused_buf = Vec::new(); + let ctx = DecodeContext { + buf: &mut reused_buf, + }; let decoder = ColumnarDecoder; let DecodeResult { column_id: decoded_column_id, datums: decoded_datums, - } = decoder.decode(&mut buf.as_slice()).unwrap(); + } = decoder.decode(ctx, &mut buf.as_slice()).unwrap(); assert_eq!(column_id, decoded_column_id); assert_eq!(datums, decoded_datums); } @@ -548,4 +600,37 @@ mod tests { check_encode_end_decode(10, datums, DatumKind::String); } + + #[test] + fn test_massive_string() { + let sample_datums = vec![ + Datum::from("vvvv"), + Datum::from("xxxx"), + Datum::from("8"), + Datum::from("9999"), + ]; + let mut datums = Vec::with_capacity(sample_datums.len() * 100); + for _ in 0..100 { + datums.append(&mut sample_datums.clone()); + } + + check_encode_end_decode(10, datums, DatumKind::String); + } + + #[test] + fn test_large_string() { + let large_string_bytes = vec![ + vec![b'a'; 500], + vec![b'x'; 5000], + vec![b'x'; 5], + vec![], + vec![b' '; 15000], + ]; + let datums = large_string_bytes + .iter() + .map(|v| Datum::from(&*String::from_utf8_lossy(&v[..]))) + .collect(); + + check_encode_end_decode(10, datums, DatumKind::String); + } } diff --git a/components/codec/src/varint.rs b/components/codec/src/varint.rs index 0900190b1e..cf8f3792b2 100644 --- a/components/codec/src/varint.rs +++ b/components/codec/src/varint.rs @@ -48,7 +48,7 @@ define_result!(Error); // return PutUvarint(buf, ux) // } // ``` -pub fn encode_varint(buf: &mut B, value: i64) -> Result<()> { +pub fn encode_varint(buf: &mut B, value: i64) -> Result { let mut x = (value as u64) << 1; if value < 0 { x = !x; @@ -71,13 +71,15 @@ pub fn encode_varint(buf: &mut B, value: i64) -> Result<()> { // return i + 1 // } // ``` -pub fn encode_uvarint(buf: &mut B, mut x: u64) -> Result<()> { +pub fn encode_uvarint(buf: &mut B, mut x: u64) -> Result { + let mut num_bytes = 0; while x >= 0x80 { buf.try_put_u8(x as u8 | 0x80).context(EncodeVarint)?; x >>= 7; + num_bytes += 1; } buf.try_put_u8(x as u8).context(EncodeVarint)?; - Ok(()) + Ok(num_bytes + 1) } // from https://golang.org/src/encoding/binary/varint.go?s=2955:2991#L84