Skip to content

Commit

Permalink
chore: add logs and metric to recovery (#1007)
Browse files Browse the repository at this point in the history
## Rationale
Part of #799 

## Detailed Changes
see title.

## Test Plan
None.
  • Loading branch information
Rachelint authored Jun 20, 2023
1 parent e3b4009 commit 9a9c0f7
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
26 changes: 26 additions & 0 deletions analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use std::{
use async_trait::async_trait;
use common_types::{schema::IndexInWriterSchema, table::ShardId};
use common_util::error::BoxError;
use lazy_static::lazy_static;
use log::{debug, error, info, trace};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::MutexGuard;
Expand All @@ -34,6 +36,22 @@ use crate::{
table::data::TableDataRef,
};

// Metrics of wal replayer
lazy_static! {
static ref PULL_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!(
"wal_replay_pull_logs_duration",
"Histogram for pull logs duration in wal replay in seconds",
exponential_buckets(0.01, 2.0, 13).unwrap()
)
.unwrap();
static ref APPLY_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!(
"wal_replay_apply_logs_duration",
"Histogram for apply logs duration in wal replay in seconds",
exponential_buckets(0.01, 2.0, 13).unwrap()
)
.unwrap();
}

/// Wal replayer supporting both table based and region based
// TODO: limit the memory usage in `RegionBased` mode.
pub struct WalReplayer<'a> {
Expand Down Expand Up @@ -186,18 +204,21 @@ impl TableBasedReplay {
let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size);
loop {
// fetch entries to log_entry_buf
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

// Replay all log entries of current table
let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
Expand All @@ -206,6 +227,7 @@ impl TableBasedReplay {
log_entry_buf.iter(),
)
.await?;
drop(timer);
}

Ok(())
Expand Down Expand Up @@ -276,19 +298,23 @@ impl RegionBasedReplay {

// Split and replay logs.
loop {
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds)
.await?;
drop(timer);
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion wal/src/message_queue_impl/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl<M: MessageQueue> Region<M> {
let (snapshot, synchronizer) = {
let inner = self.inner.write().await;

debug!(
info!(
"Mark deleted entries to sequence num:{}, region id:{}, table id:{}",
sequence_num,
inner.region_context.region_id(),
Expand Down

0 comments on commit 9a9c0f7

Please sign in to comment.