Skip to content

Commit

Permalink
Merge pull request vectordotdev#389 from answerbook/dominic/LOG-18535
Browse files Browse the repository at this point in the history
fix(s3 consolidation): large file copy source
  • Loading branch information
dominic-mcallister-logdna authored Jan 9, 2024
2 parents 5beee81 + 8df07c0 commit dec3be9
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 66 deletions.
155 changes: 89 additions & 66 deletions src/sinks/aws_s3/file_consolidation_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::io;
use std::io::Cursor;

use aws_sdk_s3::{
model::{CompletedMultipartUpload, CompletedPart},
model::{CompletedMultipartUpload, CompletedPart, RequestPayer},
types::ByteStream,
Client as S3Client, Error,
};
Expand Down Expand Up @@ -262,6 +262,7 @@ impl<'a> FileConsolidationProcessor<'a> {

let source = format!("{}/{}", self.bucket.clone(), file.key.clone());
let encoded_source = urlencoding::encode(&source);
let range = format!("0-{}", file.size);

let copy = match self
.s3_client
Expand All @@ -270,7 +271,9 @@ impl<'a> FileConsolidationProcessor<'a> {
.key(new_file_key.clone())
.upload_id(upload_id.clone())
.copy_source(encoded_source)
.copy_source_range(range)
.part_number(part_num)
.request_payer(RequestPayer::Requester)
.send()
.await
{
Expand Down Expand Up @@ -453,6 +456,7 @@ impl<'a> FileConsolidationProcessor<'a> {
.bucket(self.bucket.clone())
.key(new_file_key.clone())
.upload_id(upload_id.clone())
.request_payer(RequestPayer::Requester)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
Expand Down Expand Up @@ -480,6 +484,7 @@ impl<'a> FileConsolidationProcessor<'a> {
.bucket(self.bucket.clone())
.key(new_file_key.clone())
.upload_id(upload_id.clone())
.request_payer(RequestPayer::Requester)
.send()
.await
{
Expand Down Expand Up @@ -581,83 +586,101 @@ pub async fn get_files_to_consolidate(
bucket: String,
key_prefix: String,
) -> Result<Vec<ConsolidationFile>, Error> {
let list_result = client
.list_objects_v2()
.bucket(bucket.clone())
.prefix(key_prefix.clone())
.send()
.await?;

if list_result.contents.is_none() {
info!(
"bucket={}, prefix={}, No files found",
bucket.clone(),
key_prefix.clone(),
);
let v: Vec<ConsolidationFile> = Vec::new();
return Ok(v);
}

let mut sorted_objects = list_result.contents().unwrap().to_vec();
sorted_objects.sort_by_key(|x| x.last_modified().unwrap().secs());

let mut files_to_consolidate: Vec<ConsolidationFile> = Vec::new();
for key_object in sorted_objects {
let key = key_object.key().unwrap();
let mut continuation_token: Option<String> = None;

let tag_result = client
.get_object_tagging()
loop {
let list_result = client
.list_objects_v2()
.bucket(bucket.clone())
.key(key)
.prefix(key_prefix.clone())
.set_continuation_token(continuation_token)
.send()
.await?;

// this file is the result of a previous merge
let mut mezmo_merged_file = false;
// this file wasn't produced by the mezmo s3 process
let mut mezmo_produced_file = false;
// not breaking down standard json files as we don't want to load download
// the whole file into memory. We're trying to straight memory copy here.
let mut can_combine = false;

let tags = tag_result.tag_set().unwrap_or_default();
for tag in tags.iter() {
match tag.key().unwrap_or_default() {
"mezmo_pipeline_merged" => mezmo_merged_file = true,
"mezmo_pipeline_s3_sink" => mezmo_produced_file = true,
"mezmo_pipeline_s3_type" => match tag.value().unwrap() {
"ndjson" => can_combine = true,
"text" => can_combine = true,
"json" => can_combine = false,
_ => can_combine = false,
},
_ => info!(message = "unrecognized tag:".to_owned() + tag.key().unwrap()),
}
if list_result.contents.is_none() {
info!(
"bucket={}, prefix={}, No files found",
bucket.clone(),
key_prefix.clone(),
);
break;
}

// scroll through the tags and determine if we can even combine the file
if mezmo_merged_file || !mezmo_produced_file || !can_combine {
continue;
//determine if there is more records to be retrieved in another request
//the default is 1000 records which we'll stick with until we need to do
//some tuning
if list_result.is_truncated() {
continuation_token = Some(list_result.next_continuation_token().unwrap().to_string());
} else {
continuation_token = None;
}

// figure out the object size and keys
match client
.head_object()
.bucket(bucket.clone())
.key(key)
.send()
.await
{
Ok(head) => {
let compressed = head.content_encoding().unwrap_or_default() == "gzip";
let size = head.content_length();
let key = key.to_string();
let mut sorted_objects = list_result.contents().unwrap().to_vec();
sorted_objects.sort_by_key(|x| x.last_modified().unwrap().secs());

for key_object in sorted_objects {
let key = key_object.key().unwrap();

let tag_result = client
.get_object_tagging()
.bucket(bucket.clone())
.key(key)
.send()
.await?;

// this file is the result of a previous merge
let mut mezmo_merged_file = false;
// this file wasn't produced by the mezmo s3 process
let mut mezmo_produced_file = false;
// not breaking down standard json files as we don't want to load download
// the whole file into memory. We're trying to straight memory copy here.
let mut can_combine = false;

let tags = tag_result.tag_set().unwrap_or_default();
for tag in tags.iter() {
match tag.key().unwrap_or_default() {
"mezmo_pipeline_merged" => mezmo_merged_file = true,
"mezmo_pipeline_s3_sink" => mezmo_produced_file = true,
"mezmo_pipeline_s3_type" => match tag.value().unwrap() {
"ndjson" => can_combine = true,
"text" => can_combine = true,
"json" => can_combine = false,
_ => can_combine = false,
},
_ => (),
}
}

files_to_consolidate.push(ConsolidationFile::new(compressed, size, key));
// scroll through the tags and determine if we can even combine the file
if mezmo_merged_file || !mezmo_produced_file || !can_combine {
continue;
}
Err(e) => error!(?e, "bucket={}, Failed to head file={}", bucket.clone(), key),
};
} // end retrieving objects and sorting

// figure out the object size and keys
match client
.head_object()
.bucket(bucket.clone())
.key(key)
.send()
.await
{
Ok(head) => {
let compressed = head.content_encoding().unwrap_or_default() == "gzip";
let size = head.content_length();
let key = key.to_string();

files_to_consolidate.push(ConsolidationFile::new(compressed, size, key));
}
Err(e) => error!(?e, "bucket={}, Failed to head file={}", bucket.clone(), key),
};
} // end retrieving objects and sorting

// complete processing if there is no token to continue with
if continuation_token.is_none() {
break;
}
}

Ok(files_to_consolidate)
}
Expand Down
41 changes: 41 additions & 0 deletions src/sinks/aws_s3/integration_tests_mezmo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,47 @@ async fn s3_file_consolidation_lots_of_10mb_files() {
}
}

#[tokio::test]
async fn s3_file_consolidation_large_amount_of_files() {
let _cx = SinkContext::new_test();

let s3_client = client().await;
let bucket = uuid::Uuid::new_v4().to_string();
let key_prefix = "large-amount-of-files/".to_string();
let content_type = "text/x-log".to_string();

create_bucket(&bucket, false).await;

// default is 1000 records, so make sure we go over to test the continuation
// NOTE: this is an expensive test, takes ~45 seconds locally :/
for n in 1..1006 {
let mezmo_pipeline_s3_type_ndjson_tags =
generate_tags("mezmo_pipeline_s3_type".to_string(), "text".to_string());

let filename = format!("{}.log", n);
let data = Bytes::from(format!("This is the content of {}.log", n));

put_file(
filename,
data,
key_prefix.clone(),
bucket.clone(),
content_type.clone(),
None,
mezmo_pipeline_s3_type_ndjson_tags,
)
.await;
}

// only s3 created files with ndjson and text will be merged
match get_files_to_consolidate(&s3_client, bucket.clone(), key_prefix.clone()).await {
Ok(files) => {
assert_eq!(files.len(), 1005);
}
Err(err) => panic!("Retrieving files should not error: {}", err),
};
}

async fn put_file(
file_name: String,
content: Bytes,
Expand Down

0 comments on commit dec3be9

Please sign in to comment.