Skip to content

Commit

Permalink
Add flag to support gzip compressed messages (#179)
Browse files Browse the repository at this point in the history
* Support gzip decompression

---------

Co-authored-by: George Sachpatzidis <sper13@gmail.com>
  • Loading branch information
geosach and gsach authored Jul 7, 2024
1 parent a1990e6 commit 0585509
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["R. Tyler Croy <rtyler@brokenco.de>", "Christian Williams <christianw
edition = "2018"

[dependencies]
flate2 = "1.0"
anyhow = "1"
async-trait = "0.1"
apache-avro = "^0.14"
Expand Down
6 changes: 6 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ Make sure to provide the additional option:

when invoking the cli command as well.

== Gzip Compressed Messages

kafka-delta-ingest now supports ingestion of gzip-compressed messages. This can be particularly useful when dealing with large volumes of data that benefit from compression.

To enable gzip decompression, use the `--decompress_gzip` flag when starting the ingestion process.

== Writing to S3

When writing to S3, you may experience an error like `source: StorageError { source: S3Generic("dynamodb locking is not enabled") }`.
Expand Down
13 changes: 9 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ pub struct IngestOptions {
pub input_format: MessageFormat,
/// Terminates when initial offsets are reached
pub end_at_last_offsets: bool,
/// Assume that message payloads are gzip compressed and decompress them before processing.
pub decompress_gzip: bool,
}

impl Default for IngestOptions {
Expand All @@ -315,6 +317,7 @@ impl Default for IngestOptions {
statsd_endpoint: "localhost:8125".to_string(),
input_format: MessageFormat::DefaultJson,
end_at_last_offsets: false,
decompress_gzip: false,
}
}
}
Expand Down Expand Up @@ -757,10 +760,12 @@ impl IngestProcessor {
let table = delta_helpers::load_table(table_uri, HashMap::new()).await?;
let coercion_tree = coercions::create_coercion_tree(table.schema().unwrap());
let delta_writer = DataWriter::for_table(&table, HashMap::new())?;
let deserializer = match MessageDeserializerFactory::try_build(&opts.input_format) {
Ok(deserializer) => deserializer,
Err(e) => return Err(IngestError::UnableToCreateDeserializer { source: e }),
};
let deserializer =
match MessageDeserializerFactory::try_build(&opts.input_format, opts.decompress_gzip) {
Ok(deserializer) => deserializer,
Err(e) => return Err(IngestError::UnableToCreateDeserializer { source: e }),
};

Ok(IngestProcessor {
topic,
consumer,
Expand Down
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ async fn main() -> anyhow::Result<()> {

let end_at_last_offsets = ingest_matches.get_flag("end");

let decompress_gzip = ingest_matches.get_flag("decompress_gzip");

// ingest_matches.get_flag("end")
let format = convert_matches_to_message_format(ingest_matches).unwrap();

Expand All @@ -161,6 +163,7 @@ async fn main() -> anyhow::Result<()> {
statsd_endpoint,
input_format: format,
end_at_last_offsets,
decompress_gzip,
};

tokio::spawn(async move {
Expand Down Expand Up @@ -452,6 +455,11 @@ This can be used to provide TLS configuration as in:
.num_args(0)
.action(ArgAction::SetTrue)
.help(""))
.arg(Arg::new("decompress_gzip")
.long("decompress_gzip")
.env("DECOMPRESS_GZIP")
.help("Enable gzip decompression for incoming messages")
.action(ArgAction::SetTrue))
)
.arg_required_else_help(true)
}
Expand Down
50 changes: 38 additions & 12 deletions src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, path::PathBuf};

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
use async_trait::async_trait;
use flate2::read::GzDecoder;
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings,
};
use serde_json::Value;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, io::Read, path::PathBuf};

#[async_trait]
pub(crate) trait MessageDeserializer {
Expand All @@ -17,20 +16,22 @@ pub(crate) trait MessageDeserializer {
}

pub(crate) struct MessageDeserializerFactory {}

impl MessageDeserializerFactory {
pub fn try_build(
input_format: &MessageFormat,
decompress_gzip: bool, // Add this parameter
) -> Result<Box<dyn MessageDeserializer + Send>, anyhow::Error> {
match input_format {
MessageFormat::Json(data) => match data {
crate::SchemaSource::None => Ok(Self::json_default()),
crate::SchemaSource::None => Ok(Self::json_default(decompress_gzip)),
crate::SchemaSource::SchemaRegistry(sr) => {
match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) {
Ok(s) => Ok(Box::new(s)),
Err(e) => Err(e),
}
}
crate::SchemaSource::File(_) => Ok(Self::json_default()),
crate::SchemaSource::File(_) => Ok(Self::json_default(decompress_gzip)),
},
MessageFormat::Avro(data) => match data {
crate::SchemaSource::None => Ok(Box::<AvroSchemaDeserializer>::default()),
Expand All @@ -47,12 +48,12 @@ impl MessageDeserializerFactory {
}
}
},
_ => Ok(Box::new(DefaultDeserializer {})),
_ => Ok(Box::new(DefaultDeserializer::new(decompress_gzip))),
}
}

fn json_default() -> Box<dyn MessageDeserializer + Send> {
Box::new(DefaultDeserializer {})
fn json_default(decompress_gzip: bool) -> Box<dyn MessageDeserializer + Send> {
Box::new(DefaultDeserializer::new(decompress_gzip))
}

fn build_sr_settings(registry_url: &url::Url) -> Result<SrSettings, anyhow::Error> {
Expand Down Expand Up @@ -80,16 +81,41 @@ impl MessageDeserializerFactory {
}
}

struct DefaultDeserializer {}
struct DefaultDeserializer {
decompress_gzip: bool,
}

impl DefaultDeserializer {
pub fn new(decompress_gzip: bool) -> Self {
DefaultDeserializer { decompress_gzip }
}

fn decompress(bytes: &[u8]) -> std::io::Result<Vec<u8>> {
let mut decoder = GzDecoder::new(bytes);
let mut decompressed_data = Vec::new();
decoder.read_to_end(&mut decompressed_data)?;
Ok(decompressed_data)
}
}

#[async_trait]
impl MessageDeserializer for DefaultDeserializer {
async fn deserialize(&mut self, payload: &[u8]) -> Result<Value, MessageDeserializationError> {
let value: Value = match serde_json::from_slice(payload) {
let payload = if self.decompress_gzip {
Self::decompress(payload).map_err(|e| {
MessageDeserializationError::JsonDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()),
}
})?
} else {
payload.to_vec()
};

let value: Value = match serde_json::from_slice(&payload) {
Ok(v) => v,
Err(e) => {
return Err(MessageDeserializationError::JsonDeserialization {
dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()),
dead_letter: DeadLetter::from_failed_deserialization(&payload, e.to_string()),
});
}
};
Expand Down

0 comments on commit 0585509

Please sign in to comment.