Skip to content

Commit

Permalink
Merge pull request #5655 from Xuanwo/decompress
Browse files Browse the repository at this point in the history
feat: Add decompress support for COPY INTO and streaming loading
  • Loading branch information
BohuTANG authored May 30, 2022
2 parents c40c0c6 + 3b141c9 commit b90b74e
Show file tree
Hide file tree
Showing 28 changed files with 383 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/actions/check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runs:

- name: Clippy
shell: bash
run: cargo clippy --all -- -D warnings
run: cargo clippy --workspace --all-targets -- -D warnings

- name: Audit dependencies
shell: bash
Expand Down
88 changes: 79 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.53"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
time = "0.3.9"
3 changes: 3 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ build_exceptions! {
NetworkRequestError(1073),

UnknownFormat(1074),
UnknownCompressionType(1075),
InvalidCompressionData(1076),

// Tenant error codes.
TenantIsEmpty(1101),
Expand All @@ -150,6 +152,7 @@ build_exceptions! {
LayoutError(1103),

PanicError(1104),

}

// Metasvr errors [2001, 3000].
Expand Down
1 change: 1 addition & 0 deletions common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dyn-clone = "1.0.5"
hex = "0.4.3"
itertools = "0.10.3"
md5 = "0.7.0"
memchr = "2.5.0"
naive-cityhash = "0.2.0"
num = "0.4.0"
num-format = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ chrono = "0.4.19"
chrono-tz = "0.6.1"
futures = "0.3.21"
lexical-core = "0.8.2"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
serde = { version = "1.0.136", features = ["derive"] }
time = "0.3.9"

Expand Down
9 changes: 4 additions & 5 deletions common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Default for FormatSettings {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
pub enum Compression {
None,
Auto,
Expand All @@ -67,7 +67,7 @@ impl FromStr for Compression {
type Err = ErrorCode;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
match s.to_lowercase().as_str() {
"auto" => Ok(Compression::Auto),
"gzip" => Ok(Compression::Gzip),
"bz2" => Ok(Compression::Bz2),
Expand All @@ -78,9 +78,8 @@ impl FromStr for Compression {
"lzo" => Ok(Compression::Lzo),
"snappy" => Ok(Compression::Snappy),
"none" => Ok(Compression::None),
_ => Err(ErrorCode::IllegalUserSettingFormat(format!(
"Unknown compression: {}",
s
_ => Err(ErrorCode::UnknownCompressionType(format!(
"Unknown compression: {s}"
))),
}
}
Expand Down
20 changes: 20 additions & 0 deletions common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ impl Default for StageFileCompression {
}
}

impl FromStr for StageFileCompression {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, String> {
match s.to_lowercase().as_str() {
"auto" => Ok(StageFileCompression::Auto),
"gzip" => Ok(StageFileCompression::Gzip),
"bz2" => Ok(StageFileCompression::Bz2),
"brotli" => Ok(StageFileCompression::Brotli),
"zstd" => Ok(StageFileCompression::Zstd),
"deflate" => Ok(StageFileCompression::Deflate),
"rawdeflate" => Ok(StageFileCompression::RawDeflate),
"lzo" => Ok(StageFileCompression::Lzo),
"snappy" => Ok(StageFileCompression::Snappy),
"none" => Ok(StageFileCompression::None),
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | rawdeflate | lzo | snappy | none }"
.to_string()),
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum StageFileFormatType {
Csv,
Expand Down
4 changes: 1 addition & 3 deletions common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ async-trait = "0.1.53"
chrono-tz = "0.6.1"
csv-async = "1.2.4"
futures = "0.3.21"
opendal = { version = "0.7.1", features = ["retry", "compress"] }
pin-project-lite = "0.2.8"
serde_json = { version = "1.0.79", default-features = false, features = ["preserve_order"] }
tempfile = "3.3.0"

[dev-dependencies]
opendal = { version = "0.6.3", features = ["retry"] }
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry", "compress"] }
openssl = { version = "0.10", features = ["vendored"] }
paste = "1.0.7"
petgraph = "0.6.0"
Expand Down
Loading

0 comments on commit b90b74e

Please sign in to comment.