From 2b70be353c762d19243de5bde87099cc2839c8ef Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 25 Oct 2024 11:40:00 +1100 Subject: [PATCH] extra cleanup --- shotover/benches/benches/codec/kafka.rs | 7 ++++--- shotover/src/codec/kafka.rs | 21 ++++++++++++++++----- shotover/src/codec/mod.rs | 16 +--------------- shotover/src/frame/kafka.rs | 4 ++-- shotover/src/frame/mod.rs | 6 +++--- 5 files changed, 26 insertions(+), 28 deletions(-) diff --git a/shotover/benches/benches/codec/kafka.rs b/shotover/benches/benches/codec/kafka.rs index 323577786..2c34b1637 100644 --- a/shotover/benches/benches/codec/kafka.rs +++ b/shotover/benches/benches/codec/kafka.rs @@ -1,7 +1,8 @@ use bytes::{Bytes, BytesMut}; use criterion::{criterion_group, BatchSize, Criterion}; use shotover::codec::kafka::KafkaCodecBuilder; -use shotover::codec::{CodecBuilder, CodecState, Direction, KafkaCodecState}; +use shotover::codec::kafka::KafkaCodecState; +use shotover::codec::{CodecBuilder, CodecState, Direction}; use shotover::message::Message; use tokio_util::codec::{Decoder, Encoder}; @@ -79,7 +80,7 @@ fn criterion_benchmark(c: &mut Criterion) { Bytes::from(message.to_vec()), CodecState::Kafka(KafkaCodecState { request_header: None, - raw_sasl: None, + raw_sasl: false, }), ); // force the message to be parsed and clear raw message @@ -116,7 +117,7 @@ fn criterion_benchmark(c: &mut Criterion) { Bytes::from(message.to_vec()), CodecState::Kafka(KafkaCodecState { request_header: None, - raw_sasl: None, + raw_sasl: false, }), ); // force the message to be parsed and clear raw message diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index 037b217b4..11a87f394 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -1,5 +1,5 @@ use super::{message_latency, CodecWriteError, Direction}; -use crate::codec::{CodecBuilder, CodecReadError, CodecState, KafkaCodecState}; +use crate::codec::{CodecBuilder, CodecReadError, CodecState}; use crate::frame::kafka::KafkaFrame; use crate::frame::{Frame, MessageType}; use crate::message::{Encodable, Message, MessageId, Messages}; @@ -58,7 +58,6 @@ impl CodecBuilder for KafkaCodecBuilder { MessageType::Kafka } } - #[derive(Debug)] pub struct RequestInfo { header: RequestHeader, @@ -179,7 +178,7 @@ impl Decoder for KafkaDecoder { bytes.freeze(), CodecState::Kafka(KafkaCodecState { request_header: Some(meta.request_header), - raw_sasl: self.expect_raw_sasl, + raw_sasl: self.expect_raw_sasl.is_some(), }), Some(received_at), ); @@ -190,7 +189,7 @@ impl Decoder for KafkaDecoder { bytes.freeze(), CodecState::Kafka(KafkaCodecState { request_header: None, - raw_sasl: self.expect_raw_sasl, + raw_sasl: self.expect_raw_sasl.is_some(), }), Some(received_at), ) @@ -273,7 +272,7 @@ impl Encoder for KafkaEncoder { let id = m.id(); let received_at = m.received_from_source_or_sink_at; let message_contains_raw_sasl = if let CodecState::Kafka(codec_state) = m.codec_state { - codec_state.raw_sasl.is_some() + codec_state.raw_sasl } else { false }; @@ -365,3 +364,15 @@ impl Encoder for KafkaEncoder { }) } } + +#[cfg(feature = "kafka")] +#[derive(Debug, Clone, PartialEq, Copy)] +pub struct KafkaCodecState { + /// When the message is: + /// a request - this value is None + /// a response - this value is Some and contains the header values of the corresponding request. + pub request_header: Option, + /// When `true` this message is not a valid kafka protocol message and is instead a raw SASL message. + /// KafkaFrame will parse this as a SaslHandshake to hide the legacy raw SASL message from transform implementations. + pub raw_sasl: bool, +} diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index f6756ed80..3d5036b79 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -5,9 +5,7 @@ use crate::{frame::MessageType, message::Messages}; use cassandra_protocol::compression::Compression; use core::fmt; #[cfg(feature = "kafka")] -use kafka::RequestHeader; -#[cfg(feature = "kafka")] -use kafka::SaslMessageState; +use kafka::KafkaCodecState; use metrics::{histogram, Histogram}; use tokio_util::codec::{Decoder, Encoder}; @@ -91,18 +89,6 @@ impl CodecState { } } -#[cfg(feature = "kafka")] -#[derive(Debug, Clone, PartialEq, Copy)] -pub struct KafkaCodecState { - /// When the message is: - /// a request - this value is None - /// a response - this value is Some and contains the header values of the corresponding request. - pub request_header: Option, - /// When `Some` this message is not a valid kafka protocol message and is instead a raw SASL message. - /// KafkaFrame will parse this as a SaslHandshake to hide the legacy raw SASL message from transform implementations. - pub raw_sasl: Option, -} - #[derive(Debug)] pub enum CodecReadError { /// The codec failed to parse a received message diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 8b55ce77a..c8eb20ea9 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -1,5 +1,5 @@ +use crate::codec::kafka::KafkaCodecState; use crate::codec::kafka::RequestHeader as CodecRequestHeader; -use crate::codec::KafkaCodecState; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::messages::{ @@ -72,7 +72,7 @@ impl Display for KafkaFrame { impl KafkaFrame { pub fn from_bytes(mut bytes: Bytes, codec_state: KafkaCodecState) -> Result { - if codec_state.raw_sasl.is_some() { + if codec_state.raw_sasl { match &codec_state.request_header { Some(_) => Ok(KafkaFrame::Response { version: 0, diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 51964faea..aa9c61745 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -1,8 +1,8 @@ //! parsed AST-like representations of messages -use crate::codec::CodecState; #[cfg(feature = "kafka")] -use crate::codec::KafkaCodecState; +use crate::codec::kafka::KafkaCodecState; +use crate::codec::CodecState; use anyhow::{anyhow, Result}; use bytes::Bytes; #[cfg(feature = "cassandra")] @@ -98,7 +98,7 @@ impl Frame { #[cfg(feature = "kafka")] Frame::Kafka(_) => CodecState::Kafka(KafkaCodecState { request_header: None, - raw_sasl: None, + raw_sasl: false, }), Frame::Dummy => CodecState::Dummy, #[cfg(feature = "opensearch")]