Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add decompress support for COPY INTO and streaming loading #5655

Merged
merged 16 commits into from
May 30, 2022
Merged
2 changes: 1 addition & 1 deletion .github/actions/check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ runs:

- name: Clippy
shell: bash
run: cargo clippy --all -- -D warnings
run: cargo clippy --workspace --all-targets -- -D warnings

- name: Audit dependencies
shell: bash
Expand Down
88 changes: 79 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.53"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
time = "0.3.9"
1 change: 1 addition & 0 deletions common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dyn-clone = "1.0.5"
hex = "0.4.3"
itertools = "0.10.3"
md5 = "0.7.0"
memchr = "2.5.0"
naive-cityhash = "0.2.0"
num = "0.4.0"
num-format = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ chrono = "0.4.19"
chrono-tz = "0.6.1"
futures = "0.3.21"
lexical-core = "0.8.2"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry"] }
serde = { version = "1.0.136", features = ["derive"] }
time = "0.3.9"

Expand Down
4 changes: 2 additions & 2 deletions common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Default for FormatSettings {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
pub enum Compression {
None,
Auto,
Expand All @@ -67,7 +67,7 @@ impl FromStr for Compression {
type Err = ErrorCode;

fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
match s.to_lowercase().as_str() {
"auto" => Ok(Compression::Auto),
"gzip" => Ok(Compression::Gzip),
"bz2" => Ok(Compression::Bz2),
Expand Down
20 changes: 20 additions & 0 deletions common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ impl Default for StageFileCompression {
}
}

impl FromStr for StageFileCompression {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, String> {
match s.to_lowercase().as_str() {
"auto" => Ok(StageFileCompression::Auto),
"gzip" => Ok(StageFileCompression::Gzip),
"bz2" => Ok(StageFileCompression::Bz2),
"brotli" => Ok(StageFileCompression::Brotli),
"zstd" => Ok(StageFileCompression::Zstd),
"deflate" => Ok(StageFileCompression::Deflate),
"rawdeflate" => Ok(StageFileCompression::RawDeflate),
"lzo" => Ok(StageFileCompression::Lzo),
"snappy" => Ok(StageFileCompression::Snappy),
"none" => Ok(StageFileCompression::None),
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | rawdeflate | lzo | snappy | none }"
.to_string()),
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum StageFileFormatType {
Csv,
Expand Down
4 changes: 1 addition & 3 deletions common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,4 @@ futures = "0.3.21"
pin-project-lite = "0.2.8"
serde_json = { version = "1.0.79", default-features = false, features = ["preserve_order"] }
tempfile = "3.3.0"

[dev-dependencies]
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry", "compress"] }
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = { version = "0.6.3", features = ["retry"] }
opendal = { version = "0.7.1", features = ["retry", "compress"] }
openssl = { version = "0.10", features = ["vendored"] }
paste = "1.0.7"
petgraph = "0.6.0"
Expand Down
64 changes: 64 additions & 0 deletions query/src/servers/http/v1/multipart_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::Compression;
use common_io::prelude::FormatSettings;
use opendal::io_util::CompressAlgorithm;
use opendal::io_util::DecompressDecoder;
use opendal::io_util::DecompressState;
use poem::web::Multipart;

use crate::formats::FormatFactory;
Expand Down Expand Up @@ -143,6 +147,20 @@ impl MultipartFormat {
settings: FormatSettings,
ports: Vec<Arc<OutputPort>>,
) -> Result<(MultipartWorker, Vec<ProcessorPtr>)> {
let compress_algo = match settings.compression {
Compression::None => None,
Compression::Auto => todo!("we will support auto in the future"),
Compression::Gzip => Some(CompressAlgorithm::Gzip),
Compression::Bz2 => Some(CompressAlgorithm::Bz2),
Compression::Brotli => Some(CompressAlgorithm::Brotli),
Compression::Zstd => Some(CompressAlgorithm::Zstd),
Compression::Deflate => Some(CompressAlgorithm::Deflate),
Compression::RawDeflate => todo!("we will support raw deflate in the future"),
Compression::Lzo => todo!("we will support lzo in the future"),
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
Compression::Snappy => todo!("we will support snappy in the future"),
};
let input_decompress = compress_algo.map(DecompressDecoder::new);

let input_format = FormatFactory::instance().get_input(name, schema, settings)?;

if ports.len() != 1 || input_format.support_parallel() {
Expand All @@ -161,6 +179,7 @@ impl MultipartFormat {
vec![SequentialInputFormatSource::create(
ports[0].clone(),
input_format,
input_decompress,
rx,
ctx.get_scan_progress(),
)?],
Expand All @@ -183,22 +202,26 @@ pub struct SequentialInputFormatSource {
scan_progress: Arc<Progress>,
input_state: Box<dyn InputState>,
input_format: Box<dyn InputFormat>,
input_decompress: Option<DecompressDecoder>,
data_receiver: Receiver<Result<Vec<u8>>>,
}

impl SequentialInputFormatSource {
pub fn create(
output: Arc<OutputPort>,
input_format: Box<dyn InputFormat>,
input_decompress: Option<DecompressDecoder>,
data_receiver: Receiver<Result<Vec<u8>>>,
scan_progress: Arc<Progress>,
) -> Result<ProcessorPtr> {
let input_state = input_format.create_state();

Ok(ProcessorPtr::create(Box::new(
SequentialInputFormatSource {
output,
input_state,
input_format,
input_decompress,
data_receiver,
scan_progress,
finished: false,
Expand Down Expand Up @@ -246,6 +269,47 @@ impl Processor for SequentialInputFormatSource {
let mut progress_values = ProgressValues::default();
match replace(&mut self.state, State::NeedReceiveData) {
State::ReceivedData(data) => {
let data = match &mut self.input_decompress {
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
None => data,
Some(decompress) => {
let mut output = Vec::new();
let mut amt = 0;

loop {
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
match decompress.state() {
DecompressState::Reading => {
// If all data has been consumed, we should break with existing data directly.
if amt == data.len() {
break output;
}

let read = decompress.fill(&data[amt..]);
amt += read;
}
DecompressState::Decoding => {
let mut buf = vec![0; 4 * 1024 * 1024];
let written = decompress.decode(&mut buf).map_err(|e| {
ErrorCode::InvalidSourceFormat(format!(
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
"decompress source: {e}"
))
})?;
output.extend_from_slice(&buf[..written])
}
DecompressState::Flushing => {
let mut buf = vec![0; 4 * 1024 * 1024];
let written = decompress.finish(&mut buf).map_err(|e| {
ErrorCode::InvalidSourceFormat(format!(
"decompress source: {e}"
))
})?;
output.extend_from_slice(&buf[..written])
}
DecompressState::Done => break output,
}
}
}
};

let mut data_slice: &[u8] = &data;
progress_values.bytes += data.len();

Expand Down
Loading