Skip to content

Commit

Permalink
Merge pull request paritytech#1 from availproject/marko/add-custom-te…
Browse files Browse the repository at this point in the history
…lemetry

Marko/add custom telemetry
  • Loading branch information
markopoloparadox authored Mar 7, 2024
2 parents b66ca4d + 049bc10 commit 421b75a
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 7 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion substrate/bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sc_client_api::{Backend, BlockBackend};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_grandpa::SharedVoterState;
use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncParams};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sc_telemetry::{custom_telemetry::CustomTelemetryWorker, Telemetry, TelemetryWorker};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -62,6 +62,12 @@ pub fn new_partial(config: &Configuration) -> Result<Service, ServiceError> {
telemetry
});

let telemetry_handle = telemetry.as_ref().map(|t| t.handle());
let custom_telemetry_worker = CustomTelemetryWorker { handle: telemetry_handle };
task_manager
.spawn_handle()
.spawn("custom_telemetry", None, custom_telemetry_worker.run());

let select_chain = sc_consensus::LongestChain::new(backend.clone());

let transaction_pool = sc_transaction_pool::BasicPool::new_full(
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node-template/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "nightly"
channel = "stable"
components = [
"cargo",
"clippy",
Expand Down
1 change: 1 addition & 0 deletions substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" }
sp-core = { path = "../../../primitives/core" }
sp-runtime = { path = "../../../primitives/runtime" }
sp-state-machine = { path = "../../../primitives/state-machine" }
sc-telemetry = { path = "../../../client/telemetry" }

[dev-dependencies]
sp-test-primitives = { path = "../../../primitives/test-primitives" }
42 changes: 41 additions & 1 deletion substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use log::{debug, trace};

use sc_telemetry::custom_telemetry::*;
use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
use sp_runtime::{
traits::{Block as BlockT, Header as _, NumberFor},
Expand Down Expand Up @@ -223,9 +224,48 @@ pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
block: IncomingBlock<B>,
verifier: &mut V,
) -> BlockImportResult<B> {
import_single_block_metered(import_handle, block_origin, block, verifier, None).await
import_single_block_metered_v2(import_handle, block_origin, block, verifier, None).await
}

/// Custom wrapper around `import_single_block_metered()`. Use this function instead of
/// the original one.
pub(crate) async fn import_single_block_metered_v2<B: BlockT, V: Verifier<B>>(
import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
metrics: Option<Metrics>,
) -> BlockImportResult<B> {
let block_hash = std::format!("{}", block.hash);

// This is ugly because the type of `h.number()` is not a normal primitive but
// a trait. That's why we are forced to used try_into().
let block_number: u64 = block
.header
.as_ref()
.map(|h| (*h.number()).try_into().unwrap_or_default())
.unwrap_or_default();

let start_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();
let res =
import_single_block_metered(import_handle, block_origin, block, verifier, metrics).await;
let end_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();

let interval = IntervalWithBlockInformation {
kind: IntervalKind::Import,
block_number,
block_hash,
start_timestamp,
end_timestamp,
};
BlockMetrics::observe_interval(interval);

res
}

/// This is the original function from Substrate. We created our own `import_single_block_metered_v2`
/// that wraps this one so that we can more easily track how long it took to import the block.
///
/// Single block import function with metering.
pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>(
import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{pin::Pin, time::Duration};
use crate::{
import_queue::{
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
import_single_block_metered_v2, BlockImportError, BlockImportStatus, BoxBlockImport,
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
RuntimeOrigin, Verifier, LOG_TARGET,
},
Expand Down Expand Up @@ -432,7 +432,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
Err(BlockImportError::Cancelled)
} else {
// The actual import.
import_single_block_metered(
import_single_block_metered_v2(
import_handle,
blocks_origin,
block,
Expand Down
27 changes: 26 additions & 1 deletion substrate/client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use futures::{future::Either, Future, TryFutureExt};
use futures_timer::Delay;
use log::{debug, info, warn};
use sc_consensus::{BlockImport, JustificationSyncLink};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN};
use sc_telemetry::{
custom_telemetry::{BlockMetrics, IntervalKind, IntervalWithBlockInformation},
telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN,
};
use sp_arithmetic::traits::BaseArithmetic;
use sp_consensus::{Proposal, Proposer, SelectChain, SyncOracle};
use sp_consensus_slots::{Slot, SlotDuration};
Expand Down Expand Up @@ -358,6 +361,7 @@ pub trait SimpleSlotWorker<B: BlockT> {

telemetry!(telemetry; CONSENSUS_DEBUG; "slots.starting_authorship"; "slot_num" => slot);

let start_proposal_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();
let proposer = match self.proposer(&slot_info.chain_head).await {
Ok(p) => p,
Err(err) => {
Expand Down Expand Up @@ -418,7 +422,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
"hash_now" => ?block_import_params.post_hash(),
"hash_previously" => ?header_hash,
);
let end_proposal_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();

let post_header_hash = block_import_params.post_hash();
let start_import_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();
let header = block_import_params.post_header();
match self.block_import().import_block(block_import_params).await {
Ok(res) => {
Expand All @@ -443,6 +450,24 @@ pub trait SimpleSlotWorker<B: BlockT> {
);
},
}
let end_import_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();

// Metrics
let proposal_interval = IntervalWithBlockInformation {
kind: IntervalKind::Proposal,
block_number: header_num.try_into().unwrap_or_default(),
block_hash: std::format!("{}", post_header_hash),
start_timestamp: start_proposal_timestamp,
end_timestamp: end_proposal_timestamp,
};
let import_interval = IntervalWithBlockInformation {
start_timestamp: start_import_timestamp,
end_timestamp: end_import_timestamp,
kind: IntervalKind::Import,
..proposal_interval.clone()
};
BlockMetrics::observe_interval(proposal_interval);
BlockMetrics::observe_interval(import_interval);

Some(SlotResult { block: B::new(header, body), storage_proof })
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" }
sp-core = { path = "../../../primitives/core" }
sp-consensus-grandpa = { path = "../../../primitives/consensus/grandpa" }
sp-runtime = { path = "../../../primitives/runtime" }
sc-telemetry = { path = "../../../client/telemetry" }

[dev-dependencies]
mockall = "0.11.3"
Expand Down
20 changes: 20 additions & 0 deletions substrate/client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use sc_network::{
types::ProtocolName,
};
use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
use sc_telemetry::custom_telemetry::*;
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_runtime::{
Expand Down Expand Up @@ -189,6 +190,11 @@ where

/// Run [`BlockRequestHandler`].
async fn process_requests(&mut self) {
const DATA_COLLECTING_MAX_DURATION: u64 = 20_000; // in ms

let mut timer = std::time::SystemTime::now();
let mut requests_handled = 0u32;

while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;

Expand All @@ -199,6 +205,20 @@ where
"Failed to handle block request from {}: {}", peer, e,
),
}

requests_handled += 1;
let elapsed_time = timer.elapsed().map_or(0u64, |d| d.as_millis() as u64);
if elapsed_time > DATA_COLLECTING_MAX_DURATION {
let detail = BlockRequestsDetail {
current_queue_size: self.request_receiver.len() as u32,
requests_handled,
time_frame: elapsed_time,
};
BlockMetrics::observe_block_request(detail);

timer = std::time::SystemTime::now();
requests_handled = 0;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/state_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
let IncomingRequest { peer, payload, pending_response } = request;

match self.handle_request(payload, pending_response, &peer) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Ok(()) => debug!(target: LOG_TARGET, "Handled state request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle state request from {}: {}", peer, e,
Expand Down
23 changes: 23 additions & 0 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_network_common::sync::message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
};
use sc_telemetry::custom_telemetry::{BlockMetrics, IntervalKind};
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{BlockOrigin, BlockStatus};
Expand Down Expand Up @@ -698,8 +699,21 @@ where
if let Some(start_block) =
validate_blocks::<B>(&blocks, peer_id, Some(request))?
{
let timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();
for block in &blocks {
if let Some(header) = &block.header {
BlockMetrics::observe_interval_partial(
IntervalKind::Sync,
header.number().clone().try_into().unwrap_or_default(),
std::format!("{}", header.hash()),
timestamp,
false,
);
}
}
self.blocks.insert(start_block, blocks, *peer_id);
}

self.ready_blocks()
},
PeerSyncState::DownloadingGap(_) => {
Expand Down Expand Up @@ -1125,6 +1139,15 @@ where
})
.peers
.insert(peer_id);

let summary = announce.summary();
BlockMetrics::observe_interval_partial(
IntervalKind::Sync,
summary.number.try_into().unwrap_or_default(),
std::format!("{}", summary.block_hash),
BlockMetrics::get_current_timestamp_in_ms_or_default(),
true,
);
}

peer_info
Expand Down
1 change: 1 addition & 0 deletions substrate/client/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
thiserror = "1.0.48"
wasm-timer = "0.2.5"
tokio = "1.22.0"
Loading

0 comments on commit 421b75a

Please sign in to comment.