Skip to content

Commit

Permalink
dekaf: Swap to lz4_flex from lz4
Browse files Browse the repository at this point in the history
While investigating the cause of LZ4 compression issues related to franz-go (see comments here #1651), I found `lz4_flex` which is a pure-Rust lz4 implementation which appears to be safer and faster than `lz4`/`lz4-sys` that `kafka-protocol` is using. Now that tychedelia/kafka-protocol-rs#81 allows us to use our own compression, and `lz4`'s configuration of block checksums is broken (fix here 10XGenomics/lz4-rs#52), I thought it would be a good time to swap to `lz4_flex`.
  • Loading branch information
jshearer committed Sep 23, 2024
1 parent a5148d9 commit 51c253a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
15 changes: 7 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ flate2 = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
hex = "0.4.3"
hexdump = "0.1"
humantime = "2.1"
Expand All @@ -72,14 +72,17 @@ jemalloc-ctl = "0.3"
json-patch = "0.3"
jsonwebtoken = { version = "9", default-features = false }
js-sys = "0.3.60"
kafka-protocol = "0.11.0"
# TODO(jshearer): Swap back after 0.13.0 is released, which includes
# https://github.com/tychedelia/kafka-protocol-rs/pull/81
kafka-protocol = { git = "https://github.com/tychedelia/kafka-protocol-rs.git", rev = "cabe835" }
lazy_static = "1.4"
libc = "0.2"
librocksdb-sys = { version = "0.16.0", default-features = false, features = [
"snappy",
"rtti",
] }
lz4 = "1.24.0"
lz4_flex = "0.11.0"
mime = "0.3"
memchr = "2.5"
metrics = "0.23.0"
Expand Down
1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hex = { workspace = true }
hexdump = { workspace = true }
itertools = { workspace = true }
kafka-protocol = { workspace = true }
lz4_flex = { workspace = true }
md5 = { workspace = true }
metrics = { workspace = true }
metrics-prometheus = { workspace = true }
Expand Down
31 changes: 29 additions & 2 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::{Collection, Partition};
use anyhow::bail;
use bytes::BufMut;
use bytes::{Buf, BufMut, BytesMut};
use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::Compression;
use lz4_flex::frame::BlockMode;
use std::time::Duration;

pub struct Read {
Expand Down Expand Up @@ -255,7 +257,7 @@ impl Read {
compression: Compression::None,
version: 2,
};
RecordBatchEncoder::encode(&mut buf, records.iter(), &opts)
RecordBatchEncoder::encode(&mut buf, records.iter(), &opts, Some(compressor))
.expect("record encoding cannot fail");

tracing::debug!(
Expand All @@ -276,3 +278,28 @@ impl Read {
Ok((self, buf.freeze()))
}
}

fn compressor<Output: BufMut>(
input: &mut BytesMut,
output: &mut Output,
c: Compression,
) -> anyhow::Result<()> {
match c {
Compression::None => output.put(input),
Compression::Lz4 => {
let mut frame_info = lz4_flex::frame::FrameInfo::default();
// This breaks Go lz4 decoding
// frame_info.block_checksums = true;
frame_info.block_mode = BlockMode::Independent;

let mut encoder =
lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, output.writer());

std::io::copy(&mut input.reader(), &mut encoder)?;

encoder.finish()?;
}
unsupported @ _ => bail!("Unsupported compression type {unsupported:?}"),
};
Ok(())
}

0 comments on commit 51c253a

Please sign in to comment.