Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dal2: Implement DAL Layer support #4067

Merged
merged 4 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions common/dal2/src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::AsyncRead;

Expand Down Expand Up @@ -51,3 +54,21 @@ pub trait Accessor: Send + Sync {
unimplemented!()
}
}

/// All functions in `Accessor` only requires `&self`, so it's safe to implement
/// `Accessor` for `Arc<dyn Accessor>`.
#[async_trait]
impl Accessor for Arc<dyn Accessor> {
async fn read(&self, args: &OpRead) -> Result<Reader> {
self.as_ref().read(args).await
}
async fn write(&self, r: Reader, args: &OpWrite) -> Result<usize> {
self.as_ref().write(r, args).await
}
async fn stat(&self, args: &OpStat) -> Result<Object> {
self.as_ref().stat(args).await
}
async fn delete(&self, args: &OpDelete) -> Result<()> {
self.as_ref().delete(args).await
}
}
47 changes: 47 additions & 0 deletions common/dal2/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use crate::Accessor;

/// Layer is used to intercept the operations on the underlying storage.
///
/// Struct that implement this trait must accept input `Arc<dyn Accessor>` as inner,
/// and returns a new `Arc<dyn Accessor>` as output.
///
/// All functions in `Accessor` requires `&self`, so it's implementor's responsibility
/// to maintain the internal mutability. Please also keep in mind that `Accessor`
/// requires `Send` and `Sync`.
///
/// # Examples
///
/// ```
/// use dal2::{Accessor, Layer};
///
/// struct Trace {
/// inner: Arc<dyn Accessor>,
/// }
///
/// impl Accessor for Trace {}
///
/// impl Layer for Trace {
/// fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
/// Arc::new(Trace { inner })
/// }
/// }
/// ```
pub trait Layer {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor>;
}
3 changes: 3 additions & 0 deletions common/dal2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mod accessor;
pub use accessor::Accessor;
pub use accessor::Reader;

mod layer;
pub use layer::Layer;

mod operator;
pub use operator::Operator;

Expand Down
8 changes: 8 additions & 0 deletions common/dal2/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::Layer;

#[derive(Clone)]
pub struct Operator {
Expand All @@ -30,6 +31,13 @@ impl Operator {
Self { accessor }
}

#[must_use]
pub fn layer(self, layer: impl Layer) -> Self {
Operator {
accessor: layer.layer(self.accessor.clone()),
}
}

pub fn inner(&self) -> Arc<dyn Accessor> {
self.accessor.clone()
}
Expand Down
62 changes: 62 additions & 0 deletions common/dal2/tests/it/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_dal2::ops::OpDelete;
use common_dal2::services::fs;
use common_dal2::Accessor;
use common_dal2::Layer;
use common_dal2::Operator;
use futures::lock::Mutex;

struct Test {
#[allow(dead_code)]
inner: Arc<dyn Accessor>,
deleted: Arc<Mutex<bool>>,
}

impl Layer for &Test {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
Arc::new(Test {
inner: inner.clone(),
deleted: self.deleted.clone(),
})
}
}

#[async_trait::async_trait]
impl Accessor for Test {
async fn delete(&self, _args: &OpDelete) -> common_dal2::error::Result<()> {
let mut x = self.deleted.lock().await;
*x = true;

// We will not call anything here to test the layer.
Ok(())
}
}

#[tokio::test]
async fn test_layer() {
let test = Test {
inner: Arc::new(fs::Backend::build().finish()),
deleted: Arc::new(Mutex::new(false)),
};

let op = Operator::new(fs::Backend::build().finish()).layer(&test);

op.delete("xxxxx").run().await.unwrap();

assert!(*test.deleted.clone().lock().await);
}
1 change: 1 addition & 0 deletions common/dal2/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod layer;
mod ops;
mod services;
mod wraps;