Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use manifest updates stats in TableData to trigger snapshot #1076

Merged
merged 10 commits into from
Jul 17, 2023
1 change: 1 addition & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Instance {
spaces: spaces.clone(),
file_purger: file_purger.clone(),
preflush_write_buffer_size_ratio: ctx.config.preflush_write_buffer_size_ratio,
manifest_snapshot_every_n_updates: ctx.config.manifest.snapshot_every_n_updates,
});
let manifest = ManifestImpl::open(
ctx.config.manifest.clone(),
Expand Down
92 changes: 51 additions & 41 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
collections::VecDeque,
fmt, mem,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -44,7 +45,7 @@ use crate::{
LoadRequest, Manifest, SnapshotRequest,
},
space::SpaceId,
table::data::TableShardInfo,
table::data::{TableDataRef, TableShardInfo},
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -183,13 +184,15 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
///
/// Get snapshot of or modify table's metadata through it.
pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync {
// Get snapshot of `TableData`.
fn get_table_snapshot(
&self,
space_id: SpaceId,
table_id: TableId,
) -> Result<Option<MetaSnapshot>>;

fn apply_edit_to_table(&self, update: MetaEditRequest) -> Result<()>;
// Apply update to `TableData` and return it.
fn apply_edit_to_table(&self, update: MetaEditRequest) -> Result<TableDataRef>;
}

/// Snapshot recoverer
Expand Down Expand Up @@ -324,14 +327,17 @@ where

/// Options for manifest
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct Options {
/// Steps to do snapshot
pub snapshot_every_n_updates: usize,
// TODO: move this field to suitable place.
pub snapshot_every_n_updates: NonZeroUsize,

/// Timeout to read manifest entries
pub scan_timeout: ReadableDuration,

/// Batch size to read manifest entries
// TODO: use NonZeroUsize
pub scan_batch_size: usize,

/// Timeout to store manifest entries
Expand All @@ -341,7 +347,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Self {
snapshot_every_n_updates: 10_000,
snapshot_every_n_updates: NonZeroUsize::new(100).unwrap(),
scan_timeout: ReadableDuration::secs(5),
scan_batch_size: 100,
store_timeout: ReadableDuration::secs(5),
Expand Down Expand Up @@ -407,20 +413,12 @@ impl ManifestImpl {
/// Do snapshot if no other snapshot is triggered.
///
/// Returns the latest snapshot if snapshot is done.
async fn maybe_do_snapshot(
async fn do_snapshot_internal(
&self,
space_id: SpaceId,
table_id: TableId,
location: WalLocation,
force: bool,
) -> Result<Option<Snapshot>> {
if !force {
let num_updates = self.num_updates_since_snapshot.load(Ordering::Relaxed);
if num_updates < self.opts.snapshot_every_n_updates {
return Ok(None);
}
}

if let Ok(_guard) = self.snapshot_write_guard.try_lock() {
let log_store = WalBasedLogStore {
opts: self.opts.clone(),
Expand All @@ -439,28 +437,13 @@ impl ManifestImpl {
table_id,
};

let snapshot = snapshotter.snapshot().await?.map(|v| {
self.decrease_num_updates();
v
});
let snapshot = snapshotter.snapshot().await?;
Ok(snapshot)
} else {
debug!("Avoid concurrent snapshot");
Ok(None)
}
}

// with snapshot guard held
fn decrease_num_updates(&self) {
if self.opts.snapshot_every_n_updates
> self.num_updates_since_snapshot.load(Ordering::Relaxed)
{
self.num_updates_since_snapshot.store(0, Ordering::Relaxed);
} else {
self.num_updates_since_snapshot
.fetch_sub(self.opts.snapshot_every_n_updates, Ordering::Relaxed);
}
}
}

#[async_trait]
Expand All @@ -480,13 +463,21 @@ impl Manifest for ManifestImpl {
let location = WalLocation::new(shard_id as u64, table_id.as_u64());
let space_id = meta_update.space_id();

self.maybe_do_snapshot(space_id, table_id, location, false)
.await?;

self.store_update_to_wal(meta_update, location).await?;

// Update memory.
self.table_meta_set.apply_edit_to_table(request).box_err()
let table_data = self.table_meta_set.apply_edit_to_table(request).box_err()?;

// Update manifest updates count.
table_data.increase_manifest_updates(1);
// Judge if snapshot is needed.
if table_data.should_do_manifest_snapshot() {
self.do_snapshot_internal(space_id, table_id, location)
.await?;
table_data.reset_manifest_updates();
}

Ok(())
}

async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> {
Expand Down Expand Up @@ -536,7 +527,7 @@ impl Manifest for ManifestImpl {
let space_id = request.space_id;
let table_id = request.table_id;

self.maybe_do_snapshot(space_id, table_id, location, true)
self.do_snapshot_internal(space_id, table_id, location)
.await
.box_err()?;

Expand Down Expand Up @@ -707,8 +698,9 @@ impl MetaUpdateLogStore for WalBasedLogStore {

#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::Arc, vec};
use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, vec};

use arena::NoopCollector;
use common_types::{
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
};
Expand All @@ -728,7 +720,8 @@ mod tests {
},
LoadRequest, Manifest,
},
table::data::TableShardInfo,
sst::file::tests::FilePurgerMocker,
table::data::{tests::default_schema, TableData, TableShardInfo},
TableOptions,
};

Expand Down Expand Up @@ -780,7 +773,7 @@ mod tests {
Ok(builder.clone().build())
}

fn apply_edit_to_table(&self, request: MetaEditRequest) -> Result<()> {
fn apply_edit_to_table(&self, request: MetaEditRequest) -> Result<TableDataRef> {
let mut builder = self.builder.lock().unwrap();
let MetaEditRequest {
shard_info: _,
Expand All @@ -800,7 +793,24 @@ mod tests {
}
}

Ok(())
let table_opts = TableOptions::default();
let purger = FilePurgerMocker::mock();
let collector = Arc::new(NoopCollector);
let test_data = TableData::new(
0,
TableId::new(0),
"test_table".to_string(),
default_schema(),
0,
table_opts,
&purger,
0.75,
collector,
NonZeroUsize::new(usize::MAX).unwrap(),
)
.unwrap();

Ok(Arc::new(test_data))
}
}

Expand All @@ -820,7 +830,7 @@ mod tests {
let runtime = build_runtime(2);

let options = Options {
snapshot_every_n_updates: 100,
snapshot_every_n_updates: NonZeroUsize::new(100).unwrap(),
..Default::default()
};
Self {
Expand Down Expand Up @@ -1183,7 +1193,7 @@ mod tests {
.await;

manifest
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
.do_snapshot_internal(ctx.schema_id.as_u32(), table_id, location)
.await
.unwrap();

Expand Down Expand Up @@ -1242,7 +1252,7 @@ mod tests {

let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
manifest
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
.do_snapshot_internal(ctx.schema_id.as_u32(), table_id, location)
.await
.unwrap();
for i in 500..550 {
Expand Down
68 changes: 66 additions & 2 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::{
convert::TryInto,
fmt,
fmt::Formatter,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
Expand Down Expand Up @@ -153,6 +154,12 @@ pub struct TableData {
/// No write/alter is allowed if the table is dropped.
dropped: AtomicBool,

/// Manifest updates after last snapshot
manifest_updates: AtomicUsize,

/// Every n manifest updates to trigger a snapshot
manifest_snapshot_every_n_updates: NonZeroUsize,

/// Metrics of this table
pub metrics: Metrics,

Expand Down Expand Up @@ -213,6 +220,7 @@ impl TableData {
purger: &FilePurger,
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Result<Self> {
// FIXME(yingwen): Validate TableOptions, such as bucket_duration >=
// segment_duration and bucket_duration is aligned to segment_duration
Expand Down Expand Up @@ -245,6 +253,8 @@ impl TableData {
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)),
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
})
}

Expand All @@ -258,6 +268,7 @@ impl TableData {
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
allocator: IdAllocator,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Result<Self> {
let memtable_factory = Arc::new(SkiplistMemTableFactory);
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
Expand Down Expand Up @@ -287,6 +298,8 @@ impl TableData {
metrics,
shard_info: TableShardInfo::new(shard_id),
serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)),
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
})
}

Expand Down Expand Up @@ -559,6 +572,20 @@ impl TableData {
shard_info: self.shard_info,
}
}

pub fn increase_manifest_updates(&self, updates_num: usize) {
self.manifest_updates
.fetch_add(updates_num, Ordering::Relaxed);
}

pub fn should_do_manifest_snapshot(&self) -> bool {
let updates = self.manifest_updates.load(Ordering::Relaxed);
updates >= self.manifest_snapshot_every_n_updates.get()
}

pub fn reset_manifest_updates(&self) {
self.manifest_updates.store(0, Ordering::Relaxed);
}
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -666,7 +693,7 @@ pub mod tests {

const DEFAULT_SPACE_ID: SpaceId = 1;

fn default_schema() -> Schema {
pub fn default_schema() -> Schema {
table::create_schema_builder(
&[("key", DatumKind::Timestamp)],
&[("value", DatumKind::Double)],
Expand Down Expand Up @@ -697,6 +724,7 @@ pub mod tests {
table_id: TableId,
table_name: String,
shard_id: ShardId,
manifest_snapshot_every_n_updates: NonZeroUsize,
}

impl TableDataMocker {
Expand All @@ -715,6 +743,14 @@ pub mod tests {
self
}

pub fn manifest_snapshot_every_n_updates(
mut self,
manifest_snapshot_every_n_updates: NonZeroUsize,
) -> Self {
self.manifest_snapshot_every_n_updates = manifest_snapshot_every_n_updates;
self
}

pub fn build(self) -> TableData {
let space_id = DEFAULT_SPACE_ID;
let table_schema = default_schema();
Expand Down Expand Up @@ -746,6 +782,7 @@ pub mod tests {
&purger,
0.75,
collector,
self.manifest_snapshot_every_n_updates,
)
.unwrap()
}
Expand All @@ -757,6 +794,7 @@ pub mod tests {
table_id: table::new_table_id(2, 1),
table_name: "mocked_table".to_string(),
shard_id: DEFAULT_SHARD_ID,
manifest_snapshot_every_n_updates: NonZeroUsize::new(usize::MAX).unwrap(),
}
}
}
Expand Down Expand Up @@ -849,4 +887,30 @@ pub mod tests {
compute_mutable_limit(80, 1.1);
compute_mutable_limit(80, -0.1);
}

#[test]
fn test_manifest_snapshot_trigger() {
// When snapshot_every_n_updates is not zero.
let table_data = TableDataMocker::default()
.manifest_snapshot_every_n_updates(NonZeroUsize::new(5).unwrap())
.build();

check_manifest_snapshot_trigger(&table_data);
// Reset and check again.
table_data.reset_manifest_updates();
check_manifest_snapshot_trigger(&table_data);
}

fn check_manifest_snapshot_trigger(table_data: &TableData) {
// When no updates yet, result should be false.
assert!(!table_data.should_do_manifest_snapshot());

// Eq case.
table_data.increase_manifest_updates(5);
assert!(table_data.should_do_manifest_snapshot());

// Greater case.
table_data.increase_manifest_updates(5);
assert!(table_data.should_do_manifest_snapshot());
}
}
Loading