Skip to content

Commit

Permalink
Added logic to disable/enable telemetry service and server (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
andynog committed May 23, 2021
1 parent 5893724 commit 9d7985e
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 33 deletions.
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[global]
strategy = 'naive'
log_level = 'debug'
log_level = 'info'
telemetry_enabled = true
telemetry_port = 3001

Expand Down
8 changes: 6 additions & 2 deletions relayer-cli/src/commands/start_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ pub struct StartMultiCmd {}
impl Runnable for StartMultiCmd {
fn run(&self) {
let config = app_config();
let telemetry = telemetry::spawn(config.global.telemetry_port);
let supervisor = Supervisor::spawn(config.clone(), telemetry).expect("failed to spawn supervisor");
let telemetry = telemetry::spawn(
config.global.telemetry_port,
config.global.telemetry_enabled,
);
let supervisor =
Supervisor::spawn(config.clone(), telemetry).expect("failed to spawn supervisor");
match supervisor.run() {
Ok(()) => Output::success_msg("done").exit(),
Err(e) => Output::error(e).exit(),
Expand Down
10 changes: 5 additions & 5 deletions relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::{
};

mod error;
pub use error::Error;
use crate::telemetry::service::MetricUpdate;
pub use error::Error;

/// The supervisor listens for events on multiple pairs of chains,
/// and dispatches the events it receives to the appropriate
Expand All @@ -41,7 +41,7 @@ pub struct Supervisor {
registry: Registry,
workers: WorkerMap,
worker_msg_rx: Receiver<WorkerMsg>,
telemetry: Sender<MetricUpdate>
telemetry: Sender<MetricUpdate>,
}

impl Supervisor {
Expand All @@ -55,7 +55,7 @@ impl Supervisor {
registry,
workers: WorkerMap::new(worker_msg_tx),
worker_msg_rx,
telemetry
telemetry,
})
}

Expand Down Expand Up @@ -144,7 +144,7 @@ impl Supervisor {
Ok(chain_handle) => {
let _ = self.telemetry.send(MetricUpdate::RelayChainsNumber(1));
chain_handle
},
}
Err(e) => {
error!("skipping workers for chain id {}. reason: failed to spawn chain runtime with error: {}", chain_id, e);
continue;
Expand All @@ -155,7 +155,7 @@ impl Supervisor {
Ok(channels) => {
let _ = self.telemetry.send(MetricUpdate::RelayChannelsNumber(1));
channels
},
}
Err(e) => {
error!("failed to query channels from {}: {}", chain_id, e);
continue;
Expand Down
28 changes: 16 additions & 12 deletions relayer/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use crate::telemetry::state::TelemetryState;
use crate::telemetry::service::TelemetryService;
use crate::telemetry::server::TelemetryServer;
use crossbeam_channel::Sender;
use crate::telemetry::service::MetricUpdate;
use crate::telemetry::service::TelemetryService;
use crate::telemetry::state::TelemetryState;
use crossbeam_channel::Sender;

pub mod service;
pub mod server;
pub mod service;
pub mod state;

pub fn spawn(port: u16) -> Sender<MetricUpdate> {
pub fn spawn(port: u16, enabled: bool) -> Sender<MetricUpdate> {
let (tx, rx) = crossbeam_channel::unbounded();
let telemetry_state = TelemetryState::new();
let service = TelemetryService::new(telemetry_state.clone(), rx);
let server = TelemetryServer::new(telemetry_state.clone());

// Start the telemetry service and server
std::thread::spawn(move || server.run( telemetry_state.clone(),port));
std::thread::spawn(move || service.run());
// Only start the telemetry service and server if it is enabled in the configuration
if enabled {
let telemetry_state = TelemetryState::new();
let service = TelemetryService::new(telemetry_state.clone(), rx);
let server = TelemetryServer::new(telemetry_state.clone());

// Start the telemetry service and server
std::thread::spawn(move || server.run(telemetry_state.clone(), port));
std::thread::spawn(move || service.run());
}

tx
}
}
4 changes: 2 additions & 2 deletions relayer/src/telemetry/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use prometheus::{Encoder, TextEncoder};
use crate::telemetry::state::TelemetryState;
use prometheus::{Encoder, TextEncoder};

pub struct TelemetryServer {
pub state: TelemetryState,
Expand Down Expand Up @@ -30,4 +30,4 @@ impl TelemetryServer {
)
});
}
}
}
11 changes: 4 additions & 7 deletions relayer/src/telemetry/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crossbeam_channel::Receiver;
use crate::telemetry::state::TelemetryState;
use crossbeam_channel::Receiver;

pub enum MetricUpdate {
RelayChainsNumber(u64),
Expand All @@ -10,15 +10,12 @@ pub enum MetricUpdate {

pub struct TelemetryService {
pub state: TelemetryState,
pub rx: Receiver<MetricUpdate>
pub rx: Receiver<MetricUpdate>,
}

impl TelemetryService {
pub(crate) fn new(state: TelemetryState, rx: Receiver<MetricUpdate>) -> Self {
Self {
state,
rx,
}
Self { state, rx }
}

pub(crate) fn run(self) {
Expand All @@ -29,7 +26,7 @@ impl TelemetryService {

fn apply_update(&self, update: MetricUpdate) {
match update {
MetricUpdate::RelayChainsNumber(n) => self.state.relay_chains_num.add(n ),
MetricUpdate::RelayChainsNumber(n) => self.state.relay_chains_num.add(n),
MetricUpdate::RelayChannelsNumber(n) => self.state.relay_channels_num.add(n),
MetricUpdate::AcknowledgePacket(n) => self.state.tx_msg_ibc_acknowledge_packet.add(n),
MetricUpdate::TxCount(n) => self.state.tx_count.add(n),
Expand Down
7 changes: 3 additions & 4 deletions relayer/src/telemetry/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use opentelemetry::metrics::BoundCounter;
use opentelemetry_prometheus::PrometheusExporter;
use opentelemetry::global;
use opentelemetry::metrics::BoundCounter;
use opentelemetry::KeyValue;
use opentelemetry_prometheus::PrometheusExporter;

lazy_static! {
static ref HANDLER_ALL: [KeyValue; 1] = [KeyValue::new("hermes", "all")];
Expand All @@ -22,7 +22,6 @@ pub struct TelemetryState {

// Total number of txs processed via Relay tx
pub tx_count: BoundCounter<'static, u64>,

}

impl TelemetryState {
Expand Down Expand Up @@ -54,4 +53,4 @@ impl TelemetryState {
};
telemetry_state
}
}
}

0 comments on commit 9d7985e

Please sign in to comment.