Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add decompress support for COPY INTO and streaming loading #5655

Merged
merged 16 commits into from
May 30, 2022
Merged
2 changes: 1 addition & 1 deletion .github/actions/check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runs:

- name: Clippy
shell: bash
run: cargo clippy --all -- -D warnings
run: cargo clippy --workspace --all-targets -- -D warnings

- name: Audit dependencies
shell: bash
Expand Down
88 changes: 79 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.53"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
time = "0.3.9"
3 changes: 3 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ build_exceptions! {
NetworkRequestError(1073),

UnknownFormat(1074),
UnknownCompressionType(1075),
InvalidCompressionData(1076),

// Tenant error codes.
TenantIsEmpty(1101),
Expand All @@ -150,6 +152,7 @@ build_exceptions! {
LayoutError(1103),

PanicError(1104),

}

// Metasvr errors [2001, 3000].
Expand Down
1 change: 1 addition & 0 deletions common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dyn-clone = "1.0.5"
hex = "0.4.3"
itertools = "0.10.3"
md5 = "0.7.0"
memchr = "2.5.0"
naive-cityhash = "0.2.0"
num = "0.4.0"
num-format = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ chrono = "0.4.19"
chrono-tz = "0.6.1"
futures = "0.3.21"
lexical-core = "0.8.2"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
serde = { version = "1.0.136", features = ["derive"] }
time = "0.3.9"

Expand Down
9 changes: 4 additions & 5 deletions common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Default for FormatSettings {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
pub enum Compression {
None,
Auto,
Expand All @@ -67,7 +67,7 @@ impl FromStr for Compression {
type Err = ErrorCode;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
match s.to_lowercase().as_str() {
"auto" => Ok(Compression::Auto),
"gzip" => Ok(Compression::Gzip),
"bz2" => Ok(Compression::Bz2),
Expand All @@ -78,9 +78,8 @@ impl FromStr for Compression {
"lzo" => Ok(Compression::Lzo),
"snappy" => Ok(Compression::Snappy),
"none" => Ok(Compression::None),
_ => Err(ErrorCode::IllegalUserSettingFormat(format!(
"Unknown compression: {}",
s
_ => Err(ErrorCode::UnknownCompressionType(format!(
"Unknown compression: {s}"
))),
}
}
Expand Down
20 changes: 20 additions & 0 deletions common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ impl Default for StageFileCompression {
}
}

impl FromStr for StageFileCompression {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, String> {
match s.to_lowercase().as_str() {
"auto" => Ok(StageFileCompression::Auto),
"gzip" => Ok(StageFileCompression::Gzip),
"bz2" => Ok(StageFileCompression::Bz2),
"brotli" => Ok(StageFileCompression::Brotli),
"zstd" => Ok(StageFileCompression::Zstd),
"deflate" => Ok(StageFileCompression::Deflate),
"rawdeflate" => Ok(StageFileCompression::RawDeflate),
"lzo" => Ok(StageFileCompression::Lzo),
"snappy" => Ok(StageFileCompression::Snappy),
"none" => Ok(StageFileCompression::None),
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | rawdeflate | lzo | snappy | none }"
.to_string()),
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum StageFileFormatType {
Csv,
Expand Down
4 changes: 1 addition & 3 deletions common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ async-trait = "0.1.53"
chrono-tz = "0.6.1"
csv-async = "1.2.4"
futures = "0.3.21"
opendal = { version = "0.7.1", features = ["retry", "compress"] }
pin-project-lite = "0.2.8"
serde_json = { version = "1.0.79", default-features = false, features = ["preserve_order"] }
tempfile = "3.3.0"

[dev-dependencies]
opendal = { version = "0.6.3", features = ["retry"] }
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry", "compress"] }
openssl = { version = "0.10", features = ["vendored"] }
paste = "1.0.7"
petgraph = "0.6.0"
Expand Down
Loading