diff --git a/.github/services/oss/oss_with_versioning/action.yml b/.github/services/oss/oss_with_versioning/action.yml new file mode 100644 index 000000000000..d92e980a39d9 --- /dev/null +++ b/.github/services/oss/oss_with_versioning/action.yml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: oss_with_versioning +description: "Behavior test for OSS with bucket versioning. This service is sponsored by @datafuse_labs." + +runs: + using: "composite" + steps: + - name: Setup + uses: 1password/load-secrets-action@v1 + with: + export-env: true + env: + OPENDAL_OSS_BUCKET: op://services/oss/versioning_bucket + OPENDAL_OSS_ENDPOINT: op://services/oss/endpoint + OPENDAL_OSS_ACCESS_KEY_ID: op://services/oss/access_key_id + OPENDAL_OSS_ACCESS_KEY_SECRET: op://services/oss/access_key_secret + + - name: Add extra settings + shell: bash + run: | + echo "OPENDAL_OSS_ENABLE_VERSIONING=true" >> $GITHUB_ENV diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index eb7616d148e9..727e62d72d2a 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -30,7 +30,7 @@ use reqsign::AliyunOssSigner; use super::core::*; use super::delete::OssDeleter; use super::error::parse_error; -use super::lister::OssLister; +use super::lister::{OssLister, OssListers, OssObjectVersionsLister}; use super::writer::OssWriter; use super::writer::OssWriters; use crate::raw::*; @@ -97,6 +97,13 @@ impl OssBuilder { self } + /// Set bucket versioning status for this backend + pub fn enable_versioning(mut self, enabled: bool) -> Self { + self.config.enable_versioning = enabled; + + self + } + /// Set an endpoint for generating presigned urls. /// /// You can offer a public endpoint like to return a presinged url for @@ -408,6 +415,7 @@ impl Builder for OssBuilder { host, presign_endpoint, allow_anonymous: self.config.allow_anonymous, + enable_versioning: self.config.enable_versioning, signer, loader, client, @@ -428,7 +436,7 @@ pub struct OssBackend { impl Access for OssBackend { type Reader = HttpBody; type Writer = OssWriters; - type Lister = oio::PageLister; + type Lister = OssListers; type Deleter = oio::BatchDeleter; type BlockingReader = (); type BlockingWriter = (); @@ -449,16 +457,19 @@ impl Access for OssBackend { stat_has_content_type: true, stat_has_content_encoding: true, stat_has_content_range: true, + stat_with_version: self.core.enable_versioning, stat_has_etag: true, stat_has_content_md5: true, stat_has_last_modified: true, stat_has_content_disposition: true, stat_has_user_metadata: true, + stat_has_version: true, read: true, read_with_if_match: true, read_with_if_none_match: true, + read_with_version: self.core.enable_versioning, read_with_if_modified_since: true, read_with_if_unmodified_since: true, @@ -470,7 +481,7 @@ impl Access for OssBackend { write_with_content_type: true, write_with_content_disposition: true, // TODO: set this to false while version has been enabled. - write_with_if_not_exists: true, + write_with_if_not_exists: !self.core.enable_versioning, // The min multipart size of OSS is 100 KiB. // @@ -487,6 +498,7 @@ impl Access for OssBackend { write_with_user_metadata: true, delete: true, + delete_with_version: self.core.enable_versioning, delete_max_size: Some(self.core.delete_max_size), copy: true, @@ -497,6 +509,8 @@ impl Access for OssBackend { list_with_recursive: true, list_has_etag: true, list_has_content_md5: true, + list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, list_has_content_length: true, list_has_last_modified: true, @@ -574,14 +588,23 @@ impl Access for OssBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let l = OssLister::new( - self.core.clone(), - path, - args.recursive(), - args.limit(), - args.start_after(), - ); - Ok((RpList::default(), oio::PageLister::new(l))) + let l = if args.versions() || args.deleted() { + TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new( + self.core.clone(), + path, + args, + ))) + } else { + TwoWays::One(oio::PageLister::new(OssLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ))) + }; + + Ok((RpList::default(), l)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/oss/config.rs b/core/src/services/oss/config.rs index 73259f3e3105..1e44888d2693 100644 --- a/core/src/services/oss/config.rs +++ b/core/src/services/oss/config.rs @@ -36,6 +36,9 @@ pub struct OssConfig { /// Bucket for oss. pub bucket: String, + /// is bucket versioning enabled for this bucket + pub enable_versioning: bool, + // OSS features /// Server side encryption for oss. pub server_side_encryption: Option, diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 2467159d4c6d..7c2ecca0018b 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -70,6 +70,7 @@ pub struct OssCore { pub endpoint: String, pub presign_endpoint: String, pub allow_anonymous: bool, + pub enable_versioning: bool, pub server_side_encryption: Option, pub server_side_encryption_key_id: Option, @@ -520,6 +521,50 @@ impl OssCore { self.send(req).await } + pub async fn oss_list_object_versions( + &self, + prefix: &str, + delimiter: &str, + limit: Option, + key_marker: &str, + version_id_marker: &str, + ) -> Result> { + let p = build_abs_path(&self.root, prefix); + + let mut url = format!("{}?versions", self.endpoint); + if !p.is_empty() { + write!(url, "&prefix={}", percent_encode_path(p.as_str())) + .expect("write into string must succeed"); + } + if !delimiter.is_empty() { + write!(url, "&delimiter={}", delimiter).expect("write into string must succeed"); + } + + if let Some(limit) = limit { + write!(url, "&max-keys={}", limit).expect("write into string must succeed"); + } + if !key_marker.is_empty() { + write!(url, "&key-marker={}", percent_encode_path(key_marker)) + .expect("write into string must succeed"); + } + if !version_id_marker.is_empty() { + write!( + url, + "&version-id-marker={}", + percent_encode_path(version_id_marker) + ) + .expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> Result> { let mut req = self.oss_delete_object_request(path, args)?; self.sign(&mut req).await?; @@ -768,6 +813,45 @@ pub struct CommonPrefix { pub prefix: String, } +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct OutputCommonPrefix { + pub prefix: String, +} + +/// Output of ListObjectVersions +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct ListObjectVersionsOutput { + pub is_truncated: Option, + pub next_key_marker: Option, + pub next_version_id_marker: Option, + pub common_prefixes: Vec, + pub version: Vec, + pub delete_marker: Vec, +} + +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputVersion { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub size: u64, + pub last_modified: String, + #[serde(rename = "ETag")] + pub etag: Option, +} + +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputDeleteMarker { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub last_modified: String, +} + #[cfg(test)] mod tests { use bytes::Buf; diff --git a/core/src/services/oss/lister.rs b/core/src/services/oss/lister.rs index e69096e50db2..286147a00e10 100644 --- a/core/src/services/oss/lister.rs +++ b/core/src/services/oss/lister.rs @@ -22,9 +22,12 @@ use quick_xml::de; use super::core::*; use super::error::parse_error; +use crate::raw::oio::PageContext; use crate::raw::*; use crate::*; +pub type OssListers = TwoWays, oio::PageLister>; + pub struct OssLister { core: Arc, @@ -115,3 +118,139 @@ impl oio::PageList for OssLister { Ok(()) } } + +/// refer: https://help.aliyun.com/zh/oss/developer-reference/listobjectversions?spm=a2c4g.11186623.help-menu-31815.d_3_1_1_5_5_2.53f67237GJlMPw&scm=20140722.H_112467._.OR_help-T_cn~zh-V_1 +pub struct OssObjectVersionsLister { + core: Arc, + + prefix: String, + args: OpList, + + delimiter: &'static str, + abs_start_after: Option, +} + +impl OssObjectVersionsLister { + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); + + Self { + core, + prefix: path.to_string(), + args, + delimiter, + abs_start_after, + } + } +} + +impl oio::PageList for OssObjectVersionsLister { + async fn next_page(&self, ctx: &mut PageContext) -> Result<()> { + let markers = ctx.token.rsplit_once(" "); + let (key_marker, version_id_marker) = if let Some(data) = markers { + data + } else if let Some(start_after) = &self.abs_start_after { + (start_after.as_str(), "") + } else { + ("", "") + }; + + let resp = self + .core + .oss_list_object_versions( + &self.prefix, + self.delimiter, + self.args.limit(), + key_marker, + version_id_marker, + ) + .await?; + if resp.status() != http::StatusCode::OK { + return Err(parse_error(resp)); + } + + let body = resp.into_body(); + let output: ListObjectVersionsOutput = de::from_reader(body.reader()) + .map_err(new_xml_deserialize_error) + // Allow Cos list to retry on XML deserialization errors. + // + // This is because the Cos list API may return incomplete XML data under high load. + // We are confident that our XML decoding logic is correct. When this error occurs, + // we allow retries to obtain the correct data. + .map_err(Error::set_temporary)?; + + ctx.done = if let Some(is_truncated) = output.is_truncated { + !is_truncated + } else { + false + }; + ctx.token = format!( + "{} {}", + output.next_key_marker.unwrap_or_default(), + output.next_version_id_marker.unwrap_or_default() + ); + + for prefix in output.common_prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix.prefix), + Metadata::new(EntryMode::DIR), + ); + ctx.entries.push_back(de); + } + + for version_object in output.version { + // `list` must be additive, so we need to include the latest version object + // even if `versions` is not enabled. + // + // Here we skip all non-latest version objects if `versions` is not enabled. + if !(self.args.versions() || version_object.is_latest) { + continue; + } + + let mut path = build_rel_path(&self.core.root, &version_object.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + meta.set_version(&version_object.version_id); + meta.set_is_current(version_object.is_latest); + meta.set_content_length(version_object.size); + meta.set_last_modified(parse_datetime_from_rfc3339( + version_object.last_modified.as_str(), + )?); + if let Some(etag) = version_object.etag { + meta.set_etag(&etag); + meta.set_content_md5(etag.trim_matches('"')); + } + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); + } + + if self.args.deleted() { + for delete_marker in output.delete_marker { + let mut path = build_rel_path(&self.core.root, &delete_marker.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_version(&delete_marker.version_id); + meta.set_is_deleted(true); + meta.set_is_current(delete_marker.is_latest); + meta.set_last_modified(parse_datetime_from_rfc3339( + delete_marker.last_modified.as_str(), + )?); + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); + } + } + + Ok(()) + } +}