From f712cf0910bf2ccedd2a18c1a33938e0ec35ed32 Mon Sep 17 00:00:00 2001 From: hoslo Date: Wed, 8 Jan 2025 21:28:12 +0800 Subject: [PATCH] feat(core): Implement list with deleted and versions for gcs --- core/src/services/gcs/backend.rs | 49 ++++++++++--- core/src/services/gcs/config.rs | 2 + core/src/services/gcs/core.rs | 83 ++++++++++++++++++--- core/src/services/gcs/delete.rs | 8 +- core/src/services/gcs/lister.rs | 121 +++++++++++++++++++++++++++++++ 5 files changed, 238 insertions(+), 25 deletions(-) diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index c61ff75ca290..d57425e5fc76 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -24,6 +24,7 @@ use bytes::Buf; use http::Response; use http::StatusCode; use log::debug; +use oio::PageLister; use reqsign::GoogleCredentialLoader; use reqsign::GoogleSigner; use reqsign::GoogleTokenLoad; @@ -34,7 +35,7 @@ use serde_json; use super::core::*; use super::delete::GcsDeleter; use super::error::parse_error; -use super::lister::GcsLister; +use super::lister::{GcsLister, GcsListers, GcsObjectVersionsLister}; use super::writer::GcsWriter; use super::writer::GcsWriters; use crate::raw::oio::BatchDeleter; @@ -192,6 +193,13 @@ impl GcsBuilder { self } + /// Set bucket versioning status for this backend + pub fn enable_versioning(mut self, enabled: bool) -> Self { + self.config.enable_versioning = enabled; + + self + } + /// Set the predefined acl for GCS. /// /// Available values are: @@ -326,6 +334,7 @@ impl Builder for GcsBuilder { predefined_acl: self.config.predefined_acl.clone(), default_storage_class: self.config.default_storage_class.clone(), allow_anonymous: self.config.allow_anonymous, + enable_versioning: self.config.enable_versioning, }), }; @@ -342,7 +351,7 @@ pub struct GcsBackend { impl Access for GcsBackend { type Reader = HttpBody; type Writer = GcsWriters; - type Lister = oio::PageLister; + type Lister = GcsListers; type Deleter = oio::BatchDeleter; type BlockingReader = (); type BlockingWriter = (); @@ -362,6 +371,7 @@ impl Access for GcsBackend { stat_has_content_md5: true, stat_has_content_length: true, stat_has_content_type: true, + stat_with_version: self.core.enable_versioning, stat_has_last_modified: true, stat_has_user_metadata: true, @@ -369,13 +379,14 @@ impl Access for GcsBackend { read_with_if_match: true, read_with_if_none_match: true, + read_with_version: self.core.enable_versioning, write: true, write_can_empty: true, write_can_multi: true, write_with_content_type: true, write_with_user_metadata: true, - write_with_if_not_exists: true, + write_with_if_not_exists: !self.core.enable_versioning, // The min multipart size of Gcs is 5 MiB. // @@ -392,6 +403,7 @@ impl Access for GcsBackend { delete: true, delete_max_size: Some(100), + delete_with_version: self.core.enable_versioning, copy: true, list: true, @@ -403,6 +415,8 @@ impl Access for GcsBackend { list_has_content_length: true, list_has_content_type: true, list_has_last_modified: true, + list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, presign: true, presign_stat: true, @@ -432,6 +446,7 @@ impl Access for GcsBackend { m.set_etag(&meta.etag); m.set_content_md5(&meta.md5_hash); + m.set_version(&meta.generation); let size = meta .size @@ -485,15 +500,23 @@ impl Access for GcsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let l = GcsLister::new( - self.core.clone(), - path, - args.recursive(), - args.limit(), - args.start_after(), - ); + let l = if args.versions() || args.deleted() { + TwoWays::Two(PageLister::new(GcsObjectVersionsLister::new( + self.core.clone(), + path, + args, + ))) + } else { + TwoWays::One(PageLister::new(GcsLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ))) + }; - Ok((RpList::default(), oio::PageLister::new(l))) + Ok((RpList::default(), l)) } async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result { @@ -554,6 +577,10 @@ struct GetObjectJsonResponse { /// /// For example: `"contentType": "image/png",` content_type: String, + /// Generation of this object. + /// + /// For example: `"generation": "1660563214863653"` + generation: String, /// Custom metadata of this object. /// /// For example: `"metadata" : { "my-key": "my-value" }` diff --git a/core/src/services/gcs/config.rs b/core/src/services/gcs/config.rs index 43ff15175942..fb12871dc0ee 100644 --- a/core/src/services/gcs/config.rs +++ b/core/src/services/gcs/config.rs @@ -53,6 +53,8 @@ pub struct GcsConfig { pub disable_vm_metadata: bool, /// Disable loading configuration from the environment. pub disable_config_load: bool, + /// Enable versioning for the bucket. + pub enable_versioning: bool, /// A Google Cloud OAuth2 token. /// /// Takes precedence over `credential` and `credential_path`. diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 3e06bf03b575..bc1b7b010673 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -51,6 +51,8 @@ pub mod constants { pub const X_GOOG_ACL: &str = "x-goog-acl"; pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class"; pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-"; + pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match"; + pub const GENERATION: &str = "generation"; } pub struct GcsCore { @@ -69,6 +71,7 @@ pub struct GcsCore { pub default_storage_class: Option, pub allow_anonymous: bool, + pub enable_versioning: bool, } impl Debug for GcsCore { @@ -184,13 +187,25 @@ impl GcsCore { ) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!( + let mut url = format!( "{}/storage/v1/b/{}/o/{}?alt=media", self.endpoint, self.bucket, percent_encode_path(&p) ); + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::GENERATION, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("&{}", query_args.join("&"))); + } + let mut req = Request::get(&url); if let Some(if_match) = args.if_match() { @@ -216,6 +231,10 @@ impl GcsCore { let mut req = Request::get(&url); + if let Some(version) = args.version() { + req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version); + } + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -363,13 +382,25 @@ impl GcsCore { pub fn gcs_head_object_request(&self, path: &str, args: &OpStat) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!( + let mut url = format!( "{}/storage/v1/b/{}/o/{}", self.endpoint, self.bucket, percent_encode_path(&p) ); + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::GENERATION, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } + let mut req = Request::get(&url); if let Some(if_none_match) = args.if_none_match() { @@ -393,7 +424,19 @@ impl GcsCore { ) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!("{}/{}/{}", self.endpoint, self.bucket, p); + let mut url = format!("{}/{}/{}", self.endpoint, self.bucket, p); + + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::GENERATION, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let mut req = Request::head(&url); @@ -422,35 +465,50 @@ impl GcsCore { self.send(req).await } - pub async fn gcs_delete_object(&self, path: &str) -> Result> { - let mut req = self.gcs_delete_object_request(path)?; + pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result> { + let mut req = self.gcs_delete_object_request(path, args)?; self.sign(&mut req).await?; self.send(req).await } - pub fn gcs_delete_object_request(&self, path: &str) -> Result> { + pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!( + let mut url = format!( "{}/storage/v1/b/{}/o/{}", self.endpoint, self.bucket, percent_encode_path(&p) ); + let mut query_args = Vec::new(); + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::GENERATION, + percent_decode_path(version) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } + Request::delete(&url) .body(Buffer::new()) .map_err(new_request_build_error) } - pub async fn gcs_delete_objects(&self, paths: Vec) -> Result> { + pub async fn gcs_delete_objects( + &self, + batch: Vec<(String, OpDelete)>, + ) -> Result> { let uri = format!("{}/batch/storage/v1", self.endpoint); let mut multipart = Multipart::new(); - for (idx, path) in paths.iter().enumerate() { - let req = self.gcs_delete_object_request(path)?; + for (idx, (path, args)) in batch.iter().enumerate() { + let req = self.gcs_delete_object_request(path, args.clone())?; multipart = multipart.part( MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), @@ -493,6 +551,7 @@ impl GcsCore { delimiter: &str, limit: Option, start_after: Option, + versions: bool, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -502,6 +561,9 @@ impl GcsCore { self.bucket, percent_encode_path(&p) ); + if versions { + write!(url, "&versions=true").expect("write into string must succeed"); + } if !delimiter.is_empty() { write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); } @@ -681,6 +743,7 @@ pub struct ListResponseItem { pub md5_hash: String, pub updated: String, pub content_type: String, + pub time_deleted: Option, } /// Result of CreateMultipartUpload diff --git a/core/src/services/gcs/delete.rs b/core/src/services/gcs/delete.rs index 241b6152edc3..03968fd36228 100644 --- a/core/src/services/gcs/delete.rs +++ b/core/src/services/gcs/delete.rs @@ -34,8 +34,8 @@ impl GcsDeleter { } impl oio::BatchDelete for GcsDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let resp = self.core.gcs_delete_object(&path).await?; + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + let resp = self.core.gcs_delete_object(&path, args).await?; // deleting not existing objects is ok if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter { } async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { - let paths: Vec = batch.into_iter().map(|(p, _)| p).collect(); - let resp = self.core.gcs_delete_objects(paths.clone()).await?; + let paths: Vec = batch.clone().into_iter().map(|(p, _)| p).collect(); + let resp = self.core.gcs_delete_objects(batch).await?; let status = resp.status(); diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index cd66e964f77b..3abc969919ee 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -25,6 +25,8 @@ use super::error::parse_error; use crate::raw::*; use crate::*; +pub type GcsListers = TwoWays, oio::PageLister>; + /// GcsLister takes over task of listing objects and /// helps walking directory pub struct GcsLister { @@ -74,6 +76,7 @@ impl oio::PageList for GcsLister { } else { None }, + false, ) .await?; @@ -134,3 +137,121 @@ impl oio::PageList for GcsLister { Ok(()) } } + +pub struct GcsObjectVersionsLister { + core: Arc, + + path: String, + deleted: bool, + delimiter: &'static str, + limit: Option, + + /// Filter results to objects whose names are lexicographically + /// **equal to or after** startOffset + start_after: Option, +} + +impl GcsObjectVersionsLister { + /// Generate a new directory walker + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + Self { + core, + + path: path.to_string(), + deleted: args.deleted(), + delimiter, + limit: args.limit(), + start_after: args.start_after().map(String::from), + } + } +} + +impl oio::PageList for GcsObjectVersionsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .gcs_list_objects( + &self.path, + &ctx.token, + self.delimiter, + self.limit, + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, + true, + ) + .await?; + + if !resp.status().is_success() { + return Err(parse_error(resp)); + } + let bytes = resp.into_body(); + + let output: ListResponse = + serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?; + + if let Some(token) = &output.next_page_token { + ctx.token.clone_from(token); + } else { + ctx.done = true; + } + + for prefix in output.prefixes { + let de = oio::Entry::new( + &build_rel_path(&self.core.root, &prefix), + Metadata::new(EntryMode::DIR), + ); + + ctx.entries.push_back(de); + } + + let mut item_map = std::collections::HashMap::new(); + for object in output.items { + // exclude the inclusive start_after itself + let mut path = build_rel_path(&self.core.root, &object.name); + if path.is_empty() { + path = "/".to_string(); + } + if self.start_after.as_ref() == Some(&path) { + continue; + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + + // set metadata fields + meta.set_content_md5(object.md5_hash.as_str()); + meta.set_etag(object.etag.as_str()); + + let size = object.size.parse().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse u64 from list response").set_source(e) + })?; + meta.set_content_length(size); + if !object.content_type.is_empty() { + meta.set_content_type(&object.content_type); + } + + meta.set_last_modified(parse_datetime_from_rfc3339(object.updated.as_str())?); + if object.time_deleted.is_some() { + meta.set_is_deleted(true); + } else { + meta.set_is_current(true); + } + + item_map.insert(path, meta); + } + for (path, meta) in item_map { + // `list` must be additive, so we need to include the latest version object + // + // If `deleted` is true, we include all deleted objects. + if (self.deleted && meta.is_deleted()) || meta.is_current() == Some(true) { + let de = oio::Entry::with(path, meta); + ctx.entries.push_back(de); + } + } + + Ok(()) + } +}