Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor frontend to remove return-based response, and retry endpoint lookup on conflict #4205

Merged
merged 11 commits into from
Sep 9, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Application-defined endpoints are now accessible with both `/app` prefix and un-prefixed, e.g. `GET /app/log/private` and `GET /log/private` (#4147).

### Changed

- The method `EndpointRegistry::get_metrics_for_endpoint(const EndpointDefinitionPtr&)` has been replaced with `EndpointRegistry::get_metrics_for_endpoint(const std::string& method, const std::string& verb)`.

## [3.0.0-dev2]

### Dependencies
Expand Down
12 changes: 6 additions & 6 deletions include/ccf/endpoint_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ namespace ccf::endpoints
ccf::pal::Mutex metrics_lock;
std::map<std::string, std::map<std::string, Metrics>> metrics;

EndpointRegistry::Metrics& get_metrics_for_endpoint(
const EndpointDefinitionPtr& e);
EndpointRegistry::Metrics& get_metrics_for_request(
eddyashton marked this conversation as resolved.
Show resolved Hide resolved
const std::string& method, const std::string& verb);

kv::Consensus* consensus = nullptr;
kv::TxHistory* history = nullptr;
Expand Down Expand Up @@ -228,9 +228,9 @@ namespace ccf::endpoints

void set_history(kv::TxHistory* h);

virtual void increment_metrics_calls(const EndpointDefinitionPtr& e);
virtual void increment_metrics_errors(const EndpointDefinitionPtr& e);
virtual void increment_metrics_failures(const EndpointDefinitionPtr& e);
virtual void increment_metrics_retries(const EndpointDefinitionPtr& e);
virtual void increment_metrics_calls(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_errors(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_failures(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_retries(const ccf::RpcContext& rpc_ctx);
};
}
1 change: 1 addition & 0 deletions include/ccf/rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace ccf
virtual void set_response_body(std::vector<uint8_t>&& body) = 0;
/// Sets the main body or payload of the response.
virtual void set_response_body(std::string&& body) = 0;
virtual const std::vector<uint8_t>& get_response_body() const = 0;

/// Sets initial status code summarising result of RPC.
virtual void set_response_status(int status) = 0;
Expand Down
3 changes: 1 addition & 2 deletions src/enclave/rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace ccf
virtual bool is_open() = 0;

// Used by rpcendpoint to process incoming client RPCs
virtual std::optional<std::vector<uint8_t>> process(
std::shared_ptr<RpcContextImpl> ctx) = 0;
virtual void process(std::shared_ptr<RpcContextImpl> ctx) = 0;
};
}
33 changes: 20 additions & 13 deletions src/endpoints/endpoint_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,11 @@ namespace ccf::endpoints
return spec;
}

EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_endpoint(
const EndpointDefinitionPtr& e)
EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_request(
const std::string& method_, const std::string& verb)
{
auto method = e->dispatch.uri_path;
method = method.substr(method.find_first_not_of('/'));
return metrics[method][e->dispatch.verb.c_str()];
auto method = method_.substr(method_.find_first_not_of('/'));
return metrics[method][verb];
}

Endpoint EndpointRegistry::make_endpoint(
Expand Down Expand Up @@ -434,30 +433,38 @@ namespace ccf::endpoints
history = h;
}

void EndpointRegistry::increment_metrics_calls(const EndpointDefinitionPtr& e)
void EndpointRegistry::increment_metrics_calls(const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).calls++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.calls++;
}

void EndpointRegistry::increment_metrics_errors(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).errors++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.errors++;
}

void EndpointRegistry::increment_metrics_failures(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).failures++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.failures++;
}

void EndpointRegistry::increment_metrics_retries(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).retries++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.retries++;
}
}
4 changes: 2 additions & 2 deletions src/http/http2_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ namespace http
search = rpc_map->find(ccf::ActorsType::users);
}

auto response = search.value()->process(rpc_ctx);
search.value()->process(rpc_ctx);

if (!response.has_value())
if (rpc_ctx->response_is_pending)
{
// If the RPC is pending, hold the connection.
LOG_TRACE_FMT("Pending");
Expand Down
7 changes: 4 additions & 3 deletions src/http/http_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,18 @@ namespace http
search = rpc_map->find(ccf::ActorsType::users);
}

auto response = search.value()->process(rpc_ctx);
search.value()->process(rpc_ctx);

if (!response.has_value())
if (rpc_ctx->response_is_pending)
{
// If the RPC is pending, hold the connection.
LOG_TRACE_FMT("Pending");
return;
}
else
{
send_buffered(response.value());
const auto response = rpc_ctx->serialise_response();
send_buffered(response);
flush();
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/http/http_rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ namespace http
response_body = std::vector<uint8_t>(body.begin(), body.end());
}

virtual const std::vector<uint8_t>& get_response_body() const override
{
return response_body;
}

virtual void set_response_status(int status) override
{
response_status = (http_status)status;
Expand Down
32 changes: 11 additions & 21 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1923,32 +1923,26 @@ namespace ccf
return request.build_request();
}

bool parse_create_response(const std::vector<uint8_t>& response)
bool extract_create_result(const std::shared_ptr<RpcContext>& ctx)
{
http::SimpleResponseProcessor processor;
http::ResponseParser parser(processor);

parser.execute(response.data(), response.size());

if (processor.received.size() != 1)
if (ctx == nullptr)
{
LOG_FAIL_FMT(
"Expected single message, found {}", processor.received.size());
LOG_FAIL_FMT("Expected non-null context");
return false;
}

const auto& r = processor.received.front();

if (r.status != HTTP_STATUS_OK)
const auto status = ctx->get_response_status();
if (status != HTTP_STATUS_OK)
{
LOG_FAIL_FMT(
"Create response is error: {} {}",
r.status,
http_status_str(r.status));
status,
http_status_str((http_status)status));
return false;
}

const auto body = serdes::unpack(r.body, serdes::Pack::Text);
const auto body =
serdes::unpack(ctx->get_response_body(), serdes::Pack::Text);
if (!body.is_boolean())
{
LOG_FAIL_FMT("Expected boolean body in create response");
Expand Down Expand Up @@ -1983,13 +1977,9 @@ namespace ccf
}
auto frontend = frontend_opt.value();

const auto response = frontend->process(ctx);
if (!response.has_value())
{
return false;
}
frontend->process(ctx);

return parse_create_response(response.value());
return extract_create_result(ctx);
}

void create_and_send_boot_request(
Expand Down
6 changes: 4 additions & 2 deletions src/node/rpc/forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ccf
public:
virtual ~ForwardedRpcHandler() {}

virtual std::vector<uint8_t> process_forwarded(
virtual void process_forwarded(
std::shared_ptr<ccf::RpcContextImpl> fwd_ctx) = 0;
};

Expand Down Expand Up @@ -243,11 +243,13 @@ namespace ccf
return;
}

fwd_handler->process_forwarded(ctx);

// Ignore return value - false only means it is pending
send_forwarded_response(
ctx->get_session_context()->client_session_id,
from,
fwd_handler->process_forwarded(ctx));
ctx->serialise_response());
LOG_DEBUG_FMT("Sending forwarded response to {}", from);
}
break;
Expand Down
Loading