Skip to content

Commit

Permalink
refactor: remove stage use stream to avoid OOM. (#15378)
Browse files Browse the repository at this point in the history
* refactor: remove stage use stream to avoid oom.

* refactor: change REMOVE_BATCH to 1000.

* ci: fix flaky test.
  • Loading branch information
youngsofun authored Apr 30, 2024
1 parent be942d3 commit aa7960a
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 93 deletions.
164 changes: 122 additions & 42 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
Expand All @@ -22,6 +24,10 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::StageInfo;
use databend_common_meta_app::principal::StageType;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT;
use futures::stream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use opendal::EntryMode;
use opendal::Metadata;
Expand Down Expand Up @@ -103,6 +109,34 @@ impl StageFilesInfo {
}
}

#[async_backtrace::framed]
async fn list_files(
&self,
operator: &Operator,
thread_num: usize,
max_files: Option<usize>,
mut files: &[String],
) -> Result<Vec<StageFileInfo>> {
if let Some(m) = max_files {
files = &files[..m]
}
let file_infos = self.stat_concurrent(operator, thread_num, files).await?;
let mut res = Vec::with_capacity(file_infos.len());

for file_info in file_infos {
match file_info {
Ok((path, meta)) if meta.is_dir() => {
return Err(ErrorCode::BadArguments(format!("{path} is not a file")));
}
Ok((path, meta)) => res.push(StageFileInfo::new(path, &meta)),
Err(e) => {
return Err(e);
}
}
}
Ok(res)
}

#[async_backtrace::framed]
pub async fn list(
&self,
Expand All @@ -111,31 +145,45 @@ impl StageFilesInfo {
max_files: Option<usize>,
) -> Result<Vec<StageFileInfo>> {
if self.path == STDIN_FD {
return Ok(vec![stdin_stage_info()?]);
return Ok(vec![stdin_stage_info()]);
}

let max_files = max_files.unwrap_or(usize::MAX);
if let Some(files) = &self.files {
let file_infos = self
.stat_concurrent(operator, thread_num, max_files, files)
.await?;
let mut res = Vec::with_capacity(file_infos.len());
self.list_files(operator, thread_num, max_files, files)
.await
} else {
let pattern = self.get_pattern()?;
StageFilesInfo::list_files_with_pattern(
operator,
&self.path,
pattern,
max_files.unwrap_or(COPY_MAX_FILES_PER_COMMIT),
)
.await
}
}

for file_info in file_infos {
match file_info {
Ok((path, meta)) if meta.is_dir() => {
return Err(ErrorCode::BadArguments(format!("{path} is not a file")));
}
Ok((path, meta)) => res.push(StageFileInfo::new(path, &meta)),
Err(e) => {
return Err(e);
}
}
}
Ok(res)
#[async_backtrace::framed]
pub async fn list_stream(
&self,
operator: &Operator,
thread_num: usize,
max_files: Option<usize>,
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
if self.path == STDIN_FD {
return Ok(Box::pin(stream::iter(vec![Ok(stdin_stage_info())])));
}

if let Some(files) = &self.files {
let files = self
.list_files(operator, thread_num, max_files, files)
.await?;
let files = files.into_iter().map(Ok);
Ok(Box::pin(stream::iter(files)))
} else {
let pattern = self.get_pattern()?;
StageFilesInfo::list_files_with_pattern(operator, &self.path, pattern, max_files).await
StageFilesInfo::list_files_stream_with_pattern(operator, &self.path, pattern, max_files)
.await
}
}

Expand Down Expand Up @@ -195,40 +243,73 @@ impl StageFilesInfo {
pattern: Option<Regex>,
max_files: usize,
) -> Result<Vec<StageFileInfo>> {
Self::list_files_stream_with_pattern(operator, path, pattern, Some(max_files))
.await?
.try_collect::<Vec<_>>()
.await
}

#[async_backtrace::framed]
pub async fn list_files_stream_with_pattern(
operator: &Operator,
path: &str,
pattern: Option<Regex>,
max_files: Option<usize>,
) -> Result<Pin<Box<dyn Stream<Item = Result<StageFileInfo>> + Send>>> {
if path == STDIN_FD {
return Ok(vec![stdin_stage_info()?]);
return Ok(Box::pin(stream::once(async { Ok(stdin_stage_info()) })));
}
let mut files = Vec::new();
let prefix_len = if path == "/" { 0 } else { path.len() };
let prefix_meta = operator.stat(path).await;
match prefix_meta {
let file_exact: Option<Result<StageFileInfo>> = match prefix_meta {
Ok(meta) if meta.is_file() => {
files.push(StageFileInfo::new(path.to_string(), &meta));
let f = StageFileInfo::new(path.to_string(), &meta);
if max_files == Some(1) {
return Ok(Box::pin(stream::once(async { Ok(f) })));
}
Some(Ok(f))
}
Err(e) if e.kind() != opendal::ErrorKind::NotFound => {
return Err(e.into());
}
_ => {}
_ => None,
};
let mut lister = operator
let file_exact_stream = stream::iter(file_exact.clone().into_iter());

let lister = operator
.lister_with(path)
.recursive(true)
.metakey(StageFileInfo::meta_query())
.await?;

if files.len() == max_files {
return Ok(files);
}
while let Some(obj) = lister.try_next().await? {
let meta = obj.metadata();
if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) {
files.push(StageFileInfo::new(obj.path().to_string(), meta));
if files.len() == max_files {
return Ok(files);
let pattern = Arc::new(pattern);
let files_with_prefix = lister.filter_map(move |result| {
let pattern = pattern.clone();
async move {
match result {
Ok(entry) => {
let meta = entry.metadata();
if check_file(&entry.path()[prefix_len..], meta.mode(), &pattern) {
Some(Ok(StageFileInfo::new(entry.path().to_string(), meta)))
} else {
None
}
}
Err(e) => Some(Err(ErrorCode::from(e))),
}
}
});
if let Some(max_files) = max_files {
if file_exact.is_some() {
Ok(Box::pin(
file_exact_stream.chain(files_with_prefix.take(max_files - 1)),
))
} else {
Ok(Box::pin(files_with_prefix.take(max_files)))
}
} else {
Ok(Box::pin(file_exact_stream.chain(files_with_prefix)))
}
Ok(files)
}

/// Stat files concurrently.
Expand All @@ -237,10 +318,9 @@ impl StageFilesInfo {
&self,
operator: &Operator,
thread_num: usize,
max_files: usize,
files: &[String],
) -> Result<Vec<Result<(String, Metadata)>>> {
if max_files == 1 {
if files.len() == 1 {
let Some(file) = files.first() else {
return Ok(vec![]);
};
Expand All @@ -254,7 +334,7 @@ impl StageFilesInfo {
}

// This clone is required to make sure we are not referring to `file: &String` in the closure
let tasks = files.iter().take(max_files).cloned().map(|file| {
let tasks = files.iter().cloned().map(|file| {
let full_path = Path::new(&self.path)
.join(file)
.to_string_lossy()
Expand Down Expand Up @@ -292,7 +372,7 @@ fn blocking_list_files_with_pattern(
max_files: usize,
) -> Result<Vec<StageFileInfo>> {
if path == STDIN_FD {
return Ok(vec![stdin_stage_info()?]);
return Ok(vec![stdin_stage_info()]);
}
let operator = operator.blocking();
let mut files = Vec::new();
Expand Down Expand Up @@ -330,14 +410,14 @@ fn blocking_list_files_with_pattern(

pub const STDIN_FD: &str = "/dev/fd/0";

fn stdin_stage_info() -> Result<StageFileInfo> {
Ok(StageFileInfo {
fn stdin_stage_info() -> StageFileInfo {
StageFileInfo {
path: STDIN_FD.to_string(),
size: u64::MAX,
md5: None,
last_modified: Utc::now(),
etag: None,
status: StageFileStatus::NeedCopy,
creator: None,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use std::sync::Arc;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_sql::plans::RemoveStagePlan;
use databend_common_storage::StageFileInfo;
use databend_common_storage::StageFilesInfo;
use databend_common_storages_fuse::io::Files;
use databend_common_storages_stage::StageTable;
use futures_util::StreamExt;
use log::debug;
use log::error;

Expand Down Expand Up @@ -68,19 +70,19 @@ impl Interpreter for RemoveUserStageInterpreter {
files: None,
pattern,
};
let files: Vec<String> = files_info
.list(&op, thread_num, None)
.await?
.into_iter()
.map(|file_with_meta| file_with_meta.path)
.collect::<Vec<_>>();
let files = files_info.list_stream(&op, thread_num, None).await?;

let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let file_op = Files::create(table_ctx, op);

const REMOVE_BATCH: usize = 4000;
for chunk in files.chunks(REMOVE_BATCH) {
if let Err(e) = file_op.remove_file_in_batch(chunk).await {
const REMOVE_BATCH: usize = 1000;
let mut chunks = files.chunks(REMOVE_BATCH);

// s3 can remove at most 1k files in one request
while let Some(chunk) = chunks.next().await {
let chunk: Result<Vec<StageFileInfo>> = chunk.into_iter().collect();
let chunk = chunk?.into_iter().map(|x| x.path).collect::<Vec<_>>();
if let Err(e) = file_op.remove_file_in_batch(&chunk).await {
error!("Failed to delete file: {:?}, error: {}", chunk, e);
}

Expand Down
51 changes: 12 additions & 39 deletions tests/suites/1_stateful/00_stage/00_0004_copy_with_max_files.result
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
--- force = false, purge = false
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
4
remain 3 files
f2.csv 2 0 NULL NULL
6
remain 3 files
6
remain 3 files
copied 2 files with 4 rows, remain 3 files
copied 1 files with 6 rows, remain 3 files
copied 0 files with 6 rows, remain 3 files
--- force = false, purge = true
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
4
remain 1 files
f2.csv 2 0 NULL NULL
6
remain 0 files
6
remain 0 files
copied 2 files with 4 rows, remain 1 files
copied 1 files with 6 rows, remain 0 files
copied 0 files with 6 rows, remain 0 files
--- force = true, purge = false
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
4
remain 3 files
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
8
remain 3 files
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
12
remain 3 files
copied 2 files with 4 rows, remain 3 files
copied 2 files with 8 rows, remain 3 files
copied 2 files with 12 rows, remain 3 files
--- force = true, purge = true
f1.csv 2 0 NULL NULL
f3.csv 2 0 NULL NULL
4
remain 1 files
f2.csv 2 0 NULL NULL
6
remain 0 files
6
remain 0 files
copied 2 files with 4 rows, remain 1 files
copied 1 files with 6 rows, remain 0 files
copied 0 files with 6 rows, remain 0 files
>>>> drop table if exists test_max_files_limit
>>>> create table test_max_files_limit (a int, b int)
>>>> copy into test_max_files_limit from 'fs:///tmp/00_0004_2/' FILE_FORMAT = (type = CSV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ do
for i in {1..3}
do
table="test_max_files_force_${force}_purge_${purge}"
echo "copy into ${table} from 'fs:///tmp/00_0004/' FILE_FORMAT = (type = CSV) max_files=2 force=${force} purge=${purge}" | $BENDSQL_CLIENT_CONNECT
echo "select count(*) from ${table}" | $BENDSQL_CLIENT_CONNECT
copied=$(echo "copy into ${table} from 'fs:///tmp/00_0004/' FILE_FORMAT = (type = CSV) max_files=2 force=${force} purge=${purge}" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g')
copied_rows=$(echo "select count(*) from ${table}" | $BENDSQL_CLIENT_CONNECT)
remain=$(ls -1 /tmp/00_0004/ | wc -l | sed 's/ //g')
echo "remain ${remain} files"
echo "copied ${copied} files with ${copied_rows} rows, remain ${remain} files"
done
done
done
Expand Down

0 comments on commit aa7960a

Please sign in to comment.