Skip to content

Commit

Permalink
Merge branch 'main' into add-request-type
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored May 23, 2023
2 parents 58fd4db + 4e9c731 commit a2a0159
Show file tree
Hide file tree
Showing 26 changed files with 583 additions and 215 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/stale_issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
# Issue timing
days-before-stale: 2
days-before-close: 5
days-before-ancient: 365
days-before-ancient: 36500

# If you don't want to mark a issue as being ancient based on a
# threshold of "upvotes", you can set this here. An "upvote" is
Expand Down
32 changes: 23 additions & 9 deletions include/aws/s3/private/s3_auto_ranged_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct aws_s3_auto_ranged_put {
struct aws_s3_meta_request_resume_token *resume_token;

uint64_t content_length;
bool has_content_length;

/* Only meant for use in the update function, which is never called concurrently. */
struct {
Expand All @@ -53,30 +54,39 @@ struct aws_s3_auto_ranged_put {
uint32_t num_parts_read_from_stream;
} prepare_data;

/*
* Very similar to the etag_list used in complete_multipart_upload to create the XML payload. Each part will set the
* corresponding index to it's checksum result, so while the list is shared across threads each index will only be
* accessed once to initialize by the corresponding part number, and then again during the complete multipart upload
* request which will only be invoked after all other parts/threads have completed.
*/
struct aws_byte_buf *encoded_checksum_list;

/* Members to only be used when the mutex in the base type is locked. */
struct {
/* Array list of `struct aws_string *`. */
struct aws_array_list etag_list;

/* Very similar to the etag_list used in complete_multipart_upload to create the XML payload. Each part will set
* the corresponding index to its checksum result. */
struct aws_array_list encoded_checksum_list;

struct aws_s3_paginated_operation *list_parts_operation;
struct aws_string *list_parts_continuation_token;

/* Note: total num parts is known only if content-length is known,
otherwise it is running total of number of parts read from stream. */
uint32_t total_num_parts;
/* Number of parts we've started work on */
uint32_t num_parts_sent;
/* Number of "sent" parts we've finished reading the body for
* (does not include skipped parts in the case of pause/resume) */
uint32_t num_parts_read;
uint32_t num_parts_completed;
uint32_t num_parts_successful;
uint32_t num_parts_failed;
/* When content length is not known, requests are optimistically
* scheduled, below represents how many requests were scheduled and had no
* work to do*/
uint32_t num_parts_noop;

struct aws_http_headers *needed_response_headers;

/* Whether body stream is exhausted. */
bool is_body_stream_at_end;

int list_parts_error_code;
int create_multipart_upload_error_code;
int complete_multipart_upload_error_code;
Expand All @@ -102,12 +112,16 @@ struct aws_s3_auto_ranged_put {

AWS_EXTERN_C_BEGIN

/* Creates a new auto-ranged put meta request. This will do a multipart upload in parallel when appropriate. */
/* Creates a new auto-ranged put meta request.
* This will do a multipart upload in parallel when appropriate.
* Note: if has_content_length is false, content_length and num_parts are ignored.
*/

AWS_S3_API struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new(
struct aws_allocator *allocator,
struct aws_s3_client *client,
size_t part_size,
bool has_content_length,
uint64_t content_length,
uint32_t num_parts,
const struct aws_s3_meta_request_options *options);
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_copy_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum aws_s3_copy_object_request_tag {
struct aws_s3_copy_object {
struct aws_s3_meta_request base;

/* Useable after the Create Multipart Upload request succeeds. */
/* Usable after the Create Multipart Upload request succeeds. */
struct aws_string *upload_id;

/* Only meant for use in the update function, which is never called concurrently. */
Expand Down
13 changes: 7 additions & 6 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct aws_s3_client;
struct aws_s3_connection;
struct aws_s3_meta_request;
struct aws_s3_request;
struct aws_s3_request_options;
struct aws_http_headers;
struct aws_http_make_request_options;
struct aws_retry_strategy;
Expand Down Expand Up @@ -121,7 +120,7 @@ struct aws_s3_meta_request {

struct aws_s3_endpoint *endpoint;

/* Event loop to schedule IO work related on, ie, reading from streams, streaming parts back to the caller, etc..
/* Event loop to schedule IO work related on, ie, reading from streams, streaming parts back to the caller, etc...
* After the meta request is finished, this will be reset along with the client reference.*/
struct aws_event_loop *io_event_loop;

Expand Down Expand Up @@ -157,8 +156,8 @@ struct aws_s3_meta_request {
/* The sum of initial_read_window, plus all window_increment() calls. This number never goes down. */
uint64_t read_window_running_total;

/* The next expected streaming part number needed to continue streaming part bodies. (For example, this will
* initially be 1 for part 1, and after that part is received, it will be 2, then 3, etc.. */
/* The next expected streaming part number needed to continue streaming part bodies. (For example, this will
* initially be 1 for part 1, and after that part is received, it will be 2, then 3, etc.. )*/
uint32_t next_streaming_part;

/* Number of parts scheduled for delivery. */
Expand Down Expand Up @@ -202,7 +201,7 @@ struct aws_s3_meta_request {
/* checksum found in either a default get request, or in the initial head request of a multipart get */
struct aws_byte_buf meta_request_level_response_header_checksum;

/* running checksum of all of the parts of a default get, or ranged get meta request*/
/* running checksum of all the parts of a default get, or ranged get meta request*/
struct aws_s3_checksum *meta_request_level_running_response_sum;
};

Expand Down Expand Up @@ -292,7 +291,7 @@ void aws_s3_meta_request_finished_request(

/* Called to place the request in the meta request's priority queue for streaming back to the caller. Once all requests
* with a part number less than the given request has been received, the given request and the previous requests will
* scheduled for streaming. */
* be scheduled for streaming. */
AWS_S3_API
void aws_s3_meta_request_stream_response_body_synced(
struct aws_s3_meta_request *meta_request,
Expand All @@ -303,6 +302,8 @@ void aws_s3_meta_request_stream_response_body_synced(
AWS_S3_API
int aws_s3_meta_request_read_body(struct aws_s3_meta_request *meta_request, struct aws_byte_buf *buffer);

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request);

/* Set the meta request finish result as failed. This is meant to be called sometime before aws_s3_meta_request_finish.
* Subsequent calls to this function or to aws_s3_meta_request_set_success_synced will not overwrite the end result of
* the meta request. */
Expand Down
12 changes: 11 additions & 1 deletion include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ struct aws_s3_request {
bool checksum_match;

/* Tag that defines what the built request will actually consist of. This is meant to be space for an enum defined
* by the derived type. Request tags do not necessarily map 1:1 with actual S3 API requests. For example, they can
* by the derived type. Request tags do not necessarily map 1:1 with actual S3 API requests. (For example, they can
* be more contextual, like "first part" instead of just "part".) */

/* TODO: this should be a union type to make it clear that this could be one of two enums for puts, and gets. */
Expand Down Expand Up @@ -192,6 +192,16 @@ struct aws_s3_request {
/* When true, this request is intended to find out the object size. This is currently only used by auto_range_get.
*/
uint32_t discovers_object_size : 1;

/* When true, this request does not represent a useful http request and
* must not be sent, however client must still call corresponding finished
* callback for the request. Those requests can occur when request is
* optimistically created during update, but cannot be prepared. ex. when
* put has no content length, requests will be scheduled as regular to
* ensure fair distribution against other requests, but can also result in
* requests for uploading data after the end of the stream (those requests
* will use below flag to indicate that they should not be sent). */
uint32_t is_noop : 1;
};

AWS_EXTERN_C_BEGIN
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct aws_http_message *aws_s3_complete_multipart_message_new(
struct aws_byte_buf *body_buffer,
const struct aws_string *upload_id,
const struct aws_array_list *etags,
struct aws_byte_buf *checksums,
const struct aws_array_list *checksums,
enum aws_s3_checksum_algorithm algorithm);

AWS_S3_API
Expand Down
1 change: 0 additions & 1 deletion include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct aws_allocator;
struct aws_http_stream;
struct aws_http_headers;
struct aws_http_message;
struct aws_event_loop;

enum aws_s3_response_status {
AWS_S3_RESPONSE_STATUS_SUCCESS = 200,
Expand Down
15 changes: 11 additions & 4 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ enum aws_s3_meta_request_type {
/**
* The PutObject request will be split into MultiPart uploads that are executed in parallel
* to improve throughput, when possible.
* Note: put object supports both known and unknown body length. The client
* relies on Content-Length header to determine length of the body.
* Request with unknown content length are always sent using multipart
* upload regardless of final number of parts and do have the following limitations:
* - multipart threshold is ignored and all request are made through mpu,
* even if they only need one part
* - pause/resume is not supported
* - meta request will throw error if checksum header is provider (due to
* general limitation of checksum not being usable if meta request is
* getting split)
*/
AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,

Expand Down Expand Up @@ -441,9 +451,6 @@ struct aws_s3_meta_request_options {

/**
* Invoked to report progress of the meta request execution.
* Currently, the progress callback is invoked only for the CopyObject meta request type.
* TODO: support this callback for all the types of meta requests
* See `aws_s3_meta_request_progress_fn`
*/
aws_s3_meta_request_progress_fn *progress_callback;

Expand Down Expand Up @@ -511,7 +518,7 @@ struct aws_s3_meta_request_result {
* uploaded as a multipart object.
*
* If the object to get is multipart object, the part checksum MAY be validated if the part size to get matches the
* part size uploaded. In that case, if any part mismatch the checksum received, the meta request will failed with
* part size uploaded. In that case, if any part mismatch the checksum received, the meta request will fail with
* checksum mismatch. However, even if the parts checksum were validated, this will NOT be set to true, as the
* checksum for the whole meta request was NOT validated.
**/
Expand Down
2 changes: 1 addition & 1 deletion source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RESUME_FAILED, "Resuming request failed"),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_OBJECT_MODIFIED, "The object modifed during download."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR, "Async error received from S3 and not recoverable from retry."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens.")
};
/* clang-format on */

Expand Down
11 changes: 3 additions & 8 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
#include <aws/common/string.h>
#include <inttypes.h>

#ifdef _MSC_VER
/* sscanf warning (not currently scanning for strings) */
# pragma warning(disable : 4996)
#endif

const uint32_t s_conservative_max_requests_in_flight = 8;
const struct aws_byte_cursor g_application_xml_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("application/xml");
const struct aws_byte_cursor g_object_size_value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("ActualObjectSize");
Expand Down Expand Up @@ -157,7 +152,7 @@ static bool s_s3_auto_ranged_get_update(
/* auto-ranged-gets make use of body streaming, which will hold onto response bodies if parts earlier in
* the file haven't arrived yet. This can potentially create a lot of backed up requests, causing us to
* hit our global request limit. To help mitigate this, when the "conservative" flag is passed in, we
* only allow the total amount of requests being sent/streamed to be inside of a set limit. */
* only allow the total amount of requests being sent/streamed to be inside a set limit. */
if (num_requests_in_flight > s_conservative_max_requests_in_flight) {
goto has_work_remaining;
}
Expand Down Expand Up @@ -330,7 +325,7 @@ static bool s_s3_auto_ranged_get_update(
aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request));
if (auto_ranged_get->synced_data.num_parts_checksum_validated ==
auto_ranged_get->synced_data.num_parts_requested) {
/* If we have validated the checksum for every parts, we set the meta request level checksum validation
/* If we have validated the checksum for every part, we set the meta request level checksum validation
* result.*/
meta_request->synced_data.finish_result.did_validate = true;
meta_request->synced_data.finish_result.validation_algorithm = auto_ranged_get->validation_algorithm;
Expand Down Expand Up @@ -491,7 +486,7 @@ static int s_discover_object_range_and_content_length(
}

/* if the inital message had a ranged header, there should also be a Content-Range header that specifies the
* object range and total object size. Otherwise the size and range should be equal to the
* object range and total object size. Otherwise, the size and range should be equal to the
* total_content_length. */
if (!auto_ranged_get->initial_message_has_range_header) {
object_range_end = total_content_length - 1;
Expand Down
Loading

0 comments on commit a2a0159

Please sign in to comment.