Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Oct 11, 2024
2 parents 658f4d6 + 6b3b4fe commit 02fe9cf
Show file tree
Hide file tree
Showing 26 changed files with 753 additions and 337 deletions.
310 changes: 160 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"socks",
"http2",
] }
kube = { version = "0.95.0", git = "https://github.com/kube-rs/kube", rev = "3c3939f3988f8b8be7d1dbb9326565f3fd1d31eb", default-features = false, features = [
kube = { version = "0.96.0", default-features = false, features = [
"runtime",
"derive",
"client",
Expand Down
1 change: 1 addition & 0 deletions changelog.d/+update-dependencies.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update dependencies
1 change: 1 addition & 0 deletions changelog.d/2601.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Kafka splitting feature.
5 changes: 3 additions & 2 deletions medschool/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
recursion_level: &mut usize,
) -> Vec<String> {
if *recursion_level >= MAX_RECURSION_LEVEL {
return vec!["Recursion limit reached".to_string()];
panic!("recursion limit {MAX_RECURSION_LEVEL} reached");
}

// increment the recursion level as we're going deeper into the tree
types // get the type of the field from the types set to recurse into it's fields
.get(&field.ty)
Expand Down Expand Up @@ -281,7 +282,7 @@ fn dfs_fields<'a, const MAX_RECURSION_LEVEL: usize>(
#[tracing::instrument(level = "trace", ret)]
pub fn resolve_references(types: HashSet<PartialType>) -> Option<PartialType> {
/// Maximum recursion level for safety.
const MAX_RECURSION_LEVEL: usize = 10;
const MAX_RECURSION_LEVEL: usize = 16;
// Cache to perform memoization between recursive calls so we don't have to resolve the same
// type multiple times. Mapping between `ident` -> `resolved_docs`.
// For example, if we have a types [`A`, `B`, `C`] and A has a field of type `B` and `B` has a
Expand Down
31 changes: 26 additions & 5 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message attribute names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the attributes in the filter, with values of those attributes matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
Expand All @@ -1546,6 +1547,29 @@
]
}
}
},
{
"description": "Kafka.",
"type": "object",
"required": [
"message_filter",
"queue_type"
],
"properties": {
"message_filter": {
"description": "A filter is a mapping between message header names and regexes they should match. The local application will only receive messages that match **all** of the given patterns. This means, only messages that have **all** of the headers in the filter, with values of those headers matching the respective patterns.",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"queue_type": {
"type": "string",
"enum": [
"Kafka"
]
}
}
}
]
},
Expand All @@ -1570,11 +1594,8 @@
"additionalProperties": false
},
"SplitQueuesConfig": {
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```",
"type": [
"object",
"null"
],
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"you$\" } }, \"third-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"who\": \"you$\" } }, \"fourth-queue\": { \"queue_type\": \"Kafka\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very\" } }, } } } ```",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/QueueFilter"
}
Expand Down
14 changes: 7 additions & 7 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tokio = { workspace = true, features = [
"process",
"signal",
] }
procfs = "0.16.0"
procfs = "0.17.0"
serde.workspace = true
serde_json.workspace = true
pnet = "0.35"
Expand All @@ -43,7 +43,7 @@ tokio-stream.workspace = true
thiserror.workspace = true
hickory-resolver.workspace = true
num-traits.workspace = true
bollard = "0.16"
bollard = "0.17"
tokio-util.workspace = true
rand.workspace = true
streammap-ext.workspace = true
Expand All @@ -58,11 +58,11 @@ hyper = { workspace = true, features = ["full"] }
hyper-util.workspace = true
httparse = "1"
fancy-regex = { workspace = true }
dashmap = { version = "5" }
oci-spec = "0.6.0"
dashmap = { version = "6" }
oci-spec = "0.7.0"
async-trait = "0.1"
tonic = "0.12"
tower = "0.4"
tower = "0.5"
http = "1"
k8s-cri = "0.9"
semver.workspace = true
Expand All @@ -78,7 +78,7 @@ rawsocket = { git = "https://github.com/metalbear-co/rawsocket.git" }


[dev-dependencies]
rstest = "0.21"
mockall = "0.12" # 0.11.3 is broken
rstest = "0.23"
mockall = "0.13"
test_bin = "0.4"
rcgen = "0.13"
2 changes: 1 addition & 1 deletion mirrord/auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ chrono = "0.4"
whoami = { version = "1", optional = true }
home = { version = "0.5", optional = true }
pem = "3"
fs4 = { version = "0.8", features = ["tokio"], optional = true }
fs4 = { version = "0.10", features = ["tokio"], optional = true, default-features = false}
k8s-openapi = { workspace = true, optional = true }
kube = { workspace = true, optional = true }
serde = { version = "1", features = ["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions mirrord/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mirrord-vpn = { path = "../vpn" }

actix-codec.workspace = true
clap.workspace = true
tun2 = { version = "2", features = ["async"] }
tun2 = { version = "3", features = ["async"] }
tracing.workspace = true
serde_json.workspace = true
serde.workspace = true
Expand Down Expand Up @@ -68,7 +68,7 @@ tokio-rustls = "0.26"
tokio-stream = { workspace = true, features = ["net"] }
tokio-retry = "0.3"
regex.workspace = true
mid = "2.1.0"
mid = "3.0.0"
rand.workspace = true

[target.'cfg(target_os = "macos")'.dependencies]
Expand All @@ -79,4 +79,4 @@ mirrord-sip = { path = "../sip" }
mirrord-layer = { artifact = "cdylib", path = "../layer" }

[dev-dependencies]
rstest = "0.21"
rstest = "0.23"
93 changes: 51 additions & 42 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,48 +547,7 @@ pub(super) enum OperatorCommand {
///
/// NOTE: You don't need to install the operator to use open source mirrord features.
#[command(override_usage = "mirrord operator setup [OPTIONS] | kubectl apply -f -")]
Setup {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
sqs_splitting: bool,
},
Setup(#[clap(flatten)] OperatorSetupParams),
/// Print operator status
Status {
/// Specify config file to use
Expand All @@ -602,6 +561,56 @@ pub(super) enum OperatorCommand {
Session(SessionCommand),
}

#[derive(Args, Debug)]
pub(super) struct OperatorSetupParams {
/// ToS can be read here <https://metalbear.co/legal/terms>
#[arg(long)]
pub(super) accept_tos: bool,

/// A mirrord for Teams license key (online)
#[arg(long, allow_hyphen_values(true))]
pub(super) license_key: Option<String>,

/// Path to a file containing a mirrord for Teams license certificate
#[arg(long)]
pub(super) license_path: Option<PathBuf>,

/// Output Kubernetes specs to file instead of stdout
#[arg(short, long)]
pub(super) file: Option<PathBuf>,

/// Namespace to create the operator in (this doesn't limit the namespaces the operator
/// will be able to access)
#[arg(short, long, default_value = "mirrord")]
pub(super) namespace: OperatorNamespace,

/// AWS role ARN for the operator's service account.
/// Necessary for enabling SQS queue splitting.
/// For successfully running an SQS queue splitting operator the given IAM role must be
/// able to create, read from, write to, and delete SQS queues.
/// If the queue messages are encrypted using KMS, the operator also needs the
/// `kms:Encrypt`, `kms:Decrypt` and `kms:GenerateDataKey` permissions.
#[arg(long, visible_alias = "arn")]
pub(super) aws_role_arn: Option<String>,

/// Enable SQS queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// an SQS splitting component.
#[arg(
long,
visible_alias = "sqs",
default_value_t = false,
requires = "aws_role_arn"
)]
pub(super) sqs_splitting: bool,

/// Enable Kafka queue splitting.
/// When set, some extra CRDs will be installed on the cluster, and the operator will run
/// a Kafka splitting component.
#[arg(long, visible_alias = "kafka", default_value_t = false)]
pub(super) kafka_splitting: bool,
}

/// `mirrord operator session` family of commands.
///
/// Allows the user to forcefully kill operator sessions, use with care!
Expand Down
48 changes: 15 additions & 33 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
fs::File,
path::{Path, PathBuf},
time::Duration,
};
use std::{fs::File, path::Path, time::Duration};

use futures::TryFutureExt;
use kube::{Api, Client};
Expand All @@ -15,7 +11,7 @@ use mirrord_kube::api::kubernetes::create_kube_config;
use mirrord_operator::{
client::OperatorApi,
crd::{MirrordOperatorCrd, MirrordOperatorSpec},
setup::{LicenseType, Operator, OperatorNamespace, OperatorSetup, SetupOptions},
setup::{LicenseType, Operator, OperatorSetup, SetupOptions},
types::LicenseInfoOwned,
};
use mirrord_progress::{Progress, ProgressTracker};
Expand All @@ -29,7 +25,7 @@ use crate::{
config::{OperatorArgs, OperatorCommand},
error::{CliError, OperatorSetupError},
util::remove_proxy_env,
Result,
OperatorSetupParams, Result,
};

mod session;
Expand All @@ -54,13 +50,16 @@ async fn get_last_version() -> Result<String, reqwest::Error> {

/// Setup the operator into a file or to stdout, with explanation.
async fn operator_setup(
accept_tos: bool,
file: Option<PathBuf>,
namespace: OperatorNamespace,
license_key: Option<String>,
license_path: Option<PathBuf>,
aws_role_arn: Option<String>,
sqs_splitting: bool,
OperatorSetupParams {
accept_tos,
license_key,
license_path,
file,
namespace,
aws_role_arn,
sqs_splitting,
kafka_splitting,
}: OperatorSetupParams,
) -> Result<(), OperatorSetupError> {
if !accept_tos {
eprintln!("Please note that mirrord operator installation requires an active subscription for the mirrord Operator provided by MetalBear Tech LTD.\nThe service ToS can be read here - https://metalbear.co/legal/terms\nPass --accept-tos to accept the TOS");
Expand Down Expand Up @@ -105,6 +104,7 @@ async fn operator_setup(
image,
aws_role_arn,
sqs_splitting,
kafka_splitting,
});

match file {
Expand Down Expand Up @@ -297,25 +297,7 @@ Operator License
/// Handle commands related to the operator `mirrord operator ...`
pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
match args.command {
OperatorCommand::Setup {
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
} => operator_setup(
accept_tos,
file,
namespace,
license_key,
license_path,
aws_role_arn,
sqs_splitting,
)
.await
.map_err(CliError::from),
OperatorCommand::Setup(params) => operator_setup(params).await.map_err(CliError::from),
OperatorCommand::Status { config_file } => operator_status(config_file.as_deref()).await,
OperatorCommand::Session(session_command) => {
SessionCommandHandler::new(session_command)
Expand Down
3 changes: 2 additions & 1 deletion mirrord/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ipnet = "2.8"
bitflags = "2"
k8s-openapi = { workspace = true, features = ["schemars", "earliest"] }
tera = "1"
fancy-regex.workspace = true

[dev-dependencies]
rstest = "0.21"
rstest = "0.23"
17 changes: 15 additions & 2 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1236,13 +1236,26 @@ will be used, and your local application will not receive any messages from that
"queue_type": "SQS",
"message_filter": {
"wows": "so wows",
"coolz": "^very .*"
"coolz": "^very"
}
},
"second-queue": {
"queue_type": "SQS",
"message_filter": {
"who": "*you$"
"who": "you$"
}
},
"third-queue": {
"queue_type": "Kafka",
"message_filter": {
"who": "you$"
}
},
"fourth-queue": {
"queue_type": "Kafka",
"message_filter": {
"wows": "so wows",
"coolz": "^very"
}
},
}
Expand Down
Loading

0 comments on commit 02fe9cf

Please sign in to comment.