Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): Fix compressed buf not consumed correctly #5727

Merged
merged 13 commits into from
Jun 4, 2022
73 changes: 43 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.53"
opendal = { version = "0.7.1", features = ["retry"] }
opendal = { version = "0.7.3", features = ["retry"] }
time = "0.3.9"
2 changes: 1 addition & 1 deletion common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ chrono = "0.4.19"
chrono-tz = "0.6.1"
futures = "0.3.21"
lexical-core = "0.8.2"
opendal = { version = "0.7.1", features = ["retry"] }
opendal = { version = "0.7.3", features = ["retry"] }
serde = { version = "1.0.136", features = ["derive"] }
time = "0.3.9"

Expand Down
28 changes: 19 additions & 9 deletions common/io/src/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;

use common_exception::ErrorCode;
use common_exception::Result;
use futures::StreamExt;
use opendal::ObjectMode;
use opendal::Operator;

/// Get the files in the path, if the path is not exist, return an empty list.
/// TODO(xuanwo): it's so general that better implement in opendal instead: https://github.com/datafuselabs/opendal/issues/268
pub async fn operator_list_files(op: &Operator, path: &str) -> Result<Vec<String>> {
let mut list: Vec<String> = vec![];
let mode = op.object(path).metadata().await?.mode();
match mode {

let o = op.object(path);

// return an empty list if not exist
let meta = match o.metadata().await {
Ok(meta) => meta,
Err(e) => {
return match e.kind() {
io::ErrorKind::NotFound => Ok(Vec::new()),
_ => Err(e.into()),
}
}
};
match meta.mode() {
ObjectMode::FILE => {
list.push(path.to_string());
list.push(o.path());
}
ObjectMode::DIR => {
let mut objects = op.object(path).list().await?;
while let Some(object) = objects.next().await {
let mut object = object?;
let meta = object.metadata_cached().await?;
if meta.mode() == ObjectMode::FILE {
list.push(meta.path().to_string());
}
let name = object?.name();
list.push(name);
}
}
other => {
Expand Down
25 changes: 0 additions & 25 deletions common/io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cmp;
use std::path::PathBuf;

use bincode::Options;
use bytes::BufMut;
Expand Down Expand Up @@ -90,30 +89,6 @@ pub fn deserialize_from_slice<T: serde::de::DeserializeOwned>(slice: &mut &[u8])
Ok(value)
}

#[inline]
pub fn get_abs_path(root: &str, path: &str) -> String {
// Joining an absolute path replaces the existing path, we need to
// normalize it before.
let path = path
.split('/')
.filter(|v| !v.is_empty())
.collect::<Vec<&str>>()
.join("/");

PathBuf::from(root).join(path).to_string_lossy().to_string()
}

// todo(xuanwo): opendal support meta name (https://github.com/datafuselabs/opendal/issues/150)
#[inline]
pub fn get_file_name(path: &str) -> String {
let path = path
.split('/')
.filter(|v| !v.is_empty())
.collect::<Vec<&str>>();

path[path.len() - 1].to_string()
}

pub fn is_control_ascii(c: u8) -> bool {
c <= 31
}
Expand Down
7 changes: 0 additions & 7 deletions common/io/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ fn convert_test() {
assert_eq!(convert_number_size(10222_f64), "10.22 thousand");
}

#[test]
fn path_test() {
assert_eq!(get_abs_path("ab/c", "d"), "ab/c/d".to_string());
assert_eq!(get_abs_path("/ab/c", "d"), "/ab/c/d".to_string());
assert_eq!(get_abs_path("/ab/c", "/d/e"), "/ab/c/d/e".to_string());
}

#[test]
fn parse_escape() {
let cases = vec![
Expand Down
2 changes: 1 addition & 1 deletion common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async-trait = "0.1.53"
chrono-tz = "0.6.1"
csv-async = "1.2.4"
futures = "0.3.21"
opendal = { version = "0.7.1", features = ["retry", "compress"] }
opendal = { version = "0.7.3", features = ["retry", "compress"] }
pin-project-lite = "0.2.8"
serde_json = { version = "1.0.79", default-features = false, features = ["preserve_order"] }
tempfile = "3.3.0"
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = { version = "0.7.1", features = ["retry", "compress"] }
opendal = { version = "0.7.3", features = ["retry", "compress"] }
openssl = { version = "0.10", features = ["vendored"] }
paste = "1.0.7"
petgraph = "0.6.0"
Expand Down
4 changes: 0 additions & 4 deletions query/src/interpreters/interpreter_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common_datavalues::Series;
use common_datavalues::SeriesFrom;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::get_file_name;
use common_io::prelude::operator_list_files;
use common_planners::ListPlan;
use common_streams::DataBlockStream;
Expand Down Expand Up @@ -82,9 +81,6 @@ impl Interpreter for ListInterpreter {
let files = self.list_files().await?;
tracing::info!("list file list:{:?}, pattern:{}", &files, self.plan.pattern);

// file path to filename
let files: Vec<String> = files.iter().map(|file| get_file_name(file)).collect();

let block = DataBlock::create(self.plan.schema(), vec![Series::from_data(files)]);
Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
Expand Down
6 changes: 3 additions & 3 deletions query/src/servers/http/v1/sequential_format_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ impl Processor for SequentialInputFormatSource {
let data = match &mut self.input_decompress {
None => data,
Some(decompress) => {
let mut output = Vec::new();
// Alloc with 10 times of input data at once to avoid too many alloc.
let mut output = Vec::with_capacity(10 * data.len());
let mut buf = vec![0; 1024 * 1024];
let mut amt = 0;

loop {
Expand All @@ -240,7 +242,6 @@ impl Processor for SequentialInputFormatSource {
amt += read;
}
DecompressState::Decoding => {
let mut buf = vec![0; 4 * 1024 * 1024];
let written = decompress.decode(&mut buf).map_err(|e| {
ErrorCode::InvalidCompressionData(format!(
"compression data invalid: {e}"
Expand All @@ -249,7 +250,6 @@ impl Processor for SequentialInputFormatSource {
output.extend_from_slice(&buf[..written])
}
DecompressState::Flushing => {
let mut buf = vec![0; 4 * 1024 * 1024];
let written = decompress.finish(&mut buf).map_err(|e| {
ErrorCode::InvalidCompressionData(format!(
"compression data invalid: {e}"
Expand Down
Loading