Skip to content

Commit

Permalink
Refactor frontend to remove return-based response, and retry endpoint…
Browse files Browse the repository at this point in the history
… lookup on conflict (#4205)
  • Loading branch information
eddyashton authored Sep 9, 2022
1 parent 8619352 commit 869da22
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 436 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Application-defined endpoints are now accessible with both `/app` prefix and un-prefixed, e.g. `GET /app/log/private` and `GET /log/private` (#4147).

### Changed

- The method `EndpointRegistry::get_metrics_for_endpoint(const EndpointDefinitionPtr&)` has been replaced with `EndpointRegistry::get_metrics_for_endpoint(const std::string& method, const std::string& verb)`.

## [3.0.0-dev2]

### Dependencies
Expand Down
12 changes: 6 additions & 6 deletions include/ccf/endpoint_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ namespace ccf::endpoints
ccf::pal::Mutex metrics_lock;
std::map<std::string, std::map<std::string, Metrics>> metrics;

EndpointRegistry::Metrics& get_metrics_for_endpoint(
const EndpointDefinitionPtr& e);
EndpointRegistry::Metrics& get_metrics_for_request(
const std::string& method, const std::string& verb);

kv::Consensus* consensus = nullptr;
kv::TxHistory* history = nullptr;
Expand Down Expand Up @@ -228,9 +228,9 @@ namespace ccf::endpoints

void set_history(kv::TxHistory* h);

virtual void increment_metrics_calls(const EndpointDefinitionPtr& e);
virtual void increment_metrics_errors(const EndpointDefinitionPtr& e);
virtual void increment_metrics_failures(const EndpointDefinitionPtr& e);
virtual void increment_metrics_retries(const EndpointDefinitionPtr& e);
virtual void increment_metrics_calls(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_errors(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_failures(const ccf::RpcContext& rpc_ctx);
virtual void increment_metrics_retries(const ccf::RpcContext& rpc_ctx);
};
}
1 change: 1 addition & 0 deletions include/ccf/rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace ccf
virtual void set_response_body(std::vector<uint8_t>&& body) = 0;
/// Sets the main body or payload of the response.
virtual void set_response_body(std::string&& body) = 0;
virtual const std::vector<uint8_t>& get_response_body() const = 0;

/// Sets initial status code summarising result of RPC.
virtual void set_response_status(int status) = 0;
Expand Down
3 changes: 1 addition & 2 deletions src/enclave/rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace ccf
virtual bool is_open() = 0;

// Used by rpcendpoint to process incoming client RPCs
virtual std::optional<std::vector<uint8_t>> process(
std::shared_ptr<RpcContextImpl> ctx) = 0;
virtual void process(std::shared_ptr<RpcContextImpl> ctx) = 0;
};
}
33 changes: 20 additions & 13 deletions src/endpoints/endpoint_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,11 @@ namespace ccf::endpoints
return spec;
}

EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_endpoint(
const EndpointDefinitionPtr& e)
EndpointRegistry::Metrics& EndpointRegistry::get_metrics_for_request(
const std::string& method_, const std::string& verb)
{
auto method = e->dispatch.uri_path;
method = method.substr(method.find_first_not_of('/'));
return metrics[method][e->dispatch.verb.c_str()];
auto method = method_.substr(method_.find_first_not_of('/'));
return metrics[method][verb];
}

Endpoint EndpointRegistry::make_endpoint(
Expand Down Expand Up @@ -434,30 +433,38 @@ namespace ccf::endpoints
history = h;
}

void EndpointRegistry::increment_metrics_calls(const EndpointDefinitionPtr& e)
void EndpointRegistry::increment_metrics_calls(const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).calls++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.calls++;
}

void EndpointRegistry::increment_metrics_errors(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).errors++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.errors++;
}

void EndpointRegistry::increment_metrics_failures(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).failures++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.failures++;
}

void EndpointRegistry::increment_metrics_retries(
const EndpointDefinitionPtr& e)
const ccf::RpcContext& rpc_ctx)
{
std::lock_guard<ccf::pal::Mutex> guard(metrics_lock);
get_metrics_for_endpoint(e).retries++;
get_metrics_for_request(
rpc_ctx.get_method(), rpc_ctx.get_request_verb().c_str())
.retries++;
}
}
4 changes: 2 additions & 2 deletions src/http/http2_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ namespace http
search = rpc_map->find(ccf::ActorsType::users);
}

auto response = search.value()->process(rpc_ctx);
search.value()->process(rpc_ctx);

if (!response.has_value())
if (rpc_ctx->response_is_pending)
{
// If the RPC is pending, hold the connection.
LOG_TRACE_FMT("Pending");
Expand Down
7 changes: 4 additions & 3 deletions src/http/http_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,18 @@ namespace http
search = rpc_map->find(ccf::ActorsType::users);
}

auto response = search.value()->process(rpc_ctx);
search.value()->process(rpc_ctx);

if (!response.has_value())
if (rpc_ctx->response_is_pending)
{
// If the RPC is pending, hold the connection.
LOG_TRACE_FMT("Pending");
return;
}
else
{
send_buffered(response.value());
const auto response = rpc_ctx->serialise_response();
send_buffered(response);
flush();
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/http/http_rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ namespace http
response_body = std::vector<uint8_t>(body.begin(), body.end());
}

virtual const std::vector<uint8_t>& get_response_body() const override
{
return response_body;
}

virtual void set_response_status(int status) override
{
response_status = (http_status)status;
Expand Down
32 changes: 11 additions & 21 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1923,32 +1923,26 @@ namespace ccf
return request.build_request();
}

bool parse_create_response(const std::vector<uint8_t>& response)
bool extract_create_result(const std::shared_ptr<RpcContext>& ctx)
{
http::SimpleResponseProcessor processor;
http::ResponseParser parser(processor);

parser.execute(response.data(), response.size());

if (processor.received.size() != 1)
if (ctx == nullptr)
{
LOG_FAIL_FMT(
"Expected single message, found {}", processor.received.size());
LOG_FAIL_FMT("Expected non-null context");
return false;
}

const auto& r = processor.received.front();

if (r.status != HTTP_STATUS_OK)
const auto status = ctx->get_response_status();
if (status != HTTP_STATUS_OK)
{
LOG_FAIL_FMT(
"Create response is error: {} {}",
r.status,
http_status_str(r.status));
status,
http_status_str((http_status)status));
return false;
}

const auto body = serdes::unpack(r.body, serdes::Pack::Text);
const auto body =
serdes::unpack(ctx->get_response_body(), serdes::Pack::Text);
if (!body.is_boolean())
{
LOG_FAIL_FMT("Expected boolean body in create response");
Expand Down Expand Up @@ -1983,13 +1977,9 @@ namespace ccf
}
auto frontend = frontend_opt.value();

const auto response = frontend->process(ctx);
if (!response.has_value())
{
return false;
}
frontend->process(ctx);

return parse_create_response(response.value());
return extract_create_result(ctx);
}

void create_and_send_boot_request(
Expand Down
6 changes: 4 additions & 2 deletions src/node/rpc/forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ccf
public:
virtual ~ForwardedRpcHandler() {}

virtual std::vector<uint8_t> process_forwarded(
virtual void process_forwarded(
std::shared_ptr<ccf::RpcContextImpl> fwd_ctx) = 0;
};

Expand Down Expand Up @@ -243,11 +243,13 @@ namespace ccf
return;
}

fwd_handler->process_forwarded(ctx);

// Ignore return value - false only means it is pending
send_forwarded_response(
ctx->get_session_context()->client_session_id,
from,
fwd_handler->process_forwarded(ctx));
ctx->serialise_response());
LOG_DEBUG_FMT("Sending forwarded response to {}", from);
}
break;
Expand Down
Loading

0 comments on commit 869da22

Please sign in to comment.