From 00a34ba221d0e38db1bcbe2f67bdd20352f68a50 Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 8 Sep 2017 09:13:15 -0700 Subject: [PATCH] fix: use config hostname for websocket server, apply rustfmt, and update cargo deps Closes #1004 --- autopush_rs/Cargo.lock | 104 ++++++++--------- autopush_rs/__init__.py | 3 + autopush_rs/src/call.rs | 180 ++++++++++++++--------------- autopush_rs/src/client.rs | 188 ++++++++++++++++++------------- autopush_rs/src/errors.rs | 15 ++- autopush_rs/src/http.rs | 27 ++--- autopush_rs/src/protocol.rs | 6 +- autopush_rs/src/queue.rs | 12 +- autopush_rs/src/rt.rs | 56 ++++----- autopush_rs/src/server.rs | 128 +++++++++++---------- autopush_rs/src/util/mod.rs | 34 +++--- autopush_rs/src/util/rc.rs | 4 +- autopush_rs/src/util/send_all.rs | 40 ++++--- 13 files changed, 424 insertions(+), 373 deletions(-) diff --git a/autopush_rs/Cargo.lock b/autopush_rs/Cargo.lock index 541a1871..a94ea1f0 100644 --- a/autopush_rs/Cargo.lock +++ b/autopush_rs/Cargo.lock @@ -4,13 +4,13 @@ version = "0.1.0" dependencies = [ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -30,14 +30,14 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "backtrace-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -48,7 +48,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -103,7 +103,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -111,7 +111,7 @@ name = "core-foundation-sys" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -139,7 +139,7 @@ dependencies = [ [[package]] name = "dtoa" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -155,7 +155,7 @@ name = "error-chain" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "backtrace 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "backtrace 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -165,7 +165,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "futures" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -173,7 +173,7 @@ name = "futures-cpupool" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -194,7 +194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -224,13 +224,13 @@ name = "iovec" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "itoa" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -259,7 +259,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "libc" -version = "0.2.29" +version = "0.2.30" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -305,7 +305,7 @@ dependencies = [ "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -345,7 +345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -360,7 +360,7 @@ name = "num_cpus" version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -371,7 +371,7 @@ dependencies = [ "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "foreign-types 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "openssl-sys 0.9.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -381,7 +381,7 @@ version = "0.9.17" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "vcpkg 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -406,13 +406,13 @@ name = "rand" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "redox_syscall" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -468,7 +468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", "security-framework-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -478,7 +478,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -488,12 +488,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde_derive" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -512,13 +512,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -573,8 +573,8 @@ version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -584,7 +584,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -598,7 +598,7 @@ name = "tokio-dns-unofficial" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -610,7 +610,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -619,7 +619,7 @@ name = "tokio-proto" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -636,7 +636,7 @@ name = "tokio-service" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -644,7 +644,7 @@ name = "tokio-tls" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -656,7 +656,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-dns-unofficial 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -733,7 +733,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -762,7 +762,7 @@ dependencies = [ [metadata] "checksum advapi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e06588080cb19d0acb6739808aafa5f26bfb2ca015b2b6370028b44cf7cb8a9a" -"checksum backtrace 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "72f9b4182546f4b04ebc4ab7f84948953a118bd6021a1b6a6c909e3e94f6be76" +"checksum backtrace 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "99f2ce94e22b8e664d95c57fff45b98a966c2252b60691d0b7aeeccd88d70983" "checksum backtrace-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "afccc5772ba333abccdf60d55200fa3406f8c59dcf54d5f7998c9107d3799c7c" "checksum base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96434f987501f0ed4eb336a411e0631ecd1afa11574fe148587adc4ff96143c9" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" @@ -776,23 +776,23 @@ dependencies = [ "checksum crypt32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e34988f7e069e0b2f3bfc064295161e489b2d4e04a2e4248fb94360cdf00b4ec" "checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9" "checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850" -"checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" +"checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" "checksum foreign-types 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e4056b9bd47f8ac5ba12be771f77a0dae796d1bbaaf5fd0b9c2d38b69b8a29d" -"checksum futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4b63a4792d4f8f686defe3b39b92127fea6344de5d38202b2ee5a11bbbf29d6a" +"checksum futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a82bdc62350ca9d7974c760e9665102fc9d740992a528c2254aa930e53b783c4" "checksum futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a283c84501e92cade5ea673a2a7ca44f71f209ccdd302a3e0896f50083d2c5ff" "checksum gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)" = "e8310f7e9c890398b0e80e301c4f474e9918d2b27fca8f48486ca775fa9ffc5a" "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07" "checksum hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)" = "641abc3e3fcf0de41165595f801376e01106bca1fd876dda937730e477ca004c" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be" -"checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" +"checksum itoa 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ac17257442c2ed77dbc9fd555cf83c58b0c7f7d0e8f2ae08c0ac05c72842e1f6" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3b37545ab726dd833ec6420aaba8231c5b320814b9029ad585555d2a03e94fbf" "checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" -"checksum libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)" = "8a014d9226c2cc402676fbe9ea2e15dd5222cd1dd57f576b5b283178c944a264" +"checksum libc 0.2.30 (registry+https://github.com/rust-lang/crates.io-index)" = "2370ca07ec338939e356443dac2296f581453c35fe1e3a3ed06023c49435f915" "checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b" "checksum magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf0336886480e671965f794bc9b6fce88503563013d1bfb7a502c81fe3ac527" "checksum magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40d014c7011ac470ae28e2f76a02bfea4a8480f73e701353b49ad7a8d75f4699" @@ -810,7 +810,7 @@ dependencies = [ "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "eb250fd207a4729c976794d03db689c9be1d634ab5a1c9da9492a13d8fecbcdf" -"checksum redox_syscall 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)" = "8312fba776a49cf390b7b62f3135f9b294d8617f7a7592cfd0ac2492b658cd7b" +"checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509" "checksum rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "aee45432acc62f7b9a108cc054142dac51f979e69e71ddce7d6fc7adf29e817e" "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" @@ -820,10 +820,10 @@ dependencies = [ "checksum security-framework 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "dfa44ee9c54ce5eecc9de7d5acbad112ee58755239381f687e564004ba4a2332" "checksum security-framework-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "5421621e836278a0b139268f36eee0dc7e389b784dc3f79d8f11aabadf41bead" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" -"checksum serde 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f7726f29ddf9731b17ff113c461e362c381d9d69433f79de4f3dd572488823e9" -"checksum serde_derive 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cf823e706be268e73e7747b147aa31c8f633ab4ba31f115efb57e5047c3a76dd" +"checksum serde 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "7f61b753dd58ec5d4c735f794dbddde1f28b977f652afbcde89d75bc77902216" +"checksum serde_derive 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "2a169fa5384d751ada1da9f3992b81830151a03c875e40dcb37c9fb31aafc68f" "checksum serde_derive_internals 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37aee4e0da52d801acfbc0cc219eb1eda7142112339726e427926a6f6ee65d3a" -"checksum serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "48b04779552e92037212c3615370f6bd57a40ebba7f20e554ff9f55e41a69a7b" +"checksum serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d243424e06f9f9c39e3cd36147470fd340db785825e367625f79298a6ac6b7ac" "checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" diff --git a/autopush_rs/__init__.py b/autopush_rs/__init__.py index 42aa9a29..8888f7af 100644 --- a/autopush_rs/__init__.py +++ b/autopush_rs/__init__.py @@ -27,6 +27,9 @@ def __init__(self, conf, queue): cfg.close_handshake_timeout = conf.close_handshake_timeout cfg.max_connections = conf.max_connections cfg.open_handshake_timeout = 5 + if not conf._resolve_hostname: + raise Exception("Must set resolve_hostname to True") + cfg.host_ip = ffi_from_buffer(conf.hostname) cfg.port = conf.port cfg.ssl_cert = ffi_from_buffer(conf.ssl.cert) cfg.ssl_dh_param = ffi_from_buffer(conf.ssl.dh_param) diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index 687bc4eb..1782afb3 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -22,7 +22,6 @@ use libc::c_char; use serde::de; use serde::ser; use serde_json; -use time::Tm; use uuid::Uuid; use errors::*; @@ -46,35 +45,27 @@ pub struct PythonCall { } #[no_mangle] -pub extern "C" fn autopush_python_call_input_ptr(call: *mut AutopushPythonCall, - err: &mut AutopushError) - -> *const u8 -{ - unsafe { - (*call).inner.catch(err, |call| { - call.input.as_ptr() - }) - } +pub extern "C" fn autopush_python_call_input_ptr( + call: *mut AutopushPythonCall, + err: &mut AutopushError, +) -> *const u8 { + unsafe { (*call).inner.catch(err, |call| call.input.as_ptr()) } } #[no_mangle] -pub extern "C" fn autopush_python_call_input_len(call: *mut AutopushPythonCall, - err: &mut AutopushError) - -> usize -{ - unsafe { - (*call).inner.catch(err, |call| { - call.input.len() - }) - } +pub extern "C" fn autopush_python_call_input_len( + call: *mut AutopushPythonCall, + err: &mut AutopushError, +) -> usize { + unsafe { (*call).inner.catch(err, |call| call.input.len()) } } #[no_mangle] -pub extern "C" fn autopush_python_call_complete(call: *mut AutopushPythonCall, - input: *const c_char, - err: &mut AutopushError) - -> i32 -{ +pub extern "C" fn autopush_python_call_complete( + call: *mut AutopushPythonCall, + input: *const c_char, + err: &mut AutopushError, +) -> i32 { unsafe { (*call).inner.catch(err, |call| { let input = CStr::from_ptr(input).to_str().unwrap(); @@ -101,7 +92,8 @@ impl AutopushPythonCall { } fn _new(input: String, f: F) -> AutopushPythonCall - where F: FnOnce(&str) + Send + 'static, + where + F: FnOnce(&str) + Send + 'static, { AutopushPythonCall { inner: UnwindGuard::new(Inner { @@ -163,19 +155,14 @@ enum Call { timestamp: i64, }, - DropUser { - uaid: String, - }, + DropUser { uaid: String }, - MigrateUser { - uaid: String, - message_month: String, - }, + MigrateUser { uaid: String, message_month: String }, StoreMessages { message_month: String, messages: Vec, - } + }, } #[derive(Deserialize)] @@ -195,29 +182,25 @@ pub struct HelloResponse { #[derive(Deserialize)] #[serde(untagged)] pub enum RegisterResponse { - Success { - endpoint: String, - }, + Success { endpoint: String }, Error { error_msg: String, error: bool, status: u32, - } + }, } #[derive(Deserialize)] #[serde(untagged)] pub enum UnRegisterResponse { - Success { - success: bool, - }, + Success { success: bool }, Error { error_msg: String, error: bool, status: u32, - } + }, } #[derive(Deserialize)] @@ -254,21 +237,27 @@ pub struct StoreMessagesResponse { impl Server { - pub fn hello(&self, connected_at: &u64, uaid: Option<&Uuid>) - -> MyFuture - { + pub fn hello(&self, connected_at: &u64, uaid: Option<&Uuid>) -> MyFuture { let ms = *connected_at as i64; let (call, fut) = PythonCall::new(&Call::Hello { connected_at: ms, - uaid: if let Some(uuid) = uaid { Some(uuid.simple().to_string()) } else { None }, + uaid: if let Some(uuid) = uaid { + Some(uuid.simple().to_string()) + } else { + None + }, }); self.send_to_python(call); - return fut + return fut; } - pub fn register(&self, uaid: String, message_month: String, channel_id: String, key: Option) - -> MyFuture - { + pub fn register( + &self, + uaid: String, + message_month: String, + channel_id: String, + key: Option, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::Register { uaid: uaid, message_month: message_month, @@ -276,12 +265,16 @@ impl Server { key: key, }); self.send_to_python(call); - return fut + return fut; } - pub fn unregister(&self, uaid: String, message_month: String, channel_id: String, code: i32) - -> MyFuture - { + pub fn unregister( + &self, + uaid: String, + message_month: String, + channel_id: String, + code: i32, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::Unregister { uaid: uaid, message_month: message_month, @@ -289,12 +282,16 @@ impl Server { code: code, }); self.send_to_python(call); - return fut + return fut; } - pub fn check_storage(&self, uaid: String, message_month: String, include_topic: bool, timestamp: Option) - -> MyFuture - { + pub fn check_storage( + &self, + uaid: String, + message_month: String, + include_topic: bool, + timestamp: Option, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::CheckStorage { uaid: uaid, message_month: message_month, @@ -302,52 +299,62 @@ impl Server { timestamp: timestamp, }); self.send_to_python(call); - return fut + return fut; } - pub fn increment_storage(&self, uaid: String, message_month: String, timestamp: i64) - -> MyFuture - { + pub fn increment_storage( + &self, + uaid: String, + message_month: String, + timestamp: i64, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::IncStoragePosition { uaid: uaid, message_month: message_month, timestamp: timestamp, }); self.send_to_python(call); - return fut + return fut; } - pub fn delete_message(&self, message_month: String, notif: protocol::Notification) - -> MyFuture - { + pub fn delete_message( + &self, + message_month: String, + notif: protocol::Notification, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::DeleteMessage { message: notif, message_month: message_month, }); self.send_to_python(call); - return fut + return fut; } pub fn drop_user(&self, uaid: String) -> MyFuture { - let (call, fut) = PythonCall::new(&Call::DropUser { - uaid, - }); + let (call, fut) = PythonCall::new(&Call::DropUser { uaid }); self.send_to_python(call); - return fut + return fut; } - pub fn migrate_user(&self, uaid: String, message_month: String) -> MyFuture { + pub fn migrate_user( + &self, + uaid: String, + message_month: String, + ) -> MyFuture { let (call, fut) = PythonCall::new(&Call::MigrateUser { uaid, message_month, }); self.send_to_python(call); - return fut + return fut; } - pub fn store_messages(&self, uaid: String, message_month: String, mut messages: Vec) - -> MyFuture - { + pub fn store_messages( + &self, + uaid: String, + message_month: String, + mut messages: Vec, + ) -> MyFuture { for message in messages.iter_mut() { message.uaid = Some(uaid.clone()); } @@ -356,7 +363,7 @@ impl Server { messages, }); self.send_to_python(call); - return fut + return fut; } fn send_to_python(&self, call: PythonCall) { @@ -366,22 +373,19 @@ impl Server { impl PythonCall { fn new(input: &T) -> (PythonCall, MyFuture) - where T: ser::Serialize, - U: for<'de> de::Deserialize<'de> + 'static, + where + T: ser::Serialize, + U: for<'de> de::Deserialize<'de> + 'static, { let (tx, rx) = oneshot::channel(); let call = PythonCall { input: serde_json::to_string(input).unwrap(), - output: Box::new(|json: &str| { - drop(tx.send(json_or_error(json))); - }), + output: Box::new(|json: &str| { drop(tx.send(json_or_error(json))); }), }; - let rx = Box::new(rx.then(|res| { - match res { - Ok(Ok(s)) => Ok(serde_json::from_str(&s)?), - Ok(Err(e)) => Err(e), - Err(_) => Err("call canceled from python".into()), - } + let rx = Box::new(rx.then(|res| match res { + Ok(Ok(s)) => Ok(serde_json::from_str(&s)?), + Ok(Err(e)) => Err(e), + Err(_) => Err("call canceled from python".into()), })); (call, rx) } @@ -390,7 +394,7 @@ impl PythonCall { fn json_or_error(json: &str) -> Result { if let Ok(err) = serde_json::from_str::(json) { if err.error { - return Err(format!("python exception: {}", err.error_msg).into()) + return Err(format!("python exception: {}", err.error_msg).into()); } } Ok(json.to_string()) diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 1bb6dd04..74b05317 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -8,7 +8,7 @@ use std::rc::Rc; use futures::AsyncSink; -use futures::future::{Either}; +use futures::future::Either; use futures::sync::mpsc; use futures::{Stream, Sink, Future, Poll, Async}; use tokio_core::reactor::Timeout; @@ -134,8 +134,8 @@ where data: ClientData { webpush: None, srv: srv.clone(), - ws: ws - } + ws: ws, + }, } } @@ -146,8 +146,8 @@ where fn transition(&mut self) -> Poll { let next_state = match self.state { ClientState::FinishSend(None, None) => { - return Err("Bad state, should not have nothing to do".into()) - }, + return Err("Bad state, should not have nothing to do".into()); + } ClientState::FinishSend(None, ref mut next_state) => { debug!("State: FinishSend w/next_state"); try_ready!(self.data.ws.poll_complete()); @@ -160,13 +160,13 @@ where match ret { AsyncSink::Ready => { ClientState::FinishSend(None, Some(next_state.take().unwrap())) - }, + } AsyncSink::NotReady(returned) => { *msg = Some(returned); return Ok(Async::NotReady); } } - }, + } ClientState::SendMessages(ref mut more_messages) => { debug!("State: SendMessages"); if more_messages.is_some() { @@ -174,19 +174,23 @@ where if let Some(message) = messages.pop() { ClientState::FinishSend( Some(ServerMessage::Notification(message)), - Some(Box::new(ClientState::SendMessages( - if messages.len() > 0 { Some(messages) } else { None } - ))) + Some(Box::new(ClientState::SendMessages(if messages.len() > 0 { + Some(messages) + } else { + None + }))), ) } else { - ClientState::SendMessages( - if messages.len() > 0 { Some(messages) } else { None } - ) + ClientState::SendMessages(if messages.len() > 0 { + Some(messages) + } else { + None + }) } } else { ClientState::WaitingForAcks } - }, + } ClientState::CheckStorage => { debug!("State: CheckStorage"); let webpush = self.data.webpush.as_ref().unwrap(); @@ -196,7 +200,7 @@ where webpush.flags.include_topic, webpush.unacked_stored_highest, )) - }, + } ClientState::IncrementStorage => { debug!("State: IncrementStorage"); let webpush = self.data.webpush.as_ref().unwrap(); @@ -205,29 +209,47 @@ where webpush.message_month.clone(), webpush.unacked_stored_highest.unwrap(), )) - }, + } ClientState::WaitingForHello(ref mut timeout) => { debug!("State: WaitingForHello"); let uaid = match try_ready!(self.data.input_with_timeout(timeout)) { - ClientMessage::Hello { uaid, use_webpush: Some(true), ..} => uaid, + ClientMessage::Hello { + uaid, + use_webpush: Some(true), + .. + } => uaid, _ => return Err("Invalid message, must be hello".into()), }; let ms_time = time::precise_time_ns() / 1000; ClientState::WaitingForProcessHello(self.data.srv.hello(&ms_time, uaid.as_ref())) - }, + } ClientState::WaitingForProcessHello(ref mut response) => { debug!("State: WaitingForProcessHello"); match try_ready!(response.poll()) { - call::HelloResponse { uaid: Some(uaid), message_month, reset_uaid, rotate_message_table } - => self.data.process_hello(uaid, message_month, reset_uaid, rotate_message_table), + call::HelloResponse { + uaid: Some(uaid), + message_month, + reset_uaid, + rotate_message_table, + } => { + self.data.process_hello( + uaid, + message_month, + reset_uaid, + rotate_message_table, + ) + } _ => return Err("Already connected elsewhere".into()), } - }, + } ClientState::WaitingForCheckStorage(ref mut response) => { debug!("State: WaitingForCheckStorage"); let (include_topic, mut messages, timestamp) = match try_ready!(response.poll()) { - call::CheckStorageResponse { include_topic, messages, timestamp } - => (include_topic, messages, timestamp), + call::CheckStorageResponse { + include_topic, + messages, + timestamp, + } => (include_topic, messages, timestamp), }; debug!("Got checkstorage response"); let webpush = self.data.webpush.as_mut().unwrap(); @@ -235,35 +257,35 @@ where webpush.unacked_stored_highest = timestamp; if messages.len() > 0 { webpush.flags.increment_storage = !include_topic; - webpush.unacked_stored_notifs.extend(messages.iter().cloned()); - let message = ServerMessage::Notification( - messages.pop().unwrap() + webpush.unacked_stored_notifs.extend( + messages.iter().cloned(), ); + let message = ServerMessage::Notification(messages.pop().unwrap()); ClientState::FinishSend( Some(message), - Some(Box::new(ClientState::SendMessages(Some(messages)))) + Some(Box::new(ClientState::SendMessages(Some(messages)))), ) } else { webpush.flags.check = false; ClientState::Await } - }, + } ClientState::WaitingForIncrementStorage(ref mut response) => { debug!("State: WaitingForIncrementStorage"); try_ready!(response.poll()); self.data.webpush.as_mut().unwrap().flags.increment_storage = false; ClientState::WaitingForAcks - }, + } ClientState::WaitingForMigrateUser(ref mut response) => { debug!("State: WaitingForMigrateUser"); let message_month = match try_ready!(response.poll()) { - call::MigrateUserResponse{ message_month} => message_month + call::MigrateUserResponse { message_month } => message_month, }; let webpush = self.data.webpush.as_mut().unwrap(); webpush.message_month = message_month; webpush.flags.rotate_message_table = false; ClientState::Await - }, + } ClientState::WaitingForRegister(channel_id, ref mut response) => { debug!("State: WaitingForRegister"); let msg = match try_ready!(response.poll()) { @@ -273,7 +295,7 @@ where status: 200, push_endpoint: endpoint, } - }, + } call::RegisterResponse::Error { error_msg, status, .. } => { debug!("Got unregister fail, error: {}", error_msg); ServerMessage::Register { @@ -289,17 +311,17 @@ where ClientState::Await }; ClientState::FinishSend(Some(msg), Some(Box::new(next_state))) - }, + } ClientState::WaitingForUnRegister(channel_id, ref mut response) => { debug!("State: WaitingForUnRegister"); let msg = match try_ready!(response.poll()) { - call::UnRegisterResponse::Success{ success } => { + call::UnRegisterResponse::Success { success } => { debug!("Got the unregister response"); ServerMessage::Unregister { channel_id: channel_id, status: if success { 200 } else { 500 }, } - }, + } call::UnRegisterResponse::Error { error_msg, status, .. } => { debug!("Got unregister fail, error: {}", error_msg); ServerMessage::Unregister { channel_id, status } @@ -311,47 +333,45 @@ where ClientState::Await }; ClientState::FinishSend(Some(msg), Some(Box::new(next_state))) - }, + } ClientState::WaitingForAcks => { debug!("State: WaitingForAcks"); if let Some(next_state) = self.data.determine_acked_state() { - return Ok(next_state.into()) + return Ok(next_state.into()); } match try_ready!(self.data.input()) { ClientMessage::Register { channel_id, key } => { self.data.process_register(channel_id, key) - }, + } ClientMessage::Unregister { channel_id, code } => { self.data.process_unregister(channel_id, code) - }, - ClientMessage::Ack { updates } => { - self.data.process_acks(updates) } - _ => return Err("Invalid state transition".into()) + ClientMessage::Ack { updates } => self.data.process_acks(updates), + _ => return Err("Invalid state transition".into()), } - }, + } ClientState::WaitingForDelete(ref mut response) => { debug!("State: WaitingForDelete"); try_ready!(response.poll()); ClientState::WaitingForAcks - }, + } ClientState::WaitingForDropUser(ref mut response) => { debug!("State: WaitingForDropUser"); try_ready!(response.poll()); ClientState::Done - }, + } ClientState::Await => { debug!("State: Await"); if self.data.webpush.as_ref().unwrap().flags.check { - return Ok(ClientState::CheckStorage.into()) + return Ok(ClientState::CheckStorage.into()); } match try_ready!(self.data.input_or_notif()) { Either::A(ClientMessage::Register { channel_id, key }) => { self.data.process_register(channel_id, key) - }, + } Either::A(ClientMessage::Unregister { channel_id, code }) => { self.data.process_unregister(channel_id, code) - }, + } Either::B(ServerNotification::Notification(notif)) => { let webpush = self.data.webpush.as_mut().unwrap(); webpush.unacked_direct_notifs.push(notif.clone()); @@ -360,16 +380,16 @@ where Some(ServerMessage::Notification(notif)), Some(Box::new(ClientState::WaitingForAcks)), ) - }, + } Either::B(ServerNotification::CheckStorage) => { let webpush = self.data.webpush.as_mut().unwrap(); webpush.flags.include_topic = true; webpush.flags.check = true; ClientState::Await - }, - _ => return Err("Invalid message".into()) + } + _ => return Err("Invalid message".into()), } - }, + } ClientState::ShutdownCleanup(ref mut err) => { debug!("State: ShutdownCleanup"); if let Some(err_obj) = err.take() { @@ -377,7 +397,7 @@ where }; self.data.shutdown(); ClientState::Done - }, + } ClientState::Done => { // We don't expect this to actually run, as this state will exit // the transition. Included for exhaustive matching. @@ -399,7 +419,7 @@ where let item = match self.ws.poll()? { Async::Ready(None) => return Err("Client dropped".into()), Async::Ready(Some(msg)) => Async::Ready(msg), - Async::NotReady => Async::NotReady + Async::NotReady => Async::NotReady, }; Ok(item) } @@ -429,13 +449,19 @@ where Async::Ready(Some(msg)) => Either::A(msg), Async::NotReady => return Ok(Async::NotReady), } - }, + } Err(_) => return Err("Unexpected error".into()), }; Ok(Async::Ready(item)) } - fn process_hello(&mut self, uaid: Uuid, message_month: String, reset_uaid: bool, rotate_message_table: bool) -> ClientState { + fn process_hello( + &mut self, + uaid: Uuid, + message_month: String, + reset_uaid: bool, + rotate_message_table: bool, + ) -> ClientState { let (tx, rx) = mpsc::unbounded(); let mut flags = ClientFlags::new(); flags.reset_uaid = reset_uaid; @@ -449,7 +475,9 @@ where unacked_stored_notifs: Vec::new(), unacked_stored_highest: None, }); - self.srv.connect_client(RegisteredClient { uaid: uaid, tx: tx }); + self.srv.connect_client( + RegisteredClient { uaid: uaid, tx: tx }, + ); let response = ServerMessage::Hello { uaid: uaid.hyphenated().to_string(), status: 200, @@ -493,14 +521,16 @@ where let mut fut: Option> = None; for notif in updates.iter() { if let Some(pos) = webpush.unacked_direct_notifs.iter().position(|v| { - v.channel_id == notif.channel_id && v.version == notif.version - }) { + v.channel_id == notif.channel_id && v.version == notif.version + }) + { webpush.unacked_direct_notifs.remove(pos); continue; }; if let Some(pos) = webpush.unacked_stored_notifs.iter().position(|v| { - v.channel_id == notif.channel_id && v.version == notif.version - }) { + v.channel_id == notif.channel_id && v.version == notif.version + }) + { let message_month = webpush.message_month.clone(); let n = webpush.unacked_stored_notifs.remove(pos); if n.topic.is_some() { @@ -508,9 +538,7 @@ where fut = Some(self.srv.delete_message(message_month, n)) } else { let my_fut = self.srv.delete_message(message_month, n); - fut = Some(Box::new(fut.take().unwrap().and_then(move |_| { - my_fut - }))); + fut = Some(Box::new(fut.take().unwrap().and_then(move |_| my_fut))); } } continue; @@ -533,15 +561,13 @@ where } else if all_acked && webpush.flags.check { Some(ClientState::CheckStorage) } else if all_acked && webpush.flags.rotate_message_table { - Some(ClientState::WaitingForMigrateUser( - self.srv.migrate_user( - webpush.uaid.simple().to_string(), - webpush.message_month.clone(), - ) - )) + Some(ClientState::WaitingForMigrateUser(self.srv.migrate_user( + webpush.uaid.simple().to_string(), + webpush.message_month.clone(), + ))) } else if all_acked && webpush.flags.reset_uaid { Some(ClientState::WaitingForDropUser( - self.srv.drop_user(webpush.uaid.simple().to_string()) + self.srv.drop_user(webpush.uaid.simple().to_string()), )) } else if all_acked && webpush.flags.none() { Some(ClientState::Await) @@ -562,14 +588,18 @@ where // here self.srv.disconnet_client(&webpush.uaid); if webpush.unacked_direct_notifs.len() > 0 { - self.srv.handle.spawn(self.srv.store_messages( - webpush.uaid.simple().to_string(), - webpush.message_month, - webpush.unacked_direct_notifs - ).then(|_| { - debug!("Finished saving unacked direct notifications"); - Ok(()) - })) + self.srv.handle.spawn( + self.srv + .store_messages( + webpush.uaid.simple().to_string(), + webpush.message_month, + webpush.unacked_direct_notifs, + ) + .then(|_| { + debug!("Finished saving unacked direct notifications"); + Ok(()) + }), + ) } }; } diff --git a/autopush_rs/src/errors.rs b/autopush_rs/src/errors.rs index abdddd1e..f7ff8e09 100644 --- a/autopush_rs/src/errors.rs +++ b/autopush_rs/src/errors.rs @@ -49,17 +49,20 @@ pub type MyFuture = Box>; pub trait FutureChainErr { fn chain_err(self, callback: F) -> MyFuture - where F: FnOnce() -> E + 'static, - E: Into; + where + F: FnOnce() -> E + 'static, + E: Into; } impl FutureChainErr for F - where F: Future + 'static, - F::Error: error::Error + Send + 'static, +where + F: Future + 'static, + F::Error: error::Error + Send + 'static, { fn chain_err(self, callback: C) -> MyFuture - where C: FnOnce() -> E + 'static, - E: Into, + where + C: FnOnce() -> E + 'static, + E: Into, { Box::new(self.then(|r| r.chain_err(callback))) } diff --git a/autopush_rs/src/http.rs b/autopush_rs/src/http.rs index 40877a60..e4ce6c7b 100644 --- a/autopush_rs/src/http.rs +++ b/autopush_rs/src/http.rs @@ -27,18 +27,18 @@ impl Service for Push { fn call(&self, req: hyper::Request) -> Self::Future { if *req.method() != Method::Put && *req.method() != Method::Post { println!("not a PUT: {}", req.method()); - return Box::new(err(hyper::Error::Method)) + return Box::new(err(hyper::Error::Method)); } if req.uri().path().len() == 0 { println!("empty uri path"); - return Box::new(err(hyper::Error::Incomplete)) + return Box::new(err(hyper::Error::Incomplete)); } let req_uaid = req.uri().path()[6..].to_string(); let uaid = match Uuid::parse_str(&req_uaid) { Ok(id) => id, Err(_) => { println!("uri not uuid: {}", req_uaid); - return Box::new(err(hyper::Error::Status)) + return Box::new(err(hyper::Error::Status)); } }; @@ -50,18 +50,19 @@ impl Service for Push { let s = String::from_utf8(body.to_vec()).unwrap(); if let Ok(msg) = serde_json::from_str(&s) { match srv.notify_client(uaid, msg) { - Ok(_) => return Ok(hyper::Response::new() - .with_status(hyper::StatusCode::Ok) - ), - _ => return Ok(hyper::Response::new() - .with_status(hyper::StatusCode::BadRequest) - .with_body("Unable to decode body payload") - ) + Ok(_) => return Ok(hyper::Response::new().with_status(hyper::StatusCode::Ok)), + _ => { + return Ok( + hyper::Response::new() + .with_status(hyper::StatusCode::BadRequest) + .with_body("Unable to decode body payload"), + ) + } } } - Ok(hyper::Response::new() - .with_status(hyper::StatusCode::NotFound) - ) + Ok(hyper::Response::new().with_status( + hyper::StatusCode::NotFound, + )) })) } } diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index c705658e..153c3be6 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -39,9 +39,7 @@ pub enum ClientMessage { code: Option, }, - Ack { - updates: Vec, - }, + Ack { updates: Vec }, } #[derive(Deserialize)] @@ -91,5 +89,5 @@ pub struct Notification { #[serde(skip_serializing_if = "Option::is_none")] data: Option, #[serde(skip_serializing_if = "Option::is_none")] - headers: Option> + headers: Option>, } diff --git a/autopush_rs/src/queue.rs b/autopush_rs/src/queue.rs index 28acc48a..453c4378 100644 --- a/autopush_rs/src/queue.rs +++ b/autopush_rs/src/queue.rs @@ -25,9 +25,7 @@ fn _assert_kinds() { } #[no_mangle] -pub extern "C" fn autopush_queue_new(err: &mut AutopushError) - -> *mut AutopushQueue -{ +pub extern "C" fn autopush_queue_new(err: &mut AutopushError) -> *mut AutopushQueue { rt::catch(err, || { let (tx, rx) = mpsc::channel(); @@ -39,10 +37,10 @@ pub extern "C" fn autopush_queue_new(err: &mut AutopushError) } #[no_mangle] -pub extern "C" fn autopush_queue_recv(queue: *mut AutopushQueue, - err: &mut AutopushError) - -> *mut AutopushPythonCall -{ +pub extern "C" fn autopush_queue_recv( + queue: *mut AutopushQueue, + err: &mut AutopushError, +) -> *mut AutopushPythonCall { rt::catch(err, || unsafe { let mut rx = (*queue).rx.lock().unwrap(); let msg = match *rx { diff --git a/autopush_rs/src/rt.rs b/autopush_rs/src/rt.rs index f982df27..d194b299 100644 --- a/autopush_rs/src/rt.rs +++ b/autopush_rs/src/rt.rs @@ -55,16 +55,14 @@ impl AutopushError { fn string(&self) -> Option<&str> { assert!(self.p1 != 0); assert!(self.p2 != 0); - let any: &Any = unsafe { - mem::transmute((self.p1, self.p2)) - }; + let any: &Any = unsafe { mem::transmute((self.p1, self.p2)) }; // Similar to what libstd does, only check for `&'static str` and // `String`. - any.downcast_ref::<&'static str>() - .map(|s| &s[..]) - .or_else(|| { + any.downcast_ref::<&'static str>().map(|s| &s[..]).or_else( + || { any.downcast_ref::().map(|s| &s[..]) - }) + }, + ) } fn assert_empty(&self) { @@ -83,7 +81,7 @@ impl AutopushError { /// Deallocates the internal `Box`, freeing the resources behind it. unsafe fn cleanup(&mut self) { - mem::transmute::<_, Box>((self.p1, self.p2)); + mem::transmute::<_, Box>((self.p1, self.p2)); self.p1 = 0; self.p2 = 0; } @@ -93,9 +91,7 @@ impl AutopushError { /// there is no error message. #[no_mangle] pub extern "C" fn autopush_error_msg_len(err: *const AutopushError) -> usize { - abort_on_panic(|| unsafe { - (*err).string().map(|s| s.len()).unwrap_or(0) - }) + abort_on_panic(|| unsafe { (*err).string().map(|s| s.len()).unwrap_or(0) }) } /// Returns the data pointer of the error message, if any. If not present @@ -112,9 +108,7 @@ pub extern "C" fn autopush_error_msg_ptr(err: *const AutopushError) -> *const u8 /// The error itself can continue to be reused for future function calls. #[no_mangle] pub unsafe extern "C" fn autopush_error_cleanup(err: *mut AutopushError) { - abort_on_panic(|| { - (&mut *err).cleanup(); - }); + abort_on_panic(|| { (&mut *err).cleanup(); }); } /// Helper structure to provide "unwind safety" to ensure we don't reuse values @@ -147,28 +141,32 @@ impl UnwindGuard { /// closure `f` will not be executed. This function will immediately return /// with the "null" return value to propagate the panic again. pub fn catch(&self, err: &mut AutopushError, f: F) -> R::AbiRet - where F: FnOnce(&T) -> R, - R: AbiInto, + where + F: FnOnce(&T) -> R, + R: AbiInto, { err.assert_empty(); if self.poisoned.get() { err.fill(Box::new(String::from("accessing poisoned object"))); - return R::null() + return R::null(); } // The usage of `AssertUnwindSafe` should be ok here because as // soon as we see this closure panic we'll disallow all further // access to the internals of `self`. let mut panicked = true; - let ret = catch(err, panic::AssertUnwindSafe(|| { - let ret = f(&self.inner); - panicked = false; - return ret - })); + let ret = catch( + err, + panic::AssertUnwindSafe(|| { + let ret = f(&self.inner); + panicked = false; + return ret; + }), + ); if panicked { self.poisoned.set(true); } - return ret + return ret; } } @@ -177,8 +175,9 @@ impl UnwindGuard { /// This is typically only used for constructors where there's no state /// persisted across calls. pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet - where F: panic::UnwindSafe + FnOnce() -> T, - T: AbiInto, +where + F: panic::UnwindSafe + FnOnce() -> T, + T: AbiInto, { err.assert_empty(); @@ -189,7 +188,7 @@ pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet err.p1 = ptrs.0; err.p2 = ptrs.1; T::null() - } + }, } } @@ -199,7 +198,8 @@ pub fn catch(err: &mut AutopushError, f: F) -> T::AbiRet /// This should be rarely used but is used when executing destructors in Rust, /// which should be infallible (and this is just a double-check that they are). pub fn abort_on_panic(f: F) -> R - where F: FnOnce() -> R, +where + F: FnOnce() -> R, { struct Bomb { active: bool, @@ -216,7 +216,7 @@ pub fn abort_on_panic(f: F) -> R let mut bomb = Bomb { active: true }; let r = f(); bomb.active = false; - return r + return r; } pub trait AbiInto { diff --git a/autopush_rs/src/server.rs b/autopush_rs/src/server.rs index 3b046f31..9be6340d 100644 --- a/autopush_rs/src/server.rs +++ b/autopush_rs/src/server.rs @@ -43,6 +43,7 @@ struct AutopushServerInner { #[repr(C)] pub struct AutopushServerOptions { pub debug: i32, + pub host_ip: *const c_char, pub port: u16, pub url: *const c_char, pub ssl_key: *const c_char, @@ -66,6 +67,7 @@ pub struct Server { pub struct ServerOptions { pub debug: bool, + pub host_ip: String, pub port: u16, pub url: String, pub ssl_key: Option, @@ -79,20 +81,16 @@ pub struct ServerOptions { } #[no_mangle] -pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, - err: &mut AutopushError) - -> *mut AutopushServer -{ +pub extern "C" fn autopush_server_new( + opts: *const AutopushServerOptions, + err: &mut AutopushError, +) -> *mut AutopushServer { unsafe fn to_s<'a>(ptr: *const c_char) -> Option<&'a str> { if ptr.is_null() { - return None + return None; } let s = CStr::from_ptr(ptr).to_str().expect("invalid utf-8"); - if s.is_empty() { - None - } else { - Some(s) - } + if s.is_empty() { None } else { Some(s) } } unsafe fn ito_dur(seconds: u32) -> Option { @@ -107,8 +105,10 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, if seconds == 0.0 { None } else { - Some(Duration::new(seconds as u64, - (seconds.fract() * 1_000_000_000.0) as u32)) + Some(Duration::new( + seconds as u64, + (seconds.fract() * 1_000_000_000.0) as u32, + )) } } @@ -119,15 +119,16 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, let opts = ServerOptions { debug: opts.debug != 0, + host_ip: to_s(opts.host_ip).expect("hostname must be specified").to_string(), port: opts.port, url: to_s(opts.url).expect("url must be specified").to_string(), ssl_key: to_s(opts.ssl_key).map(PathBuf::from), ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from), ssl_dh_param: to_s(opts.ssl_dh_param).map(PathBuf::from), - auto_ping_interval: fto_dur(opts.auto_ping_interval) - .expect("ping interval cannot be 0"), - auto_ping_timeout: fto_dur(opts.auto_ping_timeout) - .expect("ping timeout cannot be 0"), + auto_ping_interval: fto_dur(opts.auto_ping_interval).expect( + "ping interval cannot be 0", + ), + auto_ping_timeout: fto_dur(opts.auto_ping_timeout).expect("ping timeout cannot be 0"), close_handshake_timeout: ito_dur(opts.close_handshake_timeout), max_connections: if opts.max_connections == 0 { None @@ -148,14 +149,15 @@ pub extern "C" fn autopush_server_new(opts: *const AutopushServerOptions, } #[no_mangle] -pub extern "C" fn autopush_server_start(srv: *mut AutopushServer, - queue: *mut AutopushQueue, - err: &mut AutopushError) -> i32 { +pub extern "C" fn autopush_server_start( + srv: *mut AutopushServer, + queue: *mut AutopushQueue, + err: &mut AutopushError, +) -> i32 { unsafe { (*srv).inner.catch(err, |srv| { let tx = (*queue).tx(); - let (tx, thread) = Server::start(&srv.opts, tx) - .expect("failed to start server"); + let (tx, thread) = Server::start(&srv.opts, tx).expect("failed to start server"); srv.tx.set(Some(tx)); srv.thread.set(Some(thread)); }) @@ -163,8 +165,7 @@ pub extern "C" fn autopush_server_start(srv: *mut AutopushServer, } #[no_mangle] -pub extern "C" fn autopush_server_stop(srv: *mut AutopushServer, - err: &mut AutopushError) -> i32 { +pub extern "C" fn autopush_server_stop(srv: *mut AutopushServer, err: &mut AutopushError) -> i32 { unsafe { (*srv).inner.catch(err, |srv| { srv.stop().expect("tokio thread panicked"); @@ -204,9 +205,10 @@ impl Server { /// separate thread for the tokio reactor. The returned /// `AutopushServerInner` is a handle to the spawned thread and can be used /// to interact with it (e.g. shut it down). - fn start(opts: &Arc, tx: queue::Sender) - -> io::Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> - { + fn start( + opts: &Arc, + tx: queue::Sender, + ) -> io::Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> { let (donetx, donerx) = oneshot::channel(); let (inittx, initrx) = oneshot::channel(); @@ -234,8 +236,7 @@ impl Server { let push_listener = TcpListener::bind(&addr, &handle).unwrap(); let proto = Http::new(); let push_srv = push_listener.incoming().for_each(move |(socket, addr)| { - proto.bind_connection(&handle, socket, addr, - ::http::Push(srv.clone())); + proto.bind_connection(&handle, socket, addr, ::http::Push(srv.clone())); Ok(()) }); core.handle().spawn(push_srv.then(|res| { @@ -254,9 +255,7 @@ impl Server { } } - fn new(opts: &Arc, tx: queue::Sender) - -> io::Result<(Rc, Core)> - { + fn new(opts: &Arc, tx: queue::Sender) -> io::Result<(Rc, Core)> { let core = Core::new()?; let srv = Rc::new(Server { opts: opts.clone(), @@ -265,7 +264,7 @@ impl Server { handle: core.handle(), tx: tx, }); - let addr = format!("127.0.0.1:{}", srv.opts.port); + let addr = format!("{}:{}", srv.opts.host_ip, srv.opts.port); let ws_listener = TcpListener::bind(&addr.parse().unwrap(), &srv.handle)?; assert!(srv.opts.ssl_key.is_none(), "ssl not supported yet"); @@ -274,17 +273,20 @@ impl Server { let handle = core.handle(); let srv2 = srv.clone(); - let ws_srv = ws_listener.incoming() + let ws_srv = ws_listener + .incoming() .map_err(|e| Error::from(e)) - .for_each(move |(socket, addr)| { // Make sure we're not handling too many clients before we start the // websocket handshake. let max = srv.opts.max_connections.unwrap_or(u32::max_value()); if srv.open_connections.get() >= max { - info!("dropping {} as we already have too many open \ - connections", addr); - return Ok(()) + info!( + "dropping {} as we already have too many open \ + connections", + addr + ); + return Ok(()); } srv.open_connections.set(srv.open_connections.get() + 1); @@ -300,8 +302,7 @@ impl Server { // `Client` to start driving the internal state machine. let srv2 = srv.clone(); let client = ws.and_then(move |ws| { - PingManager::new(&srv2, ws) - .chain_err(|| "failed to make ping handler") + PingManager::new(&srv2, ws).chain_err(|| "failed to make ping handler") }).flatten(); let srv = srv.clone(); @@ -335,7 +336,12 @@ impl Server { /// namely its channel to send notifications back. pub fn connect_client(&self, client: RegisteredClient) { debug!("Connecting a client!"); - assert!(self.uaids.borrow_mut().insert(client.uaid, client).is_none()); + assert!( + self.uaids + .borrow_mut() + .insert(client.uaid, client) + .is_none() + ); } /// A notification has come for the uaid @@ -344,7 +350,9 @@ impl Server { if let Some(client) = uaids.get_mut(&uaid) { debug!("Found a client to deliver a notification to"); // TODO: Don't unwrap, handle error properly - (&client.tx).send(ServerNotification::Notification(notif)).unwrap(); + (&client.tx) + .unbounded_send(ServerNotification::Notification(notif)) + .unwrap(); info!("Dropped notification in queue"); return Ok(()); } @@ -386,9 +394,7 @@ enum CloseState { } impl PingManager { - fn new(srv: &Rc, socket: WebSocketStream) - -> io::Result - { + fn new(srv: &Rc, socket: WebSocketStream) -> io::Result { // The `socket` is itself a sink and a stream, and we've also got a sink // (`tx`) and a stream (`rx`) to send messages. Half of our job will be // doing all this proxying: reading messages from `socket` and sending @@ -441,7 +447,7 @@ impl Future for PingManager { if let CloseState::Exchange(ref mut client) = self.client { client.shutdown(); } - return Err("close handshake took too long".into()) + return Err("close handshake took too long".into()); } } TimeoutState::Ping(ref mut timeout) => { @@ -449,7 +455,7 @@ impl Future for PingManager { if let CloseState::Exchange(ref mut client) = self.client { client.shutdown(); } - return Err("pong not received within timeout".into()) + return Err("pong not received within timeout".into()); } } } @@ -514,7 +520,9 @@ impl WebpushSocket { } fn send_ping(&mut self) -> Poll<(), Error> - where T: Sink, Error: From + where + T: Sink, + Error: From, { if self.ping { match self.inner.start_send(Message::Ping(Vec::new()))? { @@ -527,8 +535,9 @@ impl WebpushSocket { } impl Stream for WebpushSocket - where T: Stream, - Error: From, +where + T: Stream, + Error: From, { type Item = ClientMessage; type Error = Error; @@ -538,12 +547,10 @@ impl Stream for WebpushSocket match try_ready!(self.inner.poll()) { Some(Message::Text(ref s)) => { let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; - return Ok(Some(msg).into()) + return Ok(Some(msg).into()); } - Some(Message::Binary(_)) => { - return Err("binary messages not accepted".into()) - } + Some(Message::Binary(_)) => return Err("binary messages not accepted".into()), // sending a pong is already managed by lower layers, just go to // the next message @@ -568,19 +575,20 @@ impl Stream for WebpushSocket } impl Sink for WebpushSocket - where T: Sink, - Error: From, +where + T: Sink, + Error: From, { type SinkItem = ServerMessage; type SinkError = Error; - fn start_send(&mut self, msg: ServerMessage) - -> StartSend - { + fn start_send(&mut self, msg: ServerMessage) -> StartSend { if self.send_ping()?.is_not_ready() { - return Ok(AsyncSink::NotReady(msg)) + return Ok(AsyncSink::NotReady(msg)); } - let s = serde_json::to_string(&msg).chain_err(|| "failed to serialize")?; + let s = serde_json::to_string(&msg).chain_err( + || "failed to serialize", + )?; match self.inner.start_send(Message::Text(s))? { AsyncSink::Ready => Ok(AsyncSink::Ready), AsyncSink::NotReady(_) => Ok(AsyncSink::NotReady(msg)), diff --git a/autopush_rs/src/util/mod.rs b/autopush_rs/src/util/mod.rs index 92bad25a..c1cd09e7 100644 --- a/autopush_rs/src/util/mod.rs +++ b/autopush_rs/src/util/mod.rs @@ -24,21 +24,20 @@ pub use self::rc::RcObject; /// timeout) and otherwise the returned future will cancel `f` and resolve to an /// error if the `dur` timeout elapses before `f` resolves. pub fn timeout(f: F, dur: Option, handle: &Handle) -> MyFuture - where F: Future + 'static, - F::Error: Into, +where + F: Future + 'static, + F::Error: Into, { let dur = match dur { Some(dur) => dur, None => return Box::new(f.map_err(|e| e.into())), }; let timeout = Timeout::new(dur, handle).into_future().flatten(); - Box::new(f.select2(timeout).then(|res| { - match res { - Ok(Either::A((item, _timeout))) => Ok(item), - Err(Either::A((e, _timeout))) => Err(e.into()), - Ok(Either::B(((), _item))) => Err("timed out".into()), - Err(Either::B((e, _item))) => Err(e.into()), - } + Box::new(f.select2(timeout).then(|res| match res { + Ok(Either::A((item, _timeout))) => Ok(item), + Err(Either::A((e, _timeout))) => Err(e.into()), + Ok(Either::B(((), _item))) => Err("timed out".into()), + Err(Either::B((e, _item))) => Err(e.into()), })) } @@ -50,11 +49,12 @@ pub fn init_logging(json: bool) { let mut builder = env_logger::LogBuilder::new(); if env::var("RUST_LOG").is_ok() { - builder.parse(&env::var("RUST_LOG").unwrap()); + builder.parse(&env::var("RUST_LOG").unwrap()); } - builder.target(env_logger::LogTarget::Stdout) - .format(maybe_json_record); + builder.target(env_logger::LogTarget::Stdout).format( + maybe_json_record, + ); if builder.init().is_ok() { LOG_JSON.store(json, Ordering::SeqCst); @@ -82,9 +82,11 @@ fn maybe_json_record(record: &LogRecord) -> String { line: record.location().line(), }).unwrap() } else { - format!("{}:{}: {}", - record.level(), - record.location().module_path(), - record.args()) + format!( + "{}:{}: {}", + record.level(), + record.location().module_path(), + record.args() + ) } } diff --git a/autopush_rs/src/util/rc.rs b/autopush_rs/src/util/rc.rs index 6855efa0..45ad2783 100644 --- a/autopush_rs/src/util/rc.rs +++ b/autopush_rs/src/util/rc.rs @@ -33,9 +33,7 @@ impl Sink for RcObject { type SinkItem = T::SinkItem; type SinkError = T::SinkError; - fn start_send(&mut self, msg: T::SinkItem) - -> StartSend - { + fn start_send(&mut self, msg: T::SinkItem) -> StartSend { self.0.borrow_mut().start_send(msg) } diff --git a/autopush_rs/src/util/send_all.rs b/autopush_rs/src/util/send_all.rs index 0f4e51c6..23aef8de 100644 --- a/autopush_rs/src/util/send_all.rs +++ b/autopush_rs/src/util/send_all.rs @@ -10,9 +10,10 @@ pub struct MySendAll { } impl MySendAll - where U: Sink, - T: Stream, - T::Error: From, +where + U: Sink, + T: Stream, + T::Error: From, { #[allow(unused)] pub fn new(t: T, u: U) -> MySendAll { @@ -24,20 +25,24 @@ impl MySendAll } fn sink_mut(&mut self) -> &mut U { - self.sink.as_mut().take() - .expect("Attempted to poll MySendAll after completion") + self.sink.as_mut().take().expect( + "Attempted to poll MySendAll after completion", + ) } fn stream_mut(&mut self) -> &mut Fuse { - self.stream.as_mut().take() - .expect("Attempted to poll MySendAll after completion") + self.stream.as_mut().take().expect( + "Attempted to poll MySendAll after completion", + ) } fn take_result(&mut self) -> (T, U) { - let sink = self.sink.take() - .expect("Attempted to poll MySendAll after completion"); - let fuse = self.stream.take() - .expect("Attempted to poll MySendAll after completion"); + let sink = self.sink.take().expect( + "Attempted to poll MySendAll after completion", + ); + let fuse = self.stream.take().expect( + "Attempted to poll MySendAll after completion", + ); (fuse.into_inner(), sink) } @@ -45,16 +50,17 @@ impl MySendAll debug_assert!(self.buffered.is_none()); if let AsyncSink::NotReady(item) = try!(self.sink_mut().start_send(item)) { self.buffered = Some(item); - return Ok(Async::NotReady) + return Ok(Async::NotReady); } Ok(Async::Ready(())) } } impl Future for MySendAll - where U: Sink, - T: Stream, - T::Error: From, +where + U: Sink, + T: Stream, + T::Error: From, { type Item = (T, U); type Error = T::Error; @@ -71,11 +77,11 @@ impl Future for MySendAll Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)), Async::Ready(None) => { try_ready!(self.sink_mut().poll_complete()); - return Ok(Async::Ready(self.take_result())) + return Ok(Async::Ready(self.take_result())); } Async::NotReady => { try_ready!(self.sink_mut().poll_complete()); - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } }