Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka splitting #2740

Merged
merged 38 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
bce90ce
CRDs
Razz4780 Sep 17, 2024
b8bb09b
Improved external changes crd
Razz4780 Sep 18, 2024
169a13e
Improve external changes crd
Razz4780 Sep 18, 2024
ebbb309
Whatever ;_;
Razz4780 Sep 18, 2024
afe6769
One properties for all clients
Razz4780 Sep 18, 2024
3658bed
External change CRD
Razz4780 Sep 19, 2024
c9169c6
saving client properties for created kafka topics
Razz4780 Sep 19, 2024
cd70c2c
CRD only for tmp topics
Razz4780 Sep 20, 2024
b0656c1
Fixed fields
Razz4780 Sep 20, 2024
b22bc66
target patch crd
Razz4780 Sep 23, 2024
ca83de6
Add owner process to change
Razz4780 Sep 24, 2024
25eb9b2
Fix type
Razz4780 Sep 24, 2024
30e280e
namespaced
Razz4780 Sep 24, 2024
c4a697d
Improve topic details
Razz4780 Sep 24, 2024
4693aa6
...
Razz4780 Sep 29, 2024
5b50d1a
......
Razz4780 Sep 29, 2024
4d30e87
camelCase
Razz4780 Sep 29, 2024
0195c9e
setup fix
Razz4780 Sep 30, 2024
904f161
Command flag doc
Razz4780 Oct 2, 2024
4bed94c
Config fixes
Razz4780 Oct 2, 2024
4612c3c
Config again
Razz4780 Oct 3, 2024
72476e7
type name fix
Razz4780 Oct 3, 2024
b2c56f3
Fixed unknown queue type variant
Razz4780 Oct 3, 2024
c3c1cc4
test cfg
Razz4780 Oct 3, 2024
fabde35
Removed todo
Razz4780 Oct 3, 2024
95f56f3
test sqs config deserialization
Razz4780 Oct 4, 2024
630f2cc
crd update
Razz4780 Oct 4, 2024
cf976f8
CRD docs
Razz4780 Oct 4, 2024
9196bd6
Fixes
Razz4780 Oct 4, 2024
0dbcd88
Hash + Eq for some structs in crd
Razz4780 Oct 4, 2024
574043c
Printcols
Razz4780 Oct 7, 2024
5e5910f
Schema
Razz4780 Oct 7, 2024
881d1c1
Fix medschool and update configuration.md
Razz4780 Oct 7, 2024
9a49df8
Fix medschool even better
Razz4780 Oct 7, 2024
66a34e6
Merge branch 'main' into kafka-splitting
Razz4780 Oct 11, 2024
f878d84
Fixed config doc
Razz4780 Oct 11, 2024
8b5831c
Removed redundant analytics field
Razz4780 Oct 11, 2024
36847ce
Merge branch 'main' into kafka-splitting
Razz4780 Oct 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2601.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Kafka splitting feature.
5 changes: 3 additions & 2 deletions medschool/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
recursion_level: &mut usize,
) -> Vec<String> {
if *recursion_level >= MAX_RECURSION_LEVEL {
return vec!["Recursion limit reached".to_string()];
panic!("recursion limit {MAX_RECURSION_LEVEL} reached");
}

// increment the recursion level as we're going deeper into the tree
types // get the type of the field from the types set to recurse into it's fields
.get(&field.ty)
Expand Down Expand Up @@ -281,7 +282,7 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
#[tracing::instrument(level = "trace", ret)]
pub fn resolve_references(types: HashSet<PartialType>) -> Option<PartialType> {
/// Maximum recursion level for safety.
const MAX_RECURSION_LEVEL: usize = 10;
const MAX_RECURSION_LEVEL: usize = 16;
// Cache to perform memoization between recursive calls so we don't have to resolve the same
// type multiple times. Mapping between `ident` -> `resolved_docs`.
// For example, if we have a types [`A`, `B`, `C`] and A has a field of type `B` and `B` has a
Expand Down
31 changes: 26 additions & 5 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,7 @@
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message attribute names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the attributes in the filter, with values of those attributes matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
Expand All @@ -1539,6 +1540,29 @@
]
}
}
},
{
"description": "Kafka.",
"type": "object",
"required": [
"message_filter",
"queue_type"
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message header names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the headers in the filter, with values of those headers matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"queue_type": {
"type": "string",
"enum": [
"Kafka"
]
}
}
}
]
},
Expand All @@ -1563,11 +1587,8 @@
"additionalProperties": false
},
"SplitQueuesConfig": {
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```",
"type": [
"object",
"null"
],
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, \"third-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"who\": \"*you$\" } }, \"fourth-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, } } } ```",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/QueueFilter"
}
Expand Down
93 changes: 51 additions & 42 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,48 +487,7 @@ pub(super) enum OperatorCommand {
///
/// NOTE: You don't need to install the operator to use open source mirrord features.
#[command(override_usage = "mirrord operator setup [OPTIONS] | kubectl apply -f -")]
Setup {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
sqs_splitting: bool,
},
Setup(#[clap(flatten)] OperatorSetupParams),
/// Print operator status
Status {
/// Specify config file to use
Expand All @@ -542,6 +501,56 @@ pub(super) enum OperatorCommand {
Session(SessionCommand),
}

#[derive(Args, Debug)]
pub(super) struct OperatorSetupParams {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
pub(super) accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
pub(super) license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
pub(super) license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
pub(super) file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
pub(super) namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
pub(super) aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
pub(super) sqs_splitting: bool,

/// Enable Kafka queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// a Kafka splitting component.
#[arg(long, visible_alias = "kafka", default_value_t = false)]
pub(super) kafka_splitting: bool,
}

/// `mirrord operator session` family of commands.
///
/// Allows the user to forcefully kill operator sessions, use with care!
Expand Down
48 changes: 15 additions & 33 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
fs::File,
path::{Path, PathBuf},
time::Duration,
};
use std::{fs::File, path::Path, time::Duration};

use futures::TryFutureExt;
use kube::{Api, Client};
Expand All @@ -15,7 +11,7 @@ use mirrord_kube::api::kubernetes::create_kube_config;
use mirrord_operator::{
client::OperatorApi,
crd::{MirrordOperatorCrd, MirrordOperatorSpec},
setup::{LicenseType, Operator, OperatorNamespace, OperatorSetup, SetupOptions},
setup::{LicenseType, Operator, OperatorSetup, SetupOptions},
types::LicenseInfoOwned,
};
use mirrord_progress::{Progress, ProgressTracker};
Expand All @@ -29,7 +25,7 @@ use crate::{
config::{OperatorArgs, OperatorCommand},
error::{CliError, OperatorSetupError},
util::remove_proxy_env,
Result,
OperatorSetupParams, Result,
};

mod session;
Expand All @@ -54,13 +50,16 @@ async fn get_last_version() -> Result<String, reqwest::Error> {

/// Setup the operator into a file or to stdout, with explanation.
async fn operator_setup(
accept_tos: bool,
file: Option<PathBuf>,
namespace: OperatorNamespace,
license_key: Option<String>,
license_path: Option<PathBuf>,
aws_role_arn: Option<String>,
sqs_splitting: bool,
OperatorSetupParams {
accept_tos,
license_key,
license_path,
file,
namespace,
aws_role_arn,
sqs_splitting,
kafka_splitting,
}: OperatorSetupParams,
) -> Result<(), OperatorSetupError> {
if !accept_tos {
eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS");
Expand Down Expand Up @@ -105,6 +104,7 @@ async fn operator_setup(
image,
aws_role_arn,
sqs_splitting,
kafka_splitting,
});

match file {
Expand Down Expand Up @@ -297,25 +297,7 @@ Operator License
/// Handle commands related to the operator `mirrord operator ...`
pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
match args.command {
OperatorCommand::Setup {
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
} => operator_setup(
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
)
.await
.map_err(CliError::from),
OperatorCommand::Setup(params) => operator_setup(params).await.map_err(CliError::from),
OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await,
OperatorCommand::Session(session_command) => {
SessionCommandHandler::new(session_command)
Expand Down
3 changes: 2 additions & 1 deletion mirrord/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ipnet = "2.8"
bitflags = "2"
k8s-openapi = { workspace = true, features = ["schemars", "earliest"] }
tera = "1"
fancy-regex.workspace = true

[dev-dependencies]
rstest = "0.21"
rstest = "0.21"
13 changes: 13 additions & 0 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,19 @@ will be used, and your local application will not receive any messages from that
"who": "*you$"
}
},
"third-queue": {
"queue_type": "Kafka",
"message_filter": {
"who": "*you$"
}
},
"fourth-queue": {
"queue_type": "Kafka",
"message_filter": {
"wows": "so wows",
"coolz": "^very .*"
}
},
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions mirrord/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::error::Error;

use thiserror::Error;

use crate::feature::split_queues::QueueSplittingVerificationError;

/// <!--${internal}-->
/// Error that would be returned from [MirrordConfig::generate_config]
#[derive(Error, Debug)]
Expand Down Expand Up @@ -71,6 +73,9 @@ pub enum ConfigError {

#[error("Target type requires the mirrord-operator, but operator usage was explicitly disabled. Consider enabling mirrord-operator in your mirrord config.")]
TargetRequiresOperator,

#[error("Queue splitting config is invalid: {0}")]
QueueSplittingVerificationError(#[from] QueueSplittingVerificationError),
}

impl From<tera::Error> for ConfigError {
Expand Down
2 changes: 1 addition & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct FeatureConfig {
/// If you don't specify any filter for a queue that is however declared in the
/// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
/// will be used, and your local application will not receive any messages from that queue.
#[config(nested, unstable)]
#[config(nested, default, unstable)]
pub split_queues: SplitQueuesConfig,
}

Expand Down
Loading
Loading