Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause resume update #235

Merged
merged 17 commits into from
Dec 2, 2022
Merged
7 changes: 5 additions & 2 deletions include/aws/s3/private/s3_auto_ranged_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ struct aws_s3_auto_ranged_put {
/* Initialized either during creation in resume flow or as result of create multipart upload during normal flow. */
struct aws_string *upload_id;

/* Resume token used to resume the operation */
struct aws_s3_meta_request_resume_token *resume_token;
graebm marked this conversation as resolved.
Show resolved Hide resolved

uint64_t content_length;

/* Only meant for use in the update function, which is never called concurrently. */
struct {
/*
* Next part number to send.
* Note: this follows s3 part number convention and counting starts with 1.
* Throughout codebase 0 based part numbers are usually reffered to as part index.
* Throughout codebase 0 based part numbers are usually referred to as part index.
*/
uint32_t next_part_number;
} threaded_update_data;
Expand All @@ -42,7 +45,7 @@ struct aws_s3_auto_ranged_put {
*/
struct {
/* How many parts have been read from input steam.
* Since reads are always sequential, this is esentially the number of how many parts were read from start of
* Since reads are always sequential, this is essentially the number of how many parts were read from start of
* stream.
*/
uint32_t num_parts_read_from_stream;
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_checksums.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct aws_s3_checksum *aws_checksum_new(struct aws_allocator *allocator, enum a

/**
* Compute an aws_checksum corresponding to the provided enum, passing a function pointer around instead of using a
* conditional would be faster, but would be a negligble improvement compared to the cost of processing data twice
* conditional would be faster, but would be a negligible improvement compared to the cost of processing data twice
* which would be the only time this function would be used, and would be harder to follow.
*/
AWS_S3_API
Expand Down
17 changes: 17 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,23 @@ struct aws_s3_client {
} threaded_data;
};

struct aws_s3_meta_request_resume_token {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;

enum aws_s3_meta_request_type type;

/* Note: since pause currently only supports upload, this structure only has
upload specific fields. Extending it to support other types is left as
exercise for future. */
struct aws_string *multipart_upload_id;
size_t part_size;
size_t total_num_parts;
size_t num_parts_completed;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
};

struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator);

void aws_s3_client_notify_connection_finished(
struct aws_s3_client *client,
struct aws_s3_connection *connection,
Expand Down
6 changes: 3 additions & 3 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct aws_s3_meta_request_vtable {
void (*destroy)(struct aws_s3_meta_request *);

/* Pause the given request */
int (*pause)(struct aws_s3_meta_request *meta_request, struct aws_string **resume_token);
int (*pause)(struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_resume_token **resume_token);
};

/**
Expand Down Expand Up @@ -194,7 +194,7 @@ struct aws_s3_meta_request {
/* deep copy of the checksum config. */
struct checksum_config checksum_config;

/* checksum found in either a default get request, or in the initial head request of a mutlipart get */
/* checksum found in either a default get request, or in the initial head request of a multipart get */
struct aws_byte_buf meta_request_level_response_header_checksum;

/* running checksum of all of the parts of a default get, or ranged get meta request*/
Expand Down Expand Up @@ -323,7 +323,7 @@ bool aws_s3_meta_request_has_finish_result_synced(struct aws_s3_meta_request *me
AWS_S3_API
void aws_s3_meta_request_finish(struct aws_s3_meta_request *meta_request);

/* Default implementation of the meta request finish functino. */
/* Default implementation of the meta request finish function. */
AWS_S3_API
void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request);

Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct aws_s3_request {
/* checksum found in the header of an individual get part http request */
struct aws_byte_buf request_level_response_header_checksum;

/* running checksum of the respone to an individual get part http request */
/* running checksum of the response to an individual get part http request */
struct aws_s3_checksum *request_level_running_response_sum;
/* The algorithm used to validate the checksum */
enum aws_s3_checksum_algorithm validation_algorithm;
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_all_heade
struct aws_allocator *allocator,
struct aws_http_message *message);

/* Copy mesage (but not the body) and exclude specific headers.
/* Copy message (but not the body) and exclude specific headers.
* exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
AWS_S3_API
struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_headers(
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ int aws_s3_parse_content_range_response_header(
uint64_t *out_range_end,
uint64_t *out_object_size);

/* Given response headers, parses the content-length from a content-length respone header.*/
/* Given response headers, parses the content-length from a content-length response header.*/
AWS_S3_API
int aws_s3_parse_content_length_response_header(
struct aws_allocator *allocator,
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_RESUME_FAILED,
AWS_ERROR_S3_OBJECT_MODIFIED,
AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR,
AWS_ERROR_S3_PAUSE_FAILED_REQUEST_COMPLETED,
graebm marked this conversation as resolved.
Show resolved Hide resolved
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
119 changes: 108 additions & 11 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct aws_s3_client;
struct aws_s3_request;
struct aws_s3_meta_request;
struct aws_s3_meta_request_result;
struct aws_s3_meta_request_resume_token;
struct aws_uri;
struct aws_string;

Expand Down Expand Up @@ -361,6 +362,9 @@ struct aws_s3_meta_request_options {
/**
* Optional.
* Invoked to provide response headers received during execution of the meta request.
* Note: this callback will not be fired for cases when resuming an
* operation that was already completed (ex. pausing put object after it
* uploaded all data and then resuming it)
* See `aws_s3_meta_request_headers_callback_fn`.
*/
aws_s3_meta_request_headers_callback_fn *headers_callback;
Expand Down Expand Up @@ -398,12 +402,12 @@ struct aws_s3_meta_request_options {

/**
* Optional.
* For meta requests that support pause/resume (e.g. PutObject), the resume token returned by
* For meta requests that support pause/resume (e.g. PutObject), serialized resume token returned by
* aws_s3_meta_request_pause() can be provided here.
* Note: If PutObject request specifies a checksum algorithm, client will calculate checksums while skipping parts
* from the buffer and compare them them to previously uploaded part checksums.
*/
const struct aws_byte_cursor *resume_token;
struct aws_s3_meta_request_resume_token *resume_token;
graebm marked this conversation as resolved.
Show resolved Hide resolved
};

/* Result details of a meta request.
Expand Down Expand Up @@ -491,19 +495,112 @@ AWS_S3_API
void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request);

/**
* In order to pause an ongoing upload, call aws_s3_meta_request_pause(). It will return a resume token that can be
* persisted and used to resume the upload. To resume an upload that was paused, supply the resume token in the meta
* request options structure member aws_s3_meta_request_options.persistable_state.
* Note: pause is currently only supported on upload requests.
* In order to pause an ongoing upload, call aws_s3_meta_request_pause() that
* will return resume token. Token can be used to query the state of operation
* at the pausing time.
* To resume an upload that was paused, supply resume token in the meta
* request options structure member aws_s3_meta_request_options.resume_token.
* The upload can be resumed either from the same client or a different one.
* Resume token is opaque with format varying based on operation.
* Clients should not parse the token. For format details refer to pause method comments for a given operation.
* Resume token will be set to null in case of failures.
* Corner cases for resume upload are as follows:
* - upload is not MPU - fail with pause not supported error
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
* - pausing before MPU is created - NULL resume token returned. NULL resume
* token is equivalent to restarting upload
* - pausing in the middle of part transfer - return resume token. scheduling of
* new part uploads stops.
graebm marked this conversation as resolved.
Show resolved Hide resolved
* - pausing after completeMPU started - return resume token. if s3 cannot find
* find associated MPU id when resuming with that token and num of parts
* uploaded equals to total num parts, then operation is a no op. Otherwise
* operation fails.
* Note: for no op case the call will succeed and finish/shutdown request callbacks will
* fire, but on headers callback will not fire.
* Note: similar to cancel pause does not cancel requests already in flight and
* and parts might complete after pause is requested.
* @param meta_request pointer to the aws_s3_meta_request of the upload to be paused
* @param resume_token outputs the json string with the state that can be used to resume the operation.
* @return
* @param resume_token resume token
* @return error code.
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note somewhere, either in the @return or in the "Corner cases" that it will raise the AWS_ERROR_S3_PAUSE_FAILED_REQUEST_COMPLETED error if he operation has already completed

*/
AWS_S3_API
int aws_s3_meta_request_pause(struct aws_s3_meta_request *meta_request, struct aws_string **out_resume_token);
int aws_s3_meta_request_pause(
graebm marked this conversation as resolved.
Show resolved Hide resolved
struct aws_s3_meta_request *meta_request,
struct aws_s3_meta_request_resume_token **out_resume_token);

/*
* Options to construct upload resume token.
* Note: fields correspond to getters on the token below and it up to the caller
* to persist those in whichever way they choose.
*/
struct aws_s3_upload_resume_token_options {
struct aws_byte_cursor upload_id;
size_t part_size;
size_t total_num_parts;

/*
* Note: during resume num_parts_uploaded is used for sanity checking against
* uploads on s3 side.
* In cases where upload id does not exist (already resumed using this token
* or pause called after upload completes, etc...) and num_parts_uploaded
* equals to total num parts, resume will become a noop.
*/
size_t num_parts_completed;
};

/**
* Create upload resume token from persisted data.
* Note: Data required for resume token varies per operation.
*/
AWS_S3_API
struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new_upload(
struct aws_allocator *allocator,
const struct aws_s3_upload_resume_token_options *options);
graebm marked this conversation as resolved.
Show resolved Hide resolved

/*
* Increment resume token ref count.
*/
AWS_S3_API
struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_acquire(
struct aws_s3_meta_request_resume_token *resume_token);

/*
* Decrement resume token ref count.
*/
AWS_S3_API
struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_release(
struct aws_s3_meta_request_resume_token *resume_token);

/*
* Type of resume token.
*/
AWS_S3_API
enum aws_s3_meta_request_type aws_s3_meta_request_resume_token_type(
struct aws_s3_meta_request_resume_token *resume_token);

/*
* Part size associated with operation.
*/
AWS_S3_API
size_t aws_s3_meta_request_resume_token_part_size(struct aws_s3_meta_request_resume_token *resume_token);

/*
* Total num parts associated with operation.
*/
AWS_S3_API
size_t aws_s3_meta_request_resume_token_total_num_parts(struct aws_s3_meta_request_resume_token *resume_token);

/*
* Num parts completed.
*/
AWS_S3_API
size_t aws_s3_meta_request_resume_token_num_parts_completed(struct aws_s3_meta_request_resume_token *resume_token);

/*
* Upload id associated with operation.
* Only valid for tokens returned from upload operation. For all other operations
* this will return empty.
*/
AWS_S3_API
struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id(
struct aws_s3_meta_request_resume_token *resume_token);

AWS_S3_API
void aws_s3_meta_request_acquire(struct aws_s3_meta_request *meta_request);
Expand Down
3 changes: 2 additions & 1 deletion source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RESUMED_PART_CHECKSUM_MISMATCH, "Checksum does not match previously uploaded part"),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RESUME_FAILED, "Resuming request failed"),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_OBJECT_MODIFIED, "The object modifed during download."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR, "Async error received from S3 and not recoverable from retry.")
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR, "Async error received from S3 and not recoverable from retry."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_PAUSE_FAILED_REQUEST_COMPLETED, "Request cannot be paused since its already completed.")
};
/* clang-format on */

Expand Down
Loading