Skip to content

Commit

Permalink
Merge pull request #5535 from dantengsky/feat-snapshot-timestamp
Browse files Browse the repository at this point in the history
feat: snapshot timestamp & navigation
  • Loading branch information
BohuTANG authored May 24, 2022
2 parents dd478e1 + f36fe6d commit b1e3cf4
Show file tree
Hide file tree
Showing 16 changed files with 510 additions and 6 deletions.
2 changes: 2 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ build_exceptions! {

TableVersionMismatched(2009),
OCCRetryFailure(2011),
TableNotWritable(2012),
TableHistoricalDataNotFound(2013),

// User api error codes.
UnknownUser(2201),
Expand Down
27 changes: 25 additions & 2 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ pub struct FuseTable {
pub(crate) meta_location_generator: TableMetaLocationGenerator,

pub(crate) order_keys: Vec<Expression>,
pub(crate) read_only: bool,
}

impl FuseTable {
pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result<Box<dyn Table>> {
let r = Self::do_create(table_info, false)?;
Ok(r)
}

pub fn do_create(table_info: TableInfo, read_only: bool) -> Result<Box<FuseTable>> {
let storage_prefix = Self::parse_storage_prefix(&table_info)?;
let mut order_keys = Vec::new();
if let Some(order) = &table_info.meta.order_keys {
Expand All @@ -67,6 +73,7 @@ impl FuseTable {
table_info,
order_keys,
meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix),
read_only,
}))
}

Expand Down Expand Up @@ -116,7 +123,7 @@ impl FuseTable {

pub fn snapshot_format_version(&self) -> u64 {
match self.snapshot_loc() {
Some(loc) => TableMetaLocationGenerator::snaphost_version(loc.as_str()),
Some(loc) => TableMetaLocationGenerator::snapshot_version(loc.as_str()),
None => {
// No snapshot location here, indicates that there are no data of this table yet
// in this case, we just returns the current snapshot version
Expand All @@ -143,6 +150,17 @@ impl FuseTable {
))
})
}

pub fn check_mutable(&self) -> Result<()> {
if self.read_only {
Err(ErrorCode::TableNotWritable(format!(
"Table {} is in read-only mode",
self.table_info.desc.as_str()
)))
} else {
Ok(())
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -182,7 +200,7 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
plan: &ReadDataSourcePlan,
) -> Result<SendableDataBlockStream> {
self.do_read(ctx, &plan.push_downs).await
self.do_read(ctx, &plan.push_downs)
}

#[tracing::instrument(level = "debug", name = "fuse_table_read2", skip(self, ctx, pipeline), fields(ctx.id = ctx.get_id().as_str()))]
Expand All @@ -196,6 +214,7 @@ impl Table for FuseTable {
}

fn append2(&self, ctx: Arc<QueryContext>, pipeline: &mut NewPipeline) -> Result<()> {
self.check_mutable()?;
self.do_append2(ctx, pipeline)
}

Expand All @@ -205,6 +224,7 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
stream: SendableDataBlockStream,
) -> Result<SendableDataBlockStream> {
self.check_mutable()?;
let log_entry_stream = self.append_chunks(ctx, stream).await?;
let data_block_stream =
log_entry_stream.map(|append_log_entry_res| match append_log_entry_res {
Expand All @@ -221,6 +241,7 @@ impl Table for FuseTable {
operations: Vec<DataBlock>,
overwrite: bool,
) -> Result<()> {
self.check_mutable()?;
// only append operation supported currently
let append_log_entries = operations
.iter()
Expand All @@ -235,10 +256,12 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
truncate_plan: TruncateTablePlan,
) -> Result<()> {
self.check_mutable()?;
self.do_truncate(ctx, truncate_plan).await
}

async fn optimize(&self, ctx: Arc<QueryContext>, keep_last_snapshot: bool) -> Result<()> {
self.check_mutable()?;
self.do_optimize(ctx, keep_last_snapshot).await
}

Expand Down
6 changes: 3 additions & 3 deletions query/src/storages/fuse/io/locations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ impl TableMetaLocationGenerator {
}

pub fn snapshot_location_from_uuid(&self, id: &Uuid, version: u64) -> Result<String> {
let snaphost_version = SnapshotVersion::try_from(version)?;
Ok(snaphost_version.create(id, &self.prefix))
let snapshot_version = SnapshotVersion::try_from(version)?;
Ok(snapshot_version.create(id, &self.prefix))
}

pub fn snaphost_version(location: impl AsRef<str>) -> u64 {
pub fn snapshot_version(location: impl AsRef<str>) -> u64 {
if location.as_ref().ends_with(SNAPHOST_V1.suffix()) {
SNAPHOST_V1.version()
} else {
Expand Down
43 changes: 43 additions & 0 deletions query/src/storages/fuse/io/read/meta_readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use futures::io::BufReader;
use futures::stream;
use opendal::BytesReader;

use super::cached_reader::CachedReader;
Expand Down Expand Up @@ -98,6 +100,47 @@ impl<'a> TableSnapshotReader<'a> {

Ok(snapshots)
}

pub fn snapshot_history(
&'a self,
location: String,
format_version: u64,
location_gen: TableMetaLocationGenerator,
) -> Pin<Box<dyn futures::stream::Stream<Item = Result<Arc<TableSnapshot>>> + 'a>> {
let stream = stream::try_unfold(
(self, location_gen, Some((location, format_version))),
|(reader, gen, next)| async move {
if let Some((loc, ver)) = next {
let snapshot = match reader.read(loc, None, ver).await {
Ok(s) => Ok(Some(s)),
Err(e) => {
if e.code() == ErrorCode::storage_not_found_code() {
Ok(None)
} else {
Err(e)
}
}
};
match snapshot {
Ok(Some(snapshot)) => {
if let Some((id, v)) = snapshot.prev_snapshot_id {
let new_ver = v;
let new_loc = gen.snapshot_location_from_uuid(&id, v)?;
Ok(Some((snapshot, (reader, gen, Some((new_loc, new_ver))))))
} else {
Ok(Some((snapshot, (reader, gen, None))))
}
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
} else {
Ok(None)
}
},
);
Box::pin(stream)
}
}

#[async_trait::async_trait]
Expand Down
21 changes: 21 additions & 0 deletions query/src/storages/fuse/meta/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Add;

use chrono::DateTime;
use chrono::Utc;
use common_datavalues::DataSchema;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -30,6 +34,10 @@ pub struct TableSnapshot {
/// id of snapshot
pub snapshot_id: SnapshotId,

/// previous snapshot
pub timestamp: Option<DateTime<Utc>>,

/// previous snapshot
pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,

/// For each snapshot, we keep a schema for it (in case of schema evolution)
Expand All @@ -48,14 +56,26 @@ pub struct TableSnapshot {
impl TableSnapshot {
pub fn new(
snapshot_id: SnapshotId,
prev_timestamp: &Option<DateTime<Utc>>,
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
schema: DataSchema,
summary: Statistics,
segments: Vec<Location>,
) -> Self {
// timestamp of the snapshot should always larger than the previous one's
let now = Utc::now();
let mut timestamp = Some(now);
if let Some(prev_instant) = prev_timestamp {
if prev_instant > &now {
// if local time is smaller, use the timestamp of previous snapshot, plus 1 ms
timestamp = Some(prev_instant.add(chrono::Duration::milliseconds(1)))
}
};

Self {
format_version: TableSnapshot::VERSION,
snapshot_id,
timestamp,
prev_snapshot_id,
schema,
summary,
Expand All @@ -75,6 +95,7 @@ impl From<v0::TableSnapshot> for TableSnapshot {
Self {
format_version: TableSnapshot::VERSION,
snapshot_id: s.snapshot_id,
timestamp: None,
prev_snapshot_id: s.prev_snapshot_id.map(|id| (id, 0)),
schema: s.schema,
summary: s.summary,
Expand Down
4 changes: 4 additions & 0 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl FuseTable {
) -> Result<()> {
let prev = self.read_table_snapshot(ctx).await?;
let prev_version = self.snapshot_format_version();
let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp);
let schema = self.table_info.meta.schema.as_ref().clone();
let (segments, summary) = Self::merge_append_operations(operation_log)?;

Expand All @@ -162,6 +163,7 @@ impl FuseTable {
let new_snapshot = if overwrite {
TableSnapshot::new(
Uuid::new_v4(),
&prev_timestamp,
prev.as_ref().map(|v| (v.snapshot_id, prev_version)),
schema,
summary,
Expand Down Expand Up @@ -228,6 +230,7 @@ impl FuseTable {
statistics
};
let prev_snapshot_id = previous.as_ref().map(|v| (v.snapshot_id, prev_version));
let prev_snapshot_timestamp = previous.as_ref().and_then(|v| v.timestamp);

// 2. merge segment locations with previous snapshot, if any
if let Some(snapshot) = &previous {
Expand All @@ -237,6 +240,7 @@ impl FuseTable {

let new_snapshot = TableSnapshot::new(
Uuid::new_v4(),
&prev_snapshot_timestamp,
prev_snapshot_id,
schema.clone(),
stats,
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod append;
mod commit;
mod fuse_sink;
mod navigate;
mod operation_log;
mod optimize;
mod read;
Expand Down
117 changes: 117 additions & 0 deletions query/src/storages/fuse/operations/navigate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::TableStatistics;
use futures::TryStreamExt;

use crate::sessions::QueryContext;
use crate::sql::OPT_KEY_SNAPSHOT_LOCATION;
use crate::storages::fuse::io::MetaReaders;
use crate::storages::fuse::FuseTable;

impl FuseTable {
pub async fn navigate(
&self,
ctx: &Arc<QueryContext>,
time_point: DateTime<Utc>,
) -> Result<Arc<FuseTable>> {
let snapshot_location = if let Some(loc) = self.snapshot_loc() {
loc
} else {
// not an error?
return Err(ErrorCode::TableHistoricalDataNotFound(
"Empty Table has no historical data",
));
};

let snapshot_version = self.snapshot_format_version();
let reader = MetaReaders::table_snapshot_reader(ctx);

// grab the table history
// snapshots are order by timestamp DESC.
let mut snapshots = reader.snapshot_history(
snapshot_location,
snapshot_version,
self.meta_location_generator().clone(),
);

// Find the instant which matches ths given `time_point`.
let mut instant = None;
while let Some(snapshot) = snapshots.try_next().await? {
if let Some(ts) = snapshot.timestamp {
// break on the first one
if ts <= time_point {
instant = Some(snapshot);
break;
}
}
}

if let Some(snapshot) = instant {
// Load the table instance by the snapshot

// The `seq` of ident that we cloned here is JUST a place holder
// we should NOT use it other than a pure place holder.
// Fortunately, historical table should be read-only.
// - Although, caller of fuse table will not perform mutation on a historical table
// but in case there are careless mistakes, an extra attribute `read_only` is
// added the FuseTable, and during mutation operations, FuseTable will check it.
// - Figuring out better way...
let mut table_info = self.table_info.clone();

// There are more to be kept in snapshot, like engine_options, ordering keys...
// or we could just keep a clone of TableMeta in the snapshot.
//
// currently, here are what we can recovery from the snapshot:

// 1. the table schema
table_info.meta.schema = Arc::new(snapshot.schema.clone());

// 2. the table option `snapshot_location`
let ver = snapshot.format_version();
let loc = self
.meta_location_generator
.snapshot_location_from_uuid(&snapshot.snapshot_id, ver)?;
table_info
.meta
.options
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), loc);

// 3. The statistics
let summary = &snapshot.summary;
table_info.meta.statistics = TableStatistics {
number_of_rows: summary.row_count,
data_bytes: summary.uncompressed_byte_size,
compressed_data_bytes: summary.compressed_byte_size,
index_data_bytes: 0, // we do not have it yet
};

// let's instantiate it
let read_only = true;
let fuse_tbl = FuseTable::do_create(table_info, read_only)?;
Ok(fuse_tbl.into())
} else {
Err(ErrorCode::TableHistoricalDataNotFound(
"No historical data found",
))
}
}
}
Loading

0 comments on commit b1e3cf4

Please sign in to comment.