Skip to content

Commit

Permalink
Merge pull request #5992 from Xuanwo/dal-list-behavior
Browse files Browse the repository at this point in the history
refactor: List stage files recursively
  • Loading branch information
BohuTANG authored Jun 15, 2022
2 parents d7fc0df + 0416b12 commit 2b67b08
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 37 deletions.
5 changes: 5 additions & 0 deletions common/io/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,10 @@ pub async fn init_s3_operator(cfg: &StorageS3Config) -> Result<Operator> {
builder.disable_credential_loader();
}

// Enable virtual host style
if cfg.enable_virtual_host_style {
builder.enable_virtual_host_style();
}

Ok(Operator::new(builder.finish().await?))
}
70 changes: 37 additions & 33 deletions query/src/interpreters/interpreter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use common_meta_types::GrantObject;
use common_meta_types::StageFile;
use common_meta_types::StageType;
use common_meta_types::UserStageInfo;
use common_tracing::tracing::warn;
use futures::TryStreamExt;
use opendal::ObjectMode;
use regex::Regex;

use crate::sessions::QueryContext;
Expand Down Expand Up @@ -79,47 +79,51 @@ pub async fn list_files(
}
}

/// List files from DAL in recursive way.
///
/// - If input path is a dir, we will list it recursively.
/// - Or, we will append the file itself, and try to list `path/`.
/// - If not exist, we will try to list `path/` too.
pub async fn list_files_from_dal(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
path: &str,
pattern: &str,
) -> Result<Vec<StageFile>> {
let op = StageSource::get_op(ctx, stage).await?;
let files = if path.ends_with('/') {
let mut list = vec![];
let mut dirs = vec![path.to_string()];
while let Some(dir) = dirs.pop() {
let mut objects = op.object(&dir).list().await?;
while let Some(de) = objects.try_next().await? {
let meta = de.metadata().await?;
let path = de.path().to_string();
match de.mode() {
ObjectMode::FILE => {
list.push((path, meta));
}
ObjectMode::DIR => {
dirs.push(path);
}
ObjectMode::Unknown => continue,
}
}
let mut files = Vec::new();

// - If the path itself is a dir, return directly.
// - Otherwise, return a path suffix by `/`
// - If other errors happen, we will ignore them by returning None.
let dir_path = match op.object(path).metadata().await {
Ok(meta) if meta.mode().is_dir() => Some(path.to_string()),
Ok(meta) if !meta.mode().is_dir() => {
files.push((path.to_string(), meta));

Some(format!("{path}/"))
}
list
} else {
let o = op.object(path);
match o.metadata().await {
Ok(meta) => {
if meta.mode().is_dir() {
vec![]
} else {
vec![(o.path().to_string(), meta)]
Err(e) if e.kind() == io::ErrorKind::NotFound => Some(format!("{path}/")),
Err(e) => return Err(e.into()),
_ => None,
};

// Check the if this dir valid and list it recursively.
if let Some(dir) = dir_path {
match op.object(&dir).metadata().await {
Ok(_) => {
let mut ds = op.batch().walk_top_down(&dir)?;
while let Some(de) = ds.try_next().await? {
if de.mode().is_file() {
let path = de.path().to_string();
let meta = de.metadata().await?;
files.push((path, meta));
}
}
}
Err(e) if e.kind() == io::ErrorKind::NotFound => vec![],
Err(e) => return Err(e.into()),
}
};
Err(e) => warn!("ignore listing {path}/, because: {:?}", e),
};
}

let regex = if !pattern.is_empty() {
Some(Regex::new(pattern).map_err(|e| {
Expand Down Expand Up @@ -201,7 +205,7 @@ pub async fn list_files_from_meta_api(
if path.ends_with('/') {
name.starts_with(path)
} else {
name == path
name.starts_with(&format!("{path}/")) || name == path
}
})
.filter(|file| {
Expand Down
9 changes: 5 additions & 4 deletions query/tests/it/interpreters/interpreter_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ async fn test_list_stage_interpreter() -> Result<()> {
let stream = executor.execute(None).await?;
let result = stream.try_collect::<Vec<_>>().await?;
let expected = vec![
"+------+------+-----+---------------+---------+",
"| name | size | md5 | last_modified | creator |",
"+------+------+-----+---------------+---------+",
"+------+------+-----+---------------+---------+",
"+----------------+------+------+-------------------------------+-----------------+",
"| name | size | md5 | last_modified | creator |",
"+----------------+------+------+-------------------------------+-----------------+",
"| test/books.csv | 100 | NULL | 1970-01-01 00:00:00.000 +0000 | 'test_user'@'%' |",
"+----------------+------+------+-------------------------------+-----------------+",
];
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());
}
Expand Down

0 comments on commit 2b67b08

Please sign in to comment.