Skip to content

Commit

Permalink
Support retrying failed web requests for eventhub_client sends (#1654)
Browse files Browse the repository at this point in the history
* WIP

* Add unit tests, fix out of order requests, extend test http server

* Fix incorrect naming convention of variable

* Fix merge issue
  • Loading branch information
jackgerrits authored and rajan-chari committed Oct 31, 2018
1 parent f11d323 commit 6e6f2f6
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 123 deletions.
2 changes: 2 additions & 0 deletions reinforcement_learning/include/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ namespace reinforcement_learning { namespace name {
const char *const INTERACTION_EH_KEY_NAME = "interaction.eventhub.keyname";
const char *const INTERACTION_EH_KEY = "interaction.eventhub.key";
const char *const INTERACTION_EH_TASKS_LIMIT = "interaction.eventhub.tasks_limit";
const char *const INTERACTION_EH_MAX_HTTP_RETRIES = "interaction.eventhub.max_http_retries";
const char *const OBSERVATION_EH_HOST = "observation.eventhub.host";
const char *const OBSERVATION_EH_NAME = "observation.eventhub.name";
const char *const OBSERVATION_EH_KEY_NAME = "observation.eventhub.keyname";
const char *const OBSERVATION_EH_KEY = "observation.eventhub.key";
const char *const OBSERVATION_EH_TASKS_LIMIT = "observation.eventhub.tasks_limit";
const char *const OBSERVATION_EH_MAX_HTTP_RETRIES = "observation.eventhub.max_http_retries";
const char *const INTERACTION_SEND_HIGH_WATER_MARK = "interaction.send.highwatermark";
const char *const INTERACTION_SEND_QUEUE_MAXSIZE = "interaction.send.queue.maxsize";
const char *const INTERACTION_SEND_BATCH_INTERVAL_MS = "interaction.send.batchintervalms";
Expand Down
10 changes: 6 additions & 4 deletions reinforcement_learning/rlclientlib/azure_factories.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,30 @@ namespace reinforcement_learning {
return error_code::success;
}

int observation_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* _error_cb, i_trace* trace_logger, api_status* status) {
int observation_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* error_cb, i_trace* trace_logger, api_status* status) {
*retval = new eventhub_client(
cfg.get(name::OBSERVATION_EH_HOST, "localhost:8080"),
cfg.get(name::OBSERVATION_EH_KEY_NAME, ""),
cfg.get(name::OBSERVATION_EH_KEY, ""),
cfg.get(name::OBSERVATION_EH_NAME, "observation"),
cfg.get_int(name::OBSERVATION_EH_TASKS_LIMIT, 16),
cfg.get_int(name::OBSERVATION_EH_MAX_HTTP_RETRIES, 4),
trace_logger,
_error_cb,
error_cb,
cfg.get_bool(name::EH_TEST, false));
return error_code::success;
}

int interaction_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* _error_cb, i_trace* trace_logger, api_status* status) {
int interaction_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* error_cb, i_trace* trace_logger, api_status* status) {
*retval = new eventhub_client(
cfg.get(name::INTERACTION_EH_HOST, "localhost:8080"),
cfg.get(name::INTERACTION_EH_KEY_NAME, ""),
cfg.get(name::INTERACTION_EH_KEY, ""),
cfg.get(name::INTERACTION_EH_NAME, "interaction"),
cfg.get_int(name::INTERACTION_EH_TASKS_LIMIT, 16),
cfg.get_int(name::INTERACTION_EH_MAX_HTTP_RETRIES, 4),
trace_logger,
_error_cb,
error_cb,
cfg.get_bool(name::EH_TEST, false));
return error_code::success;
}
Expand Down
10 changes: 6 additions & 4 deletions reinforcement_learning/rlclientlib/factory_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,30 @@ namespace reinforcement_learning {
return error_code::success;
}

int observation_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* _error_cb, i_trace* trace_logger, api_status* status) {
int observation_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* error_cb, i_trace* trace_logger, api_status* status) {
*retval = new eventhub_client(
cfg.get(name::OBSERVATION_EH_HOST, "localhost:8080"),
cfg.get(name::OBSERVATION_EH_KEY_NAME, ""),
cfg.get(name::OBSERVATION_EH_KEY, ""),
cfg.get(name::OBSERVATION_EH_NAME, "observation"),
cfg.get_int(name::OBSERVATION_EH_TASKS_LIMIT, 16),
cfg.get_int(name::OBSERVATION_EH_MAX_HTTP_RETRIES, 4),
trace_logger,
_error_cb,
error_cb,
cfg.get_bool(name::EH_TEST, false));
return error_code::success;
}

int interaction_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* _error_cb, i_trace* trace_logger, api_status* status) {
int interaction_sender_create(i_sender** retval, const u::configuration& cfg, error_callback_fn* error_cb, i_trace* trace_logger, api_status* status) {
*retval = new eventhub_client(
cfg.get(name::INTERACTION_EH_HOST, "localhost:8080"),
cfg.get(name::INTERACTION_EH_KEY_NAME, ""),
cfg.get(name::INTERACTION_EH_KEY, ""),
cfg.get(name::INTERACTION_EH_NAME, "interaction"),
cfg.get_int(name::INTERACTION_EH_TASKS_LIMIT, 16),
cfg.get_int(name::INTERACTION_EH_MAX_HTTP_RETRIES, 4),
trace_logger,
_error_cb,
error_cb,
cfg.get_bool(name::EH_TEST, false));
return error_code::success;
}
Expand Down
Loading

0 comments on commit 6e6f2f6

Please sign in to comment.