Skip to content

Commit

Permalink
feat: implement compression encoding for columar bytes values (#1165)
Browse files Browse the repository at this point in the history
## Rationale
Current encoding of columnar bytes is naive, without any consideration
about compression.

## Detailed Changes
Introduce the compression encoding based on a designed threshold for
columnar bytes.

## Test Plan
New unit tests.
  • Loading branch information
ShiKaiWi authored Aug 23, 2023
1 parent 3172bb4 commit 0f938df
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 98 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ futures = "0.3"
generic_error = { path = "components/generic_error" }
hash_ext = { path = "components/hash_ext" }
hex = "0.4.3"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
lazy_static = "1.4.0"
log = "0.4"
logger = { path = "components/logger" }
Expand Down
20 changes: 20 additions & 0 deletions components/bytes_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ where
}
}

/// The wrapper on the [`BufMut`] for implementing [`std::io::Write`].
pub struct WriterOnBufMut<'a, B: BufMut> {
pub buf: &'a mut B,
}

impl<'a, B> std::io::Write for WriterOnBufMut<'a, B>
where
B: BufMut,
{
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buf.put_slice(buf);

Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion components/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ workspace = true
# In alphabetical order
bytes_ext = { workspace = true }
common_types = { workspace = true, features = ["test"] }
lz4_flex = { workspace = true }
macros = { workspace = true }
snafu = { workspace = true }

232 changes: 213 additions & 19 deletions components/codec/src/columnar/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,253 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut, Bytes};
use snafu::ResultExt;
use std::io::{Read, Write};

use bytes_ext::{Buf, BufMut, Bytes, WriterOnBufMut};
use lz4_flex::frame::{FrameDecoder as Lz4Decoder, FrameEncoder as Lz4Encoder};
use snafu::{ensure, ResultExt};

use crate::{
columnar::{
Compress, DecodeContext, Decompress, InvalidCompression, InvalidVersion, NotEnoughBytes,
Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint,
},
varint,
};

impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
/// The layout for the string/bytes:
/// ```plaintext
/// +-------------+--------------+------------+-----------------------+-----------------+
/// | version(u8) | length_block | data_block | length_block_len(u32) | compression(u8) |
/// +-------------+--------------+------------+-----------------------+-----------------+
/// ```
///
/// Currently, the `compression` has two optional values:
/// - 0: No compression over the data block
/// - 1: the data block will be compressed if it is too long
///
/// And the lengths in the `length block` are encoded in varint.
/// And the reason to put `length_block_len` and `compression` at the footer is
/// to avoid one more loop when encoding.
struct Encoding;

impl Encoding {
const COMPRESSION_SIZE: usize = 1;
const LENGTH_BLOCK_LEN_SIZE: usize = 4;
const VERSION: u8 = 0;
const VERSION_SIZE: usize = 1;

fn decide_compression(data_block_len: usize, threshold: usize) -> Compression {
if data_block_len > threshold {
Compression::Lz4
} else {
Compression::NoCompression
}
}

fn decode_compression(&self, v: u8) -> Result<Compression> {
let version = match v {
0 => Compression::NoCompression,
1 => Compression::Lz4,
_ => InvalidCompression { flag: v }.fail()?,
};

Ok(version)
}

fn encode<'a, B, I>(
&self,
buf: &mut B,
values: I,
data_block_compress_threshold: usize,
) -> Result<()>
where
B: BufMut,
I: Iterator<Item = &'a [u8]>,
I: Iterator<Item = &'a [u8]> + Clone,
{
for v in values {
debug_assert!(v.len() < u32::MAX as usize);
// Encode the `version`.
buf.put_u8(Self::VERSION);

varint::encode_uvarint(buf, v.len() as u64).context(Varint)?;
buf.put_slice(v);
// Encode the `length_block`.
let mut data_block_len = 0;
let mut length_block_len = 0;
for v in values.clone() {
data_block_len += v.len();
let sz = varint::encode_uvarint(buf, v.len() as u64).context(Varint)?;
length_block_len += sz;
}
assert!(length_block_len < u32::MAX as usize);

// Encode the `data_block`.
let compression = Self::decide_compression(data_block_len, data_block_compress_threshold);
match compression {
Compression::NoCompression => {
for v in values {
buf.put_slice(v);
}
}
Compression::Lz4 => self
.encode_with_compression(buf, values)
.context(Compress)?,
}

// Encode the `data_block` offset.
buf.put_u32(length_block_len as u32);
buf.put_u8(compression as u8);

Ok(())
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
fn estimated_encoded_size<'a, I>(&self, values: I) -> usize
where
I: Iterator<Item = &'a [u8]>,
{
let mut total_bytes = 0;
let mut total_bytes =
Self::VERSION_SIZE + Self::LENGTH_BLOCK_LEN_SIZE + Self::COMPRESSION_SIZE;

for v in values {
// The length of `v` should be ensured to be smaller than [u32::MAX], that is to
// say, at most 5 bytes will be used when do varint encoding over a u32 number.
total_bytes += 5 + v.len();
}
total_bytes
}
}

impl ValuesDecoder<Bytes> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
/// The layout can be referred to the docs of [`Encoding`].
fn decode<B, F>(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()>
where
B: Buf,
F: FnMut(Bytes) -> Result<()>,
{
let chunk = buf.chunk();
let footer_len = Self::LENGTH_BLOCK_LEN_SIZE + Self::COMPRESSION_SIZE;
ensure!(
chunk.len() > footer_len + Self::VERSION_SIZE,
NotEnoughBytes {
len: footer_len + Self::VERSION_SIZE
}
);

// Read and check the version.
let version = chunk[0];
ensure!(version == Self::VERSION, InvalidVersion { version });

// Read and decode the compression flag.
let compression_offset = chunk.len() - Self::COMPRESSION_SIZE;
let compression = self.decode_compression(chunk[compression_offset])?;

// Extract the `length_block` and `data_block` for decoding.
let length_block_len_offset = chunk.len() - footer_len;
let length_block_end = {
let mut len_buf = &chunk[length_block_len_offset..compression_offset];
len_buf.get_u32() as usize + Self::VERSION_SIZE
};
let mut length_block = &chunk[Self::VERSION_SIZE..length_block_end];
let data_block = &chunk[length_block_end..length_block_len_offset];

match compression {
Compression::NoCompression => {
self.decode_without_compression(&mut length_block, data_block, f)
}
Compression::Lz4 => self.decode_with_compression(length_block, data_block, ctx.buf, f),
}
}

/// Encode the values into the `buf`, and the compress the encoded payload.
fn encode_with_compression<'a, B, I>(&self, buf: &mut B, values: I) -> std::io::Result<()>
where
B: BufMut,
I: Iterator<Item = &'a [u8]>,
{
let writer = WriterOnBufMut { buf };
let mut enc = Lz4Encoder::new(writer);
for v in values {
enc.write_all(v)?;
}
enc.finish()?;

Ok(())
}

/// Decode the uncompressed data block.
fn decode_without_compression<B, F>(
&self,
length_block_buf: &mut B,
data_block_buf: &[u8],
mut f: F,
) -> Result<()>
where
B: Buf,
F: FnMut(Bytes) -> Result<()>,
{
while buf.remaining() > 0 {
let str_len = varint::decode_uvarint(buf).context(Varint)? as usize;
let v = &buf.chunk()[..str_len];
f(Bytes::copy_from_slice(v))?;
buf.advance(str_len);
let mut offset = 0;
while length_block_buf.remaining() > 0 {
let length = varint::decode_uvarint(length_block_buf).context(Varint)? as usize;
let b = Bytes::copy_from_slice(&data_block_buf[offset..offset + length]);
f(b)?;
offset += length;
}

Ok(())
}

/// Decode the compressed data block.
fn decode_with_compression<F>(
&self,
mut length_block_buf: &[u8],
compressed_data_block_buf: &[u8],
reused_buf: &mut Vec<u8>,
f: F,
) -> Result<()>
where
F: FnMut(Bytes) -> Result<()>,
{
let mut decoder = Lz4Decoder::new(compressed_data_block_buf);
decoder.read_to_end(reused_buf).context(Decompress)?;
self.decode_without_compression(&mut length_block_buf, &reused_buf[..], f)
}
}

/// The compression for [`Encoding`].
///
/// It is not allowed to be modified and only allowed to be appended with a new
/// variant.
#[derive(Clone, Copy, Default)]
#[repr(C)]
enum Compression {
#[default]
NoCompression = 0,
Lz4 = 1,
}

impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl {
/// The layout can be referred to the docs of [`Encoding`].
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = &'a [u8]> + Clone,
{
let encoding = Encoding;
encoding.encode(buf, values, self.bytes_compress_threshold)
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = &'a [u8]>,
{
let encoding = Encoding;
encoding.estimated_encoded_size(values)
}
}

impl ValuesDecoder<Bytes> for ValuesDecoderImpl {
/// The layout can be referred to the docs of [`Encoding`].
fn decode<B, F>(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()>
where
B: Buf,
F: FnMut(Bytes) -> Result<()>,
{
let encoding = Encoding;
encoding.decode(ctx, buf, f)
}
}
6 changes: 4 additions & 2 deletions components/codec/src/columnar/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use bytes_ext::{Buf, BufMut};

use crate::columnar::{Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl};
use crate::columnar::{
DecodeContext, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl,
};

impl ValuesEncoder<f64> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
Expand All @@ -31,7 +33,7 @@ impl ValuesEncoder<f64> for ValuesEncoderImpl {
}

impl ValuesDecoder<f64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(f64) -> Result<()>,
Expand Down
9 changes: 5 additions & 4 deletions components/codec/src/columnar/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use snafu::ResultExt;

use crate::{
columnar::{
Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint,
DecodeContext, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl,
Varint,
},
varint,
};
Expand All @@ -40,7 +41,7 @@ impl ValuesEncoder<i32> for ValuesEncoderImpl {
}

impl ValuesDecoder<i32> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i32) -> Result<()>,
Expand Down Expand Up @@ -78,7 +79,7 @@ impl ValuesEncoder<i64> for ValuesEncoderImpl {
}

impl ValuesDecoder<i64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i64) -> Result<()>,
Expand Down Expand Up @@ -116,7 +117,7 @@ impl ValuesEncoder<u64> for ValuesEncoderImpl {
}

impl ValuesDecoder<u64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(u64) -> Result<()>,
Expand Down
Loading

0 comments on commit 0f938df

Please sign in to comment.