Skip to content

Commit

Permalink
Merge pull request #174 from nbuffon/rust_tls
Browse files Browse the repository at this point in the history
rust: telemetry basic auth + MQTT trust store TLS

Signed-off by: Nicolas Buffon <nicolas.buffon@orange.com>
  • Loading branch information
nbuffon authored Oct 21, 2024
2 parents d8a7e1e + ba8d611 commit 8a860ec
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 50 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ crate-type = ["lib"]
[features]
mobility = []
geo_routing = ["mobility"]
telemetry = []
telemetry = ["dep:base64"]

[[example]]
name = "copycat"
Expand All @@ -56,6 +56,10 @@ serde_repr = "0.1"
thiserror = "1.0"
threadpool = "1.8"

[dependencies.base64]
version = "0.22"
optional = true

[dependencies.rumqttc]
version = "0.24"
features = ["websocket"]
Expand Down
10 changes: 6 additions & 4 deletions rust/examples/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ type="mec_application"

[mqtt]
host="test.mosquitto.org"
port=1884
port=8886
client_id="com_orange_its-client"
username="rw"
password="readwrite"
use_tls=false
use_tls=true
use_websocket=false

[node]
Expand All @@ -22,6 +20,10 @@ thread_count=4
;path=custom/v1/traces
; Optional, defaults to 2048
;max_batch_size=10
; Optional, for basic auth
; username=admin
; Optional, for basic auth
; password=admin

[log]
level="debug"
Expand Down
13 changes: 2 additions & 11 deletions rust/src/client/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::client::configuration::configuration_error::ConfigurationError::{
FieldNotFound, MissingMandatoryField, MissingMandatorySection, NoCustomSettings, NoPassword,
TypeError,
};
use crate::transport::mqtt::{configure_tls, configure_transport};
use crate::transport::mqtt::configure_transport;

#[cfg(feature = "telemetry")]
use crate::client::configuration::telemetry_configuration::{
Expand Down Expand Up @@ -142,16 +142,7 @@ impl TryFrom<&Properties> for MqttOptionWrapper {
.unwrap_or_default()
.unwrap_or_default();

// FIXME manage ALPN, and authentication
let tls_configuration = if use_tls {
let ca_path = get_mandatory_field::<String>("tls_certificate", ("mqtt", properties))
.expect("TLS enabled but no certificate path provided");
Some(configure_tls(&ca_path, None, None))
} else {
None
};

configure_transport(tls_configuration, use_websocket, &mut mqtt_options);
configure_transport(use_tls, use_websocket, &mut mqtt_options);

Ok(MqttOptionWrapper(mqtt_options))
}
Expand Down
26 changes: 26 additions & 0 deletions rust/src/client/configuration/telemetry_configuration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use base64::Engine;
use ini::Properties;
use log::warn;
use std::string::ToString;
Expand Down Expand Up @@ -28,6 +29,20 @@ pub struct TelemetryConfiguration {
pub port: u16,
pub path: String,
pub batch_size: usize,
username: Option<String>,
password: Option<String>,
}

impl TelemetryConfiguration {
pub(crate) fn basic_auth_header(&self) -> Option<String> {
if let (Some(username), Some(password)) = (&self.username, &self.password) {
let raw = format!("{}:{}", username, password);
let as_b64 = base64::prelude::BASE64_STANDARD.encode(raw.as_bytes());
Some(format!("Basic {}", as_b64))
} else {
None
}
}
}

impl TryFrom<&Properties> for TelemetryConfiguration {
Expand Down Expand Up @@ -58,11 +73,22 @@ impl TryFrom<&Properties> for TelemetryConfiguration {
}
};

let (username, password) =
match get_optional_from_section::<String>("username", properties)? {
Some(username) => {
let password = get_mandatory_field::<String>("password", section)?;
(Some(username), Some(password))
}
None => (None, None),
};

let s = TelemetryConfiguration {
host: get_mandatory_field::<String>("host", section)?,
port: get_mandatory_field::<u16>("port", section)?,
path,
batch_size,
username,
password,
};

Ok(s)
Expand Down
41 changes: 8 additions & 33 deletions rust/src/transport/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,23 @@ pub mod topic;
pub mod geo_topic;

pub(crate) fn configure_transport(
tls_configuration: Option<TlsConfiguration>,
use_tls: bool,
use_websocket: bool,
mqtt_options: &mut MqttOptions,
) {
match (tls_configuration, use_websocket) {
(Some(tls), true) => {
match (use_tls, use_websocket) {
(true, true) => {
println!("Transport: MQTT over WebSocket; TLS enabled");
mqtt_options.set_transport(Transport::Wss(tls));
mqtt_options.set_transport(Transport::Wss(TlsConfiguration::default()));
}
(Some(tls), false) => {
(true, false) => {
println!("Transport: standard MQTT; TLS enabled");
mqtt_options.set_transport(Transport::Tls(tls));
mqtt_options.set_transport(Transport::Tls(TlsConfiguration::default()));
}
(None, true) => {
(false, true) => {
println!("Transport: MQTT over WebSocket; TLS disabled");
mqtt_options.set_transport(Transport::Ws);
}
(None, false) => println!("Transport: standard MQTT; TLS disabled"),
}
}

pub(crate) fn configure_tls(
ca_path: &str,
alpn: Option<Vec<Vec<u8>>>,
client_auth: Option<(Vec<u8>, Vec<u8>)>,
) -> TlsConfiguration {
let ca: Vec<u8> = std::fs::read(ca_path).expect("Failed to read TLS certificate");

TlsConfiguration::Simple {
ca,
alpn,
client_auth,
}
}

#[cfg(test)]
mod tests {
use crate::transport::mqtt::configure_tls;

#[test]
#[should_panic]
fn configure_tls_with_invalid_path_should_return_error() {
let _ = configure_tls("unextisting/path", None, None);
(false, false) => println!("Transport: standard MQTT; TLS disabled"),
}
}
18 changes: 17 additions & 1 deletion rust/src/transport/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, RandomIdGenerator, Sampler, TracerProvider,
};
use opentelemetry_sdk::Resource;
use reqwest::header;

use crate::client::configuration::telemetry_configuration::TelemetryConfiguration;

Expand All @@ -42,9 +43,24 @@ pub fn init_tracer(
configuration.host, configuration.port, path
);

let http_client = match configuration.basic_auth_header() {
Some(header) => {
let mut headers = header::HeaderMap::new();
let mut auth_value =
header::HeaderValue::try_from(header).expect("Failed to create header value");
auth_value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, auth_value);
reqwest::ClientBuilder::new()
.default_headers(headers)
.build()
.expect("Failed to create telemetry HTTP client")
}
None => reqwest::Client::new(),
};

let http_exporter = opentelemetry_otlp::new_exporter()
.http()
.with_http_client(reqwest::Client::new())
.with_http_client(http_client)
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(3))
.build_span_exporter()?;
Expand Down

0 comments on commit 8a860ec

Please sign in to comment.