diff --git a/Cargo.lock b/Cargo.lock index a41b5e1d8bc..ba51684820c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4128,6 +4128,7 @@ version = "3.119.1" dependencies = [ "bimap", "bitflags 2.6.0", + "fancy-regex", "ipnet", "k8s-openapi", "mirrord-analytics", diff --git a/changelog.d/2601.added.md b/changelog.d/2601.added.md new file mode 100644 index 00000000000..57c69e07942 --- /dev/null +++ b/changelog.d/2601.added.md @@ -0,0 +1 @@ +Added Kafka splitting feature. diff --git a/medschool/src/parse.rs b/medschool/src/parse.rs index 76f6206e679..551fb881200 100644 --- a/medschool/src/parse.rs +++ b/medschool/src/parse.rs @@ -207,8 +207,9 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>( recursion_level: &mut usize, ) -> Vec { if *recursion_level >= MAX_RECURSION_LEVEL { - return vec!["Recursion limit reached".to_string()]; + panic!("recursion limit {MAX_RECURSION_LEVEL} reached"); } + // increment the recursion level as we're going deeper into the tree types // get the type of the field from the types set to recurse into it's fields .get(&field.ty) @@ -281,7 +282,7 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>( #[tracing::instrument(level = "trace", ret)] pub fn resolve_references(types: HashSet) -> Option { /// Maximum recursion level for safety. - const MAX_RECURSION_LEVEL: usize = 10; + const MAX_RECURSION_LEVEL: usize = 16; // Cache to perform memoization between recursive calls so we don't have to resolve the same // type multiple times. Mapping between `ident` -> `resolved_docs`. // For example, if we have a types [`A`, `B`, `C`] and A has a field of type `B` and `B` has a diff --git a/mirrord-schema.json b/mirrord-schema.json index 8641a972f23..fd5c40edf35 100644 --- a/mirrord-schema.json +++ b/mirrord-schema.json @@ -1534,6 +1534,7 @@ ], "properties": { "message_filter": { + "description": "A filter is a mapping between message attribute names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the attributes in the filter, with values of those attributes matching the respective patterns.", "type": "object", "additionalProperties": { "type": "string" @@ -1546,6 +1547,29 @@ ] } } + }, + { + "description": "Kafka.", + "type": "object", + "required": [ + "message_filter", + "queue_type" + ], + "properties": { + "message_filter": { + "description": "A filter is a mapping between message header names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the headers in the filter, with values of those headers matching the respective patterns.", + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "queue_type": { + "type": "string", + "enum": [ + "Kafka" + ] + } + } } ] }, @@ -1570,11 +1594,8 @@ "additionalProperties": false }, "SplitQueuesConfig": { - "description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```", - "type": [ - "object", - "null" - ], + "description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"you$\" } }, \"third-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"who\": \"you$\" } }, \"fourth-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, } } } ```", + "type": "object", "additionalProperties": { "$ref": "#/definitions/QueueFilter" } diff --git a/mirrord/cli/src/config.rs b/mirrord/cli/src/config.rs index 04cf5150930..44821f408dc 100644 --- a/mirrord/cli/src/config.rs +++ b/mirrord/cli/src/config.rs @@ -547,48 +547,7 @@ pub(super) enum OperatorCommand { /// /// NOTE: You don't need to install the operator to use open source mirrord features. #[command(override_usage = "mirrord operator setup [OPTIONS] | kubectl apply -f -")] - Setup { - /// ToS can be read here - #[arg(long)] - accept_tos: bool, - - /// A mirrord for Teams license key (online) - #[arg(long, allow_hyphen_values(true))] - license_key: Option, - - /// Path to a file containing a mirrord for Teams license certificate - #[arg(long)] - license_path: Option, - - /// Output Kubernetes specs to file instead of stdout - #[arg(short, long)] - file: Option, - - /// Namespace to create the operator in (this doesn't limit the namespaces the operator - /// will be able to access) - #[arg(short, long, default_value = "mirrord")] - namespace: OperatorNamespace, - - /// AWS role ARN for the operator's service account. - /// Necessary for enabling SQS queue splitting. - /// For successfully running an SQS queue splitting operator the given IAM role must be - /// able to create, read from, write to, and delete SQS queues. - /// If the queue messages are encrypted using KMS, the operator also needs the - /// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions. - #[arg(long, visible_alias = "arn")] - aws_role_arn: Option, - - /// Enable SQS queue splitting. - /// When set, some extra CRDs will be installed on the cluster, and the operator will run - /// an SQS splitting component. - #[arg( - long, - visible_alias = "sqs", - default_value_t = false, - requires = "aws_role_arn" - )] - sqs_splitting: bool, - }, + Setup(#[clap(flatten)] OperatorSetupParams), /// Print operator status Status { /// Specify config file to use @@ -602,6 +561,56 @@ pub(super) enum OperatorCommand { Session(SessionCommand), } +#[derive(Args, Debug)] +pub(super) struct OperatorSetupParams { + /// ToS can be read here + #[arg(long)] + pub(super) accept_tos: bool, + + /// A mirrord for Teams license key (online) + #[arg(long, allow_hyphen_values(true))] + pub(super) license_key: Option, + + /// Path to a file containing a mirrord for Teams license certificate + #[arg(long)] + pub(super) license_path: Option, + + /// Output Kubernetes specs to file instead of stdout + #[arg(short, long)] + pub(super) file: Option, + + /// Namespace to create the operator in (this doesn't limit the namespaces the operator + /// will be able to access) + #[arg(short, long, default_value = "mirrord")] + pub(super) namespace: OperatorNamespace, + + /// AWS role ARN for the operator's service account. + /// Necessary for enabling SQS queue splitting. + /// For successfully running an SQS queue splitting operator the given IAM role must be + /// able to create, read from, write to, and delete SQS queues. + /// If the queue messages are encrypted using KMS, the operator also needs the + /// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions. + #[arg(long, visible_alias = "arn")] + pub(super) aws_role_arn: Option, + + /// Enable SQS queue splitting. + /// When set, some extra CRDs will be installed on the cluster, and the operator will run + /// an SQS splitting component. + #[arg( + long, + visible_alias = "sqs", + default_value_t = false, + requires = "aws_role_arn" + )] + pub(super) sqs_splitting: bool, + + /// Enable Kafka queue splitting. + /// When set, some extra CRDs will be installed on the cluster, and the operator will run + /// a Kafka splitting component. + #[arg(long, visible_alias = "kafka", default_value_t = false)] + pub(super) kafka_splitting: bool, +} + /// `mirrord operator session` family of commands. /// /// Allows the user to forcefully kill operator sessions, use with care! diff --git a/mirrord/cli/src/operator.rs b/mirrord/cli/src/operator.rs index 8654752e1db..bdc1d8e3ff3 100644 --- a/mirrord/cli/src/operator.rs +++ b/mirrord/cli/src/operator.rs @@ -1,8 +1,4 @@ -use std::{ - fs::File, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{fs::File, path::Path, time::Duration}; use futures::TryFutureExt; use kube::{Api, Client}; @@ -15,7 +11,7 @@ use mirrord_kube::api::kubernetes::create_kube_config; use mirrord_operator::{ client::OperatorApi, crd::{MirrordOperatorCrd, MirrordOperatorSpec}, - setup::{LicenseType, Operator, OperatorNamespace, OperatorSetup, SetupOptions}, + setup::{LicenseType, Operator, OperatorSetup, SetupOptions}, types::LicenseInfoOwned, }; use mirrord_progress::{Progress, ProgressTracker}; @@ -29,7 +25,7 @@ use crate::{ config::{OperatorArgs, OperatorCommand}, error::{CliError, OperatorSetupError}, util::remove_proxy_env, - Result, + OperatorSetupParams, Result, }; mod session; @@ -54,13 +50,16 @@ async fn get_last_version() -> Result { /// Setup the operator into a file or to stdout, with explanation. async fn operator_setup( - accept_tos: bool, - file: Option, - namespace: OperatorNamespace, - license_key: Option, - license_path: Option, - aws_role_arn: Option, - sqs_splitting: bool, + OperatorSetupParams { + accept_tos, + license_key, + license_path, + file, + namespace, + aws_role_arn, + sqs_splitting, + kafka_splitting, + }: OperatorSetupParams, ) -> Result<(), OperatorSetupError> { if !accept_tos { eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS"); @@ -105,6 +104,7 @@ async fn operator_setup( image, aws_role_arn, sqs_splitting, + kafka_splitting, }); match file { @@ -297,25 +297,7 @@ Operator License /// Handle commands related to the operator `mirrord operator ...` pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> { match args.command { - OperatorCommand::Setup { - accept_tos, - file, - namespace, - license_key, - license_path, - aws_role_arn, - sqs_splitting, - } => operator_setup( - accept_tos, - file, - namespace, - license_key, - license_path, - aws_role_arn, - sqs_splitting, - ) - .await - .map_err(CliError::from), + OperatorCommand::Setup(params) => operator_setup(params).await.map_err(CliError::from), OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await, OperatorCommand::Session(session_command) => { SessionCommandHandler::new(session_command) diff --git a/mirrord/config/Cargo.toml b/mirrord/config/Cargo.toml index b95d78cb56d..f5c421daaad 100644 --- a/mirrord/config/Cargo.toml +++ b/mirrord/config/Cargo.toml @@ -33,6 +33,7 @@ ipnet = "2.8" bitflags = "2" k8s-openapi = { workspace = true, features = ["schemars", "earliest"] } tera = "1" +fancy-regex.workspace = true [dev-dependencies] -rstest = "0.23" \ No newline at end of file +rstest = "0.23" diff --git a/mirrord/config/configuration.md b/mirrord/config/configuration.md index 5464d945d1a..60cff444f9b 100644 --- a/mirrord/config/configuration.md +++ b/mirrord/config/configuration.md @@ -1236,13 +1236,26 @@ will be used, and your local application will not receive any messages from that "queue_type": "SQS", "message_filter": { "wows": "so wows", - "coolz": "^very .*" + "coolz": "^very" } }, "second-queue": { "queue_type": "SQS", "message_filter": { - "who": "*you$" + "who": "you$" + } + }, + "third-queue": { + "queue_type": "Kafka", + "message_filter": { + "who": "you$" + } + }, + "fourth-queue": { + "queue_type": "Kafka", + "message_filter": { + "wows": "so wows", + "coolz": "^very" } }, } diff --git a/mirrord/config/src/config.rs b/mirrord/config/src/config.rs index 95a451fb5b3..a36fe07c573 100644 --- a/mirrord/config/src/config.rs +++ b/mirrord/config/src/config.rs @@ -7,6 +7,8 @@ use std::error::Error; use thiserror::Error; +use crate::feature::split_queues::QueueSplittingVerificationError; + /// /// Error that would be returned from [MirrordConfig::generate_config] #[derive(Error, Debug)] @@ -71,6 +73,9 @@ pub enum ConfigError { #[error("Target type requires the mirrord-operator, but operator usage was explicitly disabled. Consider enabling mirrord-operator in your mirrord config.")] TargetRequiresOperator, + + #[error("Queue splitting config is invalid: {0}")] + QueueSplittingVerificationError(#[from] QueueSplittingVerificationError), } impl From for ConfigError { diff --git a/mirrord/config/src/feature.rs b/mirrord/config/src/feature.rs index 22b3e8fbcce..317d590e0c8 100644 --- a/mirrord/config/src/feature.rs +++ b/mirrord/config/src/feature.rs @@ -105,7 +105,7 @@ pub struct FeatureConfig { /// If you don't specify any filter for a queue that is however declared in the /// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter /// will be used, and your local application will not receive any messages from that queue. - #[config(nested, unstable)] + #[config(nested, default, unstable)] pub split_queues: SplitQueuesConfig, } diff --git a/mirrord/config/src/feature/split_queues.rs b/mirrord/config/src/feature/split_queues.rs index 1b25c0cf199..c13edd67513 100644 --- a/mirrord/config/src/feature/split_queues.rs +++ b/mirrord/config/src/feature/split_queues.rs @@ -1,11 +1,10 @@ -use std::{ - collections::{BTreeMap, HashMap}, - ops::Not, -}; +use std::collections::BTreeMap; +use fancy_regex::Regex; use mirrord_analytics::{Analytics, CollectAnalytics}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use thiserror::Error; use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig}; @@ -19,13 +18,26 @@ pub type QueueId = String; /// "queue_type": "SQS", /// "message_filter": { /// "wows": "so wows", -/// "coolz": "^very .*" +/// "coolz": "^very" /// } /// }, /// "second-queue": { /// "queue_type": "SQS", /// "message_filter": { -/// "who": "*you$" +/// "who": "you$" +/// } +/// }, +/// "third-queue": { +/// "queue_type": "Kafka", +/// "message_filter": { +/// "who": "you$" +/// } +/// }, +/// "fourth-queue": { +/// "queue_type": "Kafka", +/// "message_filter": { +/// "wows": "so wows", +/// "coolz": "^very" /// } /// }, /// } @@ -33,32 +45,58 @@ pub type QueueId = String; /// } /// ``` #[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Serialize, Deserialize, Default)] -pub struct SplitQueuesConfig(pub Option>); +pub struct SplitQueuesConfig(BTreeMap); impl SplitQueuesConfig { + /// Returns whether this configuration contains any queue at all. pub fn is_set(&self) -> bool { - self.0.is_some() + !self.0.is_empty() } /// Out of the whole queue splitting config, get only the sqs queues. - pub fn get_sqs_filter(&self) -> Option> { - self.0 - .as_ref() - .map(BTreeMap::iter) - .map(|filters| { - filters - // When there are more variants of QueueFilter, change this to a `filter_map`. - .filter_map(|(queue_id, queue_filter)| match queue_filter { - QueueFilter::Sqs(filter_mapping) => { - Some((queue_id.clone(), filter_mapping.clone())) - } - _ => None, - }) - .collect() - }) - .and_then(|filters_map: HashMap| { - filters_map.is_empty().not().then_some(filters_map) - }) + pub fn sqs(&self) -> impl '_ + Iterator { + self.0.iter().filter_map(|(name, filter)| match filter { + QueueFilter::Sqs { message_filter } => Some((name.as_str(), message_filter)), + _ => None, + }) + } + + /// Out of the whole queue splitting config, get only the kafka topics. + pub fn kafka(&self) -> impl '_ + Iterator { + self.0.iter().filter_map(|(name, filter)| match filter { + QueueFilter::Kafka { message_filter } => Some((name.as_str(), message_filter)), + _ => None, + }) + } + + pub fn verify( + &self, + _context: &mut ConfigContext, + ) -> Result<(), QueueSplittingVerificationError> { + for (queue_name, filter) in &self.0 { + let filter = match filter { + QueueFilter::Sqs { message_filter } | QueueFilter::Kafka { message_filter } => { + message_filter + } + QueueFilter::Unknown => { + return Err(QueueSplittingVerificationError::UnknownQueueType( + queue_name.clone(), + )); + } + }; + + for (name, pattern) in filter { + Regex::new(pattern).map_err(|error| { + QueueSplittingVerificationError::InvalidRegex( + queue_name.clone(), + name.clone(), + error.into(), + ) + })?; + } + } + + Ok(()) } } @@ -77,22 +115,32 @@ impl FromMirrordConfig for SplitQueuesConfig { type Generator = Self; } -pub type MessageAttributeName = String; -pub type AttributeValuePattern = String; - -/// A filter is a mapping between message attribute names and regexes they should match. -/// The local application will only receive messages that match **all** of the given patterns. -/// This means, only messages that have **all** the `MessageAttributeName`s in the filter, -/// with values of those attributes matching the respective `AttributeValuePattern`. -pub type SqsMessageFilter = BTreeMap; +pub type QueueMessageFilter = BTreeMap; /// More queue types might be added in the future. #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)] -#[serde(tag = "queue_type", content = "message_filter")] +#[serde(tag = "queue_type")] pub enum QueueFilter { /// Amazon Simple Queue Service. #[serde(rename = "SQS")] - Sqs(SqsMessageFilter), + Sqs { + /// A filter is a mapping between message attribute names and regexes they should match. + /// The local application will only receive messages that match **all** of the given + /// patterns. This means, only messages that have **all** of the attributes in the + /// filter, with values of those attributes matching the respective patterns. + message_filter: QueueMessageFilter, + }, + + /// Kafka. + #[serde(rename = "Kafka")] + Kafka { + /// A filter is a mapping between message header names and regexes they should match. + /// The local application will only receive messages that match **all** of the given + /// patterns. This means, only messages that have **all** of the headers in the + /// filter, with values of those headers matching the respective patterns. + message_filter: QueueMessageFilter, + }, + /// When a newer client sends a new filter kind to an older operator, that does not yet know /// about that filter type, this is what that filter will be deserialized to. #[schemars(skip)] @@ -102,12 +150,71 @@ pub enum QueueFilter { impl CollectAnalytics for &SplitQueuesConfig { fn collect_analytics(&self, analytics: &mut Analytics) { - analytics.add( - "queue_count", - self.0 - .as_ref() - .map(|mapping| mapping.len()) - .unwrap_or_default(), - ) + analytics.add("sqs_queue_count", self.sqs().count()); + analytics.add("kafka_queue_count", self.kafka().count()); + } +} + +#[derive(Error, Debug)] +pub enum QueueSplittingVerificationError { + #[error("{0}: unknown queue type")] + UnknownQueueType(String), + #[error("{0}.message_filter.{1}: failed to parse regular expression ({2})")] + InvalidRegex( + String, + String, + // without `Box`, clippy complains when `ConfigError` is used in `Err` + Box, + ), +} + +#[cfg(test)] +mod test { + use super::QueueFilter; + + #[test] + fn deserialize_known_queue_types() { + let value = serde_json::json!({ + "queue_type": "Kafka", + "message_filter": { + "key": "value", + }, + }); + + let filter = serde_json::from_value::(value).unwrap(); + assert_eq!( + filter, + QueueFilter::Kafka { + message_filter: [("key".to_string(), "value".to_string())].into() + } + ); + + let value = serde_json::json!({ + "queue_type": "SQS", + "message_filter": { + "key": "value", + }, + }); + + let filter = serde_json::from_value::(value).unwrap(); + assert_eq!( + filter, + QueueFilter::Sqs { + message_filter: [("key".to_string(), "value".to_string())].into() + } + ); + } + + #[test] + fn deserialize_unknown_queue_type() { + let value = serde_json::json!({ + "queue_type": "unknown", + "message_filter": { + "key": "value", + } + }); + + let filter = serde_json::from_value::(value).unwrap(); + assert_eq!(filter, QueueFilter::Unknown); } } diff --git a/mirrord/config/src/lib.rs b/mirrord/config/src/lib.rs index d5ebc9d6135..d2384e7f3f0 100644 --- a/mirrord/config/src/lib.rs +++ b/mirrord/config/src/lib.rs @@ -523,6 +523,7 @@ impl LayerConfig { self.feature.network.dns.verify(context)?; self.feature.network.outgoing.verify(context)?; + self.feature.split_queues.verify(context)?; if self.experimental.readlink { context.add_warning( diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index 98f4860715d..ba627b41a79 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -449,12 +449,19 @@ where .spec .require_feature(NewOperatorFeature::CopyTarget)? } - if config.feature.split_queues.is_set() { + + if config.feature.split_queues.sqs().next().is_some() { self.operator .spec .require_feature(NewOperatorFeature::SqsQueueSplitting)?; } + if config.feature.split_queues.kafka().next().is_some() { + self.operator + .spec + .require_feature(NewOperatorFeature::KafkaQueueSplitting)?; + } + Ok(()) } diff --git a/mirrord/operator/src/crd.rs b/mirrord/operator/src/crd.rs index 58e22f1c927..cfa6320a9a4 100644 --- a/mirrord/operator/src/crd.rs +++ b/mirrord/operator/src/crd.rs @@ -7,7 +7,7 @@ use kube::{CustomResource, Resource}; use kube_target::{KubeTarget, UnknownTargetType}; pub use mirrord_config::feature::split_queues::QueueId; use mirrord_config::{ - feature::split_queues::{SplitQueuesConfig, SqsMessageFilter}, + feature::split_queues::{QueueMessageFilter, SplitQueuesConfig}, target::{Target, TargetConfig}, }; use schemars::JsonSchema; @@ -19,6 +19,7 @@ use self::label_selector::LabelSelector; use crate::client::error::OperatorApiError; use crate::types::LicenseInfoOwned; +pub mod kafka; pub mod kube_target; pub mod label_selector; @@ -261,8 +262,9 @@ pub enum OperatorFeatures { pub enum NewOperatorFeature { ProxyApi, CopyTarget, - SqsQueueSplitting, SessionManagement, + SqsQueueSplitting, + KafkaQueueSplitting, /// This variant is what a client sees when the operator includes a feature the client is not /// yet aware of, because it was introduced in a version newer than the client's. #[schemars(skip)] @@ -275,9 +277,10 @@ impl Display for NewOperatorFeature { let name = match self { NewOperatorFeature::ProxyApi => "proxy API", NewOperatorFeature::CopyTarget => "copy target", + NewOperatorFeature::SessionManagement => "session management", NewOperatorFeature::SqsQueueSplitting => "SQS queue splitting", + NewOperatorFeature::KafkaQueueSplitting => "Kafka queue splitting", NewOperatorFeature::Unknown => "unknown feature", - NewOperatorFeature::SessionManagement => "session management", }; f.write_str(name) } @@ -642,7 +645,7 @@ pub struct MirrordSqsSessionSpec { /// For each queue_id, a mapping from attribute name, to attribute value regex. /// The queue_id for a queue is determined at the queue registry. It is not (necessarily) /// The name of the queue on AWS. - pub queue_filters: HashMap, + pub queue_filters: HashMap, /// The target of this session. pub queue_consumer: QueueConsumer, diff --git a/mirrord/operator/src/crd/kafka.rs b/mirrord/operator/src/crd/kafka.rs new file mode 100644 index 00000000000..9f29aaac3a2 --- /dev/null +++ b/mirrord/operator/src/crd/kafka.rs @@ -0,0 +1,182 @@ +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Configuration to use when creating operator's Kafka client. +/// Resources of this kind should live in the operator's namespace. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( + group = "queues.mirrord.metalbear.co", + version = "v1alpha", + kind = "MirrordKafkaClientConfig", + namespaced, + printcolumn = r#"{"name":"PARENT", "type":"string", "description":"Name of parent configuration.", "jsonPath":".spec.parent"}"# +)] +#[serde(rename_all = "camelCase")] +pub struct MirrordKafkaClientConfigSpec { + /// Name of parent resource to use as base when resolving final configuration. + pub parent: Option, + + /// Properties to set. + /// + /// When performing Kafka splitting, the operator will override `group.id` property. + /// + /// The list of all available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + pub properties: Vec, +} + +/// Property to use when creating operator's Kafka client. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, Eq, PartialEq, Hash)] +#[serde(rename_all = "camelCase")] +pub struct MirrordKafkaClientProperty { + /// Name of the property, e.g `bootstrap.servers`. + pub name: String, + + /// Value for the property, e.g `kafka.default.svc.cluster.local:9092`. + /// `null` clears the property from parent resource when resolving the final configuration. + pub value: Option, +} + +/// Defines splittable Kafka topics consumed by some workload living in the same namespace. +/// +/// # Concurrent splitting +/// +/// Concurrent Kafka splitting sessions are allowed, as long as they use the same topic id or their +/// topics' `nameSources` do not overlap. +/// +/// # Example +/// +/// ```yaml +/// apiVersion: queues.mirrord.metalbear.co/v1alpha +/// kind: MirrordKafkaTopicsConsumer +/// metadata: +/// name: example +/// namespace: default +/// spec: +/// consumerName: example-deployment +/// consumerApiVersion: apps/v1 +/// consumerKind: Deployment +/// topics: +/// - id: example-topic +/// nameSources: +/// - directEnvVar: +/// container: example-container +/// name: KAFKA_TOPIC_NAME +/// groupIdSources: +/// - directEnvVar: +/// container: example-container +/// name: KAFKA_GROUP_ID +/// clientConfig: example-config +/// ``` +/// +/// 1. Creating the resource below will enable Kafka splitting on a deployment `example-deployment` +/// living in namespace `default`. Id `example-topic` can be then used in the mirrord config to +/// split the topic for the duration of the mirrord session. +/// +/// 2. Topic name will be resolved based on `example-deployment`'s pod template by extracting value +/// of variable `KAFKA_TOPIC_NAME` defined directly in `example-container`. +/// +/// 3. Consumer group id used by the mirrord operator will be resolved based on +/// `example-deployment`'s pod template by extracting value of variable `KAFKA_GROUP_ID` defined +/// directly in `example-container`. +/// +/// 4. For the duration of the session, `example-deployment` will be patched - the mirrord operator +/// will substitute topic name in `KAFKA_TOPIC_NAME` variable with a name of an ephemeral Kafka +/// topic. +/// +/// 5. Local application will see a different value of the `KAFKA_TOPIC_NAME` - it will be a name of +/// another ephemeral Kafka topic. +/// +/// 6. `MirrordKafkaClientConfig` named `example-config` living in mirrord operator's namespace will +/// be used to manage ephemeral Kafka topics and consume/produce messages. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( + group = "queues.mirrord.metalbear.co", + version = "v1alpha", + kind = "MirrordKafkaTopicsConsumer", + namespaced, + printcolumn = r#"{"name":"CONSUMER-NAME", "type":"string", "description":"Name of the topic consumer workload.", "jsonPath":".spec.consumerName"}"#, + printcolumn = r#"{"name":"CONSUMER-KIND", "type":"string", "description":"Kind of the topic consumer workload.", "jsonPath":".spec.consumerKind"}"#, + printcolumn = r#"{"name":"CONSUMER-API-VERSION", "type":"string", "description":"Api version of the topic consumer workload.", "jsonPath":".spec.consumerApiVersion"}"#, + printcolumn = r#"{"name":"CONSUMER-RESTART-TIMEOUT", "type":"string", "description":"Timeout for consumer workload restart.", "jsonPath":".spec.consumerRestartTimeout"}"# +)] +#[serde(rename_all = "camelCase")] +pub struct MirrordKafkaTopicsConsumerSpec { + /// Workload name, for example `my-deployment`. + pub consumer_name: String, + + /// Workload kind, for example `Deployment`. + pub consumer_kind: String, + + /// Workload api version, for example `apps/v1`. + pub consumer_api_version: String, + + /// Timeout for waiting until workload patch takes effect, that is at least one pod reads from + /// the ephemeral topic. + /// + /// Specified in seconds. Defaults to 60s. + #[serde(skip_serializing_if = "Option::is_none")] + pub consumer_restart_timeout: Option, + + /// List of consumed splittable topics. + pub topics: Vec, +} + +/// Splittable Kafka topic consumed by some remote target. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct KafkaTopicDetails { + /// Id of this topic. Can be used in mirrord config to identify this topic. + pub id: String, + + /// All occurrences of this topic's name in the workload's pod template. + pub name_sources: Vec, + + /// All occurrences of this topic's group id in the workload's pod template. + pub group_id_sources: Vec, + + /// Links to [`MirrordKafkaClientConfig`] in the operator's namespace. + /// This config will be used to manage ephemeral Kafka topics and consume/produce messages. + pub client_config: String, +} + +/// Source of some topic property required for Kafka splitting. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema, Hash)] +#[serde(rename_all = "camelCase")] +pub enum TopicPropertySource { + /// Environment variable with value defined directly in the pod template. + DirectEnvVar(EnvVarLocation), +} + +/// Location of an environment variable defined in the pod template. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema, Hash)] +#[serde(rename_all = "camelCase")] +pub struct EnvVarLocation { + /// Name of the container. + pub container: String, + + /// Name of the variable. + pub variable: String, +} + +/// Ephemeral topic created in your Kafka cluster for the purpose of running a Kafka splitting +/// session. +/// +/// Resources of this kind should live in the operator's namespace. They will be used to clean up +/// topics that are no longer used. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema, Eq, PartialEq, Hash)] +#[kube( + group = "queues.mirrord.metalbear.co", + version = "v1alpha", + kind = "MirrordKafkaEphemeralTopic", + namespaced, + printcolumn = r#"{"name":"NAME", "type":"string", "description":"Name of the topic.", "jsonPath":".spec.name"}"#, + printcolumn = r#"{"name":"CLIENT-CONFIG", "type":"string", "description":"Name of MirrordKafkaClientProperties to use when creating Kafka client.", "jsonPath":".spec.clientConfig"}"# +)] +#[serde(rename_all = "camelCase")] +pub struct MirrordKafkaEphemeralTopicSpec { + /// Name of the topic. + pub name: String, + /// Links to [`MirrordKafkaClientConfigSpec`] resource living in the same namespace. + pub client_config: String, +} diff --git a/mirrord/operator/src/setup.rs b/mirrord/operator/src/setup.rs index 0ffe4fa5430..f26865a8828 100644 --- a/mirrord/operator/src/setup.rs +++ b/mirrord/operator/src/setup.rs @@ -26,7 +26,10 @@ use k8s_openapi::{ use kube::{CustomResourceExt, Resource}; use thiserror::Error; -use crate::crd::{MirrordPolicy, MirrordSqsSession, MirrordWorkloadQueueRegistry, TargetCrd}; +use crate::crd::{ + kafka::{MirrordKafkaClientConfig, MirrordKafkaEphemeralTopic, MirrordKafkaTopicsConsumer}, + MirrordPolicy, MirrordSqsSession, MirrordWorkloadQueueRegistry, TargetCrd, +}; pub static OPERATOR_NAME: &str = "mirrord-operator"; /// 443 is standard port for APIService, do not change this value @@ -90,6 +93,7 @@ pub struct SetupOptions { pub image: String, pub aws_role_arn: Option, pub sqs_splitting: bool, + pub kafka_splitting: bool, } #[derive(Debug)] @@ -106,6 +110,7 @@ pub struct Operator { client_ca_role: OperatorClientCaRole, client_ca_role_binding: OperatorClientCaRoleBinding, sqs_splitting: bool, + kafka_splitting: bool, } impl Operator { @@ -116,6 +121,7 @@ impl Operator { image, aws_role_arn, sqs_splitting, + kafka_splitting, } = options; let (license_secret, license_key) = match license { @@ -127,7 +133,7 @@ impl Operator { let service_account = OperatorServiceAccount::new(&namespace, aws_role_arn); - let role = OperatorRole::new(sqs_splitting); + let role = OperatorRole::new(sqs_splitting, kafka_splitting); let role_binding = OperatorRoleBinding::new(&role, &service_account); let user_cluster_role = OperatorClusterUserRole::new(); @@ -142,6 +148,7 @@ impl Operator { license_key, image, sqs_splitting, + kafka_splitting, ); let service = OperatorService::new(&namespace); @@ -161,6 +168,7 @@ impl Operator { client_ca_role, client_ca_role_binding, sqs_splitting, + kafka_splitting, } } } @@ -212,6 +220,17 @@ impl OperatorSetup for Operator { MirrordSqsSession::crd().to_writer(&mut writer)?; } + if self.kafka_splitting { + writer.write_all(b"---\n")?; + MirrordKafkaClientConfig::crd().to_writer(&mut writer)?; + + writer.write_all(b"---\n")?; + MirrordKafkaEphemeralTopic::crd().to_writer(&mut writer)?; + + writer.write_all(b"---\n")?; + MirrordKafkaTopicsConsumer::crd().to_writer(&mut writer)?; + } + Ok(()) } } @@ -252,6 +271,7 @@ impl OperatorDeployment { license_key: Option, image: String, sqs_splitting: bool, + kafka_splitting: bool, ) -> Self { let mut envs = vec![ EnvVar { @@ -319,6 +339,14 @@ impl OperatorDeployment { }); } + if kafka_splitting { + envs.push(EnvVar { + name: "OPERATOR_KAFKA_SPLITTING".into(), + value: Some("true".into()), + value_from: None, + }); + } + let health_probe = Probe { http_get: Some(HTTPGetAction { path: Some("/health".to_owned()), @@ -437,7 +465,7 @@ impl OperatorServiceAccount { pub struct OperatorRole(ClusterRole); impl OperatorRole { - pub fn new(sqs_splitting: bool) -> Self { + pub fn new(sqs_splitting: bool, kafka_splitting: bool) -> Self { let mut rules = vec![ PolicyRule { api_groups: Some(vec![ @@ -463,20 +491,6 @@ impl OperatorRole { verbs: vec!["get".to_owned(), "list".to_owned(), "watch".to_owned()], ..Default::default() }, - // For SQS controller to temporarily change deployments to use changed queues. - PolicyRule { - api_groups: Some(vec!["apps".to_owned()]), - resources: Some(vec!["deployments".to_owned()]), - verbs: vec!["patch".to_owned()], - ..Default::default() - }, - // For SQS controller to temporarily change Argo Rollouts to use changed queues. - PolicyRule { - api_groups: Some(vec!["argoproj.io".to_owned()]), - resources: Some(vec!["rollouts".to_owned()]), - verbs: vec!["patch".to_owned()], - ..Default::default() - }, PolicyRule { api_groups: Some(vec!["apps".to_owned(), "argoproj.io".to_owned()]), resources: Some(vec![ @@ -507,35 +521,92 @@ impl OperatorRole { }, // Allow the operator to list+get mirrord policies. PolicyRule { - api_groups: Some(vec!["policies.mirrord.metalbear.co".to_owned()]), - resources: Some(vec![MirrordPolicy::plural(&()).to_string()]), + api_groups: Some(vec![MirrordPolicy::group(&()).into_owned()]), + resources: Some(vec![MirrordPolicy::plural(&()).into_owned()]), verbs: vec!["list".to_owned(), "get".to_owned()], ..Default::default() }, ]; - if sqs_splitting { + + if sqs_splitting || kafka_splitting { rules.extend([ - // Allow the operator to list mirrord queue registries. + // For SQS/Kafka controller to temporarily change deployments to use changed + // queues. PolicyRule { - api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), - resources: Some(vec![MirrordWorkloadQueueRegistry::plural(&()).to_string()]), - verbs: vec!["list".to_owned()], + api_groups: Some(vec!["apps".to_owned()]), + resources: Some(vec!["deployments".to_owned()]), + verbs: vec!["patch".to_owned()], ..Default::default() }, + // For SQS/Kafka controller to temporarily change Argo Rollouts to use changed + // queues. + PolicyRule { + api_groups: Some(vec!["argoproj.io".to_owned()]), + resources: Some(vec!["rollouts".to_owned()]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, + ]); + } + + if kafka_splitting { + rules.extend([ + PolicyRule { + api_groups: Some(vec![MirrordKafkaEphemeralTopic::group(&()).into_owned()]), + resources: Some(vec![MirrordKafkaEphemeralTopic::plural(&()).into_owned()]), + verbs: ["get", "list", "watch", "create", "delete"] + .into_iter() + .map(String::from) + .collect(), + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec![MirrordKafkaClientConfig::group(&()).into_owned()]), + resources: Some(vec![MirrordKafkaClientConfig::plural(&()).into_owned()]), + verbs: ["get", "list", "watch"] + .into_iter() + .map(String::from) + .collect(), + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec![MirrordKafkaTopicsConsumer::group(&()).into_owned()]), + resources: Some(vec![MirrordKafkaTopicsConsumer::plural(&()).into_owned()]), + verbs: ["get", "list", "watch"] + .into_iter() + .map(String::from) + .collect(), + ..Default::default() + }, + ]); + } + + if sqs_splitting { + rules.extend([ // Allow the SQS controller to update queue registry status. PolicyRule { - api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), - resources: Some(vec!["mirrordworkloadqueueregistries/status".to_string()]), + api_groups: Some(vec![MirrordWorkloadQueueRegistry::group(&()).into_owned()]), + resources: Some(vec![format!( + "{}/status", + MirrordWorkloadQueueRegistry::plural(&()) + )]), verbs: vec![ // For setting the status in the SQS controller. "update".to_owned(), ], ..Default::default() }, + // Allow the operator to list mirrord queue registries. + PolicyRule { + api_groups: Some(vec![MirrordWorkloadQueueRegistry::group(&()).into_owned()]), + resources: Some(vec![MirrordWorkloadQueueRegistry::plural(&()).into_owned()]), + verbs: vec!["get".to_owned(), "list".to_owned(), "watch".to_owned()], + ..Default::default() + }, // Allow the operator to control mirrord SQS session objects. PolicyRule { - api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), - resources: Some(vec![MirrordSqsSession::plural(&()).to_string()]), + api_groups: Some(vec![MirrordSqsSession::group(&()).into_owned()]), + resources: Some(vec![MirrordSqsSession::plural(&()).into_owned()]), verbs: vec![ "create".to_owned(), "watch".to_owned(), @@ -547,10 +618,10 @@ impl OperatorRole { ], ..Default::default() }, - // Allow the SQS controller to update queue registry status. + // Allow the SQS controller to update SQS session status. PolicyRule { - api_groups: Some(vec!["queues.mirrord.metalbear.co".to_owned()]), - resources: Some(vec!["mirrordsqssessions/status".to_string()]), + api_groups: Some(vec![MirrordSqsSession::group(&()).into_owned()]), + resources: Some(vec![format!("{}/status", MirrordSqsSession::plural(&()))]), verbs: vec![ // For setting the status in the SQS controller. "update".to_owned(), @@ -559,6 +630,7 @@ impl OperatorRole { }, ]); } + let role = ClusterRole { metadata: ObjectMeta { name: Some(OPERATOR_ROLE_NAME.to_owned()), @@ -582,7 +654,7 @@ impl OperatorRole { impl Default for OperatorRole { fn default() -> Self { - Self::new(false) + Self::new(false, false) } }