diff --git a/include/aws/s3/private/s3_auto_ranged_put.h b/include/aws/s3/private/s3_auto_ranged_put.h index c47772884..cb2a7adb5 100644 --- a/include/aws/s3/private/s3_auto_ranged_put.h +++ b/include/aws/s3/private/s3_auto_ranged_put.h @@ -25,6 +25,9 @@ struct aws_s3_auto_ranged_put { /* Initialized either during creation in resume flow or as result of create multipart upload during normal flow. */ struct aws_string *upload_id; + /* Resume token used to resume the operation */ + struct aws_s3_meta_request_resume_token *resume_token; + uint64_t content_length; /* Only meant for use in the update function, which is never called concurrently. */ @@ -32,7 +35,7 @@ struct aws_s3_auto_ranged_put { /* * Next part number to send. * Note: this follows s3 part number convention and counting starts with 1. - * Throughout codebase 0 based part numbers are usually reffered to as part index. + * Throughout codebase 0 based part numbers are usually referred to as part index. */ uint32_t next_part_number; } threaded_update_data; @@ -42,7 +45,7 @@ struct aws_s3_auto_ranged_put { */ struct { /* How many parts have been read from input steam. - * Since reads are always sequential, this is esentially the number of how many parts were read from start of + * Since reads are always sequential, this is essentially the number of how many parts were read from start of * stream. */ uint32_t num_parts_read_from_stream; diff --git a/include/aws/s3/private/s3_checksums.h b/include/aws/s3/private/s3_checksums.h index f4bb8d08d..62155614e 100644 --- a/include/aws/s3/private/s3_checksums.h +++ b/include/aws/s3/private/s3_checksums.h @@ -113,7 +113,7 @@ struct aws_s3_checksum *aws_checksum_new(struct aws_allocator *allocator, enum a /** * Compute an aws_checksum corresponding to the provided enum, passing a function pointer around instead of using a - * conditional would be faster, but would be a negligble improvement compared to the cost of processing data twice + * conditional would be faster, but would be a negligible improvement compared to the cost of processing data twice * which would be the only time this function would be used, and would be harder to follow. */ AWS_S3_API diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index b01e6802d..716023009 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -342,6 +342,27 @@ struct aws_s3_client { } threaded_data; }; +struct aws_s3_meta_request_resume_token { + struct aws_allocator *allocator; + struct aws_ref_count ref_count; + + enum aws_s3_meta_request_type type; + + /* Note: since pause currently only supports upload, this structure only has + upload specific fields. Extending it to support other types is left as + exercise for future. */ + struct aws_string *multipart_upload_id; + size_t part_size; + size_t total_num_parts; + + /* Note: this field is used only when s3 tells us that upload id no longer + exists, and if this indicates that all parts have already been uploaded, + request is completed instead of failing it.*/ + size_t num_parts_completed; +}; + +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator); + void aws_s3_client_notify_connection_finished( struct aws_s3_client *client, struct aws_s3_connection *connection, diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 16f7d10f4..1f6164192 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -88,7 +88,7 @@ struct aws_s3_meta_request_vtable { void (*destroy)(struct aws_s3_meta_request *); /* Pause the given request */ - int (*pause)(struct aws_s3_meta_request *meta_request, struct aws_string **resume_token); + int (*pause)(struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_resume_token **resume_token); }; /** @@ -194,7 +194,7 @@ struct aws_s3_meta_request { /* deep copy of the checksum config. */ struct checksum_config checksum_config; - /* checksum found in either a default get request, or in the initial head request of a mutlipart get */ + /* checksum found in either a default get request, or in the initial head request of a multipart get */ struct aws_byte_buf meta_request_level_response_header_checksum; /* running checksum of all of the parts of a default get, or ranged get meta request*/ @@ -323,7 +323,7 @@ bool aws_s3_meta_request_has_finish_result_synced(struct aws_s3_meta_request *me AWS_S3_API void aws_s3_meta_request_finish(struct aws_s3_meta_request *meta_request); -/* Default implementation of the meta request finish functino. */ +/* Default implementation of the meta request finish function. */ AWS_S3_API void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request); diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index 178e8c0b8..aed5b1b39 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -63,7 +63,7 @@ struct aws_s3_request { /* checksum found in the header of an individual get part http request */ struct aws_byte_buf request_level_response_header_checksum; - /* running checksum of the respone to an individual get part http request */ + /* running checksum of the response to an individual get part http request */ struct aws_s3_checksum *request_level_running_response_sum; /* The algorithm used to validate the checksum */ enum aws_s3_checksum_algorithm validation_algorithm; diff --git a/include/aws/s3/private/s3_request_messages.h b/include/aws/s3/private/s3_request_messages.h index 114dc9d1f..fff8e363e 100644 --- a/include/aws/s3/private/s3_request_messages.h +++ b/include/aws/s3/private/s3_request_messages.h @@ -27,7 +27,7 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_all_heade struct aws_allocator *allocator, struct aws_http_message *message); -/* Copy mesage (but not the body) and exclude specific headers. +/* Copy message (but not the body) and exclude specific headers. * exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/ AWS_S3_API struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_headers( diff --git a/include/aws/s3/private/s3_util.h b/include/aws/s3/private/s3_util.h index 9804830e8..5fe22ff74 100644 --- a/include/aws/s3/private/s3_util.h +++ b/include/aws/s3/private/s3_util.h @@ -221,7 +221,7 @@ int aws_s3_parse_content_range_response_header( uint64_t *out_range_end, uint64_t *out_object_size); -/* Given response headers, parses the content-length from a content-length respone header.*/ +/* Given response headers, parses the content-length from a content-length response header.*/ AWS_S3_API int aws_s3_parse_content_length_response_header( struct aws_allocator *allocator, diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index f061f17da..b17e2a4da 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -22,6 +22,7 @@ struct aws_s3_client; struct aws_s3_request; struct aws_s3_meta_request; struct aws_s3_meta_request_result; +struct aws_s3_meta_request_resume_token; struct aws_uri; struct aws_string; @@ -361,6 +362,9 @@ struct aws_s3_meta_request_options { /** * Optional. * Invoked to provide response headers received during execution of the meta request. + * Note: this callback will not be fired for cases when resuming an + * operation that was already completed (ex. pausing put object after it + * uploaded all data and then resuming it) * See `aws_s3_meta_request_headers_callback_fn`. */ aws_s3_meta_request_headers_callback_fn *headers_callback; @@ -398,12 +402,12 @@ struct aws_s3_meta_request_options { /** * Optional. - * For meta requests that support pause/resume (e.g. PutObject), the resume token returned by + * For meta requests that support pause/resume (e.g. PutObject), serialized resume token returned by * aws_s3_meta_request_pause() can be provided here. * Note: If PutObject request specifies a checksum algorithm, client will calculate checksums while skipping parts * from the buffer and compare them them to previously uploaded part checksums. */ - const struct aws_byte_cursor *resume_token; + struct aws_s3_meta_request_resume_token *resume_token; }; /* Result details of a meta request. @@ -503,19 +507,112 @@ AWS_S3_API void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request); /** - * In order to pause an ongoing upload, call aws_s3_meta_request_pause(). It will return a resume token that can be - * persisted and used to resume the upload. To resume an upload that was paused, supply the resume token in the meta - * request options structure member aws_s3_meta_request_options.persistable_state. + * Note: pause is currently only supported on upload requests. + * In order to pause an ongoing upload, call aws_s3_meta_request_pause() that + * will return resume token. Token can be used to query the state of operation + * at the pausing time. + * To resume an upload that was paused, supply resume token in the meta + * request options structure member aws_s3_meta_request_options.resume_token. * The upload can be resumed either from the same client or a different one. - * Resume token is opaque with format varying based on operation. - * Clients should not parse the token. For format details refer to pause method comments for a given operation. - * Resume token will be set to null in case of failures. + * Corner cases for resume upload are as follows: + * - upload is not MPU - fail with AWS_ERROR_UNSUPPORTED_OPERATION + * - pausing before MPU is created - NULL resume token returned. NULL resume + * token is equivalent to restarting upload + * - pausing in the middle of part transfer - return resume token. scheduling of + * new part uploads stops. + * - pausing after completeMPU started - return resume token. if s3 cannot find + * find associated MPU id when resuming with that token and num of parts + * uploaded equals to total num parts, then operation is a no op. Otherwise + * operation fails. + * Note: for no op case the call will succeed and finish/shutdown request callbacks will + * fire, but on headers callback will not fire. + * Note: similar to cancel pause does not cancel requests already in flight and + * and parts might complete after pause is requested. * @param meta_request pointer to the aws_s3_meta_request of the upload to be paused - * @param resume_token outputs the json string with the state that can be used to resume the operation. - * @return + * @param resume_token resume token + * @return either AWS_OP_ERR or AWS_OP_SUCCESS */ AWS_S3_API -int aws_s3_meta_request_pause(struct aws_s3_meta_request *meta_request, struct aws_string **out_resume_token); +int aws_s3_meta_request_pause( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token **out_resume_token); + +/* + * Options to construct upload resume token. + * Note: fields correspond to getters on the token below and it up to the caller + * to persist those in whichever way they choose. + */ +struct aws_s3_upload_resume_token_options { + struct aws_byte_cursor upload_id; + size_t part_size; + size_t total_num_parts; + + /* + * Note: during resume num_parts_uploaded is used for sanity checking against + * uploads on s3 side. + * In cases where upload id does not exist (already resumed using this token + * or pause called after upload completes, etc...) and num_parts_uploaded + * equals to total num parts, resume will become a noop. + */ + size_t num_parts_completed; +}; + +/** + * Create upload resume token from persisted data. + * Note: Data required for resume token varies per operation. + */ +AWS_S3_API +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new_upload( + struct aws_allocator *allocator, + const struct aws_s3_upload_resume_token_options *options); + +/* + * Increment resume token ref count. + */ +AWS_S3_API +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_acquire( + struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Decrement resume token ref count. + */ +AWS_S3_API +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_release( + struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Type of resume token. + */ +AWS_S3_API +enum aws_s3_meta_request_type aws_s3_meta_request_resume_token_type( + struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Part size associated with operation. + */ +AWS_S3_API +size_t aws_s3_meta_request_resume_token_part_size(struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Total num parts associated with operation. + */ +AWS_S3_API +size_t aws_s3_meta_request_resume_token_total_num_parts(struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Num parts completed. + */ +AWS_S3_API +size_t aws_s3_meta_request_resume_token_num_parts_completed(struct aws_s3_meta_request_resume_token *resume_token); + +/* + * Upload id associated with operation. + * Only valid for tokens returned from upload operation. For all other operations + * this will return empty. + */ +AWS_S3_API +struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id( + struct aws_s3_meta_request_resume_token *resume_token); /** * Add a reference, keeping this object alive. diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 1d3e39a21..d22884454 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -8,7 +8,6 @@ #include "aws/s3/private/s3_list_parts.h" #include "aws/s3/private/s3_request_messages.h" #include "aws/s3/private/s3_util.h" -#include #include #include @@ -43,7 +42,9 @@ static void s_s3_auto_ranged_put_send_request_finish( struct aws_http_stream *stream, int error_code); -static int s_s3_auto_ranged_put_pause(struct aws_s3_meta_request *meta_request, struct aws_string **resume_token); +static int s_s3_auto_ranged_put_pause( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token **resume_token); static bool s_process_part_info(const struct aws_s3_part_info *info, void *user_data) { struct aws_s3_auto_ranged_put *auto_ranged_put = user_data; @@ -81,107 +82,87 @@ static bool s_process_part_info(const struct aws_s3_part_info *info, void *user_ return true; } -static int s_parse_resume_info_from_token( - struct aws_allocator *allocator, - const struct aws_byte_cursor *resume_token, - struct aws_string **upload_id_out, - size_t *part_size_out, - uint32_t *total_num_parts_out) { +/* + * Validates token and updates part variables. Noop if token is null. + */ +static int s_try_update_part_info_from_resume_token( + uint64_t content_length, + const struct aws_s3_meta_request_resume_token *resume_token, + size_t *out_part_size, + uint32_t *out_total_num_parts) { if (!resume_token) { return AWS_OP_SUCCESS; } - struct aws_json_value *root = aws_json_value_new_from_string(allocator, *resume_token); - - struct aws_json_value *type_node = aws_json_value_get_from_object(root, aws_byte_cursor_from_c_str("type")); - - struct aws_json_value *multipart_upload_id_node = - aws_json_value_get_from_object(root, aws_byte_cursor_from_c_str("multipart_upload_id")); - - struct aws_json_value *partition_size_node = - aws_json_value_get_from_object(root, aws_byte_cursor_from_c_str("partition_size")); - - struct aws_json_value *total_num_parts_node = - aws_json_value_get_from_object(root, aws_byte_cursor_from_c_str("total_num_parts")); - - double partition_size_value = 0.0; - double total_num_parts_value = 0.0; - struct aws_byte_cursor multipart_upload_id_value; - struct aws_byte_cursor type_value; - - if (!multipart_upload_id_node || !partition_size_node || !total_num_parts_node || !type_node || - aws_json_value_get_number(partition_size_node, &partition_size_value) || - aws_json_value_get_number(total_num_parts_node, &total_num_parts_value) || - aws_json_value_get_string(multipart_upload_id_node, &multipart_upload_id_value) || - aws_json_value_get_string(type_node, &type_value)) { - AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Invalid token."); + if (resume_token->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { + AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Invalid token type."); goto invalid_argument_cleanup; } - if (!aws_byte_cursor_eq_c_str(&type_value, "AWS_S3_META_REQUEST_TYPE_PUT_OBJECT")) { - AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Invalid token type."); + if (resume_token->multipart_upload_id == NULL) { + AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Multipart upload id missing."); goto invalid_argument_cleanup; } - if ((size_t)partition_size_value < g_s3_min_upload_part_size) { + if (resume_token->part_size < g_s3_min_upload_part_size) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, "Could not create resume auto-ranged-put meta request; part size of %" PRIu64 " specified in the token is below minimum threshold for multi-part.", - (uint64_t)partition_size_value); + (uint64_t)resume_token->part_size); goto invalid_argument_cleanup; } - if ((uint32_t)total_num_parts_value > g_s3_max_num_upload_parts) { + if ((uint32_t)resume_token->total_num_parts > g_s3_max_num_upload_parts) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, "Could not create resume auto-ranged-put meta request; total number of parts %" PRIu32 " specified in the token is too large for platform.", - (uint32_t)total_num_parts_value); + (uint32_t)resume_token->total_num_parts); goto invalid_argument_cleanup; } - *upload_id_out = aws_string_new_from_cursor(allocator, &multipart_upload_id_value); - *part_size_out = (size_t)partition_size_value; - *total_num_parts_out = (uint32_t)total_num_parts_value; + uint32_t num_parts = (uint32_t)(content_length / resume_token->part_size); + + if ((content_length % resume_token->part_size) > 0) { + ++num_parts; + } + + if (resume_token->total_num_parts != num_parts) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "Could not create auto-ranged-put meta request; persisted number of parts %zu" + " does not match expected number of parts based on length of the body.", + resume_token->total_num_parts); + + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } - aws_json_value_destroy(root); + *out_part_size = resume_token->part_size; + *out_total_num_parts = (uint32_t)resume_token->total_num_parts; return AWS_OP_SUCCESS; invalid_argument_cleanup: - aws_json_value_destroy(root); return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); } /** - * Loads the persistable state used to resume an upload that was previously paused. + * Initializes state necessary to resume upload. Noop if token is null. */ -static int s_load_persistable_state( +static int s_try_init_resume_state_from_persisted_data( struct aws_allocator *allocator, struct aws_s3_auto_ranged_put *auto_ranged_put, - uint64_t content_length, - struct aws_string *upload_id, - size_t part_size, - uint32_t total_num_parts) { - - uint32_t num_parts = (uint32_t)(content_length / part_size); - - if ((content_length % part_size) > 0) { - ++num_parts; - } - - if (total_num_parts != num_parts) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "Could not create auto-ranged-put meta request; persisted number of parts %" PRIu32 - " does not match expected number of parts based on length of the body.", - total_num_parts); + const struct aws_s3_meta_request_resume_token *resume_token) { - return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + if (resume_token == NULL) { + auto_ranged_put->synced_data.list_parts_operation = NULL; + auto_ranged_put->synced_data.list_parts_sent = true; + auto_ranged_put->synced_data.list_parts_completed = true; + return AWS_OP_SUCCESS; } struct aws_byte_cursor request_path; @@ -194,7 +175,7 @@ static int s_load_persistable_state( auto_ranged_put->synced_data.num_parts_completed = 0; auto_ranged_put->synced_data.create_multipart_upload_sent = true; auto_ranged_put->synced_data.create_multipart_upload_completed = true; - auto_ranged_put->upload_id = upload_id; + auto_ranged_put->upload_id = aws_string_clone_or_reuse(allocator, resume_token->multipart_upload_id); struct aws_s3_list_parts_params list_parts_params = { .key = request_path, @@ -258,9 +239,7 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( struct aws_s3_auto_ranged_put *auto_ranged_put = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_auto_ranged_put)); - struct aws_string *multipart_upload_id = NULL; - if (s_parse_resume_info_from_token( - allocator, options->resume_token, &multipart_upload_id, &part_size, &num_parts)) { + if (s_try_update_part_info_from_resume_token(content_length, options->resume_token, &part_size, &num_parts)) { goto error_clean_up; } @@ -280,19 +259,15 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( auto_ranged_put->content_length = content_length; auto_ranged_put->synced_data.total_num_parts = num_parts; auto_ranged_put->upload_id = NULL; + auto_ranged_put->resume_token = options->resume_token; + + aws_s3_meta_request_resume_token_acquire(auto_ranged_put->resume_token); auto_ranged_put->threaded_update_data.next_part_number = 1; auto_ranged_put->prepare_data.num_parts_read_from_stream = 0; - if (options->resume_token) { - if (s_load_persistable_state( - allocator, auto_ranged_put, content_length, multipart_upload_id, part_size, num_parts)) { - goto error_clean_up; - } - } else { - auto_ranged_put->synced_data.list_parts_operation = NULL; - auto_ranged_put->synced_data.list_parts_sent = true; - auto_ranged_put->synced_data.list_parts_completed = true; + if (s_try_init_resume_state_from_persisted_data(allocator, auto_ranged_put, options->resume_token)) { + goto error_clean_up; } struct aws_string **etag_c_array = aws_mem_calloc(allocator, sizeof(struct aws_string *), num_parts); @@ -306,7 +281,6 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( return &auto_ranged_put->base; error_clean_up: - aws_string_destroy(multipart_upload_id); aws_mem_release(allocator, auto_ranged_put); return NULL; @@ -322,6 +296,8 @@ static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request aws_string_destroy(auto_ranged_put->upload_id); auto_ranged_put->upload_id = NULL; + auto_ranged_put->resume_token = aws_s3_meta_request_resume_token_release(auto_ranged_put->resume_token); + aws_s3_paginated_operation_release(auto_ranged_put->synced_data.list_parts_operation); for (size_t etag_index = 0; etag_index < aws_array_list_length(&auto_ranged_put->synced_data.etag_list); @@ -783,7 +759,7 @@ static int s_s3_auto_ranged_put_prepare_request( if (request->num_times_prepared == 0) { - /* Corner case of last part being previosly uploaded during resume. + /* Corner case of last part being previously uploaded during resume. * Read it from input stream and potentially verify checksum */ if (s_skip_parts_from_stream( meta_request, @@ -954,10 +930,22 @@ static void s_s3_auto_ranged_put_request_finished( } auto_ranged_put->synced_data.list_parts_completed = !has_more_results; - auto_ranged_put->synced_data.create_multipart_upload_error_code = error_code; + auto_ranged_put->synced_data.list_parts_error_code = error_code; if (error_code != AWS_ERROR_SUCCESS) { - aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + if (request->send_data.response_status == AWS_HTTP_STATUS_CODE_404_NOT_FOUND && + auto_ranged_put->resume_token->num_parts_completed == + auto_ranged_put->resume_token->total_num_parts) { + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Resuming PutObject ended early, since there is nothing to resume" + "(request finished prior to being paused?)", + (void *)meta_request); + + aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS); + } else { + aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); + } } aws_s3_meta_request_unlock_synced_data(meta_request); @@ -1183,62 +1171,41 @@ static void s_s3_auto_ranged_put_request_finished( } } -static int s_s3_auto_ranged_put_pause(struct aws_s3_meta_request *meta_request, struct aws_string **out_resume_token) { +static int s_s3_auto_ranged_put_pause( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token **out_resume_token) { + + *out_resume_token = NULL; /* lock */ aws_s3_meta_request_lock_synced_data(meta_request); struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; - /* only generate token if multipart upload was completed, since there is nothing to resume otherwise*/ - int token_generated_error = AWS_OP_SUCCESS; - *out_resume_token = NULL; + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Pausing request with %u out of %u parts have completed.", + (void *)meta_request, + auto_ranged_put->synced_data.num_parts_completed, + auto_ranged_put->synced_data.total_num_parts); + + /* upload can be in one of several states: + * - not started, i.e. we didn't even call crete mpu yet - return success, + * token is NULL and cancel the upload + * - in the middle of upload - return success, create token and cancel + * upload + * - complete MPU started - return success, generate token and try to cancel + * complete MPU + */ if (auto_ranged_put->synced_data.create_multipart_upload_completed) { - struct aws_json_value *root = aws_json_value_new_object(meta_request->allocator); - - /** - * generate pause token for put. - * current format is json with the following structure - * { - * type: String - * multipart_upload_id: String - * partition_size: number - * total_num_parts: number - * } - */ - - aws_json_value_add_to_object( - root, - aws_byte_cursor_from_c_str("type"), - aws_json_value_new_string( - meta_request->allocator, aws_byte_cursor_from_c_str("AWS_S3_META_REQUEST_TYPE_PUT_OBJECT"))); - - aws_json_value_add_to_object( - root, - aws_byte_cursor_from_c_str("multipart_upload_id"), - aws_json_value_new_string( - meta_request->allocator, aws_byte_cursor_from_string(auto_ranged_put->upload_id))); - - aws_json_value_add_to_object( - root, - aws_byte_cursor_from_c_str("partition_size"), - aws_json_value_new_number(meta_request->allocator, (double)meta_request->part_size)); - - aws_json_value_add_to_object( - root, - aws_byte_cursor_from_c_str("total_num_parts"), - aws_json_value_new_number(meta_request->allocator, (double)auto_ranged_put->synced_data.total_num_parts)); - - struct aws_byte_buf result_string_buf; - aws_byte_buf_init(&result_string_buf, meta_request->allocator, 0); - - token_generated_error = aws_byte_buf_append_json_string(root, &result_string_buf); - - if (!token_generated_error) { - *out_resume_token = aws_string_new_from_buf(meta_request->allocator, &result_string_buf); - } - aws_byte_buf_clean_up(&result_string_buf); - aws_json_value_destroy(root); + *out_resume_token = aws_s3_meta_request_resume_token_new(meta_request->allocator); + + (*out_resume_token)->type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT; + (*out_resume_token)->multipart_upload_id = + aws_string_clone_or_reuse(meta_request->allocator, auto_ranged_put->upload_id); + (*out_resume_token)->part_size = meta_request->part_size; + (*out_resume_token)->total_num_parts = auto_ranged_put->synced_data.total_num_parts; + (*out_resume_token)->num_parts_completed = auto_ranged_put->synced_data.num_parts_completed; } /** @@ -1250,5 +1217,5 @@ static int s_s3_auto_ranged_put_pause(struct aws_s3_meta_request *meta_request, /* unlock */ aws_s3_meta_request_unlock_synced_data(meta_request); - return token_generated_error; + return AWS_OP_SUCCESS; } diff --git a/source/s3_client.c b/source/s3_client.c index 43ad6f974..bd25addc7 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -506,7 +507,7 @@ static void s_s3_client_start_destroy(void *user_data) { client->synced_data.active = false; - /* Prevent the client from cleaning up inbetween the mutex unlock/re-lock below.*/ + /* Prevent the client from cleaning up in between the mutex unlock/re-lock below.*/ client->synced_data.start_destroy_executing = true; aws_s3_client_unlock_synced_data(client); @@ -1873,3 +1874,83 @@ static void s_s3_client_prepare_request_callback_retry_request( aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED); } } + +static void s_resume_token_ref_count_zero_callback(void *arg) { + struct aws_s3_meta_request_resume_token *token = arg; + + aws_string_destroy(token->multipart_upload_id); + + aws_mem_release(token->allocator, token); +} + +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator) { + struct aws_s3_meta_request_resume_token *token = + aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_meta_request_resume_token)); + + token->allocator = allocator; + aws_ref_count_init(&token->ref_count, token, s_resume_token_ref_count_zero_callback); + + return token; +} + +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new_upload( + struct aws_allocator *allocator, + const struct aws_s3_upload_resume_token_options *options) { + AWS_PRECONDITION(allocator); + AWS_PRECONDITION(options); + + struct aws_s3_meta_request_resume_token *token = aws_s3_meta_request_resume_token_new(allocator); + token->multipart_upload_id = aws_string_new_from_cursor(allocator, &options->upload_id); + token->part_size = options->part_size; + token->total_num_parts = options->total_num_parts; + token->num_parts_completed = options->num_parts_completed; + token->type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT; + return token; +} + +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_acquire( + struct aws_s3_meta_request_resume_token *resume_token) { + if (resume_token) { + aws_ref_count_acquire(&resume_token->ref_count); + } + return resume_token; +} + +struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_release( + struct aws_s3_meta_request_resume_token *resume_token) { + if (resume_token) { + aws_ref_count_release(&resume_token->ref_count); + } + return NULL; +} + +enum aws_s3_meta_request_type aws_s3_meta_request_resume_token_type( + struct aws_s3_meta_request_resume_token *resume_token) { + AWS_FATAL_PRECONDITION(resume_token); + return resume_token->type; +} + +size_t aws_s3_meta_request_resume_token_part_size(struct aws_s3_meta_request_resume_token *resume_token) { + AWS_FATAL_PRECONDITION(resume_token); + return resume_token->part_size; +} + +size_t aws_s3_meta_request_resume_token_total_num_parts(struct aws_s3_meta_request_resume_token *resume_token) { + AWS_FATAL_PRECONDITION(resume_token); + return resume_token->total_num_parts; +} + +size_t aws_s3_meta_request_resume_token_num_parts_completed(struct aws_s3_meta_request_resume_token *resume_token) { + AWS_FATAL_PRECONDITION(resume_token); + return resume_token->num_parts_completed; +} + +struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id( + struct aws_s3_meta_request_resume_token *resume_token) { + AWS_FATAL_PRECONDITION(resume_token); + if (resume_token->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT && resume_token->multipart_upload_id != NULL) { + return aws_byte_cursor_from_string(resume_token->multipart_upload_id); + } + + return aws_byte_cursor_from_c_str(""); +} diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 42c1f9111..b5e41b2fa 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -291,7 +291,9 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) { /* END CRITICAL SECTION */ } -int aws_s3_meta_request_pause(struct aws_s3_meta_request *meta_request, struct aws_string **out_resume_token) { +int aws_s3_meta_request_pause( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token **out_resume_token) { AWS_PRECONDITION(meta_request); AWS_PRECONDITION(meta_request->vtable); diff --git a/source/s3_paginator.c b/source/s3_paginator.c index 2c8506f07..18a0078b0 100644 --- a/source/s3_paginator.c +++ b/source/s3_paginator.c @@ -409,8 +409,7 @@ int aws_s3_paginator_continue(struct aws_s3_paginator *paginator, const struct a struct aws_string *continuation_string = s_paginator_get_continuation_token(paginator); struct aws_byte_cursor *continuation = NULL; if (continuation_string) { - struct aws_byte_cursor continuation_val = aws_byte_cursor_from_string(continuation_string); - continuation = &continuation_val; + *continuation = aws_byte_cursor_from_string(continuation_string); } paginator->operation->next_http_message(continuation, paginator->operation->user_data, &paginated_request_message); aws_string_destroy(continuation_string); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a0f8fb966..bfbdae84c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -3,12 +3,12 @@ enable_testing() option(BYO_CRYPTO "Don't build a tls implementation or link against a crypto interface. This feature is only for unix builds currently." OFF) -if (BYO_CRYPTO) +if(BYO_CRYPTO) set(ENABLE_NET_TESTS OFF) add_test_case(test_s3_client_byo_crypto_no_options) add_test_case(test_s3_client_byo_crypto_with_options) -endif () +endif() file(GLOB TEST_SRC "*.c") file(GLOB TEST_HDRS "*.h") diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 82ec8485b..adabeb5d2 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -86,8 +86,10 @@ static int s_test_s3_client_proxy_ev_settings_override(struct aws_allocator *all struct aws_tls_connection_options tls_conn_options; AWS_ZERO_STRUCT(tls_conn_options); - struct proxy_env_var_settings proxy_ev_settings = {.env_var_type = AWS_HPEV_ENABLE, - .tls_options = &tls_conn_options}; + struct proxy_env_var_settings proxy_ev_settings = { + .env_var_type = AWS_HPEV_ENABLE, + .tls_options = &tls_conn_options, + }; struct aws_s3_client_config client_config = {.proxy_ev_settings = &proxy_ev_settings}; @@ -4881,10 +4883,10 @@ static void s_pause_meta_request_progress( return; } - struct aws_string *persistable_state = NULL; - int pause_result = aws_s3_meta_request_pause(meta_request, &persistable_state); + struct aws_s3_meta_request_resume_token *resume_token = NULL; + int pause_result = aws_s3_meta_request_pause(meta_request, &resume_token); aws_atomic_store_int(&test_data->pause_result, pause_result); - aws_atomic_store_ptr(&test_data->persistable_state_ptr, persistable_state); + aws_atomic_store_ptr(&test_data->persistable_state_ptr, resume_token); } } @@ -4933,7 +4935,7 @@ static int s_test_s3_put_pause_resume_helper( struct put_object_pause_resume_test_data *test_data, struct aws_byte_cursor destination_key, struct aws_input_stream *upload_body_stream, - struct aws_string *resume_state, + struct aws_s3_meta_request_resume_token *resume_state, enum aws_s3_checksum_algorithm checksum_algorithm, int expected_error_code, int expected_response_status) { @@ -4986,10 +4988,8 @@ static int s_test_s3_put_pause_resume_helper( .checksum_config = &checksum_config, }; - struct aws_byte_cursor resume_state_cur; if (resume_state) { - resume_state_cur = aws_byte_cursor_from_string(resume_state); - meta_request_options.resume_token = &resume_state_cur; + meta_request_options.resume_token = resume_state; } struct aws_s3_meta_request *meta_request = aws_s3_client_make_meta_request(client, &meta_request_options); @@ -5082,7 +5082,7 @@ static int s_test_s3_put_pause_resume(struct aws_allocator *allocator, void *ctx /* new stream used to resume upload. it begins at the offset specified in the persistable state */ struct aws_input_stream *resume_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - struct aws_string *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); + struct aws_s3_meta_request_resume_token *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); size_t bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded); @@ -5108,7 +5108,7 @@ static int s_test_s3_put_pause_resume(struct aws_allocator *allocator, void *ctx /* bytes uploaded is smaller since we are skipping uploaded parts */ ASSERT_TRUE(bytes_uploaded < s_pause_resume_object_length_128MB); - aws_string_destroy(persistable_state); + aws_s3_meta_request_resume_token_release(persistable_state); aws_input_stream_destroy(resume_upload_stream); aws_s3_tester_clean_up(&tester); @@ -5163,7 +5163,7 @@ static int s_test_s3_put_pause_resume_all_parts_done(struct aws_allocator *alloc /* new stream used to resume upload. it begins at the offset specified in the persistable state */ struct aws_input_stream *resume_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - struct aws_string *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); + struct aws_s3_meta_request_resume_token *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); AWS_LOGF_INFO(AWS_LS_S3_GENERAL, "Persistable state %p", persistable_state); @@ -5191,7 +5191,7 @@ static int s_test_s3_put_pause_resume_all_parts_done(struct aws_allocator *alloc /* bytes uploaded is smaller since we are skipping uploaded parts */ ASSERT_INT_EQUALS(bytes_uploaded, 0); - aws_string_destroy(persistable_state); + aws_s3_meta_request_resume_token_release(persistable_state); aws_input_stream_destroy(resume_upload_stream); aws_s3_tester_clean_up(&tester); @@ -5247,7 +5247,7 @@ static int s_test_s3_put_pause_resume_invalid_checksum(struct aws_allocator *all struct aws_input_stream *resume_upload_stream = aws_s3_test_input_stream_new_with_value_type( allocator, s_pause_resume_object_length_128MB, TEST_STREAM_VALUE_2); - struct aws_string *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); + struct aws_s3_meta_request_resume_token *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr); size_t bytes_uploaded = aws_atomic_load_int(&test_data.total_bytes_uploaded); @@ -5273,7 +5273,7 @@ static int s_test_s3_put_pause_resume_invalid_checksum(struct aws_allocator *all /* bytes uploaded is smaller since we are skipping uploaded parts */ ASSERT_TRUE(bytes_uploaded < s_pause_resume_object_length_128MB); - aws_string_destroy(persistable_state); + aws_s3_meta_request_resume_token_release(persistable_state); aws_input_stream_destroy(resume_upload_stream); aws_s3_tester_clean_up(&tester);