Skip to content

Commit

Permalink
Tests for async streaming (seems to work!) (#301)
Browse files Browse the repository at this point in the history
- Add basic tests for async streaming of the request body.
- When async streaming, don't let more than 1 part at a time be in the "prepare" stage.
    -  since `aws_async_input_stream` doesn't allow overlapping read() calls.
- Stop wrapping synchronous streams in an async stream.
    - Now that we treat async streaming slightly different than synchronous, it's simpler to just have them be separate things.
  • Loading branch information
graebm authored Jun 2, 2023
1 parent 389d1aa commit f14d92f
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 194 deletions.
3 changes: 2 additions & 1 deletion include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ struct aws_s3_meta_request {
/* Initial HTTP Message that this meta request is based on. */
struct aws_http_message *initial_request_message;

/* Async stream for meta request's body */
/* Async stream for meta request's body.
* NULL if using initial_request_message's synchronous body stream instead. */
struct aws_async_input_stream *request_body_async_stream;

/* Part size to use for uploads and downloads. Passed down by the creating client. */
Expand Down
15 changes: 7 additions & 8 deletions include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
size_t excluded_headers_size,
bool exclude_x_amz_meta);

/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
AWS_S3_API
struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor filepath);

/* Copy headers from one message to the other and exclude specific headers.
* exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
AWS_S3_API
Expand All @@ -55,14 +62,6 @@ struct aws_input_stream *aws_s3_message_util_assign_body(
const struct checksum_config *checksum_config,
struct aws_byte_buf *out_checksum);

/* Given all possible ways to send a request body, always return an async-stream.
* Returns NULL on failure */
struct aws_async_input_stream *aws_s3_message_util_acquire_async_body_stream(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor send_filepath,
struct aws_async_input_stream *send_async_stream);

/* Return true if checksum headers has been set. */
AWS_S3_API
bool aws_s3_message_util_check_checksum_header(struct aws_http_message *message);
Expand Down
18 changes: 12 additions & 6 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,17 @@ static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request
static bool s_should_skip_scheduling_more_parts_based_on_flags(
const struct aws_s3_auto_ranged_put *auto_ranged_put,
uint32_t flags) {
if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
uint32_t num_parts_in_flight =
(auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed);

uint32_t num_parts_in_flight =
(auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed);

/* If the stream is actually async, only allow 1 part in flight at a time.
* We need to wait for async read() to complete before calling it again */
if (auto_ranged_put->base.request_body_async_stream != NULL) {
return num_parts_in_flight > 0;
}

if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
/* Because uploads must read from their streams serially, we try to limit the amount of in flight
* requests for a given multipart upload if we can. */
return num_parts_in_flight > 0;
Expand Down Expand Up @@ -530,8 +537,7 @@ static bool s_s3_auto_ranged_put_update(
struct aws_string *etag = NULL;

if (!aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index) && etag) {
/* part already downloaded, skip it here and prepare will take care of adjusting the buffer
*/
/* part already downloaded, skip it here and prepare will take care of adjusting the buffer */
++auto_ranged_put->threaded_update_data.next_part_number;

} else {
Expand Down Expand Up @@ -1668,7 +1674,7 @@ static void s_s3_auto_ranged_put_request_finished(
}
if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = meta_request->part_size,
.bytes_transferred = request->request_body.len,
.content_length = auto_ranged_put->content_length,
};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
Expand Down
61 changes: 50 additions & 11 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,24 @@ int aws_s3_meta_request_init_base(
meta_request->cached_signing_config = aws_cached_signing_config_new(allocator, options->signing_config);
}

/* Keep a reference to the original message structure passed in. */
meta_request->initial_request_message = aws_http_message_acquire(options->message);

/* There are several ways for the user to pass in the request's body.
* If something besides an aws_async_input_stream was passed in, create an
* async wrapper around it */
meta_request->request_body_async_stream = aws_s3_message_util_acquire_async_body_stream(
allocator, meta_request->initial_request_message, options->send_filepath, options->send_async_stream);
if (meta_request->request_body_async_stream == NULL) {
goto error;
/* Set initial_meta_request, based on how the request's body is being passed in
* (we checked earlier that it's not being passed multiple ways) */
if (options->send_filepath.len > 0) {
/* Create copy of original message, but with body-stream that reads directly from file */
meta_request->initial_request_message = aws_s3_message_util_copy_http_message_filepath_body_all_headers(
allocator, options->message, options->send_filepath);
if (meta_request->initial_request_message == NULL) {
goto error;
}

} else if (options->send_async_stream != NULL) {
/* Read from async body-stream, but keep original message around for headers, method, etc */
meta_request->request_body_async_stream = aws_async_input_stream_acquire(options->send_async_stream);
meta_request->initial_request_message = aws_http_message_acquire(options->message);

} else {
/* Keep original message around, we'll read from its synchronous body-stream */
meta_request->initial_request_message = aws_http_message_acquire(options->message);
}

/* Client is currently optional to allow spinning up a meta_request without a client in a test. */
Expand Down Expand Up @@ -1588,7 +1596,38 @@ struct aws_future_bool *aws_s3_meta_request_read_body(
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(buffer);

return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer);
/* If async-stream, simply call read_to_fill() */
if (meta_request->request_body_async_stream != NULL) {
return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer);
}

/* Else synchronous aws_input_stream */
struct aws_input_stream *synchronous_stream =
aws_http_message_get_body_stream(meta_request->initial_request_message);
AWS_FATAL_ASSERT(synchronous_stream);

struct aws_future_bool *synchronous_read_future = aws_future_bool_new(meta_request->allocator);

/* Keep calling read() until we fill the buffer, or hit EOF */
struct aws_stream_status status = {.is_end_of_stream = false, .is_valid = true};
while ((buffer->len < buffer->capacity) && !status.is_end_of_stream) {
/* Read from stream */
if (aws_input_stream_read(synchronous_stream, buffer) != AWS_OP_SUCCESS) {
aws_future_bool_set_error(synchronous_read_future, aws_last_error());
goto synchronous_read_done;
}

/* Check if stream is done */
if (aws_input_stream_get_status(synchronous_stream, &status) != AWS_OP_SUCCESS) {
aws_future_bool_set_error(synchronous_read_future, aws_last_error());
goto synchronous_read_done;
}
}

aws_future_bool_set_result(synchronous_read_future, status.is_end_of_stream);

synchronous_read_done:
return synchronous_read_future;
}

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request) {
Expand Down
86 changes: 43 additions & 43 deletions source/s3_request_messages.c
Original file line number Diff line number Diff line change
Expand Up @@ -838,49 +838,6 @@ struct aws_input_stream *aws_s3_message_util_assign_body(
return NULL;
}

/* Given all possible ways to send a request body, always return an async-stream */
struct aws_async_input_stream *aws_s3_message_util_acquire_async_body_stream(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor send_filepath,
struct aws_async_input_stream *send_async_stream) {

AWS_PRECONDITION(message);

/* If user provides async-stream, use it */
if (send_async_stream != NULL) {
return aws_async_input_stream_acquire(send_async_stream);
}

/* If user provides filepath, create aws_input_stream to read it, and wrap that in an async-stream */
if (send_filepath.len > 0) {
struct aws_string *filepath_str = aws_string_new_from_cursor(allocator, &send_filepath);
struct aws_input_stream *body_stream =
aws_input_stream_new_from_file(allocator, aws_string_c_str(filepath_str));
aws_string_destroy(filepath_str);
if (body_stream == NULL) {
return NULL;
}
send_async_stream = aws_async_input_stream_new_from_synchronous(allocator, body_stream);
aws_input_stream_release(body_stream);
return send_async_stream;
}

/* If user provides HTTP message with aws_input_stream body, wrap that in an async stream */
struct aws_input_stream *request_body = aws_http_message_get_body_stream(message);
if (request_body) {
return aws_async_input_stream_new_from_synchronous(allocator, request_body);
}

/* Otherwise, no body provided, just create empty async-stream */
struct aws_byte_cursor empty_cursor = {0};
struct aws_input_stream *empty_stream = aws_input_stream_new_from_cursor(allocator, &empty_cursor);
AWS_ASSERT(empty_stream);
send_async_stream = aws_async_input_stream_new_from_synchronous(allocator, empty_stream);
aws_input_stream_release(empty_stream);
return send_async_stream;
}

bool aws_s3_message_util_check_checksum_header(struct aws_http_message *message) {
struct aws_http_headers *headers = aws_http_message_get_headers(message);
for (int algorithm = AWS_SCA_INIT; algorithm <= AWS_SCA_END; algorithm++) {
Expand Down Expand Up @@ -987,6 +944,49 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
return NULL;
}

/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
struct aws_allocator *allocator,
struct aws_http_message *base_message,
struct aws_byte_cursor filepath) {

bool success = false;
struct aws_string *filepath_str = NULL;
struct aws_input_stream *body_stream = NULL;
struct aws_http_message *message = NULL;

/* Copy message and retain all headers */
message = aws_s3_message_util_copy_http_message_no_body_filter_headers(
allocator,
base_message,
NULL /*excluded_header_array*/,
0 /*excluded_header_array_size*/,
false /*exclude_x_amz_meta*/);
if (!message) {
goto clean_up;
}

/* Create body-stream that reads from file */
filepath_str = aws_string_new_from_cursor(allocator, &filepath);
body_stream = aws_input_stream_new_from_file(allocator, aws_string_c_str(filepath_str));
if (!body_stream) {
goto clean_up;
}
aws_http_message_set_body_stream(message, body_stream);

success = true;

clean_up:
aws_string_destroy(filepath_str);
aws_input_stream_release(body_stream);
if (success) {
return message;
} else {
aws_http_message_release(message);
return NULL;
}
}

void aws_s3_message_util_copy_headers(
struct aws_http_message *source_message,
struct aws_http_message *dest_message,
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ add_net_test_case(test_s3_put_object_double_slashes)
add_net_test_case(test_s3_put_object_no_content_length)
add_net_test_case(test_s3_put_object_single_part_no_content_length)
add_net_test_case(test_s3_put_object_zero_size_no_content_length)
add_net_test_case(test_s3_put_object_async)
add_net_test_case(test_s3_put_object_async_no_content_length)
add_net_test_case(test_s3_put_object_async_fail_reading)

if(ENABLE_MRAP_TESTS)
add_net_test_case(test_s3_get_object_less_than_part_size_mrap)
Expand Down
70 changes: 0 additions & 70 deletions tests/s3_bad_input_stream.c

This file was deleted.

Loading

0 comments on commit f14d92f

Please sign in to comment.