From a803efc4f046f4bacc1a51c9f3baf5e46bbee68c Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 10 Mar 2023 14:55:00 +0800 Subject: [PATCH] refactor: remove custom oss impl (#720) * refactor: remove custom oss impl * fix endpoint * add region issue --- Cargo.lock | 21 +- analytic_engine/src/setup.rs | 21 +- components/object_store/Cargo.toml | 2 +- components/object_store/src/aliyun.rs | 300 ++++++-------------------- 4 files changed, 90 insertions(+), 254 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 832f528f8d..add93025d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1757,7 +1757,7 @@ dependencies = [ "lazy_static", "log", "num_cpus", - "object_store 0.5.3", + "object_store 0.5.5", "parking_lot 0.12.1", "parquet", "paste 1.0.8", @@ -1784,7 +1784,7 @@ dependencies = [ "arrow 32.0.0", "chrono", "num_cpus", - "object_store 0.5.3", + "object_store 0.5.5", "parquet", "sqlparser", ] @@ -1861,7 +1861,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", - "object_store 0.5.3", + "object_store 0.5.5", "parking_lot 0.12.1", "pbjson-build", "prost", @@ -3737,17 +3737,24 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.3" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4201837dc4c27a8670f0363b1255cd3845a4f0c521211cced1ed14c1d0cc6d2" +checksum = "e1ea8f683b4f89a64181393742c041520a1a87e9775e6b4c0dd5a3281af05fc6" dependencies = [ "async-trait", + "base64 0.21.0", "bytes 1.2.1", "chrono", "futures 0.3.25", "itertools", "parking_lot 0.12.1", "percent-encoding", + "quick-xml 0.27.1", + "rand 0.8.5", + "reqwest", + "ring", + "serde", + "serde_json", "snafu 0.7.1", "tokio", "tracing", @@ -3770,7 +3777,7 @@ dependencies = [ "lazy_static", "log", "lru", - "object_store 0.5.3", + "object_store 0.5.5", "oss-rust-sdk", "prometheus 0.12.0", "prometheus-static-metric", @@ -4565,6 +4572,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" dependencies = [ "memchr", + "serde", ] [[package]] @@ -4931,6 +4939,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 3d8636a39e..f232e27b4d 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -9,7 +9,7 @@ use common_util::define_result; use futures::Future; use message_queue::kafka::kafka_impl::KafkaImpl; use object_store::{ - aliyun::AliyunOSS, + aliyun, disk_cache::DiskCacheStore, mem_cache::{MemCache, MemCacheStore}, metrics::StoreWithMetrics, @@ -443,14 +443,17 @@ fn open_storage( Arc::new(store) as _ } ObjectStoreOptions::Aliyun(aliyun_opts) => { - let oss = Arc::new(AliyunOSS::new( - aliyun_opts.key_id, - aliyun_opts.key_secret, - aliyun_opts.endpoint, - aliyun_opts.bucket, - aliyun_opts.pool_max_idle_per_host, - aliyun_opts.timeout, - )); + let oss: ObjectStoreRef = Arc::new( + aliyun::try_new( + aliyun_opts.key_id, + aliyun_opts.key_secret, + aliyun_opts.endpoint, + aliyun_opts.bucket, + aliyun_opts.pool_max_idle_per_host, + aliyun_opts.timeout.0, + ) + .context(OpenObjectStore)?, + ); let oss_with_metrics = Arc::new(StoreWithMetrics::new(oss)); Arc::new( StoreWithPrefix::new(aliyun_opts.prefix, oss_with_metrics) diff --git a/components/object_store/Cargo.toml b/components/object_store/Cargo.toml index da19c2d9d0..0309bc9181 100644 --- a/components/object_store/Cargo.toml +++ b/components/object_store/Cargo.toml @@ -30,7 +30,7 @@ serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } -upstream = { package = "object_store", version = "0.5.3" } +upstream = { package = "object_store", version = "0.5.5", features = [ "aws" ] } [dev-dependencies] tempfile = { workspace = true } diff --git a/components/object_store/src/aliyun.rs b/components/object_store/src/aliyun.rs index 49c6ca3421..ae3dbcc86b 100644 --- a/components/object_store/src/aliyun.rs +++ b/components/object_store/src/aliyun.rs @@ -1,221 +1,58 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{collections::HashMap, fmt::Display, ops::Range, time::Duration}; +use std::time::Duration; -use async_trait::async_trait; -use bytes::Bytes; -use futures::{ - stream::{self, BoxStream}, - StreamExt, -}; -use oss_rust_sdk::{ - async_object::AsyncObjectAPI, errors::Error as AliyunError, oss::Options, prelude::OSS, -}; -use snafu::{ResultExt, Snafu}; -use tokio::io::AsyncWrite; use upstream::{ - path::Path, Error as OssError, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, - Result, + aws::{AmazonS3, AmazonS3Builder}, + ClientOptions, RetryConfig, }; -#[derive(Debug, Snafu)] -enum Error { - #[snafu(display("Failed to put object at path:{}, err:{}", path, source))] - PutObject { path: String, source: AliyunError }, - - #[snafu(display("Failed to get object at path:{}, err:{}", path, source))] - GetObject { path: String, source: AliyunError }, - - #[snafu(display("Failed to get range of object at path:{}, err:{}", path, source))] - GetRangeObject { - path: String, - range: Range, - source: AliyunError, - }, - - #[snafu(display("Failed to head object at path:{}, err:{}", path, source))] - HeadObject { path: String, source: AliyunError }, - - #[snafu(display("Failed to delete object at path:{}, err:{}", path, source))] - DeleteObject { path: String, source: AliyunError }, - - #[snafu(display("Operation {} is not implemented", op))] - Unimplemented { op: String }, -} - -impl From for OssError { - fn from(source: Error) -> Self { - Self::Generic { - store: "Aliyun", - source: Box::new(source), - } - } -} - -#[derive(Debug)] -pub struct AliyunOSS { - oss: OSS<'static>, -} - -impl AliyunOSS { - const RANGE_KEY: &str = "Range"; - - pub fn new( - key_id: impl Into, - key_secret: impl Into, - endpoint: impl Into, - bucket: impl Into, - pool_max_idle_per_host: impl Into, - timeout: impl Into, - ) -> Self { - let oss = OSS::new_with_opts( - key_id.into(), - key_secret.into(), - endpoint.into(), - bucket.into(), - Options { - pool_max_idle_per_host: Some(pool_max_idle_per_host.into()), - timeout: Some(timeout.into()), - }, - ); - Self { oss } - } - - fn make_range_header(range: &Range, headers: &mut HashMap) { - assert!(!range.is_empty()); - let range_value = format!("bytes={}-{}", range.start, range.end - 1); - - headers.insert(Self::RANGE_KEY.to_string(), range_value); - } -} - -#[async_trait] -impl ObjectStore for AliyunOSS { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.oss - .put_object( - &bytes, - &location.to_string(), - None::>, - None, - ) - .await - .with_context(|| PutObject { - path: location.to_string(), - })?; - - Ok(()) - } - - async fn put_multipart( - &self, - _location: &Path, - ) -> Result<(MultipartId, Box)> { - Err(Error::Unimplemented { - op: "put_multipart".to_string(), - } - .into()) - } - - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - Err(Error::Unimplemented { - op: "abort_multipart".to_string(), - } - .into()) - } - - async fn get(&self, location: &Path) -> Result { - let bytes = self - .oss - .get_object(&location.to_string(), None::>, None) - .await - .with_context(|| GetObject { - path: location.to_string(), - })?; - - Ok(GetResult::Stream(stream::once(async { Ok(bytes) }).boxed())) - } - - async fn get_range(&self, location: &Path, range: Range) -> Result { - if range.is_empty() { - return Ok(Bytes::new()); - } - - let mut headers = HashMap::new(); - Self::make_range_header(&range, &mut headers); - - let bytes = self - .oss - .get_object(&location.to_string(), Some(headers), None) - .await - .with_context(|| GetRangeObject { - path: location.to_string(), - range, - })?; - - Ok(bytes) - } - - async fn head(&self, location: &Path) -> Result { - let head = self - .oss - .head_object(&location.to_string()) - .await - .with_context(|| HeadObject { - path: location.to_string(), - })?; - - Ok(ObjectMeta { - last_modified: head.last_modified.into(), - size: head.size, - location: location.clone(), - }) - } - - async fn delete(&self, location: &Path) -> Result<()> { - self.oss - .delete_object(&location.to_string()) - .await - .with_context(|| DeleteObject { - path: location.to_string(), - })?; - - Ok(()) - } - - async fn list(&self, _prefix: Option<&Path>) -> Result>> { - Err(Error::Unimplemented { - op: "list".to_string(), - } - .into()) - } - - async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { - Err(Error::Unimplemented { - op: "list_with_delimiter".to_string(), - } - .into()) - } - - async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { - Err(Error::Unimplemented { - op: "copy".to_string(), - } - .into()) - } - - async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { - Err(Error::Unimplemented { - op: "copy_if_not_exists".to_string(), - } - .into()) +fn normalize_endpoint(endpoint: &str, bucket: &str) -> String { + if endpoint.starts_with("https") { + format!( + "https://{}.{}", + bucket, + endpoint.replacen("https://", "", 1) + ) + } else { + format!("http://{}.{}", bucket, endpoint.replacen("http://", "", 1)) } } -impl Display for AliyunOSS { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "AliyunOSS({})", self.oss.bucket()) - } +pub fn try_new( + key_id: impl Into, + key_secret: impl Into, + endpoint: impl Into, + bucket: impl Into, + pool_max_idle_per_host: impl Into, + timeout: Duration, +) -> upstream::Result { + let cli_opt = ClientOptions::new() + .with_allow_http(true) + .with_pool_max_idle_per_host(pool_max_idle_per_host.into()) + .with_timeout(timeout); + let retry_config = RetryConfig { + // TODO: add to config + max_retries: 3, + ..Default::default() + }; + + let endpoint = endpoint.into(); + let bucket = bucket.into(); + let endpoint = normalize_endpoint(&endpoint, &bucket); + AmazonS3Builder::new() + .with_virtual_hosted_style_request(true) + // region is not used when virtual_hosted_style is true, + // but is required, so dummy is used here + // https://github.com/apache/arrow-rs/issues/3827 + .with_region("dummy") + .with_access_key_id(key_id) + .with_secret_access_key(key_secret) + .with_endpoint(endpoint) + .with_bucket_name(bucket) + .with_client_options(cli_opt) + .with_retry(retry_config) + .build() } #[cfg(test)] @@ -223,37 +60,24 @@ mod tests { use super::*; #[test] - fn test_range_header() { - let testcases = vec![ - ((0..500), "bytes=0-499"), - ((1000..1001), "bytes=1000-1000"), - ((200..1000), "bytes=200-999"), + fn test_normalize_endpoint() { + let testcase = [ + ( + "https://oss.aliyun.com", + "test", + "https://test.oss.aliyun.com", + ), + ( + "http://oss.aliyun.com", + "test", + "http://test.oss.aliyun.com", + ), + ("no-scheme.com", "test", "http://test.no-scheme.com"), ]; - for (input_range, expect_value) in testcases { - let mut headers = HashMap::new(); - AliyunOSS::make_range_header(&input_range, &mut headers); - - assert_eq!(headers.len(), 1); - let range_value = headers - .get(AliyunOSS::RANGE_KEY) - .expect("should have range key"); - assert_eq!(range_value, expect_value); + for (endpoint, bucket, expected) in testcase { + let actual = normalize_endpoint(endpoint, bucket); + assert_eq!(expected, actual); } } - - #[test] - #[should_panic] - fn test_panic_invalid_range_header() { - let mut headers = HashMap::new(); - #[allow(clippy::reversed_empty_ranges)] - AliyunOSS::make_range_header(&(500..499), &mut headers); - } - - #[test] - #[should_panic] - fn test_panic_empty_range_header() { - let mut headers = HashMap::new(); - AliyunOSS::make_range_header(&(500..500), &mut headers); - } }