Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: snapshot timestamp & navigation #5535

Merged
merged 4 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ build_exceptions! {

TableVersionMismatched(2009),
OCCRetryFailure(2011),
TableNotWritable(2012),
TableHistoricalDataNotFound(2013),

// User api error codes.
UnknownUser(2201),
Expand Down
27 changes: 25 additions & 2 deletions query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ pub struct FuseTable {
pub(crate) meta_location_generator: TableMetaLocationGenerator,

pub(crate) order_keys: Vec<Expression>,
pub(crate) read_only: bool,
}

impl FuseTable {
pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result<Box<dyn Table>> {
let r = Self::do_create(table_info, false)?;
Ok(r)
}

pub fn do_create(table_info: TableInfo, read_only: bool) -> Result<Box<FuseTable>> {
let storage_prefix = Self::parse_storage_prefix(&table_info)?;
let mut order_keys = Vec::new();
if let Some(order) = &table_info.meta.order_keys {
Expand All @@ -67,6 +73,7 @@ impl FuseTable {
table_info,
order_keys,
meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix),
read_only,
}))
}

Expand Down Expand Up @@ -116,7 +123,7 @@ impl FuseTable {

pub fn snapshot_format_version(&self) -> u64 {
match self.snapshot_loc() {
Some(loc) => TableMetaLocationGenerator::snaphost_version(loc.as_str()),
Some(loc) => TableMetaLocationGenerator::snapshot_version(loc.as_str()),
None => {
// No snapshot location here, indicates that there are no data of this table yet
// in this case, we just returns the current snapshot version
Expand All @@ -143,6 +150,17 @@ impl FuseTable {
))
})
}

pub fn check_mutable(&self) -> Result<()> {
if self.read_only {
Err(ErrorCode::TableNotWritable(format!(
"Table {} is in read-only mode",
self.table_info.desc.as_str()
)))
} else {
Ok(())
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -182,7 +200,7 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
plan: &ReadDataSourcePlan,
) -> Result<SendableDataBlockStream> {
self.do_read(ctx, &plan.push_downs).await
self.do_read(ctx, &plan.push_downs)
}

#[tracing::instrument(level = "debug", name = "fuse_table_read2", skip(self, ctx, pipeline), fields(ctx.id = ctx.get_id().as_str()))]
Expand All @@ -196,6 +214,7 @@ impl Table for FuseTable {
}

fn append2(&self, ctx: Arc<QueryContext>, pipeline: &mut NewPipeline) -> Result<()> {
self.check_mutable()?;
self.do_append2(ctx, pipeline)
}

Expand All @@ -205,6 +224,7 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
stream: SendableDataBlockStream,
) -> Result<SendableDataBlockStream> {
self.check_mutable()?;
let log_entry_stream = self.append_chunks(ctx, stream).await?;
let data_block_stream =
log_entry_stream.map(|append_log_entry_res| match append_log_entry_res {
Expand All @@ -221,6 +241,7 @@ impl Table for FuseTable {
operations: Vec<DataBlock>,
overwrite: bool,
) -> Result<()> {
self.check_mutable()?;
// only append operation supported currently
let append_log_entries = operations
.iter()
Expand All @@ -235,10 +256,12 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
truncate_plan: TruncateTablePlan,
) -> Result<()> {
self.check_mutable()?;
self.do_truncate(ctx, truncate_plan).await
}

async fn optimize(&self, ctx: Arc<QueryContext>, keep_last_snapshot: bool) -> Result<()> {
self.check_mutable()?;
self.do_optimize(ctx, keep_last_snapshot).await
}

Expand Down
6 changes: 3 additions & 3 deletions query/src/storages/fuse/io/locations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ impl TableMetaLocationGenerator {
}

pub fn snapshot_location_from_uuid(&self, id: &Uuid, version: u64) -> Result<String> {
let snaphost_version = SnapshotVersion::try_from(version)?;
Ok(snaphost_version.create(id, &self.prefix))
let snapshot_version = SnapshotVersion::try_from(version)?;
Ok(snapshot_version.create(id, &self.prefix))
}

pub fn snaphost_version(location: impl AsRef<str>) -> u64 {
pub fn snapshot_version(location: impl AsRef<str>) -> u64 {
if location.as_ref().ends_with(SNAPHOST_V1.suffix()) {
SNAPHOST_V1.version()
} else {
Expand Down
43 changes: 43 additions & 0 deletions query/src/storages/fuse/io/read/meta_readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use futures::io::BufReader;
use futures::stream;
use opendal::BytesReader;

use super::cached_reader::CachedReader;
Expand Down Expand Up @@ -98,6 +100,47 @@ impl<'a> TableSnapshotReader<'a> {

Ok(snapshots)
}

pub fn snapshot_history(
&'a self,
location: String,
format_version: u64,
location_gen: TableMetaLocationGenerator,
) -> Pin<Box<dyn futures::stream::Stream<Item = Result<Arc<TableSnapshot>>> + 'a>> {
let stream = stream::try_unfold(
(self, location_gen, Some((location, format_version))),
|(reader, gen, next)| async move {
if let Some((loc, ver)) = next {
let snapshot = match reader.read(loc, None, ver).await {
Ok(s) => Ok(Some(s)),
Err(e) => {
if e.code() == ErrorCode::storage_not_found_code() {
Ok(None)
} else {
Err(e)
}
}
};
match snapshot {
Ok(Some(snapshot)) => {
if let Some((id, v)) = snapshot.prev_snapshot_id {
let new_ver = v;
let new_loc = gen.snapshot_location_from_uuid(&id, v)?;
Ok(Some((snapshot, (reader, gen, Some((new_loc, new_ver))))))
} else {
Ok(Some((snapshot, (reader, gen, None))))
}
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
} else {
Ok(None)
}
},
);
Box::pin(stream)
}
}

#[async_trait::async_trait]
Expand Down
21 changes: 21 additions & 0 deletions query/src/storages/fuse/meta/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Add;

use chrono::DateTime;
use chrono::Utc;
use common_datavalues::DataSchema;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -30,6 +34,10 @@ pub struct TableSnapshot {
/// id of snapshot
pub snapshot_id: SnapshotId,

/// previous snapshot
pub timestamp: Option<DateTime<Utc>>,

/// previous snapshot
pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,

/// For each snapshot, we keep a schema for it (in case of schema evolution)
Expand All @@ -48,14 +56,26 @@ pub struct TableSnapshot {
impl TableSnapshot {
pub fn new(
snapshot_id: SnapshotId,
prev_timestamp: &Option<DateTime<Utc>>,
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
schema: DataSchema,
summary: Statistics,
segments: Vec<Location>,
) -> Self {
// timestamp of the snapshot should always larger than the previous one's
let now = Utc::now();
let mut timestamp = Some(now);
if let Some(prev_instant) = prev_timestamp {
if prev_instant > &now {
// if local time is smaller, use the timestamp of previous snapshot, plus 1 ms
timestamp = Some(prev_instant.add(chrono::Duration::milliseconds(1)))
}
};

Self {
format_version: TableSnapshot::VERSION,
snapshot_id,
timestamp,
prev_snapshot_id,
schema,
summary,
Expand All @@ -75,6 +95,7 @@ impl From<v0::TableSnapshot> for TableSnapshot {
Self {
format_version: TableSnapshot::VERSION,
snapshot_id: s.snapshot_id,
timestamp: None,
prev_snapshot_id: s.prev_snapshot_id.map(|id| (id, 0)),
schema: s.schema,
summary: s.summary,
Expand Down
4 changes: 4 additions & 0 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl FuseTable {
) -> Result<()> {
let prev = self.read_table_snapshot(ctx).await?;
let prev_version = self.snapshot_format_version();
let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp);
let schema = self.table_info.meta.schema.as_ref().clone();
let (segments, summary) = Self::merge_append_operations(operation_log)?;

Expand All @@ -162,6 +163,7 @@ impl FuseTable {
let new_snapshot = if overwrite {
TableSnapshot::new(
Uuid::new_v4(),
&prev_timestamp,
prev.as_ref().map(|v| (v.snapshot_id, prev_version)),
schema,
summary,
Expand Down Expand Up @@ -228,6 +230,7 @@ impl FuseTable {
statistics
};
let prev_snapshot_id = previous.as_ref().map(|v| (v.snapshot_id, prev_version));
let prev_snapshot_timestamp = previous.as_ref().and_then(|v| v.timestamp);

// 2. merge segment locations with previous snapshot, if any
if let Some(snapshot) = &previous {
Expand All @@ -237,6 +240,7 @@ impl FuseTable {

let new_snapshot = TableSnapshot::new(
Uuid::new_v4(),
&prev_snapshot_timestamp,
prev_snapshot_id,
schema.clone(),
stats,
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod append;
mod commit;
mod fuse_sink;
mod navigate;
mod operation_log;
mod optimize;
mod read;
Expand Down
117 changes: 117 additions & 0 deletions query/src/storages/fuse/operations/navigate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::TableStatistics;
use futures::TryStreamExt;

use crate::sessions::QueryContext;
use crate::sql::OPT_KEY_SNAPSHOT_LOCATION;
use crate::storages::fuse::io::MetaReaders;
use crate::storages::fuse::FuseTable;

impl FuseTable {
pub async fn navigate(
&self,
ctx: &Arc<QueryContext>,
time_point: DateTime<Utc>,
) -> Result<Arc<FuseTable>> {
let snapshot_location = if let Some(loc) = self.snapshot_loc() {
loc
} else {
// not an error?
return Err(ErrorCode::TableHistoricalDataNotFound(
"Empty Table has no historical data",
));
};

let snapshot_version = self.snapshot_format_version();
let reader = MetaReaders::table_snapshot_reader(ctx);

// grab the table history
// snapshots are order by timestamp DESC.
let mut snapshots = reader.snapshot_history(
snapshot_location,
snapshot_version,
self.meta_location_generator().clone(),
);

// Find the instant which matches ths given `time_point`.
let mut instant = None;
while let Some(snapshot) = snapshots.try_next().await? {
if let Some(ts) = snapshot.timestamp {
// break on the first one
if ts <= time_point {
instant = Some(snapshot);
break;
}
}
}

if let Some(snapshot) = instant {
// Load the table instance by the snapshot

// The `seq` of ident that we cloned here is JUST a place holder
// we should NOT use it other than a pure place holder.
// Fortunately, historical table should be read-only.
// - Although, caller of fuse table will not perform mutation on a historical table
// but in case there are careless mistakes, an extra attribute `read_only` is
// added the FuseTable, and during mutation operations, FuseTable will check it.
// - Figuring out better way...
let mut table_info = self.table_info.clone();

// There are more to be kept in snapshot, like engine_options, ordering keys...
// or we could just keep a clone of TableMeta in the snapshot.
//
// currently, here are what we can recovery from the snapshot:

// 1. the table schema
table_info.meta.schema = Arc::new(snapshot.schema.clone());

// 2. the table option `snapshot_location`
let ver = snapshot.format_version();
let loc = self
.meta_location_generator
.snapshot_location_from_uuid(&snapshot.snapshot_id, ver)?;
table_info
.meta
.options
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), loc);

// 3. The statistics
let summary = &snapshot.summary;
table_info.meta.statistics = TableStatistics {
number_of_rows: summary.row_count,
data_bytes: summary.uncompressed_byte_size,
compressed_data_bytes: summary.compressed_byte_size,
index_data_bytes: 0, // we do not have it yet
};

// let's instantiate it
let read_only = true;
let fuse_tbl = FuseTable::do_create(table_info, read_only)?;
Ok(fuse_tbl.into())
} else {
Err(ErrorCode::TableHistoricalDataNotFound(
"No historical data found",
))
}
}
}
Loading