Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Implement list with deleted and versions for gcs #5548

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
use oio::PageLister;
use reqsign::GoogleCredentialLoader;
use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
Expand All @@ -34,7 +35,7 @@ use serde_json;
use super::core::*;
use super::delete::GcsDeleter;
use super::error::parse_error;
use super::lister::GcsLister;
use super::lister::{GcsLister, GcsListers, GcsObjectVersionsLister};
use super::writer::GcsWriter;
use super::writer::GcsWriters;
use crate::raw::oio::BatchDeleter;
Expand Down Expand Up @@ -192,6 +193,13 @@ impl GcsBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Set the predefined acl for GCS.
///
/// Available values are:
Expand Down Expand Up @@ -326,6 +334,7 @@ impl Builder for GcsBuilder {
predefined_acl: self.config.predefined_acl.clone(),
default_storage_class: self.config.default_storage_class.clone(),
allow_anonymous: self.config.allow_anonymous,
enable_versioning: self.config.enable_versioning,
}),
};

Expand All @@ -342,7 +351,7 @@ pub struct GcsBackend {
impl Access for GcsBackend {
type Reader = HttpBody;
type Writer = GcsWriters;
type Lister = oio::PageLister<GcsLister>;
type Lister = GcsListers;
type Deleter = oio::BatchDeleter<GcsDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -362,20 +371,22 @@ impl Access for GcsBackend {
stat_has_content_md5: true,
stat_has_content_length: true,
stat_has_content_type: true,
stat_with_version: self.core.enable_versioning,
stat_has_last_modified: true,
stat_has_user_metadata: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,

write: true,
write_can_empty: true,
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
write_with_if_not_exists: true,
write_with_if_not_exists: !self.core.enable_versioning,

// The min multipart size of Gcs is 5 MiB.
//
Expand All @@ -392,6 +403,7 @@ impl Access for GcsBackend {

delete: true,
delete_max_size: Some(100),
delete_with_version: self.core.enable_versioning,
copy: true,

list: true,
Expand All @@ -403,6 +415,8 @@ impl Access for GcsBackend {
list_has_content_length: true,
list_has_content_type: true,
list_has_last_modified: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -432,6 +446,7 @@ impl Access for GcsBackend {

m.set_etag(&meta.etag);
m.set_content_md5(&meta.md5_hash);
m.set_version(&meta.generation);

let size = meta
.size
Expand Down Expand Up @@ -485,15 +500,23 @@ impl Access for GcsBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(GcsObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(PageLister::new(GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), oio::PageLister::new(l)))
Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -554,6 +577,10 @@ struct GetObjectJsonResponse {
///
/// For example: `"contentType": "image/png",`
content_type: String,
/// Generation of this object.
///
/// For example: `"generation": "1660563214863653"`
generation: String,
/// Custom metadata of this object.
///
/// For example: `"metadata" : { "my-key": "my-value" }`
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GcsConfig {
pub disable_vm_metadata: bool,
/// Disable loading configuration from the environment.
pub disable_config_load: bool,
/// Enable versioning for the bucket.
pub enable_versioning: bool,
/// A Google Cloud OAuth2 token.
///
/// Takes precedence over `credential` and `credential_path`.
Expand Down
83 changes: 73 additions & 10 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub mod constants {
pub const X_GOOG_ACL: &str = "x-goog-acl";
pub const X_GOOG_STORAGE_CLASS: &str = "x-goog-storage-class";
pub const X_GOOG_META_PREFIX: &str = "x-goog-meta-";
pub const X_GOOG_IF_GENERATION_MATCH: &str = "x-goog-if-generation-match";
pub const GENERATION: &str = "generation";
}

pub struct GcsCore {
Expand All @@ -69,6 +71,7 @@ pub struct GcsCore {
pub default_storage_class: Option<String>,

pub allow_anonymous: bool,
pub enable_versioning: bool,
}

impl Debug for GcsCore {
Expand Down Expand Up @@ -184,13 +187,25 @@ impl GcsCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}?alt=media",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("&{}", query_args.join("&")));
}

let mut req = Request::get(&url);

if let Some(if_match) = args.if_match() {
Expand All @@ -216,6 +231,10 @@ impl GcsCore {

let mut req = Request::get(&url);

if let Some(version) = args.version() {
req = req.header(constants::X_GOOG_IF_GENERATION_MATCH, version);
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
Expand Down Expand Up @@ -363,13 +382,25 @@ impl GcsCore {
pub fn gcs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::get(&url);

if let Some(if_none_match) = args.if_none_match() {
Expand All @@ -393,7 +424,19 @@ impl GcsCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}/{}", self.endpoint, self.bucket, p);
let mut url = format!("{}/{}/{}", self.endpoint, self.bucket, p);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::head(&url);

Expand Down Expand Up @@ -422,35 +465,50 @@ impl GcsCore {
self.send(req).await
}

pub async fn gcs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path)?;
pub async fn gcs_delete_object(&self, path: &str, args: OpDelete) -> Result<Response<Buffer>> {
let mut req = self.gcs_delete_object_request(path, args)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub fn gcs_delete_object_request(&self, path: &str) -> Result<Request<Buffer>> {
pub fn gcs_delete_object_request(&self, path: &str, args: OpDelete) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!(
let mut url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::GENERATION,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

Request::delete(&url)
.body(Buffer::new())
.map_err(new_request_build_error)
}

pub async fn gcs_delete_objects(&self, paths: Vec<String>) -> Result<Response<Buffer>> {
pub async fn gcs_delete_objects(
&self,
batch: Vec<(String, OpDelete)>,
) -> Result<Response<Buffer>> {
let uri = format!("{}/batch/storage/v1", self.endpoint);

let mut multipart = Multipart::new();

for (idx, path) in paths.iter().enumerate() {
let req = self.gcs_delete_object_request(path)?;
for (idx, (path, args)) in batch.iter().enumerate() {
let req = self.gcs_delete_object_request(path, args.clone())?;

multipart = multipart.part(
MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()),
Expand Down Expand Up @@ -493,6 +551,7 @@ impl GcsCore {
delimiter: &str,
limit: Option<usize>,
start_after: Option<String>,
versions: bool,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);

Expand All @@ -502,6 +561,9 @@ impl GcsCore {
self.bucket,
percent_encode_path(&p)
);
if versions {
write!(url, "&versions=true").expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={delimiter}").expect("write into string must succeed");
}
Expand Down Expand Up @@ -681,6 +743,7 @@ pub struct ListResponseItem {
pub md5_hash: String,
pub updated: String,
pub content_type: String,
pub time_deleted: Option<String>,
}

/// Result of CreateMultipartUpload
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/gcs/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl GcsDeleter {
}

impl oio::BatchDelete for GcsDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path).await?;
async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
let resp = self.core.gcs_delete_object(&path, args).await?;

// deleting not existing objects is ok
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Expand All @@ -46,8 +46,8 @@ impl oio::BatchDelete for GcsDeleter {
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(paths.clone()).await?;
let paths: Vec<String> = batch.clone().into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(batch).await?;

let status = resp.status();

Expand Down
Loading
Loading