diff --git a/src/sinks/aws_s3/file_consolidation_processor.rs b/src/sinks/aws_s3/file_consolidation_processor.rs index 9b998768f9784..f67807807882d 100644 --- a/src/sinks/aws_s3/file_consolidation_processor.rs +++ b/src/sinks/aws_s3/file_consolidation_processor.rs @@ -9,7 +9,7 @@ use std::io; use std::io::Cursor; use aws_sdk_s3::{ - model::{CompletedMultipartUpload, CompletedPart}, + model::{CompletedMultipartUpload, CompletedPart, RequestPayer}, types::ByteStream, Client as S3Client, Error, }; @@ -262,6 +262,7 @@ impl<'a> FileConsolidationProcessor<'a> { let source = format!("{}/{}", self.bucket.clone(), file.key.clone()); let encoded_source = urlencoding::encode(&source); + let range = format!("0-{}", file.size); let copy = match self .s3_client @@ -270,7 +271,9 @@ impl<'a> FileConsolidationProcessor<'a> { .key(new_file_key.clone()) .upload_id(upload_id.clone()) .copy_source(encoded_source) + .copy_source_range(range) .part_number(part_num) + .request_payer(RequestPayer::Requester) .send() .await { @@ -453,6 +456,7 @@ impl<'a> FileConsolidationProcessor<'a> { .bucket(self.bucket.clone()) .key(new_file_key.clone()) .upload_id(upload_id.clone()) + .request_payer(RequestPayer::Requester) .multipart_upload( CompletedMultipartUpload::builder() .set_parts(Some(completed_parts)) @@ -480,6 +484,7 @@ impl<'a> FileConsolidationProcessor<'a> { .bucket(self.bucket.clone()) .key(new_file_key.clone()) .upload_id(upload_id.clone()) + .request_payer(RequestPayer::Requester) .send() .await { @@ -581,83 +586,101 @@ pub async fn get_files_to_consolidate( bucket: String, key_prefix: String, ) -> Result, Error> { - let list_result = client - .list_objects_v2() - .bucket(bucket.clone()) - .prefix(key_prefix.clone()) - .send() - .await?; - - if list_result.contents.is_none() { - info!( - "bucket={}, prefix={}, No files found", - bucket.clone(), - key_prefix.clone(), - ); - let v: Vec = Vec::new(); - return Ok(v); - } - - let mut sorted_objects = list_result.contents().unwrap().to_vec(); - sorted_objects.sort_by_key(|x| x.last_modified().unwrap().secs()); - let mut files_to_consolidate: Vec = Vec::new(); - for key_object in sorted_objects { - let key = key_object.key().unwrap(); + let mut continuation_token: Option = None; - let tag_result = client - .get_object_tagging() + loop { + let list_result = client + .list_objects_v2() .bucket(bucket.clone()) - .key(key) + .prefix(key_prefix.clone()) + .set_continuation_token(continuation_token) .send() .await?; - // this file is the result of a previous merge - let mut mezmo_merged_file = false; - // this file wasn't produced by the mezmo s3 process - let mut mezmo_produced_file = false; - // not breaking down standard json files as we don't want to load download - // the whole file into memory. We're trying to straight memory copy here. - let mut can_combine = false; - - let tags = tag_result.tag_set().unwrap_or_default(); - for tag in tags.iter() { - match tag.key().unwrap_or_default() { - "mezmo_pipeline_merged" => mezmo_merged_file = true, - "mezmo_pipeline_s3_sink" => mezmo_produced_file = true, - "mezmo_pipeline_s3_type" => match tag.value().unwrap() { - "ndjson" => can_combine = true, - "text" => can_combine = true, - "json" => can_combine = false, - _ => can_combine = false, - }, - _ => info!(message = "unrecognized tag:".to_owned() + tag.key().unwrap()), - } + if list_result.contents.is_none() { + info!( + "bucket={}, prefix={}, No files found", + bucket.clone(), + key_prefix.clone(), + ); + break; } - // scroll through the tags and determine if we can even combine the file - if mezmo_merged_file || !mezmo_produced_file || !can_combine { - continue; + //determine if there is more records to be retrieved in another request + //the default is 1000 records which we'll stick with until we need to do + //some tuning + if list_result.is_truncated() { + continuation_token = Some(list_result.next_continuation_token().unwrap().to_string()); + } else { + continuation_token = None; } - // figure out the object size and keys - match client - .head_object() - .bucket(bucket.clone()) - .key(key) - .send() - .await - { - Ok(head) => { - let compressed = head.content_encoding().unwrap_or_default() == "gzip"; - let size = head.content_length(); - let key = key.to_string(); + let mut sorted_objects = list_result.contents().unwrap().to_vec(); + sorted_objects.sort_by_key(|x| x.last_modified().unwrap().secs()); + + for key_object in sorted_objects { + let key = key_object.key().unwrap(); + + let tag_result = client + .get_object_tagging() + .bucket(bucket.clone()) + .key(key) + .send() + .await?; + + // this file is the result of a previous merge + let mut mezmo_merged_file = false; + // this file wasn't produced by the mezmo s3 process + let mut mezmo_produced_file = false; + // not breaking down standard json files as we don't want to load download + // the whole file into memory. We're trying to straight memory copy here. + let mut can_combine = false; + + let tags = tag_result.tag_set().unwrap_or_default(); + for tag in tags.iter() { + match tag.key().unwrap_or_default() { + "mezmo_pipeline_merged" => mezmo_merged_file = true, + "mezmo_pipeline_s3_sink" => mezmo_produced_file = true, + "mezmo_pipeline_s3_type" => match tag.value().unwrap() { + "ndjson" => can_combine = true, + "text" => can_combine = true, + "json" => can_combine = false, + _ => can_combine = false, + }, + _ => (), + } + } - files_to_consolidate.push(ConsolidationFile::new(compressed, size, key)); + // scroll through the tags and determine if we can even combine the file + if mezmo_merged_file || !mezmo_produced_file || !can_combine { + continue; } - Err(e) => error!(?e, "bucket={}, Failed to head file={}", bucket.clone(), key), - }; - } // end retrieving objects and sorting + + // figure out the object size and keys + match client + .head_object() + .bucket(bucket.clone()) + .key(key) + .send() + .await + { + Ok(head) => { + let compressed = head.content_encoding().unwrap_or_default() == "gzip"; + let size = head.content_length(); + let key = key.to_string(); + + files_to_consolidate.push(ConsolidationFile::new(compressed, size, key)); + } + Err(e) => error!(?e, "bucket={}, Failed to head file={}", bucket.clone(), key), + }; + } // end retrieving objects and sorting + + // complete processing if there is no token to continue with + if continuation_token.is_none() { + break; + } + } Ok(files_to_consolidate) } diff --git a/src/sinks/aws_s3/integration_tests_mezmo.rs b/src/sinks/aws_s3/integration_tests_mezmo.rs index 2519b9211f2d6..fddfa86a02bd1 100644 --- a/src/sinks/aws_s3/integration_tests_mezmo.rs +++ b/src/sinks/aws_s3/integration_tests_mezmo.rs @@ -725,6 +725,47 @@ async fn s3_file_consolidation_lots_of_10mb_files() { } } +#[tokio::test] +async fn s3_file_consolidation_large_amount_of_files() { + let _cx = SinkContext::new_test(); + + let s3_client = client().await; + let bucket = uuid::Uuid::new_v4().to_string(); + let key_prefix = "large-amount-of-files/".to_string(); + let content_type = "text/x-log".to_string(); + + create_bucket(&bucket, false).await; + + // default is 1000 records, so make sure we go over to test the continuation + // NOTE: this is an expensive test, takes ~45 seconds locally :/ + for n in 1..1006 { + let mezmo_pipeline_s3_type_ndjson_tags = + generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string()); + + let filename = format!("{}.log", n); + let data = Bytes::from(format!("This is the content of {}.log", n)); + + put_file( + filename, + data, + key_prefix.clone(), + bucket.clone(), + content_type.clone(), + None, + mezmo_pipeline_s3_type_ndjson_tags, + ) + .await; + } + + // only s3 created files with ndjson and text will be merged + match get_files_to_consolidate(&s3_client, bucket.clone(), key_prefix.clone()).await { + Ok(files) => { + assert_eq!(files.len(), 1005); + } + Err(err) => panic!("Retrieving files should not error: {}", err), + }; +} + async fn put_file( file_name: String, content: Bytes,