From 5d1f1dd02e66545ab0090de3187acf0eb247c88e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 7 Feb 2022 17:52:58 +0800 Subject: [PATCH 1/3] dal2: Implement DAL Layer support Signed-off-by: Xuanwo --- common/dal2/src/accessor.rs | 21 ++++++++++++ common/dal2/src/layer.rs | 47 ++++++++++++++++++++++++++ common/dal2/src/lib.rs | 3 ++ common/dal2/src/operator.rs | 8 +++++ common/dal2/tests/it/layer.rs | 63 +++++++++++++++++++++++++++++++++++ common/dal2/tests/it/main.rs | 1 + 6 files changed, 143 insertions(+) create mode 100644 common/dal2/src/layer.rs create mode 100644 common/dal2/tests/it/layer.rs diff --git a/common/dal2/src/accessor.rs b/common/dal2/src/accessor.rs index 9c2c9981ee24..3f7a330ab5b3 100644 --- a/common/dal2/src/accessor.rs +++ b/common/dal2/src/accessor.rs @@ -11,6 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use std::sync::Arc; + use async_trait::async_trait; use futures::AsyncRead; @@ -51,3 +54,21 @@ pub trait Accessor: Send + Sync { unimplemented!() } } + +/// All functions in `Accessor` only requires `&self`, so it's safe to implement +/// `Accessor` for `Arc`. +#[async_trait] +impl Accessor for Arc { + async fn read(&self, args: &OpRead) -> Result { + self.as_ref().read(args).await + } + async fn write(&self, r: Reader, args: &OpWrite) -> Result { + self.as_ref().write(r, args).await + } + async fn stat(&self, args: &OpStat) -> Result { + self.as_ref().stat(args).await + } + async fn delete(&self, args: &OpDelete) -> Result<()> { + self.as_ref().delete(args).await + } +} diff --git a/common/dal2/src/layer.rs b/common/dal2/src/layer.rs new file mode 100644 index 000000000000..a32aedfb85cd --- /dev/null +++ b/common/dal2/src/layer.rs @@ -0,0 +1,47 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::Accessor; + +/// Layer is used to intercept the operations on the underlying storage. +/// +/// Struct that implement this trait must accept input `Arc` as inner, +/// and returns a new `Arc` as output. +/// +/// All functions in `Accessor` requires `&self`, so it's implementor's responsibility +/// to maintain the internal mutability. Please also keep in mind that `Accessor` +/// requires `Send` and `Sync`. +/// +/// # Examples +/// +/// ``` +/// use dal2::{Accessor, Layer}; +/// +/// struct Trace { +/// inner: Arc, +/// } +/// +/// impl Accessor for Trace {} +/// +/// impl Layer for Trace { +/// fn layer(&self, inner: Arc) -> Arc { +/// Arc::new(Trace { inner }) +/// } +/// } +/// ``` +pub trait Layer { + fn layer(&self, inner: Arc) -> Arc; +} diff --git a/common/dal2/src/lib.rs b/common/dal2/src/lib.rs index 0e8d0077d59d..764566a825d9 100644 --- a/common/dal2/src/lib.rs +++ b/common/dal2/src/lib.rs @@ -20,6 +20,9 @@ mod accessor; pub use accessor::Accessor; pub use accessor::Reader; +mod layer; +pub use layer::Layer; + mod operator; pub use operator::Operator; diff --git a/common/dal2/src/operator.rs b/common/dal2/src/operator.rs index c03315a1753f..08a6d4f610bc 100644 --- a/common/dal2/src/operator.rs +++ b/common/dal2/src/operator.rs @@ -19,6 +19,7 @@ use crate::ops::OpRead; use crate::ops::OpStat; use crate::ops::OpWrite; use crate::Accessor; +use crate::Layer; #[derive(Clone)] pub struct Operator { @@ -30,6 +31,13 @@ impl Operator { Self { accessor } } + #[must_use] + pub fn layer(self, layer: impl Layer) -> Self { + Operator { + accessor: layer.layer(self.accessor.clone()), + } + } + pub fn inner(&self) -> Arc { self.accessor.clone() } diff --git a/common/dal2/tests/it/layer.rs b/common/dal2/tests/it/layer.rs new file mode 100644 index 000000000000..75a15f7573cb --- /dev/null +++ b/common/dal2/tests/it/layer.rs @@ -0,0 +1,63 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait; +use common_dal2::ops::OpDelete; +use common_dal2::services::fs; +use common_dal2::Accessor; +use common_dal2::Layer; +use common_dal2::Operator; +use futures::lock::Mutex; + +struct Test { + #[allow(dead_code)] + inner: Arc, + deleted: Arc>, +} + +impl Layer for &Test { + fn layer(&self, inner: Arc) -> Arc { + Arc::new(Test { + inner: inner.clone(), + deleted: self.deleted.clone(), + }) + } +} + +#[async_trait::async_trait] +impl Accessor for Test { + async fn delete(&self, _args: &OpDelete) -> common_dal2::error::Result<()> { + let mut x = self.deleted.lock().await; + *x = true; + + // We will not call anything here to test the layer. + Ok(()) + } +} + +#[tokio::test] +async fn test_layer() { + let test = Test { + inner: Arc::new(fs::Backend::build().finish()), + deleted: Arc::new(Mutex::new(false)), + }; + + let op = Operator::new(fs::Backend::build().finish()).layer(&test); + + op.delete("xxxxx").run().await.unwrap(); + + assert_eq!(true, test.deleted.clone().lock().await.clone()); +} diff --git a/common/dal2/tests/it/main.rs b/common/dal2/tests/it/main.rs index 91cba5b23ed6..d7f4b951b445 100644 --- a/common/dal2/tests/it/main.rs +++ b/common/dal2/tests/it/main.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod layer; mod ops; mod services; mod wraps; From af4b28b6602259aa0393bba16d51404885daed60 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 7 Feb 2022 18:06:26 +0800 Subject: [PATCH 2/3] Make clippy happy Signed-off-by: Xuanwo --- common/dal2/tests/it/layer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/dal2/tests/it/layer.rs b/common/dal2/tests/it/layer.rs index 75a15f7573cb..47216aa1e398 100644 --- a/common/dal2/tests/it/layer.rs +++ b/common/dal2/tests/it/layer.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use async_trait; use common_dal2::ops::OpDelete; use common_dal2::services::fs; use common_dal2::Accessor; @@ -59,5 +58,5 @@ async fn test_layer() { op.delete("xxxxx").run().await.unwrap(); - assert_eq!(true, test.deleted.clone().lock().await.clone()); + assert!(test.deleted.clone().lock().await.clone()); } From 6d2d2eb787b3753e79d6e9833c58f66340a27196 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 7 Feb 2022 18:25:58 +0800 Subject: [PATCH 3/3] Make clippy happy Signed-off-by: Xuanwo --- common/dal2/tests/it/layer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dal2/tests/it/layer.rs b/common/dal2/tests/it/layer.rs index 47216aa1e398..41e8534039a7 100644 --- a/common/dal2/tests/it/layer.rs +++ b/common/dal2/tests/it/layer.rs @@ -58,5 +58,5 @@ async fn test_layer() { op.delete("xxxxx").run().await.unwrap(); - assert!(test.deleted.clone().lock().await.clone()); + assert!(*test.deleted.clone().lock().await); }