diff --git a/CHANGELOG.md b/CHANGELOG.md index 03bd4a1088c4..a806de34ef56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Application-defined endpoints are now accessible with both `/app` prefix and un-prefixed, e.g. `GET /app/log/private` and `GET /log/private` (#4147). +### Changed + +- The method `EndpointRegistry::get_metrics_for_endpoint(const EndpointDefinitionPtr&)` has been replaced with `EndpointRegistry::get_metrics_for_endpoint(const std::string& method, const std::string& verb)`. + ## [3.0.0-dev2] ### Dependencies diff --git a/include/ccf/endpoint_registry.h b/include/ccf/endpoint_registry.h index be27b853dd16..792d12e0e261 100644 --- a/include/ccf/endpoint_registry.h +++ b/include/ccf/endpoint_registry.h @@ -130,8 +130,8 @@ namespace ccf::endpoints ccf::pal::Mutex metrics_lock; std::map> metrics; - EndpointRegistry::Metrics& get_metrics_for_endpoint( - const EndpointDefinitionPtr& e); + EndpointRegistry::Metrics& get_metrics_for_request( + const std::string& method, const std::string& verb); kv::Consensus* consensus = nullptr; kv::TxHistory* history = nullptr; @@ -228,9 +228,9 @@ namespace ccf::endpoints void set_history(kv::TxHistory* h); - virtual void increment_metrics_calls(const EndpointDefinitionPtr& e); - virtual void increment_metrics_errors(const EndpointDefinitionPtr& e); - virtual void increment_metrics_failures(const EndpointDefinitionPtr& e); - virtual void increment_metrics_retries(const EndpointDefinitionPtr& e); + virtual void increment_metrics_calls(const ccf::RpcContext& rpc_ctx); + virtual void increment_metrics_errors(const ccf::RpcContext& rpc_ctx); + virtual void increment_metrics_failures(const ccf::RpcContext& rpc_ctx); + virtual void increment_metrics_retries(const ccf::RpcContext& rpc_ctx); }; } \ No newline at end of file diff --git a/include/ccf/rpc_context.h b/include/ccf/rpc_context.h index f9425797ad32..b93725a56afb 100644 --- a/include/ccf/rpc_context.h +++ b/include/ccf/rpc_context.h @@ -106,6 +106,7 @@ namespace ccf virtual void set_response_body(std::vector&& body) = 0; /// Sets the main body or payload of the response. virtual void set_response_body(std::string&& body) = 0; + virtual const std::vector& get_response_body() const = 0; /// Sets initial status code summarising result of RPC. virtual void set_response_status(int status) = 0; diff --git a/src/enclave/rpc_handler.h b/src/enclave/rpc_handler.h index 2cade0a7b5b5..6c339a0b14d3 100644 --- a/src/enclave/rpc_handler.h +++ b/src/enclave/rpc_handler.h @@ -36,7 +36,6 @@ namespace ccf virtual bool is_open() = 0; // Used by rpcendpoint to process incoming client RPCs - virtual std::optional> process( - std::shared_ptr ctx) = 0; + virtual void process(std::shared_ptr ctx) = 0; }; } diff --git a/src/endpoints/endpoint_registry.cpp b/src/endpoints/endpoint_registry.cpp index 70df6c823025..93dccd912b74 100644 --- a/src/endpoints/endpoint_registry.cpp +++ b/src/endpoints/endpoint_registry.cpp @@ -117,12 +117,11 @@ namespace ccf::endpoints return spec; } - EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_endpoint( - const EndpointDefinitionPtr& e) + EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_request( + const std::string& method_, const std::string& verb) { - auto method = e->dispatch.uri_path; - method = method.substr(method.find_first_not_of('/')); - return metrics[method][e->dispatch.verb.c_str()]; + auto method = method_.substr(method_.find_first_not_of('/')); + return metrics[method][verb]; } Endpoint EndpointRegistry::make_endpoint( @@ -434,30 +433,38 @@ namespace ccf::endpoints history = h; } - void EndpointRegistry::increment_metrics_calls(const EndpointDefinitionPtr& e) + void EndpointRegistry::increment_metrics_calls(const ccf::RpcContext& rpc_ctx) { std::lock_guard guard(metrics_lock); - get_metrics_for_endpoint(e).calls++; + get_metrics_for_request( + rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str()) + .calls++; } void EndpointRegistry::increment_metrics_errors( - const EndpointDefinitionPtr& e) + const ccf::RpcContext& rpc_ctx) { std::lock_guard guard(metrics_lock); - get_metrics_for_endpoint(e).errors++; + get_metrics_for_request( + rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str()) + .errors++; } void EndpointRegistry::increment_metrics_failures( - const EndpointDefinitionPtr& e) + const ccf::RpcContext& rpc_ctx) { std::lock_guard guard(metrics_lock); - get_metrics_for_endpoint(e).failures++; + get_metrics_for_request( + rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str()) + .failures++; } void EndpointRegistry::increment_metrics_retries( - const EndpointDefinitionPtr& e) + const ccf::RpcContext& rpc_ctx) { std::lock_guard guard(metrics_lock); - get_metrics_for_endpoint(e).retries++; + get_metrics_for_request( + rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str()) + .retries++; } } \ No newline at end of file diff --git a/src/http/http2_endpoint.h b/src/http/http2_endpoint.h index 6dc1d0cf418e..14c1ef884b9d 100644 --- a/src/http/http2_endpoint.h +++ b/src/http/http2_endpoint.h @@ -177,9 +177,9 @@ namespace http search = rpc_map->find(ccf::ActorsType::users); } - auto response = search.value()->process(rpc_ctx); + search.value()->process(rpc_ctx); - if (!response.has_value()) + if (rpc_ctx->response_is_pending) { // If the RPC is pending, hold the connection. LOG_TRACE_FMT("Pending"); diff --git a/src/http/http_endpoint.h b/src/http/http_endpoint.h index be101e3bc576..e9f8aab031bd 100644 --- a/src/http/http_endpoint.h +++ b/src/http/http_endpoint.h @@ -227,9 +227,9 @@ namespace http search = rpc_map->find(ccf::ActorsType::users); } - auto response = search.value()->process(rpc_ctx); + search.value()->process(rpc_ctx); - if (!response.has_value()) + if (rpc_ctx->response_is_pending) { // If the RPC is pending, hold the connection. LOG_TRACE_FMT("Pending"); @@ -237,7 +237,8 @@ namespace http } else { - send_buffered(response.value()); + const auto response = rpc_ctx->serialise_response(); + send_buffered(response); flush(); } } diff --git a/src/http/http_rpc_context.h b/src/http/http_rpc_context.h index 4a12f4b85651..eed86e9da990 100644 --- a/src/http/http_rpc_context.h +++ b/src/http/http_rpc_context.h @@ -218,6 +218,11 @@ namespace http response_body = std::vector(body.begin(), body.end()); } + virtual const std::vector& get_response_body() const override + { + return response_body; + } + virtual void set_response_status(int status) override { response_status = (http_status)status; diff --git a/src/node/node_state.h b/src/node/node_state.h index 36509b06bc82..d0bfd69e8478 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -1923,32 +1923,26 @@ namespace ccf return request.build_request(); } - bool parse_create_response(const std::vector& response) + bool extract_create_result(const std::shared_ptr& ctx) { - http::SimpleResponseProcessor processor; - http::ResponseParser parser(processor); - - parser.execute(response.data(), response.size()); - - if (processor.received.size() != 1) + if (ctx == nullptr) { - LOG_FAIL_FMT( - "Expected single message, found {}", processor.received.size()); + LOG_FAIL_FMT("Expected non-null context"); return false; } - const auto& r = processor.received.front(); - - if (r.status != HTTP_STATUS_OK) + const auto status = ctx->get_response_status(); + if (status != HTTP_STATUS_OK) { LOG_FAIL_FMT( "Create response is error: {} {}", - r.status, - http_status_str(r.status)); + status, + http_status_str((http_status)status)); return false; } - const auto body = serdes::unpack(r.body, serdes::Pack::Text); + const auto body = + serdes::unpack(ctx->get_response_body(), serdes::Pack::Text); if (!body.is_boolean()) { LOG_FAIL_FMT("Expected boolean body in create response"); @@ -1983,13 +1977,9 @@ namespace ccf } auto frontend = frontend_opt.value(); - const auto response = frontend->process(ctx); - if (!response.has_value()) - { - return false; - } + frontend->process(ctx); - return parse_create_response(response.value()); + return extract_create_result(ctx); } void create_and_send_boot_request( diff --git a/src/node/rpc/forwarder.h b/src/node/rpc/forwarder.h index 7f71ba95fcb7..d997867c134b 100644 --- a/src/node/rpc/forwarder.h +++ b/src/node/rpc/forwarder.h @@ -18,7 +18,7 @@ namespace ccf public: virtual ~ForwardedRpcHandler() {} - virtual std::vector process_forwarded( + virtual void process_forwarded( std::shared_ptr fwd_ctx) = 0; }; @@ -243,11 +243,13 @@ namespace ccf return; } + fwd_handler->process_forwarded(ctx); + // Ignore return value - false only means it is pending send_forwarded_response( ctx->get_session_context()->client_session_id, from, - fwd_handler->process_forwarded(ctx)); + ctx->serialise_response()); LOG_DEBUG_FMT("Sending forwarded response to {}", from); } break; diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index 18260072996f..d286b5f3e8e3 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -52,9 +52,6 @@ namespace ccf std::shared_ptr node_configuration_subsystem = nullptr; - using PreExec = - std::function; - void update_consensus() { auto c = tables.get_consensus().get(); @@ -72,83 +69,27 @@ namespace ccf endpoints.set_history(history); } - void update_metrics( - const std::shared_ptr& ctx, - const endpoints::EndpointDefinitionPtr& endpoint) + void update_metrics(const std::shared_ptr& ctx) { int cat = ctx->get_response_status() / 100; switch (cat) { case 4: - endpoints.increment_metrics_errors(endpoint); + endpoints.increment_metrics_errors(*ctx); return; case 5: - endpoints.increment_metrics_failures(endpoint); + endpoints.increment_metrics_failures(*ctx); return; } } - std::optional> forward( - std::shared_ptr ctx, - kv::ReadOnlyTx& tx, - const endpoints::EndpointDefinitionPtr& endpoint) - { - if (!cmd_forwarder || !consensus) - { - ctx->set_error( - HTTP_STATUS_INTERNAL_SERVER_ERROR, - ccf::errors::InternalError, - "No consensus or forwarder to forward request."); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - - if (ctx->get_session_context()->is_forwarded) - { - // If the request was already forwarded, return an error to prevent - // daisy chains. - ctx->set_error( - HTTP_STATUS_SERVICE_UNAVAILABLE, - ccf::errors::RequestAlreadyForwarded, - "RPC was already forwarded."); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - - auto primary_id = consensus->primary(); - if (!primary_id.has_value()) - { - ctx->set_error( - HTTP_STATUS_SERVICE_UNAVAILABLE, - ccf::errors::InternalError, - "RPC could not be forwarded to unknown primary."); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - - // Ignore return value - false only means it is pending - cmd_forwarder->forward_command( - ctx, primary_id.value(), ctx->get_session_context()->caller_cert); - - LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value()); - - // Indicate that the RPC has been forwarded to primary - return std::nullopt; - } - - std::optional> process_command( - std::shared_ptr ctx, - kv::CommittableTx& tx, - const PreExec& pre_exec = {}, - kv::Version prescribed_commit_version = kv::NoVersion, - ccf::View replicated_view = ccf::VIEW_UNKNOWN) + endpoints::EndpointDefinitionPtr find_endpoint( + std::shared_ptr ctx, kv::CommittableTx& tx) { - auto sctx = ctx->get_session_context(); - auto interface_id = sctx->interface_id; - const auto endpoint = endpoints.find_endpoint(tx, *ctx); if (endpoint == nullptr) { + // Every path from here should populate an appropriate response error const auto allowed_verbs = endpoints.get_allowed_verbs(tx, *ctx); if (allowed_verbs.empty()) { @@ -156,7 +97,6 @@ namespace ccf HTTP_STATUS_NOT_FOUND, ccf::errors::ResourceNotFound, fmt::format("Unknown path: {}.", ctx->get_method())); - return ctx->serialise_response(); } else { @@ -187,10 +127,17 @@ namespace ccf ctx->get_method(), allow_header_value)); } - return ctx->serialise_response(); } } + return endpoint; + } + + bool check_uri_allowed( + std::shared_ptr ctx, + const endpoints::EndpointDefinitionPtr& endpoint) + { + auto interface_id = ctx->get_session_context()->interface_id; if (consensus && interface_id) { if (!node_configuration_subsystem) @@ -200,7 +147,7 @@ namespace ccf if (!node_configuration_subsystem) { ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR); - return ctx->serialise_response(); + return false; } } @@ -222,7 +169,7 @@ namespace ccf if (!ok) { ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE); - return ctx->serialise_response(); + return false; } } else @@ -236,7 +183,7 @@ namespace ccf "no accepted_endpoints have been configured.", endpoint->full_uri_path); ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE); - return ctx->serialise_response(); + return false; } } } @@ -246,258 +193,328 @@ namespace ccf // forwarder (forward() happens further down). } - // Note: calls that could not be dispatched (cases handled above) - // are not counted against any particular endpoint. - endpoints.increment_metrics_calls(endpoint); + return true; + } + + std::unique_ptr get_authenticated_identity( + std::shared_ptr ctx, + kv::CommittableTx& tx, + const endpoints::EndpointDefinitionPtr& endpoint) + { + std::unique_ptr identity = nullptr; - try + std::string auth_error_reason; + std::vector error_details; + for (const auto& policy : endpoint->authn_policies) { - update_history(); + identity = policy->authenticate(tx, ctx, auth_error_reason); + if (identity != nullptr) + { + break; + } + else + { + // Collate error details + error_details.push_back( + {policy->get_security_scheme_name(), + ccf::errors::InvalidAuthenticationInfo, + auth_error_reason}); + } + } - const bool is_primary = (consensus == nullptr) || - consensus->can_replicate() || ctx->is_create_request; - const bool forwardable = (consensus != nullptr) && - (consensus->type() == ConsensusType::CFT || - (consensus->type() != ConsensusType::CFT && !ctx->execute_on_node)); + if (identity == nullptr) + { + // If none were accepted, let the last set the response header + endpoint->authn_policies.back()->set_unauthenticated_error( + ctx, std::move(auth_error_reason)); + // Return collated error details for the auth policies + // declared in the request + ctx->set_error( + HTTP_STATUS_UNAUTHORIZED, + ccf::errors::InvalidAuthenticationInfo, + "Invalid info", + error_details); + update_metrics(ctx); + } - if (!is_primary && forwardable) - { - switch (endpoint->properties.forwarding_required) - { - case endpoints::ForwardingRequired::Never: - { - break; - } + return identity; + } - case endpoints::ForwardingRequired::Sometimes: - { - if ( - (ctx->get_session_context()->is_forwarding && - consensus->type() == ConsensusType::CFT) || - (consensus->type() != ConsensusType::CFT && - !ctx->execute_on_node)) - { - ctx->get_session_context()->is_forwarding = true; - return forward(ctx, tx, endpoint); - } - break; - } + void forward( + std::shared_ptr ctx, + kv::ReadOnlyTx& tx, + const endpoints::EndpointDefinitionPtr& endpoint) + { + if (!cmd_forwarder || !consensus) + { + ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "No consensus or forwarder to forward request."); + update_metrics(ctx); + return; + } - case endpoints::ForwardingRequired::Always: - { - ctx->get_session_context()->is_forwarding = true; - return forward(ctx, tx, endpoint); - } - } - } + if (ctx->get_session_context()->is_forwarded) + { + // If the request was already forwarded, return an error to prevent + // daisy chains. + ctx->set_error( + HTTP_STATUS_SERVICE_UNAVAILABLE, + ccf::errors::RequestAlreadyForwarded, + "RPC was already forwarded."); + update_metrics(ctx); + return; + } - auto args = endpoints::EndpointContext(ctx, tx); + auto primary_id = consensus->primary(); + if (!primary_id.has_value()) + { + ctx->set_error( + HTTP_STATUS_SERVICE_UNAVAILABLE, + ccf::errors::InternalError, + "RPC could not be forwarded to unknown primary."); + update_metrics(ctx); + return; + } - size_t attempts = 0; - constexpr auto max_attempts = 30; + // Ignore return value - false only means it is pending + cmd_forwarder->forward_command( + ctx, primary_id.value(), ctx->get_session_context()->caller_cert); - while (attempts < max_attempts) - { - if (attempts > 0) - { - // If the endpoint has already been executed, the effects of its - // execution should be dropped - tx = tables.create_tx(); - ctx->reset_response(); - set_root_on_proposals(*ctx, tx); - endpoints.increment_metrics_retries(endpoint); - } + LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value()); - ++attempts; + // Indicate that the RPC has been forwarded to primary + ctx->response_is_pending = true; - try - { - if (pre_exec) - { - pre_exec(tx, *ctx.get()); - } + // Ensure future requests on this session are forwarded for session + // consistency + ctx->get_session_context()->is_forwarding = true; - // Check auth policies - { - std::unique_ptr identity = nullptr; + return; + } - // If any auth policy was required, check that at least one is - // accepted - if (!endpoint->authn_policies.empty()) - { - std::string auth_error_reason; - std::vector error_details; - for (const auto& policy : endpoint->authn_policies) - { - identity = policy->authenticate(tx, ctx, auth_error_reason); - if (identity != nullptr) - { - break; - } - else - { - // Collate error details - error_details.push_back( - {policy->get_security_scheme_name(), - ccf::errors::InvalidAuthenticationInfo, - auth_error_reason}); - } - } + void process_command( + std::shared_ptr ctx, + kv::CommittableTx& tx, + kv::Version prescribed_commit_version = kv::NoVersion, + ccf::View replicated_view = ccf::VIEW_UNKNOWN) + { + size_t attempts = 0; + constexpr auto max_attempts = 30; - if (identity == nullptr) - { - // If none were accepted, let the last set the response header - endpoint->authn_policies.back()->set_unauthenticated_error( - ctx, std::move(auth_error_reason)); - // Return collated error details for the auth policies - // declared in the request - ctx->set_error( - HTTP_STATUS_UNAUTHORIZED, - ccf::errors::InvalidAuthenticationInfo, - "Invalid info", - error_details); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - } + endpoints.increment_metrics_calls(*ctx); - args.caller = std::move(identity); - } + while (attempts < max_attempts) + { + if (attempts > 0) + { + // If the endpoint has already been executed, the effects of its + // execution should be dropped + tx = tables.create_tx(); + ctx->reset_response(); + set_root_on_proposals(*ctx, tx); + endpoints.increment_metrics_retries(*ctx); + } - endpoints.execute_endpoint(endpoint, args); + ++attempts; + update_history(); - if (!ctx->should_apply_writes()) - { - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } + const auto endpoint = find_endpoint(ctx, tx); + if (endpoint == nullptr) + { + return; + } - kv::CommitResult result; - bool track_read_versions = - (consensus != nullptr && consensus->type() == ConsensusType::BFT); - if (prescribed_commit_version != kv::NoVersion) - { - CCF_ASSERT( - consensus->type() == ConsensusType::BFT, - "Wrong consensus type"); - auto version_resolver = [&](bool) { - tables.next_version(); - return std::make_tuple( - prescribed_commit_version, kv::NoVersion); - }; - tx.set_view(replicated_view); - result = - tx.commit(ctx->claims, track_read_versions, version_resolver); - } - else - { - result = tx.commit(ctx->claims, track_read_versions); - } + try + { + if (!check_uri_allowed(ctx, endpoint)) + { + return; + } + + const bool is_primary = (consensus == nullptr) || + consensus->can_replicate() || ctx->is_create_request; + const bool forwardable = (consensus != nullptr) && + (consensus->type() == ConsensusType::CFT || + (consensus->type() != ConsensusType::CFT && + !ctx->execute_on_node)); - switch (result) + if (!is_primary && forwardable) + { + switch (endpoint->properties.forwarding_required) { - case kv::CommitResult::SUCCESS: + case endpoints::ForwardingRequired::Never: { - auto tx_id = tx.get_txid(); - if (tx_id.has_value() && consensus != nullptr) - { - // Only transactions that acquired one or more map handles - // have a TxID, while others (e.g. unauthenticated commands) - // don't. Also, only report a TxID if the consensus is set, as - // the consensus is required to verify that a TxID is valid. - ctx->set_tx_id(tx_id.value()); - } + break; + } + case endpoints::ForwardingRequired::Sometimes: + { if ( - consensus != nullptr && consensus->can_replicate() && - history != nullptr) + (ctx->get_session_context()->is_forwarding && + consensus->type() == ConsensusType::CFT) || + (consensus->type() != ConsensusType::CFT && + !ctx->execute_on_node)) { - history->try_emit_signature(); + forward(ctx, tx, endpoint); + return; } - - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - - case kv::CommitResult::FAIL_CONFLICT: - { break; } - case kv::CommitResult::FAIL_NO_REPLICATE: + case endpoints::ForwardingRequired::Always: { - ctx->set_error( - HTTP_STATUS_SERVICE_UNAVAILABLE, - ccf::errors::TransactionReplicationFailed, - "Transaction failed to replicate."); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); + forward(ctx, tx, endpoint); + return; } } } - catch (const kv::CompactedVersionConflict& e) + + auto args = endpoints::EndpointContext(ctx, tx); + + // If any auth policy was required, check that at least one is + // accepted + if (!endpoint->authn_policies.empty()) { - // The executing transaction failed because of a conflicting - // compaction. Reset and retry - LOG_DEBUG_FMT( - "Transaction execution conflicted with compaction: {}", e.what()); - continue; + auto identity = get_authenticated_identity(ctx, tx, endpoint); + if (identity == nullptr) + { + return; + } + args.caller = std::move(identity); } - catch (RpcException& e) + + endpoints.execute_endpoint(endpoint, args); + + if (!ctx->should_apply_writes()) { - ctx->set_error(std::move(e.error)); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); + update_metrics(ctx); + return; } - catch (const JsonParseError& e) + + kv::CommitResult result; + bool track_read_versions = + (consensus != nullptr && consensus->type() == ConsensusType::BFT); + if (prescribed_commit_version != kv::NoVersion) { - ctx->set_error( - HTTP_STATUS_BAD_REQUEST, - ccf::errors::InvalidInput, - fmt::format("At {}: {}", e.pointer(), e.what())); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); + CCF_ASSERT( + consensus->type() == ConsensusType::BFT, "Wrong consensus type"); + auto version_resolver = [&](bool) { + tables.next_version(); + return std::make_tuple(prescribed_commit_version, kv::NoVersion); + }; + tx.set_view(replicated_view); + result = + tx.commit(ctx->claims, track_read_versions, version_resolver); } - catch (const nlohmann::json::exception& e) + else { - ctx->set_error( - HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what()); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); + result = tx.commit(ctx->claims, track_read_versions); } - catch (const kv::KvSerialiserException& e) + + switch (result) { - // If serialising the committed transaction fails, there is no way - // to recover safely (https://github.com/microsoft/CCF/issues/338). - // Better to abort. - LOG_DEBUG_FMT("Failed to serialise: {}", e.what()); - LOG_FATAL_FMT("Failed to serialise"); - abort(); - } + case kv::CommitResult::SUCCESS: + { + auto tx_id = tx.get_txid(); + if (tx_id.has_value() && consensus != nullptr) + { + // Only transactions that acquired one or more map handles + // have a TxID, while others (e.g. unauthenticated commands) + // don't. Also, only report a TxID if the consensus is set, as + // the consensus is required to verify that a TxID is valid. + ctx->set_tx_id(tx_id.value()); + } + + if ( + consensus != nullptr && consensus->can_replicate() && + history != nullptr) + { + history->try_emit_signature(); + } + + update_metrics(ctx); + return; + } + + case kv::CommitResult::FAIL_CONFLICT: + { + break; + } + case kv::CommitResult::FAIL_NO_REPLICATE: + { + ctx->set_error( + HTTP_STATUS_SERVICE_UNAVAILABLE, + ccf::errors::TransactionReplicationFailed, + "Transaction failed to replicate."); + update_metrics(ctx); + return; + } + } + } + catch (const kv::CompactedVersionConflict& e) + { + // The executing transaction failed because of a conflicting + // compaction. Reset and retry + LOG_DEBUG_FMT( + "Transaction execution conflicted with compaction: {}", e.what()); + continue; + } + catch (RpcException& e) + { + ctx->set_error(std::move(e.error)); + update_metrics(ctx); + return; + } + catch (const JsonParseError& e) + { ctx->set_error( - HTTP_STATUS_SERVICE_UNAVAILABLE, - ccf::errors::TransactionCommitAttemptsExceedLimit, - fmt::format( - "Transaction continued to conflict after {} attempts. Retry " - "later.", - max_attempts)); - static constexpr size_t retry_after_seconds = 3; - ctx->set_response_header( - http::headers::RETRY_AFTER, retry_after_seconds); + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidInput, + fmt::format("At {}: {}", e.pointer(), e.what())); + update_metrics(ctx); + return; } - } - catch (const std::exception& e) - { - ctx->set_error( - HTTP_STATUS_INTERNAL_SERVER_ERROR, - ccf::errors::InternalError, - e.what()); - update_metrics(ctx, endpoint); - return ctx->serialise_response(); - } - - return ctx->serialise_response(); + catch (const nlohmann::json::exception& e) + { + ctx->set_error( + HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what()); + update_metrics(ctx); + return; + } + catch (const kv::KvSerialiserException& e) + { + // If serialising the committed transaction fails, there is no way + // to recover safely (https://github.com/microsoft/CCF/issues/338). + // Better to abort. + LOG_DEBUG_FMT("Failed to serialise: {}", e.what()); + LOG_FATAL_FMT("Failed to serialise"); + abort(); + } + catch (const std::exception& e) + { + ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + e.what()); + update_metrics(ctx); + return; + } + } // end of while loop + + ctx->set_error( + HTTP_STATUS_SERVICE_UNAVAILABLE, + ccf::errors::TransactionCommitAttemptsExceedLimit, + fmt::format( + "Transaction continued to conflict after {} attempts. Retry " + "later.", + max_attempts)); + static constexpr size_t retry_after_seconds = 3; + ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds); + + return; } public: @@ -600,12 +617,10 @@ namespace ccf * If an RPC that requires writing to the kv store is processed on a * backup, the serialised RPC is forwarded to the current network primary. * - * @param ctx Context for this RPC - * @returns nullopt if the result is pending (to be forwarded, or still - * to-be-executed by consensus), else the response (may contain error) + * @param ctx Context for this RPC. Will be populated with response details + * before this call returns, or else response_is_pending will be set to true */ - std::optional> process( - std::shared_ptr ctx) override + void process(std::shared_ptr ctx) override { update_consensus(); @@ -618,22 +633,21 @@ namespace ccf HTTP_STATUS_NOT_FOUND, ccf::errors::FrontendNotOpen, "Frontend is not open."); - return ctx->serialise_response(); + return; + } + else + { + // NB: If we want to re-execute on backups, the original command could + // be propagated from here + process_command(ctx, tx); } - - // NB: If we want to re-execute on backups, the original command could be - // propagated from here - return process_command(ctx, tx); } /** Process a serialised input forwarded from another node * * @param ctx Context for this forwarded RPC - * - * @return Serialised reply to send back to forwarder node */ - std::vector process_forwarded( - std::shared_ptr ctx) override + void process_forwarded(std::shared_ptr ctx) override { if (!ctx->get_session_context()->is_forwarded) { @@ -645,23 +659,22 @@ namespace ccf auto tx = tables.create_tx(); set_root_on_proposals(*ctx, tx); - const auto endpoint = endpoints.find_endpoint(tx, *ctx); if (consensus->type() == ConsensusType::CFT) { - auto rep = process_command(ctx, tx); - if (!rep.has_value()) + process_command(ctx, tx); + if (ctx->response_is_pending) { // This should never be called when process_command is called with a // forwarded RPC context throw std::logic_error("Forwarded RPC cannot be forwarded"); } - return rep.value(); + return; } else { LOG_FAIL_FMT("Unsupported consensus type"); - return {}; + return; } } diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index f8e38e13aa89..1e2996dd0579 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -1385,15 +1385,15 @@ namespace ccf .set_auto_schema() .install(); - auto jwt_metrics = [this](auto&, nlohmann::json&&) { + auto jwt_metrics = [this](auto& ctx, nlohmann::json&&) { JWTMetrics m; // Attempts are recorded by the key refresh code itself, registering // before each call to each issuer's keys m.attempts = node_operation.get_jwt_attempts(); // Success is marked by the fact that the key succeeded and called // our internal "jwt_keys/refresh" endpoint. - auto e = fully_qualified_endpoints["/jwt_keys/refresh"][HTTP_POST]; - auto metric = get_metrics_for_endpoint(e); + auto metric = get_metrics_for_request( + "/jwt_keys/refresh", llhttp_method_name(HTTP_POST)); m.successes = metric.calls - (metric.failures + metric.errors); return m; }; diff --git a/src/node/rpc/rpc_context_impl.h b/src/node/rpc/rpc_context_impl.h index e29ad761789b..d6e6def0101a 100644 --- a/src/node/rpc/rpc_context_impl.h +++ b/src/node/rpc/rpc_context_impl.h @@ -44,6 +44,7 @@ namespace ccf bool is_create_request = false; bool execute_on_node = false; + bool response_is_pending = false; virtual void set_tx_id(const ccf::TxID& tx_id) = 0; virtual bool should_apply_writes() const = 0; diff --git a/src/node/rpc/test/frontend_test.cpp b/src/node/rpc/test/frontend_test.cpp index f357c4449581..fc40465535ee 100644 --- a/src/node/rpc/test/frontend_test.cpp +++ b/src/node/rpc/test/frontend_test.cpp @@ -550,7 +550,8 @@ TEST_CASE("process with signatures") const auto serialized_call = invalid_call.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_NOT_FOUND); } @@ -569,14 +570,16 @@ TEST_CASE("process with signatures") INFO("Unsigned RPC"); { - const auto serialized_response = frontend.process(simple_rpc_ctx).value(); + frontend.process(simple_rpc_ctx); + const auto serialized_response = simple_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } INFO("Signed RPC"); { - const auto serialized_response = frontend.process(signed_rpc_ctx).value(); + frontend.process(signed_rpc_ctx); + const auto serialized_response = signed_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } @@ -596,7 +599,8 @@ TEST_CASE("process with signatures") INFO("Unsigned RPC"); { - const auto serialized_response = frontend.process(simple_rpc_ctx).value(); + frontend.process(simple_rpc_ctx); + const auto serialized_response = simple_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); CHECK(response.status == HTTP_STATUS_UNAUTHORIZED); @@ -606,7 +610,8 @@ TEST_CASE("process with signatures") INFO("Signed RPC"); { - const auto serialized_response = frontend.process(signed_rpc_ctx).value(); + frontend.process(signed_rpc_ctx); + const auto serialized_response = signed_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } @@ -632,8 +637,9 @@ TEST_CASE("process with caller") INFO("Valid authentication"); { + frontend.process(authenticated_rpc_ctx); const auto serialized_response = - frontend.process(authenticated_rpc_ctx).value(); + authenticated_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); // Even though the RPC does not require authenticated caller, an @@ -643,16 +649,16 @@ TEST_CASE("process with caller") INFO("Invalid authentication"); { - const auto serialized_response = - frontend.process(invalid_rpc_ctx).value(); + frontend.process(invalid_rpc_ctx); + const auto serialized_response = invalid_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } INFO("Anonymous caller"); { - const auto serialized_response = - frontend.process(anonymous_rpc_ctx).value(); + frontend.process(anonymous_rpc_ctx); + const auto serialized_response = anonymous_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } @@ -671,16 +677,17 @@ TEST_CASE("process with caller") INFO("Valid authentication"); { + frontend.process(authenticated_rpc_ctx); const auto serialized_response = - frontend.process(authenticated_rpc_ctx).value(); + authenticated_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_OK); } INFO("Invalid authentication"); { - const auto serialized_response = - frontend.process(invalid_rpc_ctx).value(); + frontend.process(invalid_rpc_ctx); + const auto serialized_response = invalid_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_UNAUTHORIZED); const std::string error_msg(response.body.begin(), response.body.end()); @@ -691,8 +698,8 @@ TEST_CASE("process with caller") INFO("Anonymous caller"); { - const auto serialized_response = - frontend.process(anonymous_rpc_ctx).value(); + frontend.process(anonymous_rpc_ctx); + const auto serialized_response = anonymous_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); REQUIRE(response.status == HTTP_STATUS_UNAUTHORIZED); const std::string error_msg(response.body.begin(), response.body.end()); @@ -712,8 +719,8 @@ TEST_CASE("No certs table") INFO("Authenticated caller"); { auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - std::vector serialized_response = - frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); CHECK(response.status == HTTP_STATUS_OK); } @@ -721,8 +728,8 @@ TEST_CASE("No certs table") INFO("Anonymous caller"); { auto rpc_ctx = ccf::make_rpc_context(anonymous_session, serialized_call); - std::vector serialized_response = - frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); CHECK(response.status == HTTP_STATUS_OK); } @@ -744,8 +751,8 @@ TEST_CASE("Member caller") { auto member_rpc_ctx = ccf::make_rpc_context(member_session, serialized_call); - std::vector serialized_response = - frontend.process(member_rpc_ctx).value(); + frontend.process(member_rpc_ctx); + const auto serialized_response = member_rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); CHECK(response.status == HTTP_STATUS_OK); } @@ -753,8 +760,8 @@ TEST_CASE("Member caller") SUBCASE("invalid caller") { auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - std::vector serialized_response = - frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); auto response = parse_response(serialized_response); CHECK(response.status == HTTP_STATUS_UNAUTHORIZED); } @@ -777,7 +784,8 @@ TEST_CASE("JsonWrappedEndpointFunction") const auto serialized_call = echo_call.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); const auto response_body = parse_response_body(response.body, pack_type); @@ -800,7 +808,8 @@ TEST_CASE("JsonWrappedEndpointFunction") const auto serialized_call = echo_call.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); const auto response_body = parse_response_body(response.body, pack_type); @@ -814,7 +823,8 @@ TEST_CASE("JsonWrappedEndpointFunction") const auto serialized_call = get_caller.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); const auto response_body = parse_response_body(response.body, pack_type); @@ -828,7 +838,8 @@ TEST_CASE("JsonWrappedEndpointFunction") const auto serialized_call = dont_fail.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } @@ -849,7 +860,8 @@ TEST_CASE("JsonWrappedEndpointFunction") const auto serialized_call = fail.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == err); CHECK( response.headers[http::headers::CONTENT_TYPE] == @@ -880,7 +892,8 @@ TEST_CASE("Restricted verbs") http::Request get("get_only", verb); const auto serialized_get = get.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_get); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); const auto response = parse_response(serialized_response); if (verb == HTTP_GET) { @@ -900,7 +913,8 @@ TEST_CASE("Restricted verbs") http::Request post("post_only", verb); const auto serialized_post = post.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_post); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); const auto response = parse_response(serialized_response); if (verb == HTTP_POST) { @@ -921,7 +935,8 @@ TEST_CASE("Restricted verbs") const auto serialized_put_or_delete = put_or_delete.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_put_or_delete); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); const auto response = parse_response(serialized_response); if (verb == HTTP_PUT || verb == HTTP_DELETE) { @@ -987,7 +1002,8 @@ TEST_CASE("Explicit commitability") const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); const auto response = parse_response(serialized_response); CHECK(response.status == status); @@ -1022,7 +1038,8 @@ TEST_CASE("Explicit commitability") const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); - const auto serialized_response = frontend.process(rpc_ctx).value(); + frontend.process(rpc_ctx); + const auto serialized_response = rpc_ctx->serialise_response(); const auto response = parse_response(serialized_response); CHECK(response.status == status); @@ -1055,7 +1072,8 @@ TEST_CASE("Alternative endpoints") const auto serialized_command = command.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_command); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } @@ -1065,7 +1083,8 @@ TEST_CASE("Alternative endpoints") const auto serialized_read_only = read_only.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_read_only); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } } @@ -1081,7 +1100,8 @@ TEST_CASE("Templated paths") const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); std::map expected_mapping; @@ -1100,7 +1120,8 @@ TEST_CASE("Templated paths") const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); std::map expected_mapping; @@ -1126,7 +1147,8 @@ TEST_CASE("Decoded Templated paths") const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); std::map expected_mapping; @@ -1153,7 +1175,8 @@ TEST_CASE("Signed read requests can be executed on backup") auto signed_call = create_signed_request(user_caller); auto serialized_signed_call = signed_call.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_signed_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } @@ -1188,11 +1211,11 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) INFO("Backup frontend without forwarder does not forward"); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_backup.process(backup_ctx); - REQUIRE(r.has_value()); + user_frontend_backup.process(backup_ctx); + REQUIRE(!backup_ctx->response_is_pending); REQUIRE(channel_stub->is_empty()); - const auto response = parse_response(r.value()); + const auto response = parse_response(backup_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_INTERNAL_SERVER_ERROR); } @@ -1204,11 +1227,11 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) TestUserFrontend user_frontend_backup_read(*network_backup.tables); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_backup_read.process(backup_ctx); - REQUIRE(r.has_value()); + user_frontend_backup_read.process(backup_ctx); + REQUIRE(!backup_ctx->response_is_pending); REQUIRE(channel_stub->is_empty()); - const auto response = parse_response(r.value()); + const auto response = parse_response(backup_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } @@ -1216,8 +1239,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) INFO("Write command on backup is forwarded to primary"); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_backup.process(backup_ctx); - REQUIRE(!r.has_value()); + user_frontend_backup.process(backup_ctx); + REQUIRE(backup_ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); @@ -1226,8 +1249,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) { INFO("Invalid caller"); - auto response = - parse_response(user_frontend_primary.process_forwarded(fwd_ctx)); + user_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_UNAUTHORIZED); }; @@ -1235,8 +1258,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) { INFO("Valid caller"); - auto response = - parse_response(user_frontend_primary.process_forwarded(fwd_ctx)); + user_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } } @@ -1245,8 +1268,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) INFO("Forwarding write command to a backup returns error"); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_backup.process(backup_ctx); - REQUIRE(!r.has_value()); + user_frontend_backup.process(backup_ctx); + REQUIRE(backup_ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); @@ -1255,8 +1278,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) // Processing forwarded response by a backup frontend (here, the same // frontend that the command was originally issued to) - auto response = - parse_response(user_frontend_backup.process_forwarded(fwd_ctx)); + user_frontend_backup.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); // Command was already forwarded CHECK(response.status == HTTP_STATUS_SERVICE_UNAVAILABLE); @@ -1271,8 +1294,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) user_frontend_backup_read.set_cmd_forwarder(backup_forwarder); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_backup_read.process(backup_ctx); - REQUIRE(!r.has_value()); + user_frontend_backup_read.process(backup_ctx); + REQUIRE(backup_ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); channel_stub->clear(); @@ -1286,16 +1309,16 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) auto serialized_signed_call = signed_call.build_request(); auto signed_ctx = ccf::make_rpc_context(user_session, serialized_signed_call); - const auto r = user_frontend_backup.process(signed_ctx); - REQUIRE(!r.has_value()); + user_frontend_backup.process(signed_ctx); + REQUIRE(signed_ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); auto fwd_ctx = backup_forwarder->recv_forwarded_command( kv::test::FirstBackupNodeId, forwarded_msg.data(), forwarded_msg.size()); - auto response = - parse_response(user_frontend_primary.process_forwarded(fwd_ctx)); + user_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } @@ -1306,9 +1329,9 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) INFO("Write command primary on a forwarded session succeeds"); REQUIRE(channel_stub->is_empty()); - const auto r = user_frontend_primary.process(ctx); - CHECK(r.has_value()); - auto response = parse_response(r.value()); + user_frontend_primary.process(ctx); + CHECK(!ctx->response_is_pending); + auto response = parse_response(ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); } } @@ -1346,16 +1369,16 @@ TEST_CASE("Nodefrontend forwarding" * doctest::test_suite("forwarding")) auto node_session = std::make_shared( ccf::InvalidSessionId, node_caller.raw()); auto ctx = ccf::make_rpc_context(node_session, serialized_call); - const auto r = node_frontend_backup.process(ctx); - REQUIRE(!r.has_value()); + node_frontend_backup.process(ctx); + REQUIRE(ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); auto fwd_ctx = backup_forwarder->recv_forwarded_command( kv::test::FirstBackupNodeId, forwarded_msg.data(), forwarded_msg.size()); - auto response = - parse_response(node_frontend_primary.process_forwarded(fwd_ctx)); + node_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); CHECK(node_frontend_primary.last_caller_cert == node_caller); @@ -1390,16 +1413,16 @@ TEST_CASE("Userfrontend forwarding" * doctest::test_suite("forwarding")) auto serialized_call = write_req.build_request(); auto ctx = ccf::make_rpc_context(user_session, serialized_call); - const auto r = user_frontend_backup.process(ctx); - REQUIRE(!r.has_value()); + user_frontend_backup.process(ctx); + REQUIRE(ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); auto fwd_ctx = backup_forwarder->recv_forwarded_command( kv::test::FirstBackupNodeId, forwarded_msg.data(), forwarded_msg.size()); - auto response = - parse_response(user_frontend_primary.process_forwarded(fwd_ctx)); + user_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); CHECK(user_frontend_primary.last_caller_cert == user_caller); @@ -1438,16 +1461,16 @@ TEST_CASE("Memberfrontend forwarding" * doctest::test_suite("forwarding")) auto serialized_call = write_req.build_request(); auto ctx = ccf::make_rpc_context(member_session, serialized_call); - const auto r = member_frontend_backup.process(ctx); - REQUIRE(!r.has_value()); + member_frontend_backup.process(ctx); + REQUIRE(ctx->response_is_pending); REQUIRE(channel_stub->size() == 1); auto forwarded_msg = channel_stub->get_pop_back(); auto fwd_ctx = backup_forwarder->recv_forwarded_command( kv::test::FirstBackupNodeId, forwarded_msg.data(), forwarded_msg.size()); - auto response = - parse_response(member_frontend_primary.process_forwarded(fwd_ctx)); + member_frontend_primary.process_forwarded(fwd_ctx); + auto response = parse_response(fwd_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); CHECK(member_frontend_primary.last_caller_cert == member_cert); @@ -1512,14 +1535,15 @@ TEST_CASE("Retry on conflict") constexpr size_t ccf_max_attempts = 30; // Defined by CCF (frontend.h) - INFO("Does not each execution limit"); + INFO("Does not reach execution limit"); { size_t retry_count = ccf_max_attempts - 1; req.set_header("test-retry-count", fmt::format("{}", retry_count)); auto serialized_call = req.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); // Response headers are cleared once conflict is resolved @@ -1535,7 +1559,8 @@ TEST_CASE("Retry on conflict") auto serialized_call = req.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_SERVICE_UNAVAILABLE); CHECK(response.headers["test-has-conflicted"] == "true"); @@ -1618,7 +1643,8 @@ TEST_CASE("Manual conflicts") auto req = create_simple_request("/pausable"); auto serialized_call = req.build_request(); auto rpc_ctx = ccf::make_rpc_context(session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == expected_status); }; @@ -1627,7 +1653,8 @@ TEST_CASE("Manual conflicts") req.set_method(HTTP_GET); auto serialized_call = req.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - auto response = parse_response(frontend.process(rpc_ctx).value()); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); CHECK(response.status == HTTP_STATUS_OK); auto body = nlohmann::json::parse(response.body); auto& element = body["metrics"]; diff --git a/src/node/rpc/test/frontend_test_infra.h b/src/node/rpc/test/frontend_test_infra.h index 1f6f361101d0..981ad650de64 100644 --- a/src/node/rpc/test/frontend_test_infra.h +++ b/src/node/rpc/test/frontend_test_infra.h @@ -120,14 +120,15 @@ auto frontend_process( ccf::InvalidSessionId, crypto::make_verifier(caller)->cert_der()); auto rpc_ctx = ccf::make_rpc_context(session, serialized_request); http::extract_actor(*rpc_ctx); - auto serialized_response = frontend.process(rpc_ctx); + frontend.process(rpc_ctx); + DOCTEST_CHECK(!rpc_ctx->response_is_pending); - DOCTEST_CHECK(serialized_response.has_value()); + auto serialized_response = rpc_ctx->serialise_response(); http::SimpleResponseProcessor processor; http::ResponseParser parser(processor); - parser.execute(serialized_response->data(), serialized_response->size()); + parser.execute(serialized_response.data(), serialized_response.size()); DOCTEST_REQUIRE(processor.received.size() == 1); return processor.received.front(); diff --git a/src/node/rpc/test/node_frontend_test.cpp b/src/node/rpc/test/node_frontend_test.cpp index 79c970a6c72d..6df3eecee5cd 100644 --- a/src/node/rpc/test/node_frontend_test.cpp +++ b/src/node/rpc/test/node_frontend_test.cpp @@ -36,14 +36,15 @@ TResponse frontend_process( auto session = std::make_shared(ccf::InvalidSessionId, caller.raw()); auto rpc_ctx = ccf::make_rpc_context(session, serialise_request); - auto serialised_response = frontend.process(rpc_ctx); + frontend.process(rpc_ctx); - CHECK(serialised_response.has_value()); + CHECK(!rpc_ctx->response_is_pending); + const auto serialised_response = rpc_ctx->serialise_response(); http::SimpleResponseProcessor processor; http::ResponseParser parser(processor); - parser.execute(serialised_response->data(), serialised_response->size()); + parser.execute(serialised_response.data(), serialised_response.size()); REQUIRE(processor.received.size() == 1); return processor.received.front(); diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index 19ca6773abd9..711e52fb31c2 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -701,6 +701,7 @@ def get_metrics(r, path, method, default=None): ) except StopIteration: if default is None: + LOG.error(f"Found no metrics for {method} {path}") raise else: return default