Skip to content

Commit

Permalink
fix: add options for setting NATS leaf and client ports separately
Browse files Browse the repository at this point in the history
PR #35 introduced a small regression which only impacts clients who are
trying to use nonstandard ports. This fixes the problem by allowing the
NATS leaf node and client ports to be configured separately in the CRD.
  • Loading branch information
protochron committed May 28, 2024
1 parent 45ada5a commit ec1d795
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-operator-types"
version = "0.1.3"
version = "0.1.4"
edition = "2021"

[dependencies]
Expand Down
18 changes: 16 additions & 2 deletions crates/types/src/v1alpha1/wasmcloud_host_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{PodSpec, ResourceRequirements};
use kube::CustomResource;
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Default))]
Expand All @@ -26,7 +26,7 @@ pub struct WasmCloudHostConfigSpec {
/// The lattice to use for these hosts.
pub lattice: String,
/// An optional set of labels to apply to these hosts.
pub host_labels: Option<HashMap<String, String>>,
pub host_labels: Option<BTreeMap<String, String>>,
/// The version of the wasmCloud host to deploy.
pub version: String,
/// The image to use for the wasmCloud host.
Expand All @@ -51,6 +51,12 @@ pub struct WasmCloudHostConfigSpec {
/// The address of the NATS server to connect to. Defaults to "nats://nats.default.svc.cluster.local".
#[serde(default = "default_nats_address")]
pub nats_address: String,
/// The port of the NATS server to connect to. Defaults to 4222.
#[serde(default = "default_nats_port")]
pub nats_client_port: u16,
/// The port of the NATS server to connect to for leaf node connections. Defaults to 7422.
#[serde(default = "default_nats_leafnode_port")]
pub nats_leafnode_port: u16,
/// The Jetstream domain to use for the NATS sidecar. Defaults to "default".
#[serde(default = "default_jetstream_domain")]
pub jetstream_domain: String,
Expand Down Expand Up @@ -128,6 +134,14 @@ fn default_log_level() -> String {
"INFO".to_string()
}

fn default_nats_port() -> u16 {
4222
}

fn default_nats_leafnode_port() -> u16 {
7422
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct WasmCloudHostConfigResources {
pub nats: Option<ResourceRequirements>,
Expand Down
3 changes: 2 additions & 1 deletion sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ spec:
# Additional labels to apply to the host other than the defaults set in the controller
hostLabels:
test: value
cluster: kind
# Which wasmCloud version to use
version: "1.0.2"
# The name of a secret in the same namespace that provides the required secrets.
secretName: cluster-secrets
logLevel: INFO
natsAddress: nats://nats-cluster.default.svc.cluster.local:7422
natsAddress: nats://nats-cluster.default.svc.cluster.local
################################################
# Additional options that can be set for hosts:
################################################
Expand Down
6 changes: 4 additions & 2 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
let nc = s.nats_creds.map(SecretString::new);
let apps = crate::resources::application::list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
nc.as_ref(),
cfg.spec.lattice.clone(),
)
Expand Down Expand Up @@ -202,6 +203,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
// Start the watcher so that services are automatically created in the cluster.
let nats_client = get_client(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
ctx.nats_creds.clone(),
NameNamespace::new(name.clone(), ns.clone()),
)
Expand Down Expand Up @@ -835,7 +837,7 @@ jetstream {
leafnodes {
remotes: [
{
url: "{{cluster_url}}"
url: "{{cluster_url}}:{{leafnode_port}}"
{{#if use_credentials}}
credentials: "/nats/nats.creds"
{{/if}}
Expand All @@ -844,7 +846,7 @@ leafnodes {
}
"#;
let tpl = Handlebars::new();
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "use_credentials": use_nats_creds}))?;
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "leafnode_port": config.spec.nats_leafnode_port,"use_credentials": use_nats_creds}))?;
let mut contents = BTreeMap::new();
contents.insert("nats.conf".to_string(), rendered);
let cm = ConfigMap {
Expand Down
37 changes: 28 additions & 9 deletions src/resources/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,14 @@ pub async fn list_all_applications(
let secret = map.get(&nst);
// Prevent listing applications within a given lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand Down Expand Up @@ -440,7 +447,14 @@ pub async fn list_applications(
let secret = map.get(&nst);
// This is to check that we don't list a lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand All @@ -466,16 +480,18 @@ pub async fn list_applications(

pub async fn list_apps(
cluster_url: &str,
port: &u16,
creds: Option<&SecretString>,
lattice_id: String,
) -> Result<Vec<ModelSummary>, Error> {
let addr = format!("{}:{}", cluster_url, port);
let client = match creds {
Some(creds) => {
ConnectOptions::with_credentials(creds.expose_secret())?
.connect(cluster_url)
.connect(addr)
.await?
}
None => ConnectOptions::new().connect(cluster_url).await?,
None => ConnectOptions::new().connect(addr).await?,
};
let models = wash_lib::app::get_models(&client, Some(lattice_id)).await?;

Expand All @@ -484,19 +500,21 @@ pub async fn list_apps(

pub async fn get_client(
cluster_url: &str,
port: &u16,
nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
namespace: NameNamespace,
) -> Result<async_nats::Client, async_nats::ConnectError> {
let addr = format!("{}:{}", cluster_url, port);
let creds = nats_creds.read().await;
match creds.get(&namespace) {
Some(creds) => {
let creds = creds.expose_secret();
ConnectOptions::with_credentials(creds)
.expect("unable to create nats client")
.connect(cluster_url)
.connect(addr)
.await
}
None => ConnectOptions::new().connect(cluster_url).await,
None => ConnectOptions::new().connect(addr).await,
}
}

Expand Down Expand Up @@ -809,11 +827,12 @@ async fn get_lattice_connection(
let lattice_id = cfg.spec.lattice;
let lattice_name = cfg.metadata.name?;
let nst: NameNamespace = NameNamespace::new(lattice_name, namespace);
Some((cluster_url, nst, lattice_id))
let port = cfg.spec.nats_client_port;
Some((cluster_url, nst, lattice_id, port))
});

for (cluster_url, ns, lattice_id) in connection_data {
match get_client(&cluster_url, state.nats_creds.clone(), ns).await {
for (cluster_url, ns, lattice_id, port) in connection_data {
match get_client(&cluster_url, &port, state.nats_creds.clone(), ns).await {
Ok(c) => return Ok((c, lattice_id)),
Err(e) => {
error!(err = %e, %lattice_id, "error connecting to nats");
Expand Down

0 comments on commit ec1d795

Please sign in to comment.