From ead3dd05fd6be3a17d6470d20547c1fd4f571409 Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Wed, 13 Sep 2017 15:34:13 -0700 Subject: [PATCH] feat: update cargo deps, and rustfmt all the rust Closes #1006 --- autopush_rs/Cargo.lock | 80 ++++++------ autopush_rs/src/call.rs | 179 +++++++++++++------------- autopush_rs/src/errors.rs | 15 ++- autopush_rs/src/http.rs | 27 ++-- autopush_rs/src/protocol.rs | 6 +- autopush_rs/src/queue.rs | 12 +- autopush_rs/src/rt.rs | 56 ++++----- autopush_rs/src/server/dispatch.rs | 6 +- autopush_rs/src/server/mod.rs | 181 ++++++++++++++------------- autopush_rs/src/server/webpush_io.rs | 5 +- autopush_rs/src/util/mod.rs | 34 ++--- autopush_rs/src/util/rc.rs | 4 +- autopush_rs/src/util/send_all.rs | 40 +++--- 13 files changed, 334 insertions(+), 311 deletions(-) diff --git a/autopush_rs/Cargo.lock b/autopush_rs/Cargo.lock index 852fc3b1..4ba3ebec 100644 --- a/autopush_rs/Cargo.lock +++ b/autopush_rs/Cargo.lock @@ -8,11 +8,11 @@ dependencies = [ "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -24,14 +24,14 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "backtrace-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -41,8 +41,8 @@ name = "backtrace-sys" version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "gcc 0.3.54 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -102,7 +102,7 @@ dependencies = [ [[package]] name = "dtoa" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -118,7 +118,7 @@ name = "error-chain" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "backtrace 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "backtrace 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -137,7 +137,7 @@ dependencies = [ [[package]] name = "gcc" -version = "0.3.53" +version = "0.3.54" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -182,13 +182,13 @@ name = "iovec" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "itoa" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -212,7 +212,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "libc" -version = "0.2.29" +version = "0.2.30" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -258,7 +258,7 @@ dependencies = [ "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -286,7 +286,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -301,7 +301,7 @@ name = "num_cpus" version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -319,13 +319,13 @@ name = "rand" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "redox_syscall" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -358,22 +358,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde" -version = "1.0.11" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde_derive" -version = "1.0.11" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive_internals 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive_internals 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "serde_derive_internals" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -382,13 +382,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -435,8 +435,8 @@ version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -567,7 +567,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -590,7 +590,7 @@ dependencies = [ ] [metadata] -"checksum backtrace 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "72f9b4182546f4b04ebc4ab7f84948953a118bd6021a1b6a6c909e3e94f6be76" +"checksum backtrace 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "99f2ce94e22b8e664d95c57fff45b98a966c2252b60691d0b7aeeccd88d70983" "checksum backtrace-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "afccc5772ba333abccdf60d55200fa3406f8c59dcf54d5f7998c9107d3799c7c" "checksum base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96434f987501f0ed4eb336a411e0631ecd1afa11574fe148587adc4ff96143c9" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" @@ -600,21 +600,21 @@ dependencies = [ "checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299" "checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" -"checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" +"checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" "checksum futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a82bdc62350ca9d7974c760e9665102fc9d740992a528c2254aa930e53b783c4" "checksum futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a283c84501e92cade5ea673a2a7ca44f71f209ccdd302a3e0896f50083d2c5ff" -"checksum gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)" = "e8310f7e9c890398b0e80e301c4f474e9918d2b27fca8f48486ca775fa9ffc5a" +"checksum gcc 0.3.54 (registry+https://github.com/rust-lang/crates.io-index)" = "5e33ec290da0d127825013597dbdfc28bee4964690c7ce1166cbc2a7bd08b1bb" "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07" "checksum hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)" = "641abc3e3fcf0de41165595f801376e01106bca1fd876dda937730e477ca004c" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be" -"checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" +"checksum itoa 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ac17257442c2ed77dbc9fd555cf83c58b0c7f7d0e8f2ae08c0ac05c72842e1f6" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" -"checksum libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)" = "8a014d9226c2cc402676fbe9ea2e15dd5222cd1dd57f576b5b283178c944a264" +"checksum libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)" = "2370ca07ec338939e356443dac2296f581453c35fe1e3a3ed06023c49435f915" "checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b" "checksum magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf0336886480e671965f794bc9b6fce88503563013d1bfb7a502c81fe3ac527" "checksum magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40d014c7011ac470ae28e2f76a02bfea4a8480f73e701353b49ad7a8d75f4699" @@ -628,16 +628,16 @@ dependencies = [ "checksum percent-encoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de154f638187706bde41d9b4738748933d64e6b37bdbffc0b47a97d16a6ae356" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "eb250fd207a4729c976794d03db689c9be1d634ab5a1c9da9492a13d8fecbcdf" -"checksum redox_syscall 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)" = "8312fba776a49cf390b7b62f3135f9b294d8617f7a7592cfd0ac2492b658cd7b" +"checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509" "checksum rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "aee45432acc62f7b9a108cc054142dac51f979e69e71ddce7d6fc7adf29e817e" "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" -"checksum serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f7726f29ddf9731b17ff113c461e362c381d9d69433f79de4f3dd572488823e9" -"checksum serde_derive 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cf823e706be268e73e7747b147aa31c8f633ab4ba31f115efb57e5047c3a76dd" -"checksum serde_derive_internals 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37aee4e0da52d801acfbc0cc219eb1eda7142112339726e427926a6f6ee65d3a" -"checksum serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "48b04779552e92037212c3615370f6bd57a40ebba7f20e554ff9f55e41a69a7b" +"checksum serde 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "bcb6a7637a47663ee073391a139ed07851f27ed2532c2abc88c6bf27a16cdf34" +"checksum serde_derive 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "812ff66056fd9a9a5b7c119714243b0862cf98340e7d4b5ee05a932c40d5ea6c" +"checksum serde_derive_internals 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bd381f6d01a6616cdba8530492d453b7761b456ba974e98768a18cad2cd76f58" +"checksum serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d243424e06f9f9c39e3cd36147470fd340db785825e367625f79298a6ac6b7ac" "checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index aa4e299e..4d69ad3d 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -45,35 +45,27 @@ pub struct PythonCall { } #[no_mangle] -pub extern "C" fn autopush_python_call_input_ptr(call: *mut AutopushPythonCall, - err: &mut AutopushError) - -> *const u8 -{ - unsafe { - (*call).inner.catch(err, |call| { - call.input.as_ptr() - }) - } +pub extern "C" fn autopush_python_call_input_ptr( + call: *mut AutopushPythonCall, + err: &mut AutopushError, +) -> *const u8 { + unsafe { (*call).inner.catch(err, |call| call.input.as_ptr()) } } #[no_mangle] -pub extern "C" fn autopush_python_call_input_len(call: *mut AutopushPythonCall, - err: &mut AutopushError) - -> usize -{ - unsafe { - (*call).inner.catch(err, |call| { - call.input.len() - }) - } +pub extern "C" fn autopush_python_call_input_len( + call: *mut AutopushPythonCall, + err: &mut AutopushError, +) -> usize { + unsafe { (*call).inner.catch(err, |call| call.input.len()) } } #[no_mangle] -pub extern "C" fn autopush_python_call_complete(call: *mut AutopushPythonCall, - input: *const c_char, - err: &mut AutopushError) - -> i32 -{ +pub extern "C" fn autopush_python_call_complete( + call: *mut AutopushPythonCall, + input: *const c_char, + err: &mut AutopushError, +) -> i32 { unsafe { (*call).inner.catch(err, |call| { let input = CStr::from_ptr(input).to_str().unwrap(); @@ -100,7 +92,8 @@ impl AutopushPythonCall { } fn _new(input: String, f: F) -> AutopushPythonCall - where F: FnOnce(&str) + Send + 'static, + where + F: FnOnce(&str) + Send + 'static, { AutopushPythonCall { inner: UnwindGuard::new(Inner { @@ -162,19 +155,14 @@ enum Call { timestamp: i64, }, - DropUser { - uaid: String, - }, + DropUser { uaid: String }, - MigrateUser { - uaid: String, - message_month: String, - }, + MigrateUser { uaid: String, message_month: String }, StoreMessages { message_month: String, messages: Vec, - } + }, } #[derive(Deserialize)] @@ -195,29 +183,25 @@ pub struct HelloResponse { #[derive(Deserialize)] #[serde(untagged)] pub enum RegisterResponse { - Success { - endpoint: String, - }, + Success { endpoint: String }, Error { error_msg: String, error: bool, status: u32, - } + }, } #[derive(Deserialize)] #[serde(untagged)] pub enum UnRegisterResponse { - Success { - success: bool, - }, + Success { success: bool }, Error { error_msg: String, error: bool, status: u32, - } + }, } #[derive(Deserialize)] @@ -254,21 +238,27 @@ pub struct StoreMessagesResponse { impl Server { - pub fn hello(&self, connected_at: &u64, uaid: Option<&Uuid>) - -> MyFuture - { + pub fn hello(&self, connected_at: &u64, uaid: Option<&Uuid>) -> MyFuture { let ms = *connected_at as i64; let (call, fut) = PythonCall::new(&Call::Hello { connected_at: ms, - uaid: if let Some(uuid) = uaid { Some(uuid.simple().to_string()) } else { None }, + uaid: if let Some(uuid) = uaid { + Some(uuid.simple().to_string()) + } else { + None + }, }); self.send_to_python(call); - return fut + return fut; } - pub fn register(&self, uaid: String, message_month: String, channel_id: String, key: Option) - -> MyFuture - { + pub fn register( + &self, + uaid: String, + message_month: String, + channel_id: String, + key: Option, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::Register { uaid: uaid, message_month: message_month, @@ -276,12 +266,16 @@ impl Server { key: key, }); self.send_to_python(call); - return fut + return fut; } - pub fn unregister(&self, uaid: String, message_month: String, channel_id: String, code: i32) - -> MyFuture - { + pub fn unregister( + &self, + uaid: String, + message_month: String, + channel_id: String, + code: i32, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::Unregister { uaid: uaid, message_month: message_month, @@ -289,12 +283,16 @@ impl Server { code: code, }); self.send_to_python(call); - return fut + return fut; } - pub fn check_storage(&self, uaid: String, message_month: String, include_topic: bool, timestamp: Option) - -> MyFuture - { + pub fn check_storage( + &self, + uaid: String, + message_month: String, + include_topic: bool, + timestamp: Option, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::CheckStorage { uaid: uaid, message_month: message_month, @@ -302,52 +300,62 @@ impl Server { timestamp: timestamp, }); self.send_to_python(call); - return fut + return fut; } - pub fn increment_storage(&self, uaid: String, message_month: String, timestamp: i64) - -> MyFuture - { + pub fn increment_storage( + &self, + uaid: String, + message_month: String, + timestamp: i64, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::IncStoragePosition { uaid: uaid, message_month: message_month, timestamp: timestamp, }); self.send_to_python(call); - return fut + return fut; } - pub fn delete_message(&self, message_month: String, notif: protocol::Notification) - -> MyFuture - { + pub fn delete_message( + &self, + message_month: String, + notif: protocol::Notification, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::DeleteMessage { message: notif, message_month: message_month, }); self.send_to_python(call); - return fut + return fut; } pub fn drop_user(&self, uaid: String) -> MyFuture { - let (call, fut) = PythonCall::new(&Call::DropUser { - uaid, - }); + let (call, fut) = PythonCall::new(&Call::DropUser { uaid }); self.send_to_python(call); - return fut + return fut; } - pub fn migrate_user(&self, uaid: String, message_month: String) -> MyFuture { + pub fn migrate_user( + &self, + uaid: String, + message_month: String, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::MigrateUser { uaid, message_month, }); self.send_to_python(call); - return fut + return fut; } - pub fn store_messages(&self, uaid: String, message_month: String, mut messages: Vec) - -> MyFuture - { + pub fn store_messages( + &self, + uaid: String, + message_month: String, + mut messages: Vec, + ) -> MyFuture { for message in messages.iter_mut() { message.uaid = Some(uaid.clone()); } @@ -356,7 +364,7 @@ impl Server { messages, }); self.send_to_python(call); - return fut + return fut; } fn send_to_python(&self, call: PythonCall) { @@ -366,22 +374,19 @@ impl Server { impl PythonCall { fn new(input: &T) -> (PythonCall, MyFuture) - where T: ser::Serialize, - U: for<'de> de::Deserialize<'de> + 'static, + where + T: ser::Serialize, + U: for<'de> de::Deserialize<'de> + 'static, { let (tx, rx) = oneshot::channel(); let call = PythonCall { input: serde_json::to_string(input).unwrap(), - output: Box::new(|json: &str| { - drop(tx.send(json_or_error(json))); - }), + output: Box::new(|json: &str| { drop(tx.send(json_or_error(json))); }), }; - let rx = Box::new(rx.then(|res| { - match res { - Ok(Ok(s)) => Ok(serde_json::from_str(&s)?), - Ok(Err(e)) => Err(e), - Err(_) => Err("call canceled from python".into()), - } + let rx = Box::new(rx.then(|res| match res { + Ok(Ok(s)) => Ok(serde_json::from_str(&s)?), + Ok(Err(e)) => Err(e), + Err(_) => Err("call canceled from python".into()), })); (call, rx) } @@ -390,7 +395,7 @@ impl PythonCall { fn json_or_error(json: &str) -> Result { if let Ok(err) = serde_json::from_str::(json) { if err.error { - return Err(format!("python exception: {}", err.error_msg).into()) + return Err(format!("python exception: {}", err.error_msg).into()); } } Ok(json.to_string()) diff --git a/autopush_rs/src/errors.rs b/autopush_rs/src/errors.rs index 4d71e37e..5781e14f 100644 --- a/autopush_rs/src/errors.rs +++ b/autopush_rs/src/errors.rs @@ -51,17 +51,20 @@ pub type MyFuture = Box>; pub trait FutureChainErr { fn chain_err(self, callback: F) -> MyFuture - where F: FnOnce() -> E + 'static, - E: Into; + where + F: FnOnce() -> E + 'static, + E: Into; } impl FutureChainErr for F - where F: Future + 'static, - F::Error: error::Error + Send + 'static, +where + F: Future + 'static, + F::Error: error::Error + Send + 'static, { fn chain_err(self, callback: C) -> MyFuture - where C: FnOnce() -> E + 'static, - E: Into, + where + C: FnOnce() -> E + 'static, + E: Into, { Box::new(self.then(|r| r.chain_err(callback))) } diff --git a/autopush_rs/src/http.rs b/autopush_rs/src/http.rs index 40877a60..e4ce6c7b 100644 --- a/autopush_rs/src/http.rs +++ b/autopush_rs/src/http.rs @@ -27,18 +27,18 @@ impl Service for Push { fn call(&self, req: hyper::Request) -> Self::Future { if *req.method() != Method::Put && *req.method() != Method::Post { println!("not a PUT: {}", req.method()); - return Box::new(err(hyper::Error::Method)) + return Box::new(err(hyper::Error::Method)); } if req.uri().path().len() == 0 { println!("empty uri path"); - return Box::new(err(hyper::Error::Incomplete)) + return Box::new(err(hyper::Error::Incomplete)); } let req_uaid = req.uri().path()[6..].to_string(); let uaid = match Uuid::parse_str(&req_uaid) { Ok(id) => id, Err(_) => { println!("uri not uuid: {}", req_uaid); - return Box::new(err(hyper::Error::Status)) + return Box::new(err(hyper::Error::Status)); } }; @@ -50,18 +50,19 @@ impl Service for Push { let s = String::from_utf8(body.to_vec()).unwrap(); if let Ok(msg) = serde_json::from_str(&s) { match srv.notify_client(uaid, msg) { - Ok(_) => return Ok(hyper::Response::new() - .with_status(hyper::StatusCode::Ok) - ), - _ => return Ok(hyper::Response::new() - .with_status(hyper::StatusCode::BadRequest) - .with_body("Unable to decode body payload") - ) + Ok(_) => return Ok(hyper::Response::new().with_status(hyper::StatusCode::Ok)), + _ => { + return Ok( + hyper::Response::new() + .with_status(hyper::StatusCode::BadRequest) + .with_body("Unable to decode body payload"), + ) + } } } - Ok(hyper::Response::new() - .with_status(hyper::StatusCode::NotFound) - ) + Ok(hyper::Response::new().with_status( + hyper::StatusCode::NotFound, + )) })) } } diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index c705658e..153c3be6 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -39,9 +39,7 @@ pub enum ClientMessage { code: Option, }, - Ack { - updates: Vec, - }, + Ack { updates: Vec }, } #[derive(Deserialize)] @@ -91,5 +89,5 @@ pub struct Notification { #[serde(skip_serializing_if = "Option::is_none")] data: Option, #[serde(skip_serializing_if = "Option::is_none")] - headers: Option> + headers: Option>, } diff --git a/autopush_rs/src/queue.rs b/autopush_rs/src/queue.rs index 28acc48a..453c4378 100644 --- a/autopush_rs/src/queue.rs +++ b/autopush_rs/src/queue.rs @@ -25,9 +25,7 @@ fn _assert_kinds() { } #[no_mangle] -pub extern "C" fn autopush_queue_new(err: &mut AutopushError) - -> *mut AutopushQueue -{ +pub extern "C" fn autopush_queue_new(err: &mut AutopushError) -> *mut AutopushQueue { rt::catch(err, || { let (tx, rx) = mpsc::channel(); @@ -39,10 +37,10 @@ pub extern "C" fn autopush_queue_new(err: &mut AutopushError) } #[no_mangle] -pub extern "C" fn autopush_queue_recv(queue: *mut AutopushQueue, - err: &mut AutopushError) - -> *mut AutopushPythonCall -{ +pub extern "C" fn autopush_queue_recv( + queue: *mut AutopushQueue, + err: &mut AutopushError, +) -> *mut AutopushPythonCall { rt::catch(err, || unsafe { let mut rx = (*queue).rx.lock().unwrap(); let msg = match *rx { diff --git a/autopush_rs/src/rt.rs b/autopush_rs/src/rt.rs index f982df27..d194b299 100644 --- a/autopush_rs/src/rt.rs +++ b/autopush_rs/src/rt.rs @@ -55,16 +55,14 @@ impl AutopushError { fn string(&self) -> Option<&str> { assert!(self.p1 != 0); assert!(self.p2 != 0); - let any: &Any = unsafe { - mem::transmute((self.p1, self.p2)) - }; + let any: &Any = unsafe { mem::transmute((self.p1, self.p2)) }; // Similar to what libstd does, only check for `&'static str` and // `String`. - any.downcast_ref::<&'static str>() - .map(|s| &s[..]) - .or_else(|| { + any.downcast_ref::<&'static str>().map(|s| &s[..]).or_else( + || { any.downcast_ref::().map(|s| &s[..]) - }) + }, + ) } fn assert_empty(&self) { @@ -83,7 +81,7 @@ impl AutopushError { /// Deallocates the internal `Box`, freeing the resources behind it. unsafe fn cleanup(&mut self) { - mem::transmute::<_, Box>((self.p1, self.p2)); + mem::transmute::<_, Box>((self.p1, self.p2)); self.p1 = 0; self.p2 = 0; } @@ -93,9 +91,7 @@ impl AutopushError { /// there is no error message. #[no_mangle] pub extern "C" fn autopush_error_msg_len(err: *const AutopushError) -> usize { - abort_on_panic(|| unsafe { - (*err).string().map(|s| s.len()).unwrap_or(0) - }) + abort_on_panic(|| unsafe { (*err).string().map(|s| s.len()).unwrap_or(0) }) } /// Returns the data pointer of the error message, if any. If not present @@ -112,9 +108,7 @@ pub extern "C" fn autopush_error_msg_ptr(err: *const AutopushError) -> *const u8 /// The error itself can continue to be reused for future function calls. #[no_mangle] pub unsafe extern "C" fn autopush_error_cleanup(err: *mut AutopushError) { - abort_on_panic(|| { - (&mut *err).cleanup(); - }); + abort_on_panic(|| { (&mut *err).cleanup(); }); } /// Helper structure to provide "unwind safety" to ensure we don't reuse values @@ -147,28 +141,32 @@ impl UnwindGuard { /// closure `f` will not be executed. This function will immediately return /// with the "null" return value to propagate the panic again. pub fn catch(&self, err: &mut AutopushError, f: F) -> R::AbiRet - where F: FnOnce(&T) -> R, - R: AbiInto, + where + F: FnOnce(&T) -> R, + R: AbiInto, { err.assert_empty(); if self.poisoned.get() { err.fill(Box::new(String::from("accessing poisoned object"))); - return R::null() + return R::null(); } // The usage of `AssertUnwindSafe` should be ok here because as // soon as we see this closure panic we'll disallow all further // access to the internals of `self`. let mut panicked = true; - let ret = catch(err, panic::AssertUnwindSafe(|| { - let ret = f(&self.inner); - panicked = false; - return ret - })); + let ret = catch( + err, + panic::AssertUnwindSafe(|| { + let ret = f(&self.inner); + panicked = false; + return ret; + }), + ); if panicked { self.poisoned.set(true); } - return ret + return ret; } } @@ -177,8 +175,9 @@ impl UnwindGuard { /// This is typically only used for constructors where there's no state /// persisted across calls. pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet - where F: panic::UnwindSafe + FnOnce() -> T, - T: AbiInto, +where + F: panic::UnwindSafe + FnOnce() -> T, + T: AbiInto, { err.assert_empty(); @@ -189,7 +188,7 @@ pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet err.p1 = ptrs.0; err.p2 = ptrs.1; T::null() - } + }, } } @@ -199,7 +198,8 @@ pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet /// This should be rarely used but is used when executing destructors in Rust, /// which should be infallible (and this is just a double-check that they are). pub fn abort_on_panic(f: F) -> R - where F: FnOnce() -> R, +where + F: FnOnce() -> R, { struct Bomb { active: bool, @@ -216,7 +216,7 @@ pub fn abort_on_panic(f: F) -> R let mut bomb = Bomb { active: true }; let r = f(); bomb.active = false; - return r + return r; } pub trait AbiInto { diff --git a/autopush_rs/src/server/dispatch.rs b/autopush_rs/src/server/dispatch.rs index 0dc79f7e..7bf6bc88 100644 --- a/autopush_rs/src/server/dispatch.rs +++ b/autopush_rs/src/server/dispatch.rs @@ -59,7 +59,7 @@ impl Future for Dispatch { self.data.reserve(16); // get some extra space } if try_ready!(self.socket.as_mut().unwrap().read_buf(&mut self.data)) == 0 { - return Err("early eof".into()) + return Err("early eof".into()); } let ty = { let mut headers = [httparse::EMPTY_HEADER; 16]; @@ -76,14 +76,14 @@ impl Future for Dispatch { Some(ref path) if path.starts_with("/status") => RequestType::Status, _ => { debug!("unknown http request {:?}", req); - return Err("unknown http request".into()) + return Err("unknown http request".into()); } } } }; let tcp = self.socket.take().unwrap(); - return Ok((WebpushIo::new(tcp, self.data.take()), ty).into()) + return Ok((WebpushIo::new(tcp, self.data.take()), ty).into()); } } } diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 878f3141..f3d06142 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -96,20 +96,16 @@ fn resolve(host: &str) -> IpAddr { } #[no_mangle] -pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, - err: &mut AutopushError) - -> *mut AutopushServer -{ +pub extern "C" fn autopush_server_new( + opts: *const AutopushServerOptions, + err: &mut AutopushError, +) -> *mut AutopushServer { unsafe fn to_s<'a>(ptr: *const c_char) -> Option<&'a str> { if ptr.is_null() { - return None + return None; } let s = CStr::from_ptr(ptr).to_str().expect("invalid utf-8"); - if s.is_empty() { - None - } else { - Some(s) - } + if s.is_empty() { None } else { Some(s) } } unsafe fn ito_dur(seconds: u32) -> Option { @@ -124,8 +120,10 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, if seconds == 0.0 { None } else { - Some(Duration::new(seconds as u64, - (seconds.fract() * 1_000_000_000.0) as u32)) + Some(Duration::new( + seconds as u64, + (seconds.fract() * 1_000_000_000.0) as u32, + )) } } @@ -136,18 +134,22 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, let opts = ServerOptions { debug: opts.debug != 0, - host_ip: to_s(opts.host_ip).expect("hostname must be specified").to_string(), - router_ip: to_s(opts.router_ip).expect("router hostname must be specified").to_string(), + host_ip: to_s(opts.host_ip) + .expect("hostname must be specified") + .to_string(), + router_ip: to_s(opts.router_ip) + .expect("router hostname must be specified") + .to_string(), port: opts.port, router_port: opts.router_port, url: to_s(opts.url).expect("url must be specified").to_string(), ssl_key: to_s(opts.ssl_key).map(PathBuf::from), ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from), ssl_dh_param: to_s(opts.ssl_dh_param).map(PathBuf::from), - auto_ping_interval: fto_dur(opts.auto_ping_interval) - .expect("ping interval cannot be 0"), - auto_ping_timeout: fto_dur(opts.auto_ping_timeout) - .expect("ping timeout cannot be 0"), + auto_ping_interval: fto_dur(opts.auto_ping_interval).expect( + "ping interval cannot be 0", + ), + auto_ping_timeout: fto_dur(opts.auto_ping_timeout).expect("ping timeout cannot be 0"), close_handshake_timeout: ito_dur(opts.close_handshake_timeout), max_connections: if opts.max_connections == 0 { None @@ -168,14 +170,15 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, } #[no_mangle] -pub extern "C" fn autopush_server_start(srv: *mut AutopushServer, - queue: *mut AutopushQueue, - err: &mut AutopushError) -> i32 { +pub extern "C" fn autopush_server_start( + srv: *mut AutopushServer, + queue: *mut AutopushQueue, + err: &mut AutopushError, +) -> i32 { unsafe { (*srv).inner.catch(err, |srv| { let tx = (*queue).tx(); - let (tx, thread) = Server::start(&srv.opts, tx) - .expect("failed to start server"); + let (tx, thread) = Server::start(&srv.opts, tx).expect("failed to start server"); srv.tx.set(Some(tx)); srv.thread.set(Some(thread)); }) @@ -183,8 +186,7 @@ pub extern "C" fn autopush_server_start(srv: *mut AutopushServer, } #[no_mangle] -pub extern "C" fn autopush_server_stop(srv: *mut AutopushServer, - err: &mut AutopushError) -> i32 { +pub extern "C" fn autopush_server_stop(srv: *mut AutopushServer, err: &mut AutopushError) -> i32 { unsafe { (*srv).inner.catch(err, |srv| { srv.stop().expect("tokio thread panicked"); @@ -224,9 +226,10 @@ impl Server { /// separate thread for the tokio reactor. The returned /// `AutopushServerInner` is a handle to the spawned thread and can be used /// to interact with it (e.g. shut it down). - fn start(opts: &Arc, tx: queue::Sender) - -> io::Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> - { + fn start( + opts: &Arc, + tx: queue::Sender, + ) -> io::Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> { let (donetx, donerx) = oneshot::channel(); let (inittx, initrx) = oneshot::channel(); @@ -251,12 +254,13 @@ impl Server { let handle = core.handle(); let router_ip = resolve(&srv.opts.router_ip); - let addr = format!("{}:{}", router_ip, srv.opts.router_port).parse().unwrap(); + let addr = format!("{}:{}", router_ip, srv.opts.router_port) + .parse() + .unwrap(); let push_listener = TcpListener::bind(&addr, &handle).unwrap(); let proto = Http::new(); let push_srv = push_listener.incoming().for_each(move |(socket, addr)| { - proto.bind_connection(&handle, socket, addr, - ::http::Push(srv.clone())); + proto.bind_connection(&handle, socket, addr, ::http::Push(srv.clone())); Ok(()) }); core.handle().spawn(push_srv.then(|res| { @@ -275,9 +279,7 @@ impl Server { } } - fn new(opts: &Arc, tx: queue::Sender) - -> io::Result<(Rc, Core)> - { + fn new(opts: &Arc, tx: queue::Sender) -> io::Result<(Rc, Core)> { let core = Core::new()?; let srv = Rc::new(Server { opts: opts.clone(), @@ -296,17 +298,20 @@ impl Server { let handle = core.handle(); let srv2 = srv.clone(); - let ws_srv = ws_listener.incoming() + let ws_srv = ws_listener + .incoming() .map_err(|e| Error::from(e)) - .for_each(move |(socket, addr)| { // Make sure we're not handling too many clients before we start the // websocket handshake. let max = srv.opts.max_connections.unwrap_or(u32::max_value()); if srv.open_connections.get() >= max { - info!("dropping {} as we already have too many open \ - connections", addr); - return Ok(()) + info!( + "dropping {} as we already have too many open \ + connections", + addr + ); + return Ok(()); } srv.open_connections.set(srv.open_connections.get() + 1); @@ -315,9 +320,7 @@ impl Server { // Figure out if this is a websocket or a `/status` request, // without letting it take too long. let request = Dispatch::new(socket); - let request = timeout(request, - srv.opts.open_handshake_timeout, - &handle); + let request = timeout(request, srv.opts.open_handshake_timeout, &handle); let srv2 = srv.clone(); let handle2 = handle.clone(); let client = request.and_then(move |(socket, request)| -> MyFuture<_> { @@ -326,21 +329,21 @@ impl Server { RequestType::Websocket => { // Perform the websocket handshake on each // connection, but don't let it take too long. - let ws = accept_async(socket, None).chain_err(|| { - "failed to accept client" - }); - let ws = timeout(ws, - srv2.opts.open_handshake_timeout, - &handle2); + let ws = + accept_async(socket, None).chain_err(|| "failed to accept client"); + let ws = timeout(ws, srv2.opts.open_handshake_timeout, &handle2); // Once the handshake is done we'll start the main // communication with the client, managing pings // here and deferring to `Client` to start driving // the internal state machine. - Box::new(ws.and_then(move |ws| { - PingManager::new(&srv2, ws) - .chain_err(|| "failed to make ping handler") - }).flatten()) + Box::new( + ws.and_then(move |ws| { + PingManager::new(&srv2, ws).chain_err( + || "failed to make ping handler", + ) + }).flatten(), + ) } } }); @@ -376,7 +379,12 @@ impl Server { /// namely its channel to send notifications back. pub fn connect_client(&self, client: RegisteredClient) { debug!("Connecting a client!"); - assert!(self.uaids.borrow_mut().insert(client.uaid, client).is_none()); + assert!( + self.uaids + .borrow_mut() + .insert(client.uaid, client) + .is_none() + ); } /// A notification has come for the uaid @@ -385,7 +393,10 @@ impl Server { if let Some(client) = uaids.get_mut(&uaid) { debug!("Found a client to deliver a notification to"); // TODO: Don't unwrap, handle error properly - client.tx.unbounded_send(ServerNotification::Notification(notif)).unwrap(); + client + .tx + .unbounded_send(ServerNotification::Notification(notif)) + .unwrap(); info!("Dropped notification in queue"); return Ok(()); } @@ -427,9 +438,7 @@ enum CloseState { } impl PingManager { - fn new(srv: &Rc, socket: WebSocketStream) - -> io::Result - { + fn new(srv: &Rc, socket: WebSocketStream) -> io::Result { // The `socket` is itself a sink and a stream, and we've also got a sink // (`tx`) and a stream (`rx`) to send messages. Half of our job will be // doing all this proxying: reading messages from `socket` and sending @@ -468,7 +477,7 @@ impl Future for PingManager { self.timeout.reset(at); self.waiting = WaitingFor::Pong; } else { - break + break; } } assert!(!socket.ping); @@ -499,7 +508,7 @@ impl Future for PingManager { // then no need to keep checking the timer and we can // keep going debug!("waiting for socket to see pong timed out"); - break + break; } else if self.timeout.poll()?.is_ready() { // We may not actually be reading messages from the // websocket right now, could have been waiting on @@ -510,7 +519,7 @@ impl Future for PingManager { debug!("waited too long for a pong"); socket.pong_timeout = true; } else { - break + break; } } WaitingFor::Close => { @@ -519,16 +528,16 @@ impl Future for PingManager { if let CloseState::Exchange(ref mut client) = self.client { client.shutdown(); } - return Err("close handshake took too long".into()) + return Err("close handshake took too long".into()); } } } } // Be sure to always flush out any buffered messages/pings - socket.poll_complete().chain_err(|| { - "failed routine `poll_complete` call" - })?; + socket.poll_complete().chain_err( + || "failed routine `poll_complete` call", + )?; drop(socket); // At this point looks our state of ping management A-OK, so try to @@ -570,7 +579,9 @@ impl WebpushSocket { } fn send_ping(&mut self) -> Poll<(), Error> - where T: Sink, Error: From + where + T: Sink, + Error: From, { if self.ping { debug!("sending a ping"); @@ -581,7 +592,7 @@ impl WebpushSocket { } AsyncSink::NotReady(_) => { debug!("ping not ready to be sent"); - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } } @@ -590,8 +601,9 @@ impl WebpushSocket { } impl Stream for WebpushSocket - where T: Stream, - Error: From, +where + T: Stream, + Error: From, { type Item = ClientMessage; type Error = Error; @@ -606,21 +618,19 @@ impl Stream for WebpushSocket // elapsed (set above) then this is where we start // triggering errors. if self.pong_timeout { - return Err("failed to receive a pong in time".into()) + return Err("failed to receive a pong in time".into()); } - return Ok(Async::NotReady) + return Ok(Async::NotReady); } }; match msg { Message::Text(ref s) => { trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; - return Ok(Some(msg).into()) + return Ok(Some(msg).into()); } - Message::Binary(_) => { - return Err("binary messages not accepted".into()) - } + Message::Binary(_) => return Err("binary messages not accepted".into()), // sending a pong is already managed by lower layers, just go to // the next message @@ -639,19 +649,20 @@ impl Stream for WebpushSocket } impl Sink for WebpushSocket - where T: Sink, - Error: From, +where + T: Sink, + Error: From, { type SinkItem = ServerMessage; type SinkError = Error; - fn start_send(&mut self, msg: ServerMessage) - -> StartSend - { + fn start_send(&mut self, msg: ServerMessage) -> StartSend { if self.send_ping()?.is_not_ready() { - return Ok(AsyncSink::NotReady(msg)) + return Ok(AsyncSink::NotReady(msg)); } - let s = serde_json::to_string(&msg).chain_err(|| "failed to serialize")?; + let s = serde_json::to_string(&msg).chain_err( + || "failed to serialize", + )?; match self.inner.start_send(Message::Text(s))? { AsyncSink::Ready => Ok(AsyncSink::Ready), AsyncSink::NotReady(_) => Ok(AsyncSink::NotReady(msg)), @@ -686,7 +697,9 @@ fn write_status(socket: WebpushIo) -> MyFuture<()> { len = data.len(), data = data, ); - Box::new(tokio_io::io::write_all(socket, data.into_bytes()) - .map(|_| ()) - .chain_err(|| "failed to write status response")) + Box::new( + tokio_io::io::write_all(socket, data.into_bytes()) + .map(|_| ()) + .chain_err(|| "failed to write status response"), + ) } diff --git a/autopush_rs/src/server/webpush_io.rs b/autopush_rs/src/server/webpush_io.rs index 362acb88..fadc0d86 100644 --- a/autopush_rs/src/server/webpush_io.rs +++ b/autopush_rs/src/server/webpush_io.rs @@ -35,7 +35,7 @@ impl Read for WebpushIo { let n = (&header[..]).read(buf)?; header.split_to(n); if buf.len() == 0 || n > 0 { - return Ok(n) + return Ok(n); } } self.header_to_read = None; @@ -55,8 +55,7 @@ impl Write for WebpushIo { } } -impl AsyncRead for WebpushIo { -} +impl AsyncRead for WebpushIo {} impl AsyncWrite for WebpushIo { fn shutdown(&mut self) -> Poll<(), io::Error> { diff --git a/autopush_rs/src/util/mod.rs b/autopush_rs/src/util/mod.rs index 92bad25a..c1cd09e7 100644 --- a/autopush_rs/src/util/mod.rs +++ b/autopush_rs/src/util/mod.rs @@ -24,21 +24,20 @@ pub use self::rc::RcObject; /// timeout) and otherwise the returned future will cancel `f` and resolve to an /// error if the `dur` timeout elapses before `f` resolves. pub fn timeout(f: F, dur: Option, handle: &Handle) -> MyFuture - where F: Future + 'static, - F::Error: Into, +where + F: Future + 'static, + F::Error: Into, { let dur = match dur { Some(dur) => dur, None => return Box::new(f.map_err(|e| e.into())), }; let timeout = Timeout::new(dur, handle).into_future().flatten(); - Box::new(f.select2(timeout).then(|res| { - match res { - Ok(Either::A((item, _timeout))) => Ok(item), - Err(Either::A((e, _timeout))) => Err(e.into()), - Ok(Either::B(((), _item))) => Err("timed out".into()), - Err(Either::B((e, _item))) => Err(e.into()), - } + Box::new(f.select2(timeout).then(|res| match res { + Ok(Either::A((item, _timeout))) => Ok(item), + Err(Either::A((e, _timeout))) => Err(e.into()), + Ok(Either::B(((), _item))) => Err("timed out".into()), + Err(Either::B((e, _item))) => Err(e.into()), })) } @@ -50,11 +49,12 @@ pub fn init_logging(json: bool) { let mut builder = env_logger::LogBuilder::new(); if env::var("RUST_LOG").is_ok() { - builder.parse(&env::var("RUST_LOG").unwrap()); + builder.parse(&env::var("RUST_LOG").unwrap()); } - builder.target(env_logger::LogTarget::Stdout) - .format(maybe_json_record); + builder.target(env_logger::LogTarget::Stdout).format( + maybe_json_record, + ); if builder.init().is_ok() { LOG_JSON.store(json, Ordering::SeqCst); @@ -82,9 +82,11 @@ fn maybe_json_record(record: &LogRecord) -> String { line: record.location().line(), }).unwrap() } else { - format!("{}:{}: {}", - record.level(), - record.location().module_path(), - record.args()) + format!( + "{}:{}: {}", + record.level(), + record.location().module_path(), + record.args() + ) } } diff --git a/autopush_rs/src/util/rc.rs b/autopush_rs/src/util/rc.rs index 6855efa0..45ad2783 100644 --- a/autopush_rs/src/util/rc.rs +++ b/autopush_rs/src/util/rc.rs @@ -33,9 +33,7 @@ impl Sink for RcObject { type SinkItem = T::SinkItem; type SinkError = T::SinkError; - fn start_send(&mut self, msg: T::SinkItem) - -> StartSend - { + fn start_send(&mut self, msg: T::SinkItem) -> StartSend { self.0.borrow_mut().start_send(msg) } diff --git a/autopush_rs/src/util/send_all.rs b/autopush_rs/src/util/send_all.rs index 0f4e51c6..23aef8de 100644 --- a/autopush_rs/src/util/send_all.rs +++ b/autopush_rs/src/util/send_all.rs @@ -10,9 +10,10 @@ pub struct MySendAll { } impl MySendAll - where U: Sink, - T: Stream, - T::Error: From, +where + U: Sink, + T: Stream, + T::Error: From, { #[allow(unused)] pub fn new(t: T, u: U) -> MySendAll { @@ -24,20 +25,24 @@ impl MySendAll } fn sink_mut(&mut self) -> &mut U { - self.sink.as_mut().take() - .expect("Attempted to poll MySendAll after completion") + self.sink.as_mut().take().expect( + "Attempted to poll MySendAll after completion", + ) } fn stream_mut(&mut self) -> &mut Fuse { - self.stream.as_mut().take() - .expect("Attempted to poll MySendAll after completion") + self.stream.as_mut().take().expect( + "Attempted to poll MySendAll after completion", + ) } fn take_result(&mut self) -> (T, U) { - let sink = self.sink.take() - .expect("Attempted to poll MySendAll after completion"); - let fuse = self.stream.take() - .expect("Attempted to poll MySendAll after completion"); + let sink = self.sink.take().expect( + "Attempted to poll MySendAll after completion", + ); + let fuse = self.stream.take().expect( + "Attempted to poll MySendAll after completion", + ); (fuse.into_inner(), sink) } @@ -45,16 +50,17 @@ impl MySendAll debug_assert!(self.buffered.is_none()); if let AsyncSink::NotReady(item) = try!(self.sink_mut().start_send(item)) { self.buffered = Some(item); - return Ok(Async::NotReady) + return Ok(Async::NotReady); } Ok(Async::Ready(())) } } impl Future for MySendAll - where U: Sink, - T: Stream, - T::Error: From, +where + U: Sink, + T: Stream, + T::Error: From, { type Item = (T, U); type Error = T::Error; @@ -71,11 +77,11 @@ impl Future for MySendAll Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)), Async::Ready(None) => { try_ready!(self.sink_mut().poll_complete()); - return Ok(Async::Ready(self.take_result())) + return Ok(Async::Ready(self.take_result())); } Async::NotReady => { try_ready!(self.sink_mut().poll_complete()); - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } }