diff --git a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs index 425376c35e9..13facfdacf1 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/integration_test.rs @@ -114,6 +114,7 @@ mod test { KafkaPortalListener::create( context, + true, inlet_controller, secure_channel_controller, listener_address, diff --git a/implementations/rust/ockam/ockam_api/src/kafka/portal_listener.rs b/implementations/rust/ockam/ockam_api/src/kafka/portal_listener.rs index 53252a3d3af..04da868a6ad 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/portal_listener.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/portal_listener.rs @@ -18,6 +18,7 @@ pub(crate) struct KafkaPortalListener { uuid_to_name: TopicUuidMap, request_outgoing_access_control: Arc, response_incoming_access_control: Arc, + encrypt_content: bool, } #[ockam::worker] @@ -49,6 +50,7 @@ impl Worker for KafkaPortalListener { let worker_address = KafkaPortalWorker::create_inlet_side_kafka_portal( context, + self.encrypt_content, self.secure_channel_controller.clone(), self.uuid_to_name.clone(), self.inlet_controller.clone(), @@ -78,6 +80,7 @@ impl Worker for KafkaPortalListener { impl KafkaPortalListener { pub(crate) async fn create( context: &Context, + encrypt_content: bool, inlet_controller: KafkaInletController, secure_channel_controller: KafkaSecureChannelControllerImpl, listener_address: Address, @@ -90,6 +93,7 @@ impl KafkaPortalListener { uuid_to_name: Default::default(), request_outgoing_access_control: outgoing_access_control, response_incoming_access_control: incoming_access_control, + encrypt_content, }; context.start_worker(listener_address, s).await diff --git a/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs b/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs index 6172c6e25c8..049dbe02316 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/portal_worker.rs @@ -388,6 +388,7 @@ impl KafkaPortalWorker { #[allow(clippy::too_many_arguments)] pub(crate) async fn create_inlet_side_kafka_portal( context: &mut Context, + encrypt_content: bool, secure_channel_controller: KafkaSecureChannelControllerImpl, uuid_to_name: TopicUuidMap, inlet_map: KafkaInletController, @@ -401,6 +402,7 @@ impl KafkaPortalWorker { secure_channel_controller, uuid_to_name, inlet_map, + encrypt_content, )); let requests_worker_address = Address::random_tagged("KafkaPortalWorker.requests"); @@ -810,6 +812,7 @@ mod test { KafkaPortalWorker::create_inlet_side_kafka_portal( context, + true, secure_channel_controller, Default::default(), inlet_map, @@ -902,6 +905,7 @@ mod test { ); let portal_inlet_address = KafkaPortalWorker::create_inlet_side_kafka_portal( context, + true, secure_channel_controller, Default::default(), inlet_map.clone(), diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs index 39c40b6c595..7dd3784335e 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs @@ -53,6 +53,7 @@ pub(crate) struct InletInterceptorImpl { uuid_to_name: TopicUuidMap, secure_channel_controller: KafkaSecureChannelControllerImpl, inlet_map: KafkaInletController, + encrypt_content: bool, } #[async_trait] @@ -88,12 +89,14 @@ impl InletInterceptorImpl { secure_channel_controller: KafkaSecureChannelControllerImpl, uuid_to_name: TopicUuidMap, inlet_map: KafkaInletController, + encrypt_content: bool, ) -> InletInterceptorImpl { Self { request_map: Arc::new(Mutex::new(Default::default())), uuid_to_name, secure_channel_controller, inlet_map, + encrypt_content, } } } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs index 5c3d8cf74ac..ee994f1b374 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/request.rs @@ -75,9 +75,11 @@ impl InletInterceptorImpl { } ApiKey::ProduceKey => { - return self - .handle_produce_request(context, &mut buffer, &header) - .await; + if self.encrypt_content { + return self + .handle_produce_request(context, &mut buffer, &header) + .await; + } } ApiKey::FetchKey => { self.handle_fetch_request(context, &mut buffer, &header) diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/response.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/response.rs index 185065dbcb9..1fa7a62aa98 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/response.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/response.rs @@ -76,9 +76,11 @@ impl InletInterceptorImpl { } ApiKey::FetchKey => { - return self - .handle_fetch_response(context, &mut buffer, &request_info, &header) - .await; + if self.encrypt_content { + return self + .handle_fetch_response(context, &mut buffer, &request_info, &header) + .await; + } } ApiKey::FindCoordinatorKey => { diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs index 87d08845a2b..f9799553518 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs @@ -61,8 +61,12 @@ mod test { producer_policy_access_control, ); - let interceptor = - InletInterceptorImpl::new(secure_channel_controller, Default::default(), inlet_map); + let interceptor = InletInterceptorImpl::new( + secure_channel_controller, + Default::default(), + inlet_map, + true, + ); let mut correlation_id = 0; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs index 35317aa28d1..5da3fc63637 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs @@ -95,11 +95,12 @@ pub struct StartKafkaInletRequest { #[n(1)] bind_address: SocketAddr, #[n(2)] brokers_port_range: (u16, u16), #[n(3)] kafka_outlet_route: MultiAddr, - #[n(4)] consumer_resolution: ConsumerResolution, - #[n(5)] consumer_publishing: ConsumerPublishing, - #[n(6)] inlet_policy_expression: Option, - #[n(7)] consumer_policy_expression: Option, - #[n(8)] producer_policy_expression: Option, + #[n(4)] encrypt_content: bool, + #[n(5)] consumer_resolution: ConsumerResolution, + #[n(6)] consumer_publishing: ConsumerPublishing, + #[n(7)] inlet_policy_expression: Option, + #[n(8)] consumer_policy_expression: Option, + #[n(9)] producer_policy_expression: Option, } impl StartKafkaInletRequest { @@ -108,6 +109,7 @@ impl StartKafkaInletRequest { bind_address: SocketAddr, brokers_port_range: impl Into<(u16, u16)>, kafka_outlet_route: MultiAddr, + encrypt_content: bool, consumer_resolution: ConsumerResolution, consumer_publishing: ConsumerPublishing, inlet_policy_expression: Option, @@ -118,6 +120,7 @@ impl StartKafkaInletRequest { bind_address, brokers_port_range: brokers_port_range.into(), kafka_outlet_route, + encrypt_content, consumer_resolution, consumer_publishing, inlet_policy_expression, @@ -136,6 +139,10 @@ impl StartKafkaInletRequest { self.kafka_outlet_route.clone() } + pub fn encrypt_content(&self) -> bool { + self.encrypt_content + } + pub fn consumer_resolution(&self) -> ConsumerResolution { self.consumer_resolution.clone() } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs index 9e2021ee873..d59b91f2088 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs @@ -43,6 +43,7 @@ impl NodeManagerWorker { request.bind_address(), request.brokers_port_range(), request.project_route(), + request.encrypt_content(), request.consumer_resolution(), request.consumer_publishing(), request.inlet_policy_expression(), @@ -114,6 +115,7 @@ impl InMemoryNode { bind_address: SocketAddr, brokers_port_range: (u16, u16), outlet_node_multiaddr: MultiAddr, + encrypt_content: bool, consumer_resolution: ConsumerResolution, consumer_publishing: ConsumerPublishing, inlet_policy_expression: Option, @@ -224,6 +226,7 @@ impl InMemoryNode { KafkaPortalListener::create( context, + encrypt_content, inlet_controller, secure_channel_controller, local_interceptor_address.clone(), diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs index 4020c7748ef..a07b05b2be5 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs @@ -49,6 +49,7 @@ impl CreateCommand { consumer_relay: None, publishing_relay: None, avoid_publishing: false, + disable_content_encryption: false, inlet_policy_expression: None, consumer_policy_expression: None, producer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs index e3e9472d285..e1fb4e12144 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs @@ -71,6 +71,16 @@ pub struct CreateCommand { /// referenced by the producer. #[arg(long, name = "avoid-publishing", conflicts_with = "publishing-relay")] pub avoid_publishing: bool, + /// Disable end-to-end kafka messages encryption between producer and consumer. + /// Use it when you want a plain kafka portal, the communication itself will still be + /// encrypted. + #[arg( + long, + name = "disable-content-encryption", + value_name = "BOOL", + default_value_t = true + )] + pub disable_content_encryption: bool, /// Policy expression that will be used for access control to the Kafka Inlet. /// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used. /// @@ -150,6 +160,7 @@ impl Command for CreateCommand { self.from, brokers_port_range, to, + !self.disable_content_encryption, consumer_resolution, consumer_publishing, self.inlet_policy_expression, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs index 88c72357e01..5a8237d4f42 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs @@ -47,6 +47,7 @@ impl CreateCommand { consumer_relay: None, publishing_relay: None, avoid_publishing: false, + disable_content_encryption: false, inlet_policy_expression: None, consumer_policy_expression: None, producer_policy_expression: None,