Skip to content

Commit

Permalink
feat: use manifest updates stats in TableData to trigger snapshot (#…
Browse files Browse the repository at this point in the history
…1076)

## Rationale
Now we stats manifest updates of all tables in `Manifest` to trigger
snapshot.
However when snapshot is triggered, it will just do snapshot of latest
table storing update.
It is obvious unreasonable...

Worse, this mechanism lead to bug #1075 ...

So In this pr, I place the manifest updates stats into `TableData` , and
each table can only trigger snapshot of itself rather than others.

## Detailed Changes
+ Place the manifest updates stats into `TableData`.
+ Use it to trigger manifest snapshot.

## Test Plan
Test by new ut.
  • Loading branch information
Rachelint authored Jul 17, 2023
1 parent 27f2169 commit 838a6fd
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 96 deletions.
1 change: 1 addition & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Instance {
spaces: spaces.clone(),
file_purger: file_purger.clone(),
preflush_write_buffer_size_ratio: ctx.config.preflush_write_buffer_size_ratio,
manifest_snapshot_every_n_updates: ctx.config.manifest.snapshot_every_n_updates,
});
let manifest = ManifestImpl::open(
ctx.config.manifest.clone(),
Expand Down
92 changes: 51 additions & 41 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
collections::VecDeque,
fmt, mem,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -44,7 +45,7 @@ use crate::{
LoadRequest, Manifest, SnapshotRequest,
},
space::SpaceId,
table::data::TableShardInfo,
table::data::{TableDataRef, TableShardInfo},
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -183,13 +184,15 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
///
/// Get snapshot of or modify table's metadata through it.
pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync {
// Get snapshot of `TableData`.
fn get_table_snapshot(
&self,
space_id: SpaceId,
table_id: TableId,
) -> Result<Option<MetaSnapshot>>;

fn apply_edit_to_table(&self, update: MetaEditRequest) -> Result<()>;
// Apply update to `TableData` and return it.
fn apply_edit_to_table(&self, update: MetaEditRequest) -> Result<TableDataRef>;
}

/// Snapshot recoverer
Expand Down Expand Up @@ -324,14 +327,17 @@ where

/// Options for manifest
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct Options {
/// Steps to do snapshot
pub snapshot_every_n_updates: usize,
// TODO: move this field to suitable place.
pub snapshot_every_n_updates: NonZeroUsize,

/// Timeout to read manifest entries
pub scan_timeout: ReadableDuration,

/// Batch size to read manifest entries
// TODO: use NonZeroUsize
pub scan_batch_size: usize,

/// Timeout to store manifest entries
Expand All @@ -341,7 +347,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Self {
snapshot_every_n_updates: 10_000,
snapshot_every_n_updates: NonZeroUsize::new(100).unwrap(),
scan_timeout: ReadableDuration::secs(5),
scan_batch_size: 100,
store_timeout: ReadableDuration::secs(5),
Expand Down Expand Up @@ -407,20 +413,12 @@ impl ManifestImpl {
/// Do snapshot if no other snapshot is triggered.
///
/// Returns the latest snapshot if snapshot is done.
async fn maybe_do_snapshot(
async fn do_snapshot_internal(
&self,
space_id: SpaceId,
table_id: TableId,
location: WalLocation,
force: bool,
) -> Result<Option<Snapshot>> {
if !force {
let num_updates = self.num_updates_since_snapshot.load(Ordering::Relaxed);
if num_updates < self.opts.snapshot_every_n_updates {
return Ok(None);
}
}

if let Ok(_guard) = self.snapshot_write_guard.try_lock() {
let log_store = WalBasedLogStore {
opts: self.opts.clone(),
Expand All @@ -439,28 +437,13 @@ impl ManifestImpl {
table_id,
};

let snapshot = snapshotter.snapshot().await?.map(|v| {
self.decrease_num_updates();
v
});
let snapshot = snapshotter.snapshot().await?;
Ok(snapshot)
} else {
debug!("Avoid concurrent snapshot");
Ok(None)
}
}

// with snapshot guard held
fn decrease_num_updates(&self) {
if self.opts.snapshot_every_n_updates
> self.num_updates_since_snapshot.load(Ordering::Relaxed)
{
self.num_updates_since_snapshot.store(0, Ordering::Relaxed);
} else {
self.num_updates_since_snapshot
.fetch_sub(self.opts.snapshot_every_n_updates, Ordering::Relaxed);
}
}
}

#[async_trait]
Expand All @@ -480,13 +463,21 @@ impl Manifest for ManifestImpl {
let location = WalLocation::new(shard_id as u64, table_id.as_u64());
let space_id = meta_update.space_id();

self.maybe_do_snapshot(space_id, table_id, location, false)
.await?;

self.store_update_to_wal(meta_update, location).await?;

// Update memory.
self.table_meta_set.apply_edit_to_table(request).box_err()
let table_data = self.table_meta_set.apply_edit_to_table(request).box_err()?;

// Update manifest updates count.
table_data.increase_manifest_updates(1);
// Judge if snapshot is needed.
if table_data.should_do_manifest_snapshot() {
self.do_snapshot_internal(space_id, table_id, location)
.await?;
table_data.reset_manifest_updates();
}

Ok(())
}

async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> {
Expand Down Expand Up @@ -536,7 +527,7 @@ impl Manifest for ManifestImpl {
let space_id = request.space_id;
let table_id = request.table_id;

self.maybe_do_snapshot(space_id, table_id, location, true)
self.do_snapshot_internal(space_id, table_id, location)
.await
.box_err()?;

Expand Down Expand Up @@ -707,8 +698,9 @@ impl MetaUpdateLogStore for WalBasedLogStore {

#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::Arc, vec};
use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, vec};

use arena::NoopCollector;
use common_types::{
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
};
Expand All @@ -728,7 +720,8 @@ mod tests {
},
LoadRequest, Manifest,
},
table::data::TableShardInfo,
sst::file::tests::FilePurgerMocker,
table::data::{tests::default_schema, TableData, TableShardInfo},
TableOptions,
};

Expand Down Expand Up @@ -780,7 +773,7 @@ mod tests {
Ok(builder.clone().build())
}

fn apply_edit_to_table(&self, request: MetaEditRequest) -> Result<()> {
fn apply_edit_to_table(&self, request: MetaEditRequest) -> Result<TableDataRef> {
let mut builder = self.builder.lock().unwrap();
let MetaEditRequest {
shard_info: _,
Expand All @@ -800,7 +793,24 @@ mod tests {
}
}

Ok(())
let table_opts = TableOptions::default();
let purger = FilePurgerMocker::mock();
let collector = Arc::new(NoopCollector);
let test_data = TableData::new(
0,
TableId::new(0),
"test_table".to_string(),
default_schema(),
0,
table_opts,
&purger,
0.75,
collector,
NonZeroUsize::new(usize::MAX).unwrap(),
)
.unwrap();

Ok(Arc::new(test_data))
}
}

Expand All @@ -820,7 +830,7 @@ mod tests {
let runtime = build_runtime(2);

let options = Options {
snapshot_every_n_updates: 100,
snapshot_every_n_updates: NonZeroUsize::new(100).unwrap(),
..Default::default()
};
Self {
Expand Down Expand Up @@ -1183,7 +1193,7 @@ mod tests {
.await;

manifest
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
.do_snapshot_internal(ctx.schema_id.as_u32(), table_id, location)
.await
.unwrap();

Expand Down Expand Up @@ -1242,7 +1252,7 @@ mod tests {

let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
manifest
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
.do_snapshot_internal(ctx.schema_id.as_u32(), table_id, location)
.await
.unwrap();
for i in 500..550 {
Expand Down
68 changes: 66 additions & 2 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::{
convert::TryInto,
fmt,
fmt::Formatter,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
Expand Down Expand Up @@ -153,6 +154,12 @@ pub struct TableData {
/// No write/alter is allowed if the table is dropped.
dropped: AtomicBool,

/// Manifest updates after last snapshot
manifest_updates: AtomicUsize,

/// Every n manifest updates to trigger a snapshot
manifest_snapshot_every_n_updates: NonZeroUsize,

/// Metrics of this table
pub metrics: Metrics,

Expand Down Expand Up @@ -213,6 +220,7 @@ impl TableData {
purger: &FilePurger,
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Result<Self> {
// FIXME(yingwen): Validate TableOptions, such as bucket_duration >=
// segment_duration and bucket_duration is aligned to segment_duration
Expand Down Expand Up @@ -245,6 +253,8 @@ impl TableData {
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)),
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
})
}

Expand All @@ -258,6 +268,7 @@ impl TableData {
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
allocator: IdAllocator,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Result<Self> {
let memtable_factory = Arc::new(SkiplistMemTableFactory);
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
Expand Down Expand Up @@ -287,6 +298,8 @@ impl TableData {
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)),
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
})
}

Expand Down Expand Up @@ -559,6 +572,20 @@ impl TableData {
shard_info: self.shard_info,
}
}

pub fn increase_manifest_updates(&self, updates_num: usize) {
self.manifest_updates
.fetch_add(updates_num, Ordering::Relaxed);
}

pub fn should_do_manifest_snapshot(&self) -> bool {
let updates = self.manifest_updates.load(Ordering::Relaxed);
updates >= self.manifest_snapshot_every_n_updates.get()
}

pub fn reset_manifest_updates(&self) {
self.manifest_updates.store(0, Ordering::Relaxed);
}
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -666,7 +693,7 @@ pub mod tests {

const DEFAULT_SPACE_ID: SpaceId = 1;

fn default_schema() -> Schema {
pub fn default_schema() -> Schema {
table::create_schema_builder(
&[("key", DatumKind::Timestamp)],
&[("value", DatumKind::Double)],
Expand Down Expand Up @@ -697,6 +724,7 @@ pub mod tests {
table_id: TableId,
table_name: String,
shard_id: ShardId,
manifest_snapshot_every_n_updates: NonZeroUsize,
}

impl TableDataMocker {
Expand All @@ -715,6 +743,14 @@ pub mod tests {
self
}

pub fn manifest_snapshot_every_n_updates(
mut self,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Self {
self.manifest_snapshot_every_n_updates = manifest_snapshot_every_n_updates;
self
}

pub fn build(self) -> TableData {
let space_id = DEFAULT_SPACE_ID;
let table_schema = default_schema();
Expand Down Expand Up @@ -746,6 +782,7 @@ pub mod tests {
&purger,
0.75,
collector,
self.manifest_snapshot_every_n_updates,
)
.unwrap()
}
Expand All @@ -757,6 +794,7 @@ pub mod tests {
table_id: table::new_table_id(2, 1),
table_name: "mocked_table".to_string(),
shard_id: DEFAULT_SHARD_ID,
manifest_snapshot_every_n_updates: NonZeroUsize::new(usize::MAX).unwrap(),
}
}
}
Expand Down Expand Up @@ -849,4 +887,30 @@ pub mod tests {
compute_mutable_limit(80, 1.1);
compute_mutable_limit(80, -0.1);
}

#[test]
fn test_manifest_snapshot_trigger() {
// When snapshot_every_n_updates is not zero.
let table_data = TableDataMocker::default()
.manifest_snapshot_every_n_updates(NonZeroUsize::new(5).unwrap())
.build();

check_manifest_snapshot_trigger(&table_data);
// Reset and check again.
table_data.reset_manifest_updates();
check_manifest_snapshot_trigger(&table_data);
}

fn check_manifest_snapshot_trigger(table_data: &TableData) {
// When no updates yet, result should be false.
assert!(!table_data.should_do_manifest_snapshot());

// Eq case.
table_data.increase_manifest_updates(5);
assert!(table_data.should_do_manifest_snapshot());

// Greater case.
table_data.increase_manifest_updates(5);
assert!(table_data.should_do_manifest_snapshot());
}
}
Loading

0 comments on commit 838a6fd

Please sign in to comment.