Skip to content

Commit

Permalink
Add a timeout option for watch.
Browse files Browse the repository at this point in the history
Closes #9.
  • Loading branch information
jimmycuadra committed Jul 16, 2017
1 parent 3d0fa37 commit e008cdb
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ serde = "1.0.9"
serde_derive = "1.0.9"
serde_json = "1.0.2"
tokio-core = "0.1.8"
tokio-timer = "0.1.2"
url = "1.5.1"

[dependencies.hyper-tls]
Expand Down
81 changes: 56 additions & 25 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,38 @@ use hyper::error::UriError;
#[cfg(feature = "tls")]
use native_tls::Error as TlsError;
use serde_json::Error as SerializationError;
use tokio_timer::TimeoutError as TokioTimeoutError;
use url::ParseError as UrlError;

/// An error returned by an etcd API endpoint.
///
/// This is a logical error, as opposed to other types of errors that may occur when using this
/// crate, such as network or serialization errors. See `Error` for the other types of errors.
#[derive(Clone, Debug, Deserialize)]
pub struct ApiError {
/// The key that was being operated upon or reason for the failure.
pub cause: Option<String>,
/// The etcd error code.
#[serde(rename = "errorCode")]
pub error_code: u64,
/// The etcd index.
pub index: u64,
/// A human-friendly description of the error.
pub message: String,
}

impl Display for ApiError {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
write!(f, "{}", self.message)
}
}

impl StdError for ApiError {
fn description(&self) -> &str {
&self.message
}
}

/// An error returned when an operation fails for some reaosn.
#[derive(Debug)]
pub enum Error {
Expand All @@ -34,23 +64,6 @@ pub enum Error {
Tls(TlsError),
}

/// An error returned by an etcd API endpoint.
///
/// This is a logical error, as opposed to other types of errors that may occur when using this
/// crate, such as network or serialization errors. See `Error` for the other types of errors.
#[derive(Clone, Debug, Deserialize)]
pub struct ApiError {
/// The key that was being operated upon or reason for the failure.
pub cause: Option<String>,
/// The etcd error code.
#[serde(rename = "errorCode")]
pub error_code: u64,
/// The etcd index.
pub index: u64,
/// A human-friendly description of the error.
pub message: String,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match *self {
Expand All @@ -59,12 +72,12 @@ impl Display for Error {
Error::InvalidConditions(reason) => write!(f, "{}", reason),
Error::InvalidUri(ref error) => write!(f, "{}", error),
Error::InvalidUrl(ref error) => write!(f, "{}", error),
#[cfg(feature = "tls")]
Error::Tls(ref error) => write!(f, "{}", error),
Error::Serialization(ref error) => write!(f, "{}", error),
Error::NoEndpoints => {
f.write_str("At least one endpoint is required to create a Client")
}
#[cfg(feature = "tls")]
Error::Tls(ref error) => write!(f, "{}", error),
Error::Serialization(ref error) => write!(f, "{}", error),
}
}
}
Expand All @@ -77,10 +90,10 @@ impl StdError for Error {
Error::InvalidConditions(conditions) => conditions,
Error::InvalidUri(_) => "a supplied endpoint could not be parsed as a URI",
Error::InvalidUrl(_) => "a URL for the request could not be generated",
Error::NoEndpoints => "at least one endpoint is required to create a Client",
#[cfg(feature = "tls")]
Error::Tls(_) => "an error occurred configuring TLS",
Error::Serialization(_) => "an error occurred deserializing JSON",
Error::NoEndpoints => "at least one endpoint is required to create a Client",
}
}
}
Expand Down Expand Up @@ -116,14 +129,32 @@ impl From<UriError> for Error {
}
}

impl Display for ApiError {
/// An error returned by `kv::watch`.
#[derive(Debug)]
pub enum WatchError {
/// An error for each failed request to an etcd member.
Other(Vec<Error>),
/// The supplied timeout was reached before any request successfully completed.
Timeout,
}

impl<T> From<TokioTimeoutError<T>> for WatchError {
fn from(_: TokioTimeoutError<T>) -> Self {
WatchError::Timeout
}
}

impl Display for WatchError {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
write!(f, "{}", self.message)
match *self {
WatchError::Timeout => write!(f, "{}", self.description()),
ref other => other.fmt(f),
}
}
}

impl StdError for ApiError {
impl StdError for WatchError {
fn description(&self) -> &str {
&self.message
"operation timed out"
}
}
22 changes: 17 additions & 5 deletions src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use futures::future::{Future, IntoFuture, err, ok};
use futures::stream::Stream;
use hyper::{StatusCode, Uri};
use hyper::client::Connect;
use serde_json;
use tokio_timer::Timer;
use url::Url;

pub use error::WatchError;

use async::first_ok;
use client::Client;
use error::{ApiError, Error};
use member::Member;
use options::{ComparisonConditions, DeleteOptions, GetOptions, SetOptions};
use url::form_urlencoded::Serializer;

/// The future returned by all key space API calls.
/// The future returned by most key space API calls.
///
/// On success, information about the result of the operation. On failure, an error for each cluster
/// member that failed.
Expand Down Expand Up @@ -342,11 +346,12 @@ pub fn watch<C>(
key: &str,
index: Option<u64>,
recursive: bool,
) -> FutureKeySpaceInfo
timeout: Option<Duration>,
) -> Box<Future<Item = KeySpaceInfo, Error = WatchError>>
where
C: Clone + Connect,
{
raw_get(
let work = raw_get(
client,
key,
GetOptions {
Expand All @@ -355,9 +360,16 @@ where
wait: true,
..Default::default()
},
)
}
).map_err(|errors| WatchError::Other(errors));

if let Some(duration) = timeout {
let timer = Timer::default();

Box::new(timer.timeout(work, duration))
} else {
Box::new(work)
}
}

/// Constructs the full URL for an API call.
fn build_url(member: &Member, path: &str) -> String {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;
extern crate tokio_core;
extern crate tokio_timer;
extern crate url;

pub use client::{BasicAuth, Client};
Expand Down
112 changes: 79 additions & 33 deletions tests/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ extern crate hyper;
extern crate hyper_tls;
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_timer;

use std::fs::File;
use std::io::Read;
use std::ops::Deref;
use std::thread::spawn;
use std::time::Duration;

use futures::future::{Future, join_all};
use futures::sync::oneshot::channel;
Expand All @@ -18,7 +20,7 @@ use native_tls::{Certificate, Pkcs12, TlsConnector};
use tokio_core::reactor::Core;

use etcd::{Client, Error};
use etcd::kv::{self, FutureKeySpaceInfo, KeySpaceInfo};
use etcd::kv::{self, FutureKeySpaceInfo, KeySpaceInfo, WatchError};

/// Wrapper around Client that automatically cleans up etcd after each test.
struct TestClient<C> where C: Clone + Connect {
Expand Down Expand Up @@ -134,7 +136,7 @@ fn create_does_not_replace_existing_key() {
let mut client = TestClient::new(core);
let inner_client = client.clone();

let work = kv::create(&inner_client, "/test/foo", "bar", Some(60)).and_then(move |_| {
let work = kv::create(&client, "/test/foo", "bar", Some(60)).and_then(move |_| {
kv::create(&inner_client, "/test/foo", "bar", Some(60)).then(|result| {
match result {
Ok(_) => panic!("expected EtcdError due to pre-existing key"),
Expand Down Expand Up @@ -610,61 +612,105 @@ fn watch() {

let work = rx.then(|_| kv::set(&inner_client, "/test/foo", "baz", None));

client.run(work).unwrap();
assert!(client.run(work).is_ok());
});

let core = Core::new().unwrap();
let mut client = TestClient::new(core);
let inner_client = client.clone();

let work = kv::create(&inner_client, "/test/foo", "bar", None).and_then(move |_| {
tx.send(()).unwrap();
let work = kv::create(&client, "/test/foo", "bar", None)
.map_err(|errors| WatchError::Other(errors))
.and_then(move |_| {
tx.send(()).unwrap();

kv::watch(&inner_client, "/test/foo", None, false).and_then(|ksi| {
assert_eq!(ksi.node.unwrap().value.unwrap(), "baz");
kv::watch(&inner_client, "/test/foo", None, false, None).and_then(|ksi| {
assert_eq!(ksi.node.unwrap().value.unwrap(), "baz");

Ok(())
})
});
Ok(())
})
});

assert!(client.run(work).is_ok());

child.join().ok().unwrap();
}

// // #[test]
// // fn watch_index() {
// // let client = TestClient::new();
#[test]
fn watch_cancel() {
let core = Core::new().unwrap();
let mut client = TestClient::new(core);
let inner_client = client.clone();

let work = kv::create(&client, "/test/foo", "bar", None)
.map_err(|errors| WatchError::Other(errors))
.and_then(move |_| {
kv::watch(&inner_client, "/test/foo", None, false, Some(Duration::from_millis(1)))
});

match client.run(work) {
Ok(_) => panic!("expected WatchError::Timeout"),
Err(WatchError::Timeout) => {}
Err(_) => panic!("expected WatchError::Timeout"),
}
}

#[test]
fn watch_index() {
let core = Core::new().unwrap();
let mut client = TestClient::new(core);
let inner_client = client.clone();

let work = kv::set(&client, "/test/foo", "bar", None)
.map_err(|errors| WatchError::Other(errors))
.and_then(move |ksi| {
let index = ksi.node.unwrap().modified_index.unwrap();

kv::watch(&inner_client, "/test/foo", Some(index), false, None).and_then(move |ksi| {
let node = ksi.node.unwrap();

assert_eq!(node.modified_index.unwrap(), index);
assert_eq!(node.value.unwrap(), "bar");

Ok(())
})
});

assert!(client.run(work).is_ok());
}

#[test]
fn watch_recursive() {
let (tx, rx) = channel();

// // let index = client.set("/test/foo", "bar", None).ok().unwrap().node.unwrap().modified_index.unwrap();
let child = spawn(move || {
let core = Core::new().unwrap();
let mut client = TestClient::no_destructor(core);
let inner_client = client.clone();

// // let response = client.watch("/test/foo", Some(index), false).ok().unwrap();
// // let node = response.node.unwrap();
let work = rx.then(|_| kv::set(&inner_client, "/test/foo/bar", "baz", None));

// // assert_eq!(node.modified_index.unwrap(), index);
// // assert_eq!(node.value.unwrap(), "bar");
// // }
assert!(client.run(work).is_ok());
});

// // #[test]
// // fn watch_recursive() {
// // let child = spawn(|| {
// // let client = Client::new(&["http://etcd:2379"]).unwrap();
let core = Core::new().unwrap();
let mut client = TestClient::new(core);

// // sleep(Duration::from_millis(50));
tx.send(()).unwrap();

// // client.set("/test/foo/bar", "baz", None).ok().unwrap();
// // });
let work = kv::watch(&client, "/test", None, true, None).and_then(|ksi| {
let node = ksi.node.unwrap();

// // let client = TestClient::new();
assert_eq!(node.key.unwrap(), "/test/foo/bar");
assert_eq!(node.value.unwrap(), "baz");

// // let response = client.watch("/test", None, true).ok().unwrap();
// // let node = response.node.unwrap();
Ok(())
});

// // assert_eq!(node.key.unwrap(), "/test/foo/bar");
// // assert_eq!(node.value.unwrap(), "baz");
assert!(client.run(work).is_ok());

// // child.join().ok().unwrap();
// // }
child.join().ok().unwrap();
}

// #[test]
// fn versions() {
Expand Down

0 comments on commit e008cdb

Please sign in to comment.