Skip to content

Commit

Permalink
extra cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 25, 2024
1 parent fae217e commit 2b70be3
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 28 deletions.
7 changes: 4 additions & 3 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use bytes::{Bytes, BytesMut};
use criterion::{criterion_group, BatchSize, Criterion};
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::{CodecBuilder, CodecState, Direction, KafkaCodecState};
use shotover::codec::kafka::KafkaCodecState;
use shotover::codec::{CodecBuilder, CodecState, Direction};
use shotover::message::Message;
use tokio_util::codec::{Decoder, Encoder};

Expand Down Expand Up @@ -79,7 +80,7 @@ fn criterion_benchmark(c: &mut Criterion) {
Bytes::from(message.to_vec()),
CodecState::Kafka(KafkaCodecState {
request_header: None,
raw_sasl: None,
raw_sasl: false,
}),
);
// force the message to be parsed and clear raw message
Expand Down Expand Up @@ -116,7 +117,7 @@ fn criterion_benchmark(c: &mut Criterion) {
Bytes::from(message.to_vec()),
CodecState::Kafka(KafkaCodecState {
request_header: None,
raw_sasl: None,
raw_sasl: false,
}),
);
// force the message to be parsed and clear raw message
Expand Down
21 changes: 16 additions & 5 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{message_latency, CodecWriteError, Direction};
use crate::codec::{CodecBuilder, CodecReadError, CodecState, KafkaCodecState};
use crate::codec::{CodecBuilder, CodecReadError, CodecState};
use crate::frame::kafka::KafkaFrame;
use crate::frame::{Frame, MessageType};
use crate::message::{Encodable, Message, MessageId, Messages};
Expand Down Expand Up @@ -58,7 +58,6 @@ impl CodecBuilder for KafkaCodecBuilder {
MessageType::Kafka
}
}

#[derive(Debug)]
pub struct RequestInfo {
header: RequestHeader,
Expand Down Expand Up @@ -179,7 +178,7 @@ impl Decoder for KafkaDecoder {
bytes.freeze(),
CodecState::Kafka(KafkaCodecState {
request_header: Some(meta.request_header),
raw_sasl: self.expect_raw_sasl,
raw_sasl: self.expect_raw_sasl.is_some(),
}),
Some(received_at),
);
Expand All @@ -190,7 +189,7 @@ impl Decoder for KafkaDecoder {
bytes.freeze(),
CodecState::Kafka(KafkaCodecState {
request_header: None,
raw_sasl: self.expect_raw_sasl,
raw_sasl: self.expect_raw_sasl.is_some(),
}),
Some(received_at),
)
Expand Down Expand Up @@ -273,7 +272,7 @@ impl Encoder<Messages> for KafkaEncoder {
let id = m.id();
let received_at = m.received_from_source_or_sink_at;
let message_contains_raw_sasl = if let CodecState::Kafka(codec_state) = m.codec_state {
codec_state.raw_sasl.is_some()
codec_state.raw_sasl
} else {
false
};
Expand Down Expand Up @@ -365,3 +364,15 @@ impl Encoder<Messages> for KafkaEncoder {
})
}
}

#[cfg(feature = "kafka")]
#[derive(Debug, Clone, PartialEq, Copy)]
pub struct KafkaCodecState {
/// When the message is:
/// a request - this value is None
/// a response - this value is Some and contains the header values of the corresponding request.
pub request_header: Option<RequestHeader>,
/// When `true` this message is not a valid kafka protocol message and is instead a raw SASL message.
/// KafkaFrame will parse this as a SaslHandshake to hide the legacy raw SASL message from transform implementations.
pub raw_sasl: bool,
}
16 changes: 1 addition & 15 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{frame::MessageType, message::Messages};
use cassandra_protocol::compression::Compression;
use core::fmt;
#[cfg(feature = "kafka")]
use kafka::RequestHeader;
#[cfg(feature = "kafka")]
use kafka::SaslMessageState;
use kafka::KafkaCodecState;
use metrics::{histogram, Histogram};
use tokio_util::codec::{Decoder, Encoder};

Expand Down Expand Up @@ -91,18 +89,6 @@ impl CodecState {
}
}

#[cfg(feature = "kafka")]
#[derive(Debug, Clone, PartialEq, Copy)]
pub struct KafkaCodecState {
/// When the message is:
/// a request - this value is None
/// a response - this value is Some and contains the header values of the corresponding request.
pub request_header: Option<RequestHeader>,
/// When `Some` this message is not a valid kafka protocol message and is instead a raw SASL message.
/// KafkaFrame will parse this as a SaslHandshake to hide the legacy raw SASL message from transform implementations.
pub raw_sasl: Option<SaslMessageState>,
}

#[derive(Debug)]
pub enum CodecReadError {
/// The codec failed to parse a received message
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::codec::kafka::KafkaCodecState;
use crate::codec::kafka::RequestHeader as CodecRequestHeader;
use crate::codec::KafkaCodecState;
use anyhow::{anyhow, Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::messages::{
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Display for KafkaFrame {

impl KafkaFrame {
pub fn from_bytes(mut bytes: Bytes, codec_state: KafkaCodecState) -> Result<Self> {
if codec_state.raw_sasl.is_some() {
if codec_state.raw_sasl {
match &codec_state.request_header {
Some(_) => Ok(KafkaFrame::Response {
version: 0,
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! parsed AST-like representations of messages
use crate::codec::CodecState;
#[cfg(feature = "kafka")]
use crate::codec::KafkaCodecState;
use crate::codec::kafka::KafkaCodecState;
use crate::codec::CodecState;
use anyhow::{anyhow, Result};
use bytes::Bytes;
#[cfg(feature = "cassandra")]
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Frame {
#[cfg(feature = "kafka")]
Frame::Kafka(_) => CodecState::Kafka(KafkaCodecState {
request_header: None,
raw_sasl: None,
raw_sasl: false,
}),
Frame::Dummy => CodecState::Dummy,
#[cfg(feature = "opensearch")]
Expand Down

0 comments on commit 2b70be3

Please sign in to comment.