diff --git a/common/dal2/src/accessor.rs b/common/dal2/src/accessor.rs index 9c2c9981ee24..3f7a330ab5b3 100644 --- a/common/dal2/src/accessor.rs +++ b/common/dal2/src/accessor.rs @@ -11,6 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use std::sync::Arc; + use async_trait::async_trait; use futures::AsyncRead; @@ -51,3 +54,21 @@ pub trait Accessor: Send + Sync { unimplemented!() } } + +/// All functions in `Accessor` only requires `&self`, so it's safe to implement +/// `Accessor` for `Arc`. +#[async_trait] +impl Accessor for Arc { + async fn read(&self, args: &OpRead) -> Result { + self.as_ref().read(args).await + } + async fn write(&self, r: Reader, args: &OpWrite) -> Result { + self.as_ref().write(r, args).await + } + async fn stat(&self, args: &OpStat) -> Result { + self.as_ref().stat(args).await + } + async fn delete(&self, args: &OpDelete) -> Result<()> { + self.as_ref().delete(args).await + } +} diff --git a/common/dal2/src/layer.rs b/common/dal2/src/layer.rs new file mode 100644 index 000000000000..a32aedfb85cd --- /dev/null +++ b/common/dal2/src/layer.rs @@ -0,0 +1,47 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::Accessor; + +/// Layer is used to intercept the operations on the underlying storage. +/// +/// Struct that implement this trait must accept input `Arc` as inner, +/// and returns a new `Arc` as output. +/// +/// All functions in `Accessor` requires `&self`, so it's implementor's responsibility +/// to maintain the internal mutability. Please also keep in mind that `Accessor` +/// requires `Send` and `Sync`. +/// +/// # Examples +/// +/// ``` +/// use dal2::{Accessor, Layer}; +/// +/// struct Trace { +/// inner: Arc, +/// } +/// +/// impl Accessor for Trace {} +/// +/// impl Layer for Trace { +/// fn layer(&self, inner: Arc) -> Arc { +/// Arc::new(Trace { inner }) +/// } +/// } +/// ``` +pub trait Layer { + fn layer(&self, inner: Arc) -> Arc; +} diff --git a/common/dal2/src/lib.rs b/common/dal2/src/lib.rs index 0e8d0077d59d..764566a825d9 100644 --- a/common/dal2/src/lib.rs +++ b/common/dal2/src/lib.rs @@ -20,6 +20,9 @@ mod accessor; pub use accessor::Accessor; pub use accessor::Reader; +mod layer; +pub use layer::Layer; + mod operator; pub use operator::Operator; diff --git a/common/dal2/src/operator.rs b/common/dal2/src/operator.rs index c03315a1753f..08a6d4f610bc 100644 --- a/common/dal2/src/operator.rs +++ b/common/dal2/src/operator.rs @@ -19,6 +19,7 @@ use crate::ops::OpRead; use crate::ops::OpStat; use crate::ops::OpWrite; use crate::Accessor; +use crate::Layer; #[derive(Clone)] pub struct Operator { @@ -30,6 +31,13 @@ impl Operator { Self { accessor } } + #[must_use] + pub fn layer(self, layer: impl Layer) -> Self { + Operator { + accessor: layer.layer(self.accessor.clone()), + } + } + pub fn inner(&self) -> Arc { self.accessor.clone() } diff --git a/common/dal2/tests/it/layer.rs b/common/dal2/tests/it/layer.rs new file mode 100644 index 000000000000..41e8534039a7 --- /dev/null +++ b/common/dal2/tests/it/layer.rs @@ -0,0 +1,62 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_dal2::ops::OpDelete; +use common_dal2::services::fs; +use common_dal2::Accessor; +use common_dal2::Layer; +use common_dal2::Operator; +use futures::lock::Mutex; + +struct Test { + #[allow(dead_code)] + inner: Arc, + deleted: Arc>, +} + +impl Layer for &Test { + fn layer(&self, inner: Arc) -> Arc { + Arc::new(Test { + inner: inner.clone(), + deleted: self.deleted.clone(), + }) + } +} + +#[async_trait::async_trait] +impl Accessor for Test { + async fn delete(&self, _args: &OpDelete) -> common_dal2::error::Result<()> { + let mut x = self.deleted.lock().await; + *x = true; + + // We will not call anything here to test the layer. + Ok(()) + } +} + +#[tokio::test] +async fn test_layer() { + let test = Test { + inner: Arc::new(fs::Backend::build().finish()), + deleted: Arc::new(Mutex::new(false)), + }; + + let op = Operator::new(fs::Backend::build().finish()).layer(&test); + + op.delete("xxxxx").run().await.unwrap(); + + assert!(*test.deleted.clone().lock().await); +} diff --git a/common/dal2/tests/it/main.rs b/common/dal2/tests/it/main.rs index 91cba5b23ed6..d7f4b951b445 100644 --- a/common/dal2/tests/it/main.rs +++ b/common/dal2/tests/it/main.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod layer; mod ops; mod services; mod wraps;