From 98d27776d4310f11d46a1f96d12709cef6f552f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 17 Apr 2018 13:03:57 +0200 Subject: [PATCH] Chain head subscription (#126) * Start WebSockets server. * Expose non-working subscription. * Dummy subscription for testing. * Proper implementation with event loop. * Finalized pubsub. * Bump clap. * Fix yml. * Disable WS logs. * Remove stale TransactionHash mention * Fix build from nightly API change. * Don't panic on invalid port. * Bind server to random port. * Send only best blocks. --- Cargo.lock | 137 ++++++++++++++++++++--------- build.sh | 5 ++ demo/cli/Cargo.toml | 7 +- demo/cli/src/lib.rs | 45 +++++++--- polkadot/api/src/lib.rs | 3 +- polkadot/cli/src/cli.yml | 7 +- polkadot/cli/src/lib.rs | 81 ++++++++++++----- substrate/client/src/client.rs | 2 +- substrate/rpc-servers/Cargo.toml | 7 +- substrate/rpc-servers/src/lib.rs | 43 +++++++-- substrate/rpc/Cargo.toml | 8 +- substrate/rpc/src/author/error.rs | 2 + substrate/rpc/src/author/tests.rs | 3 +- substrate/rpc/src/chain/mod.rs | 68 ++++++++++++-- substrate/rpc/src/chain/tests.rs | 8 +- substrate/rpc/src/lib.rs | 9 +- substrate/rpc/src/metadata.rs | 46 ++++++++++ substrate/rpc/src/subscriptions.rs | 86 ++++++++++++++++++ 18 files changed, 462 insertions(+), 105 deletions(-) create mode 100644 substrate/rpc/src/metadata.rs create mode 100644 substrate/rpc/src/subscriptions.rs diff --git a/Cargo.lock b/Cargo.lock index aca5f344ef6bc..6a67beb3dfd6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,6 +16,14 @@ name = "ansi_term" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "app_dirs" version = "1.2.1" @@ -155,10 +163,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "clap" -version = "2.29.4" +version = "2.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", + "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -206,13 +214,15 @@ dependencies = [ name = "demo-cli" version = "0.1.0" dependencies = [ - "clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)", + "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "demo-executor 0.1.0", "demo-primitives 0.1.0", "demo-runtime 0.1.0", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", @@ -223,6 +233,7 @@ dependencies = [ "substrate-rpc-servers 0.1.0", "substrate-runtime-io 0.1.0", "substrate-state-machine 0.1.0", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -650,12 +661,12 @@ dependencies = [ [[package]] name = "globset" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -724,7 +735,7 @@ dependencies = [ [[package]] name = "hyper" -version = "0.11.17" +version = "0.11.25" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -813,11 +824,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" -version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#d1e2cc48d962510328f7373509fb494a85dbeae8" +version = "8.0.2" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -825,50 +836,64 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" -version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#d1e2cc48d962510328f7373509fb494a85dbeae8" +version = "8.0.1" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" dependencies = [ - "hyper 0.11.17 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "jsonrpc-server-utils 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-server-utils 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-macros" -version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#d1e2cc48d962510328f7373509fb494a85dbeae8" +version = "8.0.1" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" dependencies = [ - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "jsonrpc-pubsub 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-pubsub" -version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#d1e2cc48d962510328f7373509fb494a85dbeae8" +version = "8.0.1" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" dependencies = [ - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-server-utils" -version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#d1e2cc48d962510328f7373509fb494a85dbeae8" +version = "8.0.1" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" dependencies = [ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "globset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "globset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "jsonrpc-ws-server" +version = "8.0.0" +source = "git+https://github.com/paritytech/jsonrpc.git#9cfa451b0534281d83f7564bf41fda465a6e18c8" +dependencies = [ + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-server-utils 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ws 0.7.5 (git+https://github.com/tomusdrw/ws-rs)", +] + [[package]] name = "keccak-hash" version = "0.1.0" @@ -1190,7 +1215,7 @@ name = "polkadot-cli" version = "0.1.0" dependencies = [ "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)", "ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1661,6 +1686,11 @@ dependencies = [ "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sha1" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "shell32-sys" version = "0.1.2" @@ -1872,8 +1902,9 @@ version = "0.1.0" dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "jsonrpc-macros 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-macros 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", @@ -1881,14 +1912,18 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-runtime-support 0.1.0", "substrate-state-machine 0.1.0", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "substrate-rpc-servers" version = "0.1.0" dependencies = [ - "jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", - "jsonrpc-http-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-http-server 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", + "jsonrpc-ws-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-rpc 0.1.0", ] @@ -2451,6 +2486,22 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ws" +version = "0.7.5" +source = "git+https://github.com/tomusdrw/ws-rs#f12d19c4c19422fc79af28a3181f598bc07ecd1e" +dependencies = [ + "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", + "sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" @@ -2489,6 +2540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" +"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d" "checksum arrayvec 0.3.25 (registry+https://github.com/rust-lang/crates.io-index)" = "06f59fe10306bb78facd90d28c2038ad23ffaaefa85bac43c8a434cde383334f" @@ -2507,7 +2559,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "1b7db437d718977f6dc9b2e3fd6fc343c02ac6b899b73fdd2179163447bd9ce9" "checksum cc 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "deaf9ec656256bb25b404c51ef50097207b9cbb29c933d31f92cae5a8a0ffee0" "checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" -"checksum clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b8f59bcebcfe4269b09f71dab0da15b355c75916a8f975d3876ce81561893ee" +"checksum clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0f16b89cbb9ee36d87483dc939fe9f1e13c05898d56d7b230a0d4dff033a536" "checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" @@ -2546,7 +2598,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum gcc 0.3.54 (registry+https://github.com/rust-lang/crates.io-index)" = "5e33ec290da0d127825013597dbdfc28bee4964690c7ce1166cbc2a7bd08b1bb" -"checksum globset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "464627f948c3190ae3d04b1bc6d7dca2f785bda0ac01278e6db129ad383dbeb6" +"checksum globset 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1e96ab92362c06811385ae9a34d2698e8a1160745e0c78fbb434a44c8de3fabc" "checksum hashdb 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d97be07c358c5b461268b4ce60304024c5fa5acfd4bd8cd743639f0252003cf5" "checksum heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1679e6ea370dee694f91f1dc469bf94cf8f52051d147aec3e1f9497c6fc22461" "checksum hex 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "459d3cf58137bb02ad4adeef5036377ff59f066dbb82517b7192e3a5462a2abc" @@ -2554,7 +2606,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum hex-literal-impl 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2ea76da4c7f1a54d01d54985566d3fdd960b2bbd7b970da024821c883c2d9631" "checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" "checksum hyper 0.10.13 (registry+https://github.com/rust-lang/crates.io-index)" = "368cb56b2740ebf4230520e2b90ebb0461e69034d85d1945febd9b3971426db2" -"checksum hyper 0.11.17 (registry+https://github.com/rust-lang/crates.io-index)" = "7f4de6edd503089841ebfa88341e1c00fb19b6bf93d820d908db15960fd31226" +"checksum hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)" = "549dbb86397490ce69d908425b9beebc85bbaad25157d67479d4995bb56fdf9a" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum igd 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "356a0dc23a4fa0f8ce4777258085d00a01ea4923b2efd93538fc44bf5e1bda76" "checksum integer-sqrt 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8833702c315192502093b244e29c6ab9c55454adfe21b879a87a039ea8fe8520" @@ -2563,11 +2615,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum isatty 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8f2a233726c7bb76995cec749d59582e5664823b7245d4970354408f1d79a7a2" "checksum itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4833d6978da405305126af4ac88569b5d71ff758581ce5a987dbfa3755f694fc" "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" -"checksum jsonrpc-core 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)" = "" -"checksum jsonrpc-http-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)" = "" -"checksum jsonrpc-macros 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)" = "" -"checksum jsonrpc-pubsub 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)" = "" -"checksum jsonrpc-server-utils 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-http-server 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-macros 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-server-utils 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)" = "" +"checksum jsonrpc-ws-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git)" = "" "checksum keccak-hash 0.1.0 (git+https://github.com/paritytech/parity.git)" = "" "checksum keccak-hash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1f300c1f149cd9ca5214eed24f6e713a597517420fb8b15499824aa916259ec1" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" @@ -2635,6 +2688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "f4ba7591cfe93755e89eeecdbcc668885624829b020050e6aec99c2a03bd3fd0" "checksum serde_derive_internals 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6e03f1c9530c3fb0a0a5c9b826bdd9246a5921ae995d75f512ac917fc4dd55b5" "checksum serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c9db7266c7d63a4c4b7fe8719656ccdd51acf1bed6124b174f933b009fb10bcb" +"checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" "checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c" "checksum slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" @@ -2688,6 +2742,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum ws 0.7.5 (git+https://github.com/tomusdrw/ws-rs)" = "" "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" "checksum xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a66b7c2281ebde13cf4391d70d4c7e5946c3c25e72a7b859ca8f677dcd0b0c61" "checksum xml-rs 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7ec6c39eaa68382c8e31e35239402c0a9489d4141a8ceb0c716099a0b515b562" diff --git a/build.sh b/build.sh index 43d8deb8c1549..4c3c7a4b79f43 100755 --- a/build.sh +++ b/build.sh @@ -1,5 +1,10 @@ #!/bin/sh +# NOTE `cargo install wasm-gc` before running this script. + +set -e +export CARGO_INCREMENTAL=0 + cd demo/runtime/wasm && ./build.sh && cd ../../.. cd substrate/executor/wasm && ./build.sh && cd ../../.. cd substrate/test-runtime/wasm && ./build.sh && cd ../../.. diff --git a/demo/cli/Cargo.toml b/demo/cli/Cargo.toml index 7e024afb19c49..645429cb31679 100644 --- a/demo/cli/Cargo.toml +++ b/demo/cli/Cargo.toml @@ -6,12 +6,15 @@ description = "Substrate Demo node implementation in Rust." [dependencies] clap = { version = "2.27", features = ["yaml"] } +ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } +ed25519 = { path = "../../substrate/ed25519" } env_logger = "0.4" +futures = "0.1.17" error-chain = "0.11" -log = "0.3" hex-literal = "0.1" +log = "0.3" +tokio-core = "0.1.12" triehash = "0.1" -ed25519 = { path = "../../substrate/ed25519" } substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec" } substrate-runtime-io = { path = "../../substrate/runtime-io" } diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 5b4c976c8c328..0330192b5816a 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -18,18 +18,21 @@ #![warn(missing_docs)] -extern crate env_logger; +extern crate ctrlc; extern crate ed25519; +extern crate env_logger; +extern crate futures; +extern crate tokio_core; extern crate triehash; -extern crate substrate_codec as codec; -extern crate substrate_runtime_io as runtime_io; -extern crate substrate_state_machine as state_machine; extern crate substrate_client as client; +extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; extern crate substrate_rpc; extern crate substrate_rpc_servers as rpc; -extern crate demo_primitives; +extern crate substrate_runtime_io as runtime_io; +extern crate substrate_state_machine as state_machine; extern crate demo_executor; +extern crate demo_primitives; extern crate demo_runtime; #[macro_use] @@ -44,11 +47,12 @@ extern crate log; pub mod error; use std::sync::Arc; +use client::genesis; use codec::Slicable; -use runtime_io::with_externalities; use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, SessionConfig, StakingConfig, BuildExternalities}; -use client::genesis; +use futures::{Future, Sink, Stream}; + struct DummyPool; impl substrate_rpc::author::AuthorApi for DummyPool { @@ -128,15 +132,30 @@ pub fn run(args: I) -> error::Result<()> where (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) }; let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); - - let address = "127.0.0.1:9933".parse().unwrap(); - let handler = rpc::rpc_handler(client.clone(), DummyPool, client); - let server = rpc::start_http(&address, handler)?; + let mut core = ::tokio_core::reactor::Core::new().expect("Unable to spawn event loop."); + + let _rpc_servers = { + let handler = || { + let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote()); + rpc::rpc_handler(client.clone(), chain, DummyPool) + }; + let http_address = "127.0.0.1:9933".parse().unwrap(); + let ws_address = "127.0.0.1:9944".parse().unwrap(); + + ( + rpc::start_http(&http_address, handler())?, + rpc::start_ws(&ws_address, handler())? + ) + }; if let Some(_) = matches.subcommand_matches("validator") { info!("Starting validator."); - server.wait(); - return Ok(()); + let (exit_send, exit) = futures::sync::mpsc::channel(1); + ctrlc::CtrlC::set_handler(move || { + exit_send.clone().send(()).wait().expect("Error sending exit notification"); + }); + core.run(exit.into_future()).expect("Error running informant event loop"); + return Ok(()) } println!("No command given.\n"); diff --git a/polkadot/api/src/lib.rs b/polkadot/api/src/lib.rs index e9f422503de72..e178f1d547c24 100644 --- a/polkadot/api/src/lib.rs +++ b/polkadot/api/src/lib.rs @@ -355,7 +355,6 @@ impl BlockBuilder for ClientBlockBuilder #[cfg(test)] mod tests { use super::*; - use runtime_io::with_externalities; use keyring::Keyring; use codec::Slicable; use client::in_mem::Backend as InMemory; @@ -388,7 +387,7 @@ mod tests { ::client::new_in_mem( LocalDispatch::new(), || { - let mut storage = genesis_config.build_externalities(); + let storage = genesis_config.build_externalities(); let block = ::client::genesis::construct_genesis_block(&storage); (substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) } diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 86ca40314a7a5..07d3c0ec66d13 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -40,7 +40,12 @@ args: - rpc-port: long: rpc-port value_name: PORT - help: Specify RPC server TCP port + help: Specify HTTP RPC server TCP port + takes_value: true + - ws-port: + long: ws-port + value_name: PORT + help: Specify WebSockets RPC server TCP port takes_value: true - bootnodes: long: bootnodes diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 95a432b25261f..7f82a9bc3cf83 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -47,8 +47,9 @@ extern crate log; pub mod error; mod informant; -use std::path::{Path, PathBuf}; +use std::io; use std::net::SocketAddr; +use std::path::{Path, PathBuf}; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; @@ -117,43 +118,76 @@ pub fn run(args: I) -> error::Result<()> where }); config.roles = role; - config.network.boot_nodes = matches - .values_of("bootnodes") - .map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect()); - config.network.config_path = Some(network_path(&base_path).to_string_lossy().into()); - config.network.net_config_path = config.network.config_path.clone(); - - let port = match matches.value_of("port") { - Some(port) => port.parse().expect("Invalid p2p port value specified."), - None => 30333, - }; - config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); - config.network.public_address = None; - config.network.client_version = format!("parity-polkadot/{}", crate_version!()); + { + config.network.boot_nodes = matches + .values_of("bootnodes") + .map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect()); + config.network.config_path = Some(network_path(&base_path).to_string_lossy().into()); + config.network.net_config_path = config.network.config_path.clone(); + + let port = match matches.value_of("port") { + Some(port) => port.parse().expect("Invalid p2p port value specified."), + None => 30333, + }; + config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); + config.network.public_address = None; + config.network.client_version = format!("parity-polkadot/{}", crate_version!()); + } config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); let service = service::Service::new(config)?; - let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap(); - if let Some(port) = matches.value_of("rpc-port") { - let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified."); - address.set_port(rpc_port); - } - - let handler = rpc::rpc_handler(service.client(), service.transaction_pool(), service.client()); - let _server = rpc::start_http(&address, handler)?; - informant::start(&service, core.handle()); let (exit_send, exit) = mpsc::channel(1); ctrlc::CtrlC::set_handler(move || { exit_send.clone().send(()).wait().expect("Error sending exit notification"); }); + + let _rpc_servers = { + let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?; + let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?; + + let handler = || { + let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); + rpc::rpc_handler(service.client(), chain, service.transaction_pool()) + }; + ( + start_server(http_address, |address| rpc::start_http(address, handler())), + start_server(ws_address, |address| rpc::start_ws(address, handler())), + ) + }; + core.run(exit.into_future()).expect("Error running informant event loop"); Ok(()) } +fn start_server(mut address: SocketAddr, start: F) -> Result where + F: Fn(&SocketAddr) -> Result, +{ + start(&address) + .or_else(|e| match e.kind() { + io::ErrorKind::AddrInUse | + io::ErrorKind::PermissionDenied => { + warn!("Unable to bind server to {}. Trying random port.", address); + address.set_port(0); + start(&address) + }, + _ => Err(e), + }) +} + +fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> Result { + let mut address: SocketAddr = default.parse().unwrap(); + if let Some(port) = matches.value_of(port_param) { + let port: u16 = port.parse().ok().ok_or(format!("Invalid port for --{} specified.", port_param))?; + address.set_port(port); + } + + Ok(address) +} + fn keystore_path(base_path: &Path) -> PathBuf { let mut path = base_path.to_owned(); path.push("keystore"); @@ -183,6 +217,7 @@ fn default_base_path() -> PathBuf { fn init_logger(pattern: &str) { let mut builder = env_logger::LogBuilder::new(); // Disable info logging by default for some modules: + builder.filter(Some("ws"), log::LogLevelFilter::Warn); builder.filter(Some("hyper"), log::LogLevelFilter::Warn); // Enable info for others. builder.filter(None, log::LogLevelFilter::Info); diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 668213acf5cc0..0eba8d9fa56bd 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -33,7 +33,7 @@ use {error, in_mem, block_builder, runtime_io, bft}; pub type BlockchainEventStream = mpsc::UnboundedReceiver; /// Polkadot Client -pub struct Client where B: backend::Backend { +pub struct Client { backend: B, executor: E, import_notification_sinks: Mutex>>, diff --git a/substrate/rpc-servers/Cargo.toml b/substrate/rpc-servers/Cargo.toml index e242664da3db7..54ab77601116a 100644 --- a/substrate/rpc-servers/Cargo.toml +++ b/substrate/rpc-servers/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git" } +jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git" } +jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc.git" } +log = "0.3" substrate-rpc = { path = "../rpc", version = "0.1" } -jsonrpc-core = { git="https://github.com/paritytech/jsonrpc.git" } -jsonrpc-http-server = { git="https://github.com/paritytech/jsonrpc.git" } diff --git a/substrate/rpc-servers/src/lib.rs b/substrate/rpc-servers/src/lib.rs index 85b46fc0d5cd9..0d0157553ec44 100644 --- a/substrate/rpc-servers/src/lib.rs +++ b/substrate/rpc-servers/src/lib.rs @@ -18,30 +18,42 @@ #[warn(missing_docs)] -extern crate substrate_rpc as apis; +pub extern crate substrate_rpc as apis; extern crate jsonrpc_core as rpc; extern crate jsonrpc_http_server as http; +extern crate jsonrpc_pubsub as pubsub; +extern crate jsonrpc_ws_server as ws; + +#[macro_use] +extern crate log; use std::io; +type Metadata = apis::metadata::Metadata; +type RpcHandler = pubsub::PubSubHandler; + /// Construct rpc `IoHandler` -pub fn rpc_handler(state: S, transaction_pool: T, chain: C) -> rpc::IoHandler where +pub fn rpc_handler( + state: S, + chain: C, + author: A, +) -> RpcHandler where S: apis::state::StateApi, - T: apis::author::AuthorApi, - C: apis::chain::ChainApi, + C: apis::chain::ChainApi, + A: apis::author::AuthorApi, { - let mut io = rpc::IoHandler::new(); + let mut io = pubsub::PubSubHandler::default(); io.extend_with(state.to_delegate()); - io.extend_with(transaction_pool.to_delegate()); io.extend_with(chain.to_delegate()); + io.extend_with(author.to_delegate()); io } /// Start HTTP server listening on given address. pub fn start_http( addr: &std::net::SocketAddr, - io: rpc::IoHandler, + io: RpcHandler, ) -> io::Result { http::ServerBuilder::new(io) .threads(4) @@ -49,3 +61,20 @@ pub fn start_http( .cors(http::DomainsValidation::Disabled) .start_http(addr) } + +/// Start WS server listening on given address. +pub fn start_ws( + addr: &std::net::SocketAddr, + io: RpcHandler, +) -> io::Result { + ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender())) + .start(addr) + .map_err(|err| match err { + ws::Error(ws::ErrorKind::Io(io), _) => io, + ws::Error(ws::ErrorKind::ConnectionClosed, _) => io::ErrorKind::BrokenPipe.into(), + ws::Error(e, _) => { + error!("{}", e); + io::ErrorKind::Other.into() + } + }) +} diff --git a/substrate/rpc/Cargo.toml b/substrate/rpc/Cargo.toml index c4e3b1298af69..7aaea95700bb5 100644 --- a/substrate/rpc/Cargo.toml +++ b/substrate/rpc/Cargo.toml @@ -4,15 +4,17 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -parking_lot = "0.4" -log = "0.3" error-chain = "0.11" jsonrpc-core = { git="https://github.com/paritytech/jsonrpc.git" } jsonrpc-macros = { git="https://github.com/paritytech/jsonrpc.git" } +jsonrpc-pubsub = { git="https://github.com/paritytech/jsonrpc.git" } +log = "0.3" +parking_lot = "0.4" substrate-client = { path = "../client" } +substrate-executor = { path = "../executor" } substrate-primitives = { path = "../primitives" } substrate-state-machine = { path = "../state-machine" } -substrate-executor = { path = "../executor" } +tokio-core = "0.1.12" [dev-dependencies] assert_matches = "1.1" diff --git a/substrate/rpc/src/author/error.rs b/substrate/rpc/src/author/error.rs index 05c2da37ac5d8..ba37779383f2d 100644 --- a/substrate/rpc/src/author/error.rs +++ b/substrate/rpc/src/author/error.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +//! Authoring RPC module errors. + use client; use rpc; diff --git a/substrate/rpc/src/author/tests.rs b/substrate/rpc/src/author/tests.rs index fd9b9062e6464..5306664fe11e4 100644 --- a/substrate/rpc/src/author/tests.rs +++ b/substrate/rpc/src/author/tests.rs @@ -15,7 +15,6 @@ // along with Substrate. If not, see . use primitives::block; -use substrate_executor as executor; use super::*; use super::error::*; @@ -38,7 +37,7 @@ impl AsyncAuthorApi for DummyTxPool { #[test] fn submit_transaction_should_not_cause_error() { - let mut p = Arc::new(Mutex::new(DummyTxPool::default())); + let p = Arc::new(Mutex::new(DummyTxPool::default())); assert_matches!( AuthorApi::submit_extrinsic(&p, block::Extrinsic(vec![])), diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index e0c8c31f76368..c5c179837c3ed 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -17,12 +17,20 @@ //! Substrate blockchain API. use std::sync::Arc; + use primitives::block; -use client::{self, Client}; +use client::{self, Client, BlockchainEvents}; use state_machine; -mod error; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; +use rpc::Result as RpcResult; +use rpc::futures::{Future, Sink, Stream}; +use tokio_core::reactor::Remote; + +use subscriptions::Subscriptions; +mod error; #[cfg(test)] mod tests; @@ -31,6 +39,8 @@ use self::error::{Result, ResultExt}; build_rpc_trait! { /// Polkadot blockchain API pub trait ChainApi { + type Metadata; + /// Get header of a relay chain block. #[rpc(name = "chain_getHeader")] fn header(&self, block::HeaderHash) -> Result>; @@ -38,19 +48,67 @@ build_rpc_trait! { /// Get hash of the head. #[rpc(name = "chain_getHead")] fn head(&self) -> Result; + + #[pubsub(name = "chain_newHead")] { + /// Hello subscription + #[rpc(name = "subscribe_newHead")] + fn subscribe_new_head(&self, Self::Metadata, pubsub::Subscriber); + + /// Unsubscribe from hello subscription. + #[rpc(name = "unsubscribe_newHead")] + fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; + } } } -impl ChainApi for Arc> where +/// Chain API with subscriptions support. +pub struct Chain { + /// Substrate client. + client: Arc>, + /// Current subscriptions. + subscriptions: Subscriptions, +} + +impl Chain { + /// Create new Chain API RPC handler. + pub fn new(client: Arc>, remote: Remote) -> Self { + Chain { + client, + subscriptions: Subscriptions::new(remote), + } + } +} + +impl ChainApi for Chain where B: client::backend::Backend + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static, client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { + type Metadata = ::metadata::Metadata; + fn header(&self, hash: block::HeaderHash) -> Result> { - client::Client::header(self, &block::Id::Hash(hash)).chain_err(|| "Blockchain error") + self.client.header(&block::Id::Hash(hash)).chain_err(|| "Blockchain error") } fn head(&self) -> Result { - Ok(client::Client::info(self).chain_err(|| "Blockchain error")?.chain.best_hash) + Ok(self.client.info().chain_err(|| "Blockchain error")?.chain.best_hash) + } + + fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber) { + self.subscriptions.add(subscriber, |sink| { + let stream = self.client.import_notification_stream() + .filter(|notification| notification.is_new_best) + .map(|notification| Ok(notification.header)) + .map_err(|e| warn!("Block notification stream error: {:?}", e)); + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); + } + + fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/rpc/src/chain/tests.rs b/substrate/rpc/src/chain/tests.rs index ac120933ddab1..b44f107fac1cd 100644 --- a/substrate/rpc/src/chain/tests.rs +++ b/substrate/rpc/src/chain/tests.rs @@ -29,7 +29,13 @@ fn should_return_header() { digest: Default::default(), }; - let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); + let core = ::tokio_core::reactor::Core::new().unwrap(); + let remote = core.remote(); + + let client = Chain { + client: Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()), + subscriptions: Subscriptions::new(remote), + }; assert_matches!( ChainApi::header(&client, test_genesis_block.blake2_256().into()), diff --git a/substrate/rpc/src/lib.rs b/substrate/rpc/src/lib.rs index 99621988fae4d..ddc8accf94b8f 100644 --- a/substrate/rpc/src/lib.rs +++ b/substrate/rpc/src/lib.rs @@ -18,11 +18,13 @@ #![warn(missing_docs)] -extern crate parking_lot; extern crate jsonrpc_core as rpc; +extern crate jsonrpc_pubsub; +extern crate parking_lot; extern crate substrate_client as client; extern crate substrate_primitives as primitives; extern crate substrate_state_machine as state_machine; +extern crate tokio_core; #[macro_use] extern crate error_chain; @@ -39,6 +41,9 @@ extern crate assert_matches; #[cfg(test)] extern crate substrate_runtime_support as runtime_support; +mod subscriptions; + +pub mod author; pub mod chain; +pub mod metadata; pub mod state; -pub mod author; diff --git a/substrate/rpc/src/metadata.rs b/substrate/rpc/src/metadata.rs new file mode 100644 index 0000000000000..c40a6ad0542c1 --- /dev/null +++ b/substrate/rpc/src/metadata.rs @@ -0,0 +1,46 @@ +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! RPC Metadata +use std::sync::Arc; + +use jsonrpc_pubsub::{Session, PubSubMetadata}; + +/// RPC Metadata. +/// +/// Manages peristent session for transports that support it +/// and may contain some additional info extracted from specific transports +/// (like remote client IP address, request headers, etc) +#[derive(Default, Clone)] +pub struct Metadata { + session: Option>, +} + +impl ::rpc::Metadata for Metadata {} +impl PubSubMetadata for Metadata { + fn session(&self) -> Option> { + self.session.clone() + } +} + +impl Metadata { + /// Create new `Metadata` with session (Pub/Sub) support. + pub fn new(transport: ::rpc::futures::sync::mpsc::Sender) -> Self { + Metadata { + session: Some(Arc::new(Session::new(transport))), + } + } +} diff --git a/substrate/rpc/src/subscriptions.rs b/substrate/rpc/src/subscriptions.rs new file mode 100644 index 0000000000000..60536e5a6d801 --- /dev/null +++ b/substrate/rpc/src/subscriptions.rs @@ -0,0 +1,86 @@ +// Copyright 2017-2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::collections::HashMap; +use std::sync::atomic::{self, AtomicUsize}; + +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; +use parking_lot::Mutex; +use rpc::futures::sync::oneshot; +use rpc::futures::{Future, future}; +use tokio_core::reactor::Remote; + +type Id = u64; + +/// Subscriptions manager. +/// +/// Takes care of assigning unique subscription ids and +/// driving the sinks into completion. +#[derive(Debug)] +pub struct Subscriptions { + next_id: AtomicUsize, + active_subscriptions: Mutex>>, + event_loop: Remote, +} + +impl Subscriptions { + /// Creates new `Subscriptions` object. + pub fn new(event_loop: Remote) -> Self { + Subscriptions { + next_id: Default::default(), + active_subscriptions: Default::default(), + event_loop, + } + } + + /// Creates new subscription for given subscriber. + /// + /// Second parameter is a function that converts Subscriber sink into a future. + /// This future will be driven to completion bu underlying event loop + /// or will be cancelled in case #cancel is invoked. + pub fn add(&self, subscriber: pubsub::Subscriber, into_future: G) where + G: FnOnce(pubsub::Sink) -> R, + R: future::IntoFuture, + F: future::Future + Send + 'static, + { + let id = self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64; + if let Ok(sink) = subscriber.assign_id(id.into()) { + let (tx, rx) = oneshot::channel(); + let future = into_future(sink) + .into_future() + .select(rx.map_err(|e| warn!("Error timeing out: {:?}", e))) + .map(|_| ()) + .map_err(|_| ()); + + self.active_subscriptions.lock().insert(id, tx); + self.event_loop.spawn(|_| future); + } + } + + /// Cancel subscription. + /// + /// Returns true if subscription existed or false otherwise. + pub fn cancel(&self, id: SubscriptionId) -> bool { + if let SubscriptionId::Number(id) = id { + if let Some(tx) = self.active_subscriptions.lock().remove(&id) { + let _ = tx.send(()); + return true; + } + } + false + } +}