Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
feat(circuit_breaker logger)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Oct 31, 2018
1 parent 8c33481 commit 1b844cb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 71 deletions.
55 changes: 15 additions & 40 deletions ethcore/light/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ pub const DEFAULT_REQUEST_BACKOFF_ATTEMPTS: usize = 10;
pub const DEFAULT_MIN_BACKOFF_DURATION: Duration = Duration::from_secs(10);
/// The maximum request interval for OnDemand queries
pub const DEFAULT_MAX_BACKOFF_DURATION: Duration = Duration::from_secs(100);
/// The default success rate of a time window that must be met otherwise it is regarded as a failure
pub const DEFAULT_SUCCESS_RATE: f64 = 0.8;
/// The default window length when a requested is evaluated as successful or as a failure
pub const DEFAULT_WINDOW_DURATION: Duration = Duration::from_secs(10);
/// The default number of maximum backoff iterations
Expand All @@ -81,16 +79,16 @@ pub mod error {
}

errors {
#[doc = "Failure rate of bad responses exceeded"]
#[doc = "Timeout bad response"]
BadResponse(err: super::ResponseGuardError) {
description("Failure rate of bad responses exceeded")
display("Failure rate of bad responses exceeded, determined failure was: {:?}", err)
description("Max response evaluation time exceeded")
display("Bad response timeout, {}", err)
}

#[doc = "Failure rate for OnDemand requests were exceeded"]
RequestLimit {
description("Failure rate for OnDemand requests were exceeded")
display("Failure rate for OnDemand requests were exceeded")
display("Request time out")
}
}
}
Expand Down Expand Up @@ -330,7 +328,6 @@ pub struct OnDemand {
in_transit: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
no_immediate_dispatch: bool,
required_success_rate: f64,
time_window_dur: Duration,
start_backoff_dur: Duration,
max_backoff_dur: Duration,
Expand All @@ -342,24 +339,22 @@ impl OnDemand {
/// Create a new `OnDemand` service with the given cache.
pub fn new(
cache: Arc<Mutex<Cache>>,
required_success_rate: f64,
time_window_dur: Duration,
start_backoff_dur: Duration,
max_backoff_dur: Duration,
max_backoff_rounds: usize,
) -> Self {

// santize input in order to make sure that it doesn't panic
let (s_success_rate, s_window_dur, s_start_backoff_dur, s_max_backoff_dur) =
Self::santize_circuit_breaker_input(required_success_rate, time_window_dur, start_backoff_dur, max_backoff_dur);
let (s_window_dur, s_start_backoff_dur, s_max_backoff_dur) =
Self::santize_circuit_breaker_input(time_window_dur, start_backoff_dur, max_backoff_dur);

OnDemand {
pending: RwLock::new(Vec::new()),
peers: RwLock::new(HashMap::new()),
in_transit: RwLock::new(HashMap::new()),
cache,
no_immediate_dispatch: false,
required_success_rate: s_success_rate,
time_window_dur: s_window_dur,
start_backoff_dur: s_start_backoff_dur,
max_backoff_dur: s_max_backoff_dur,
Expand All @@ -368,19 +363,10 @@ impl OnDemand {
}

fn santize_circuit_breaker_input(
required_success_rate: f64,
time_window_dur: Duration,
start_backoff_dur: Duration,
max_backoff_dur: Duration
) -> (f64, Duration, Duration, Duration) {
let required_success_rate = if required_success_rate > 1.00 || required_success_rate < 0.0 {
let rounded_rate = (0.0 as f64).max((1.0 as f64).min(required_success_rate));
warn!(target: "on_demand", "Success rate is illegal, falling back to success rate {:.2}", rounded_rate);
rounded_rate
} else {
required_success_rate
};

) -> (Duration, Duration, Duration) {
let time_window_dur = if time_window_dur.as_secs() < 1 {
warn!(target: "on_demand",
"Time window is too short must be at least 1 second, configuring it to 1 second");
Expand All @@ -405,21 +391,20 @@ impl OnDemand {
start_backoff_dur
};

(required_success_rate, time_window_dur, start_backoff_dur, max_backoff_dur)
(time_window_dur, start_backoff_dur, max_backoff_dur)
}

// make a test version: this doesn't dispatch pending requests
// until you trigger it manually.
#[cfg(test)]
fn new_test(
cache: Arc<Mutex<Cache>>,
required_success_rate: f64,
time_window_dur: Duration,
start_backoff_dur: Duration,
max_backoff_dur: Duration,
max_backoff_rounds: usize,
) -> Self {
let mut me = OnDemand::new(cache, required_success_rate, time_window_dur, start_backoff_dur, max_backoff_dur, max_backoff_rounds);
let mut me = OnDemand::new(cache, time_window_dur, start_backoff_dur, max_backoff_dur, max_backoff_rounds);
me.no_immediate_dispatch = true;

me
Expand Down Expand Up @@ -474,13 +459,12 @@ impl OnDemand {
responses,
sender,
request_guard: RequestGuard::new(
self.required_success_rate,
self.start_backoff_dur,
self.max_backoff_dur, self.
time_window_dur,
self.max_backoff_rounds
),
response_guard: ResponseGuard::new(self.required_success_rate, self.time_window_dur),
response_guard: ResponseGuard::new(self.time_window_dur),
});

Ok(receiver)
Expand Down Expand Up @@ -549,21 +533,12 @@ impl OnDemand {
}

// Register that the request round failed
match pending.request_guard.register_error() {
// Drop the request
RequestError::ReachedLimit => {
trace!(target: "on_demand", "The RequestGuard dropped the request");
pending.request_limit_reached();
return None
}
RequestError::Rejected => {
trace!(target: "on_demand", "The RequestGuard rejected the request, waiting for backoff");
}
RequestError::LetThrough => {
trace!(target: "on_demand", "The RequestGuard registered a bad response");
}
if let RequestError::ReachedLimit = pending.request_guard.register_error() {
pending.request_limit_reached();
None
} else {
Some(pending)
}
Some(pending)
})
.collect(); // `pending` now contains all requests we couldn't dispatch

Expand Down
22 changes: 14 additions & 8 deletions ethcore/light/src/on_demand/request_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,43 +41,48 @@ pub struct RequestGuard {
impl RequestGuard {
/// Constructor
pub fn new(
required_success_rate: f64,
min_backoff_dur: Duration,
max_backoff_dur: Duration,
window_dur: Duration,
max_backoff_rounds: usize
) -> Self {
let backoff = failsafe::backoff::exponential(min_backoff_dur, max_backoff_dur);
let policy = failsafe::failure_policy::success_rate_over_time_window(required_success_rate, 1, window_dur, backoff);
// success_rate not used because only errors are registered
let policy = failsafe::failure_policy::success_rate_over_time_window(1.00, 1, window_dur, backoff);

Self {
num_failures: 0,
max_failures: max_backoff_rounds,
state: failsafe::StateMachine::new(policy, ())
state: failsafe::StateMachine::new(policy, ()),
}
}

/// Update the state after a `faulty` call
pub fn register_error(&mut self) -> Error {

// Max number of failures received
// Circuit breaker is `closed`, count as a failure
if self.num_failures >= self.max_failures {
Error::ReachedLimit
trace!(target: "circuit_breaker", "RequestGuard: reached_limit, failures: {}/{}, state {:?}",
self.num_failures, self.max_failures, self.state);
Error::ReachedLimit
}
// Circuit breaker is `closed`, count as a failure
else if self.state.is_call_permitted() {
// register as a `failure`
self.state.on_error();
self.num_failures += 1;

if self.num_failures >= self.max_failures {
trace!(target: "circuit_breaker", "RequestGuard: reached_limit, failures: {}/{}, state {:?}",
self.num_failures, self.max_failures, self.state);
Error::ReachedLimit
} else {
trace!(target: "circuit_breaker", "RequestGuard; failures: {}/{}, state {:?}",
self.num_failures, self.max_failures, self.state);
Error::LetThrough
}
}
// Circuit breaker is `open`, don't count as a failure
else {
trace!(target: "circuit_breaker", "RequestGuard; failures: {}/{}, state {:?}",
self.num_failures, self.max_failures, self.state);
Error::Rejected
}
}
Expand All @@ -87,3 +92,4 @@ impl RequestGuard {
self.state.is_call_permitted()
}
}

61 changes: 39 additions & 22 deletions ethcore/light/src/on_demand/response_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::time::Duration;
use std::collections::HashMap;
use std::fmt;

use failsafe;
use super::{ResponseError, ValidityError};
Expand All @@ -26,9 +27,23 @@ type ResponsePolicy = failsafe::failure_policy::SuccessRateOverTimeWindow<NoBack
#[derive(Debug, Eq, PartialEq)]
pub enum Error {
/// No majority, the error reason can't be determined
NoMajority,
NoMajority(usize),
/// Majority, with the error reason
Majority(Inner),
Majority(Inner, usize, usize),
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Majority(err, majority, total) => {
write!(f, "error cause was {:?}, (majority count: {} / total: {})",
err, majority, total)
}
Error::NoMajority(total) => {
write!(f, "error cause couldn't be determined, the total number of responses was {}", total)
}
}
}
}


Expand Down Expand Up @@ -74,9 +89,8 @@ pub struct ResponseGuard {

impl ResponseGuard {
/// Constructor
pub fn new(required_success_rate: f64, window_dur: Duration) -> Self {
let policy = failsafe::failure_policy::success_rate_over_time_window(required_success_rate, 1, window_dur, NoBackoff);

pub fn new(window_dur: Duration) -> Self {
let policy = failsafe::failure_policy::success_rate_over_time_window(1.0, 1, window_dur, NoBackoff);
Self {
state: failsafe::StateMachine::new(policy, ()),
responses: HashMap::new(),
Expand Down Expand Up @@ -104,21 +118,24 @@ impl ResponseGuard {

/// Update the state after a `faulty` call
pub fn register_error(&mut self, err: &ResponseError<super::request::Error>) -> Result<(), Error> {
self.state.on_error();
let err = self.into_reason(err);
*self.responses.entry(err).or_insert(0) += 1;
if self.state.is_call_permitted() {
Ok(())
self.state.on_error();
let err = self.into_reason(err);
*self.responses.entry(err).or_insert(0) += 1;
if self.state.is_call_permitted() {
trace!(target: "circuit_breaker", "ResponseGuard: {:?}, responses {:?}", self.state, self.responses);
Ok(())
} else {
trace!(target: "circuit_breaker", "ResponseGuard: {:?}, responses: {:?}", self.state, self.responses);
let (&err, &max_count) = self.responses.iter().max_by_key(|(_k, v)| *v).expect("got at least one element; qed");
let majority = self.responses.values().filter(|v| **v == max_count).count() == 1;
// FIXME: more efficient with a separate counter
let total_responses = self.responses.values().sum();
if majority {
Err(Error::Majority(err, max_count, total_responses))
} else {
let (&err, &max_count) = self.responses.iter().max_by_key(|(_k, v)| *v).expect("got at least one element; qed");
let majority = self.responses.values().filter(|v| **v == max_count).count() == 1;

if majority {
Err(Error::Majority(err))
} else {
Err(Error::NoMajority)
}
Err(Error::NoMajority(total_responses))
}
}
}
}

Expand All @@ -141,27 +158,27 @@ mod tests {

#[test]
fn test_basic_by_majority() {
let mut guard = ResponseGuard::new(0.8, Duration::from_secs(5));
let mut guard = ResponseGuard::new(Duration::from_secs(5));
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
// wait for the current time window to end
thread::sleep(Duration::from_secs(5));

assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::Majority(Inner::Unexpected)));
assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::Majority(Inner::Unexpected, 3, 5)));
}

#[test]
fn test_no_majority() {
let mut guard = ResponseGuard::new(0.8, Duration::from_secs(5));
let mut guard = ResponseGuard::new(Duration::from_secs(5));
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Validity(ValidityError::Empty)).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
guard.register_error(&ResponseError::Unexpected).unwrap();
// wait for the current time window to end
thread::sleep(Duration::from_secs(5));

assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::NoMajority));
assert_eq!(guard.register_error(&ResponseError::Validity(ValidityError::WrongKind)), Err(Error::NoMajority(5)));
}
}
1 change: 0 additions & 1 deletion ethcore/light/src/on_demand/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl Harness {
Harness {
service: OnDemand::new_test(
cache,
super::DEFAULT_SUCCESS_RATE,
Duration::from_secs(5),
Duration::from_secs(5),
Duration::from_secs(10),
Expand Down

0 comments on commit 1b844cb

Please sign in to comment.