Skip to content

Commit

Permalink
Merge pull request #4067 from Xuanwo/dal2
Browse files Browse the repository at this point in the history
dal2: Implement DAL Layer support
  • Loading branch information
databend-bot authored Feb 7, 2022
2 parents 8727416 + 6ea64e5 commit 72cde46
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 0 deletions.
21 changes: 21 additions & 0 deletions common/dal2/src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::AsyncRead;

Expand Down Expand Up @@ -51,3 +54,21 @@ pub trait Accessor: Send + Sync {
unimplemented!()
}
}

/// All functions in `Accessor` only requires `&self`, so it's safe to implement
/// `Accessor` for `Arc<dyn Accessor>`.
#[async_trait]
impl Accessor for Arc<dyn Accessor> {
async fn read(&self, args: &OpRead) -> Result<Reader> {
self.as_ref().read(args).await
}
async fn write(&self, r: Reader, args: &OpWrite) -> Result<usize> {
self.as_ref().write(r, args).await
}
async fn stat(&self, args: &OpStat) -> Result<Object> {
self.as_ref().stat(args).await
}
async fn delete(&self, args: &OpDelete) -> Result<()> {
self.as_ref().delete(args).await
}
}
47 changes: 47 additions & 0 deletions common/dal2/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use crate::Accessor;

/// Layer is used to intercept the operations on the underlying storage.
///
/// Struct that implement this trait must accept input `Arc<dyn Accessor>` as inner,
/// and returns a new `Arc<dyn Accessor>` as output.
///
/// All functions in `Accessor` requires `&self`, so it's implementor's responsibility
/// to maintain the internal mutability. Please also keep in mind that `Accessor`
/// requires `Send` and `Sync`.
///
/// # Examples
///
/// ```
/// use dal2::{Accessor, Layer};
///
/// struct Trace {
/// inner: Arc<dyn Accessor>,
/// }
///
/// impl Accessor for Trace {}
///
/// impl Layer for Trace {
/// fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
/// Arc::new(Trace { inner })
/// }
/// }
/// ```
pub trait Layer {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor>;
}
3 changes: 3 additions & 0 deletions common/dal2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mod accessor;
pub use accessor::Accessor;
pub use accessor::Reader;

mod layer;
pub use layer::Layer;

mod operator;
pub use operator::Operator;

Expand Down
8 changes: 8 additions & 0 deletions common/dal2/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::Layer;

#[derive(Clone)]
pub struct Operator {
Expand All @@ -30,6 +31,13 @@ impl Operator {
Self { accessor }
}

#[must_use]
pub fn layer(self, layer: impl Layer) -> Self {
Operator {
accessor: layer.layer(self.accessor.clone()),
}
}

pub fn inner(&self) -> Arc<dyn Accessor> {
self.accessor.clone()
}
Expand Down
62 changes: 62 additions & 0 deletions common/dal2/tests/it/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_dal2::ops::OpDelete;
use common_dal2::services::fs;
use common_dal2::Accessor;
use common_dal2::Layer;
use common_dal2::Operator;
use futures::lock::Mutex;

struct Test {
#[allow(dead_code)]
inner: Arc<dyn Accessor>,
deleted: Arc<Mutex<bool>>,
}

impl Layer for &Test {
fn layer(&self, inner: Arc<dyn Accessor>) -> Arc<dyn Accessor> {
Arc::new(Test {
inner: inner.clone(),
deleted: self.deleted.clone(),
})
}
}

#[async_trait::async_trait]
impl Accessor for Test {
async fn delete(&self, _args: &OpDelete) -> common_dal2::error::Result<()> {
let mut x = self.deleted.lock().await;
*x = true;

// We will not call anything here to test the layer.
Ok(())
}
}

#[tokio::test]
async fn test_layer() {
let test = Test {
inner: Arc::new(fs::Backend::build().finish()),
deleted: Arc::new(Mutex::new(false)),
};

let op = Operator::new(fs::Backend::build().finish()).layer(&test);

op.delete("xxxxx").run().await.unwrap();

assert!(*test.deleted.clone().lock().await);
}
1 change: 1 addition & 0 deletions common/dal2/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod layer;
mod ops;
mod services;
mod wraps;

0 comments on commit 72cde46

Please sign in to comment.