Skip to content

Commit

Permalink
chore(deps): Drop use of infer crate (#21623)
Browse files Browse the repository at this point in the history
* chore(deps): Drop use of `infer` crate

* Add reference

* Fix spelling
  • Loading branch information
bruceg authored Oct 28, 2024
1 parent 42e2bb6 commit 29d58c9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
9 changes: 1 addition & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ hyper = { version = "0.14.28", default-features = false, features = ["client", "
hyper-openssl = { version = "0.9.2", default-features = false }
hyper-proxy = { version = "0.9.1", default-features = false, features = ["openssl-tls"] }
indexmap.workspace = true
infer = { version = "0.16.0", default-features = false, optional = true }
indoc = { version = "2.0.5", default-features = false }
inventory = { version = "0.3.15", default-features = false }
ipnet = { version = "2", default-features = false, optional = true, features = ["serde", "std"] }
Expand Down Expand Up @@ -568,7 +567,7 @@ sources-metrics = [
sources-amqp = ["lapin"]
sources-apache_metrics = ["sources-utils-http-client"]
sources-aws_ecs_metrics = ["sources-utils-http-client"]
sources-aws_kinesis_firehose = ["dep:base64", "dep:infer"]
sources-aws_kinesis_firehose = ["dep:base64"]
sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:semver", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"]
sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sources-datadog_agent = ["sources-utils-http-error", "protobuf-build", "dep:prost"]
Expand Down
53 changes: 40 additions & 13 deletions src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,32 @@ fn decode_record(
compression: compression.to_owned(),
}),
Compression::Auto => {
match infer::get(&buf) {
Some(filetype) => match filetype.mime_type() {
"application/gzip" => decode_gzip(&buf[..]).or_else(|error| {
emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
compression: Compression::Gzip,
error
});
Ok(Bytes::from(buf))
}),
// only support gzip for now
_ => Ok(Bytes::from(buf)),
},
None => Ok(Bytes::from(buf)),
if is_gzip(&buf) {
decode_gzip(&buf[..]).or_else(|error| {
emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
compression: Compression::Gzip,
error
});
Ok(Bytes::from(buf))
})
} else {
// only support gzip for now
Ok(Bytes::from(buf))
}
}
}
}

fn is_gzip(data: &[u8]) -> bool {
// The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from
// the GZIP file format specification, which is the fixed member header identification bytes.
// The third byte is the compression method, of which only one is defined which is 8 for the
// deflate algorithm.
//
// Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3
data.len() >= 10 && &data[..3] == b"\x1f\x8b\x08"
}

fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
let mut decoded = Vec::new();

Expand All @@ -245,3 +253,22 @@ fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {

Ok(Bytes::from(decoded))
}

#[cfg(test)]
mod tests {
use flate2::{write::GzEncoder, Compression};
use std::io::Write as _;

use super::*;

const CONTENT: &[u8] = b"Example";

#[test]
fn correctly_detects_gzipped_content() {
assert!(!is_gzip(CONTENT));
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(CONTENT).unwrap();
let compressed = encoder.finish().unwrap();
assert!(is_gzip(&compressed));
}
}

0 comments on commit 29d58c9

Please sign in to comment.