Skip to content

Commit

Permalink
feat(core): add version(bool) for List operation to include version d… (
Browse files Browse the repository at this point in the history
#5106)

* feat(core): add version(bool) for List operation to include version during list or not

* fix docs

* fix cargo clippy

* rename bucket_versioning_enabled to enable_versioning && support start_after

* enable bucket versioning in ceph_rados_s3 action

* add versioning test action for minio and ceph_radios

* disable ceph radios with versioning action

* handle markers internally in S3
  • Loading branch information
meteorgan authored Sep 19, 2024
1 parent da9a685 commit 9313561
Show file tree
Hide file tree
Showing 13 changed files with 616 additions and 29 deletions.
4 changes: 3 additions & 1 deletion .github/services/s3/0_minio_s3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ runs:
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
AWS_EC2_METADATA_DISABLED: "true"
run: aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
run: |
aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
- name: Setup
shell: bash
run: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: ceph_rados_s3_with_versioning
description: 'Behavior test for CEPH OBJECT GATEWAY S3 with bucket versioning enabled'

runs:
using: "composite"
steps:
- name: Setup Ceph Rados Server
shell: bash
working-directory: fixtures/s3
run: docker compose -f docker-compose-ceph-rados.yml up -d --wait

# ceph/demo has support for CEPH_DEMO_BUCKET, but it doesn't work as expected.
- name: Create bucket
shell: bash
working-directory: fixtures/s3
run: |
docker exec ceph-demo s3cmd mb s3://demo
docker exec ceph-demo s3cmd setversioning s3://demo enable
- name: Setup
shell: bash
run: |
cat << EOF >> $GITHUB_ENV
OPENDAL_S3_BUCKET=demo
OPENDAL_S3_ENDPOINT=http://127.0.0.1:8080
OPENDAL_S3_ACCESS_KEY_ID=demo
OPENDAL_S3_SECRET_ACCESS_KEY=demo
OPENDAL_S3_REGION=us-east-1
OPENDAL_S3_ENABLE_VERSIONING=true
EOF
48 changes: 48 additions & 0 deletions .github/services/s3/minio_s3_with_versioning/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: minio_s3_with_versioning
description: 'Behavior test for Minio S3 with bucket versioning enabled'

runs:
using: "composite"
steps:
- name: Setup MinIO Server
shell: bash
working-directory: fixtures/s3
run: docker compose -f docker-compose-minio.yml up -d --wait
- name: Setup test bucket
shell: bash
env:
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
AWS_EC2_METADATA_DISABLED: "true"
run: |
aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
aws --endpoint-url http://127.0.0.1:9000/ s3api put-bucket-versioning --bucket test --versioning-configuration Status=Enabled
- name: Setup
shell: bash
run: |
cat << EOF >> $GITHUB_ENV
OPENDAL_S3_BUCKET=test
OPENDAL_S3_ENDPOINT=http://127.0.0.1:9000
OPENDAL_S3_ACCESS_KEY_ID=minioadmin
OPENDAL_S3_SECRET_ACCESS_KEY=minioadmin
OPENDAL_S3_REGION=us-east-1
OPENDAL_S3_ENABLE_VERSIONING=true
EOF
9 changes: 9 additions & 0 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
}
}

impl<ONE: oio::List, TWO: oio::List> oio::List for TwoWays<ONE, TWO> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
match self {
Self::One(v) => v.next().await,
Self::Two(v) => v.next().await,
}
}
}

/// ThreeWays is used to implement traits that based on three ways.
///
/// Users can wrap three different trait types together.
Expand Down
5 changes: 2 additions & 3 deletions core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::future::Future;

use crate::raw::*;
use crate::*;
use std::collections::VecDeque;
use std::future::Future;

/// PageList is used to implement [`oio::List`] based on API supporting pagination. By implementing
/// PageList, services don't need to care about the details of page list.
Expand Down
39 changes: 29 additions & 10 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
use std::collections::HashMap;
use std::time::Duration;

use flagset::FlagSet;

use crate::raw::*;
use crate::*;
use flagset::FlagSet;

/// Args for `create` operation.
///
Expand Down Expand Up @@ -100,6 +99,14 @@ pub struct OpList {
/// - If this is set to > 1, the list operation will be concurrent,
/// and the maximum number of concurrent operations will be determined by this value.
concurrent: usize,
/// The version is used to control whether the object versions should be returned.
///
/// - If `false`, list operation will not return with object versions
/// - If `true`, list operation will return with object versions if object versioning is supported
/// by the underlying service
///
/// Default to `false`
version: bool,
}

impl Default for OpList {
Expand All @@ -111,6 +118,7 @@ impl Default for OpList {
// By default, we want to know what's the mode of this entry.
metakey: Metakey::Mode.into(),
concurrent: 1,
version: false,
}
}
}
Expand Down Expand Up @@ -184,6 +192,17 @@ impl OpList {
pub fn concurrent(&self) -> usize {
self.concurrent
}

/// Change the version of this list operation
pub fn with_version(mut self, version: bool) -> Self {
self.version = version;
self
}

/// Get the version of this list operation
pub fn version(&self) -> bool {
self.version
}
}

/// Args for `presign` operation.
Expand Down Expand Up @@ -333,36 +352,36 @@ impl OpRead {
&mut self.range
}

/// Sets the content-disposition header that should be send back by the remote read operation.
/// Sets the content-disposition header that should be sent back by the remote read operation.
pub fn with_override_content_disposition(mut self, content_disposition: &str) -> Self {
self.override_content_disposition = Some(content_disposition.into());
self
}

/// Returns the content-disposition header that should be send back by the remote read
/// Returns the content-disposition header that should be sent back by the remote read
/// operation.
pub fn override_content_disposition(&self) -> Option<&str> {
self.override_content_disposition.as_deref()
}

/// Sets the cache-control header that should be send back by the remote read operation.
/// Sets the cache-control header that should be sent back by the remote read operation.
pub fn with_override_cache_control(mut self, cache_control: &str) -> Self {
self.override_cache_control = Some(cache_control.into());
self
}

/// Returns the cache-control header that should be send back by the remote read operation.
/// Returns the cache-control header that should be sent back by the remote read operation.
pub fn override_cache_control(&self) -> Option<&str> {
self.override_cache_control.as_deref()
}

/// Sets the content-type header that should be send back by the remote read operation.
/// Sets the content-type header that should be sent back by the remote read operation.
pub fn with_override_content_type(mut self, content_type: &str) -> Self {
self.override_content_type = Some(content_type.into());
self
}

/// Returns the content-type header that should be send back by the remote read operation.
/// Returns the content-type header that should be sent back by the remote read operation.
pub fn override_content_type(&self) -> Option<&str> {
self.override_content_type.as_deref()
}
Expand Down Expand Up @@ -526,13 +545,13 @@ impl OpStat {
self.if_none_match.as_deref()
}

/// Sets the content-disposition header that should be send back by the remote read operation.
/// Sets the content-disposition header that should be sent back by the remote read operation.
pub fn with_override_content_disposition(mut self, content_disposition: &str) -> Self {
self.override_content_disposition = Some(content_disposition.into());
self
}

/// Returns the content-disposition header that should be send back by the remote read
/// Returns the content-disposition header that should be sent back by the remote read
/// operation.
pub fn override_content_disposition(&self) -> Option<&str> {
self.override_content_disposition.as_deref()
Expand Down
48 changes: 38 additions & 10 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use reqwest::Url;
use super::core::*;
use super::error::parse_error;
use super::error::parse_s3_error_code;
use super::lister::S3Lister;
use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
use super::writer::S3Writer;
use super::writer::S3Writers;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::S3Config;
use crate::*;
Expand Down Expand Up @@ -457,6 +458,13 @@ impl S3Builder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Check if `bucket` is valid
/// `bucket` must be not empty and if `enable_virtual_host_style` is true
/// it couldn't contain dot(.) character
Expand Down Expand Up @@ -856,6 +864,7 @@ impl Builder for S3Builder {
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
disable_stat_with_override: self.config.disable_stat_with_override,
enable_versioning: self.config.enable_versioning,
signer,
loader,
credential_loaded: AtomicBool::new(false),
Expand All @@ -876,7 +885,7 @@ pub struct S3Backend {
impl Access for S3Backend {
type Reader = HttpBody;
type Writer = S3Writers;
type Lister = oio::PageLister<S3Lister>;
type Lister = S3Listers;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
Expand Down Expand Up @@ -929,6 +938,7 @@ impl Access for S3Backend {
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
list_with_version: self.core.enable_versioning,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -1027,14 +1037,32 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
if args.version() && !self.core.enable_versioning {
return Err(Error::new(
ErrorKind::Unsupported,
"the bucket doesn't enable versioning",
));
}

let l = if args.version() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct S3Config {
///
/// required.
pub bucket: String,
/// is bucket versioning enabled for this bucket
pub enable_versioning: bool,
/// endpoint of this backend.
///
/// Endpoint must be full uri, e.g.
Expand Down
Loading

0 comments on commit 9313561

Please sign in to comment.