Skip to content

Commit

Permalink
Merge pull request #8244 from Xuanwo/write-retry
Browse files Browse the repository at this point in the history
feat: Retry fuse table write operations
  • Loading branch information
BohuTANG authored Oct 16, 2022
2 parents 479d538 + 2fa0ab6 commit 053a751
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async-channel = "1.7.1"
async-recursion = "1.0.0"
async-stream = "0.3.3"
async-trait = { version = "0.1.57", package = "async-trait-fn" }
backon = "0.2"
bumpalo = "3.11.0"
byteorder = "1.4.3"
bytes = "1.2.1"
Expand Down
25 changes: 17 additions & 8 deletions src/query/service/src/storages/result/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::ErrorKind;
use std::sync::Arc;

use backon::ExponentialBackoff;
use backon::Retryable;
use common_datablocks::serialize_data_blocks;
use common_datablocks::DataBlock;
use common_exception::Result;
Expand All @@ -23,6 +26,7 @@ use common_legacy_planners::PartInfoPtr;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;
use opendal::Operator;
use tracing::debug;

use crate::sessions::QueryContext;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -101,14 +105,19 @@ impl ResultTableWriter {
let block_statistics = BlockStatistics::from(&block, location.clone(), None)?;
let schema = block.schema().clone();
let (size, meta_data) = serialize_data_blocks(vec![block], &schema, &mut data)?;
self.data_accessor
.object(&location)
.write(data)
.await
.map_err(|e| {
println!("error {}", e);
e
})?;

let object = self.data_accessor.object(&location);
{ || object.write(data.as_slice()) }
.retry(ExponentialBackoff::default())
.when(|err| err.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
debug!(
"append block write retry after {}s for error {:?}",
dur.as_secs(),
err
)
})
.await?;
self.accumulator
.add_block(size, meta_data, block_statistics, None, 0)?;
Ok(self.get_last_part_info())
Expand Down
16 changes: 15 additions & 1 deletion src/query/service/src/storages/stage/stage_table_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::any::Any;
use std::io::ErrorKind;
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use backon::ExponentialBackoff;
use backon::Retryable;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -29,6 +32,7 @@ use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;
use opendal::Operator;
use tracing::debug;

use crate::sessions::TableContext;

Expand Down Expand Up @@ -269,7 +273,17 @@ impl Processor for StageTableSink {
.inc_write_bytes(bytes.len());

let object = self.data_accessor.object(&path);
object.write(bytes.as_slice()).await?;
{ || object.write(bytes.as_slice()) }
.retry(ExponentialBackoff::default())
.when(|err| err.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
debug!(
"stage table sink write retry after {}s for error {:?}",
dur.as_secs(),
err
)
})
.await?;

match remainng_block {
Some(block) => self.state = State::NeedSerialize(block),
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ common-storages-util = { path = "../util" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
backon = "0.2"
chrono = "0.4.22"
futures = "0.3.24"
futures-util = "0.3.24"
Expand Down
19 changes: 18 additions & 1 deletion src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::ErrorKind;

use backon::ExponentialBackoff;
use backon::Retryable;
use common_arrow::parquet::compression::CompressionOptions;
use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_datablocks::serialize_data_blocks;
Expand All @@ -22,6 +26,7 @@ use common_fuse_meta::meta::BlockMeta;
use common_fuse_meta::meta::ClusterStatistics;
use common_fuse_meta::meta::Location;
use opendal::Operator;
use tracing::debug;
use uuid::Uuid;

use crate::index::BlockFilter;
Expand Down Expand Up @@ -125,7 +130,19 @@ pub async fn write_block(
}

pub async fn write_data(data: &[u8], data_accessor: &Operator, location: &str) -> Result<()> {
data_accessor.object(location).write(data).await?;
let object = data_accessor.object(location);

{ || object.write(data) }
.retry(ExponentialBackoff::default())
.when(|err| err.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
debug!(
"fuse table block writer write_data retry after {}s for error {:?}",
dur.as_secs(),
err
)
})
.await?;

Ok(())
}
17 changes: 16 additions & 1 deletion src/query/storages/fuse/src/io/write/meta_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@
// limitations under the License.

use std::io::Error;
use std::io::ErrorKind;

use backon::ExponentialBackoff;
use backon::Retryable;
use common_exception::Result;
use opendal::Operator;
use serde::Serialize;
use tracing::debug;

pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: T) -> Result<()>
where T: Serialize {
let bs = serde_json::to_vec(&meta).map_err(Error::other)?;
data_accessor.object(location).write(bs).await?;
let object = data_accessor.object(location);
{ || object.write(bs.as_slice()) }
.retry(ExponentialBackoff::default())
.when(|err| err.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
debug!(
"stage table sink write retry after {}s for error {:?}",
dur.as_secs(),
err
)
})
.await?;
Ok(())
}
18 changes: 15 additions & 3 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::ErrorKind;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use backon::Retryable;
use common_base::base::ProgressValues;
use common_cache::Cache;
use common_catalog::table::Table;
Expand Down Expand Up @@ -79,6 +81,8 @@ impl FuseTable {
// By default, it is 2 minutes
let max_elapsed = OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS;

// TODO(xuanwo): move to backon instead.
//
// To simplify the settings, using fixed common values for randomization_factor and multiplier
let mut backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(init_delay)
Expand Down Expand Up @@ -373,9 +377,17 @@ impl FuseTable {
format!("{}{}", storage_prefix, last_snapshot_path)
};

operator
.object(&hint_path)
.write(last_snapshot_path)
let object = operator.object(&hint_path);
{ || object.write(last_snapshot_path.as_bytes()) }
.retry(backon::ExponentialBackoff::default())
.when(|err| err.kind() == ErrorKind::Interrupted)
.notify(|err, dur| {
debug!(
"fuse table write_last_snapshot_hint retry after {}s for error {:?}",
dur.as_secs(),
err
)
})
.await
.unwrap_or_else(|e| {
warn!("write last snapshot hint failure. {}", e);
Expand Down

1 comment on commit 053a751

@vercel
Copy link

@vercel vercel bot commented on 053a751 Oct 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.