diff --git a/Cargo.lock b/Cargo.lock index c057425a..c580f205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,8 +172,8 @@ dependencies = [ "async-lock 3.2.0", "async-task", "concurrent-queue", - "fastrand 2.1.1", - "futures-lite 2.2.0", + "fastrand", + "futures-lite", "slab", ] @@ -185,10 +185,10 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.1.1", "async-executor", - "async-io 2.2.2", + "async-io", "async-lock 3.2.0", "blocking", - "futures-lite 2.2.0", + "futures-lite", "once_cell", ] @@ -286,26 +286,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock 2.8.0", - "autocfg 1.1.0", - "cfg-if", - "concurrent-queue", - "futures-lite 1.13.0", - "log", - "parking", - "polling 2.8.0", - "rustix 0.37.27", - "slab", - "socket2 0.4.7", - "waker-fn", -] - [[package]] name = "async-io" version = "2.2.2" @@ -316,10 +296,10 @@ dependencies = [ "cfg-if", "concurrent-queue", "futures-io", - "futures-lite 2.2.0", + "futures-lite", "parking", - "polling 3.3.1", - "rustix 0.38.37", + "polling", + "rustix", "slab", "tracing", "windows-sys 0.52.0", @@ -356,19 +336,21 @@ dependencies = [ [[package]] name = "async-process" -version = "1.8.1" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" dependencies = [ - "async-io 1.13.0", - "async-lock 2.8.0", + "async-channel 2.1.1", + "async-io", + "async-lock 3.2.0", "async-signal", + "async-task", "blocking", "cfg-if", - "event-listener 3.1.0", - "futures-lite 1.13.0", - "rustix 0.38.37", - "windows-sys 0.48.0", + "event-listener 5.3.1", + "futures-lite", + "rustix", + "tracing", ] [[package]] @@ -377,13 +359,13 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" dependencies = [ - "async-io 2.2.2", + "async-io", "async-lock 2.8.0", "atomic-waker", "cfg-if", "futures-core", "futures-io", - "rustix 0.38.37", + "rustix", "signal-hook-registry", "slab", "windows-sys 0.48.0", @@ -391,21 +373,21 @@ dependencies = [ [[package]] name = "async-std" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" dependencies = [ "async-attributes", "async-channel 1.9.0", "async-global-executor", - "async-io 1.13.0", - "async-lock 2.8.0", + "async-io", + "async-lock 3.2.0", "async-process", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-lite 1.13.0", + "futures-lite", "gloo-timers", "kv-log-macro", "log", @@ -456,6 +438,20 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e661b6cb0a6eb34d02c520b052daa3aa9ac0cc02495c9d066bbce13ead132b" +dependencies = [ + "async-std", + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tungstenite 0.24.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -516,7 +512,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand 2.1.1", + "fastrand", "hex", "http 0.2.9", "hyper 0.14.28", @@ -569,7 +565,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "fastrand 2.1.1", + "fastrand", "http 0.2.9", "percent-encoding", "tracing", @@ -729,7 +725,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand 2.1.1", + "fastrand", "h2", "http 0.2.9", "http-body 0.4.5", @@ -1090,9 +1086,9 @@ dependencies = [ "async-channel 2.1.1", "async-lock 3.2.0", "async-task", - "fastrand 2.1.1", + "fastrand", "futures-io", - "futures-lite 2.2.0", + "futures-lite", "piper", "tracing", ] @@ -1601,9 +1597,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "3.1.0" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" dependencies = [ "concurrent-queue", "parking", @@ -1612,9 +1608,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "4.0.3" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" dependencies = [ "concurrent-queue", "parking", @@ -1658,15 +1654,6 @@ dependencies = [ "ascii_utils", ] -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.1.1" @@ -1810,28 +1797,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-lite" version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" dependencies = [ - "fastrand 2.1.1", + "fastrand", "futures-core", "futures-io", "parking", @@ -1925,9 +1897,9 @@ checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" [[package]] name = "gloo-timers" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" dependencies = [ "futures-channel", "futures-core", @@ -2531,26 +2503,6 @@ dependencies = [ "serde", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "ipnet" version = "2.9.0" @@ -2564,7 +2516,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi", - "rustix 0.38.37", + "rustix", "windows-sys 0.52.0", ] @@ -2741,12 +2693,6 @@ dependencies = [ "cc", ] -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3260,7 +3206,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" dependencies = [ "atomic-waker", - "fastrand 2.1.1", + "fastrand", "futures-io", ] @@ -3270,22 +3216,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg 1.1.0", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "polling" version = "3.3.1" @@ -3295,7 +3225,7 @@ dependencies = [ "cfg-if", "concurrent-queue", "pin-project-lite", - "rustix 0.38.37", + "rustix", "tracing", "windows-sys 0.52.0", ] @@ -3942,20 +3872,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustix" -version = "0.37.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.37" @@ -3965,7 +3881,7 @@ dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys 0.4.14", + "linux-raw-sys", "windows-sys 0.52.0", ] @@ -4555,9 +4471,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" dependencies = [ "cfg-if", - "fastrand 2.1.1", + "fastrand", "once_cell", - "rustix 0.38.37", + "rustix", "windows-sys 0.59.0", ] @@ -5218,12 +5134,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" -[[package]] -name = "waker-fn" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" - [[package]] name = "want" version = "0.3.0" @@ -5596,7 +5506,8 @@ checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" name = "wsclient" version = "0.1.0" dependencies = [ - "tungstenite 0.24.0", + "async-std", + "async-tungstenite", ] [[package]] diff --git a/crates/types/src/time.rs b/crates/types/src/time.rs index e9ef8ed9..6c4d6daa 100644 --- a/crates/types/src/time.rs +++ b/crates/types/src/time.rs @@ -38,7 +38,7 @@ fn base() -> NaiveDateTime { .unwrap() } -impl<'a> FromSql<'a> for TZTime { +impl FromSql<'_> for TZTime { #[allow(unused_variables)] fn from_sql(type_: &Type, raw: &[u8]) -> Result> { let t = types::timestamp_from_sql(raw)?; diff --git a/crates/wsclient/Cargo.toml b/crates/wsclient/Cargo.toml index f0810640..b0137e9e 100644 --- a/crates/wsclient/Cargo.toml +++ b/crates/wsclient/Cargo.toml @@ -5,4 +5,5 @@ edition = "2021" rust-version.workspace = true [dependencies] -tungstenite = "0.24.0" +async-tungstenite = { version = "0.28.0", features = ["async-std-runtime"] } +async-std = "1.13.0" diff --git a/crates/wsclient/src/lib.rs b/crates/wsclient/src/lib.rs index e23c2d34..48287007 100644 --- a/crates/wsclient/src/lib.rs +++ b/crates/wsclient/src/lib.rs @@ -1,5 +1,5 @@ -use std::net::TcpStream; -use tungstenite::{connect, stream::MaybeTlsStream, WebSocket}; +use async_std::net::TcpStream; +use async_tungstenite::{async_std::connect_async, tungstenite, WebSocketStream}; pub struct WsClient { uri: WsUri, @@ -18,10 +18,8 @@ impl WsClient { WsClient { uri } } - pub fn connect( - &self, - ) -> Result>, tungstenite::error::Error> { - let (ws_stream, _) = connect(self.uri.query_string())?; + pub async fn connect(&self) -> Result, tungstenite::error::Error> { + let (ws_stream, _) = connect_async(self.uri.query_string()).await?; Ok(ws_stream) } } diff --git a/services/graphql/src/main.rs b/services/graphql/src/main.rs index aa55e648..916b76a7 100644 --- a/services/graphql/src/main.rs +++ b/services/graphql/src/main.rs @@ -16,7 +16,7 @@ use axum::{ routing::get, Router, }; -use futures_util::stream::Stream; +use futures_util::{stream::Stream, StreamExt}; use httpclient::HttpClient as Client; use serde_json::json; use shutdown::shutdown_signal; @@ -205,7 +205,7 @@ impl Subscription { .to_string(); let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality); stream! { - let mut measure_socket = match ws_client.connect() { + let measure_socket = match ws_client.connect().await { Ok(ws) => { tracing::info!("graphql websocket connection created with measure"); ws @@ -215,9 +215,17 @@ impl Subscription { return; } }; + + // the async-graphql crate limits the scope of the websocket lifecycle to where its upgraded: + // https://github.com/async-graphql/async-graphql/issues/1022#issuecomment-1214541591 + // this means it cant send a close message to the measure service from inside the subscription + // todo: add on_close(closed) callback support to graphql subscription context so + // a close message can be written/sent to the measure service + let (_write, mut read) = measure_socket.split(); + loop { - match measure_socket.read() { - Ok(msg) => { + match read.next().await { + Some(Ok(msg)) => { match msg { tungstenite::Message::Text(text) => { let gdp: f64 = serde_json::from_str(&text).unwrap(); @@ -229,18 +237,22 @@ impl Subscription { } } } - Err(WsError::ConnectionClosed) => { - measure_socket.close(None).unwrap(); - tracing::info!("graphql received closed message from measure"); + Some(Err(e)) => { + match e { + WsError::ConnectionClosed => { + tracing::info!("graphql received closed message from measure"); + } + _ => { + tracing::info!("graphql message receipt failure from measure: {:?}", e); + } + } break; } - Err(e) => { - tracing::info!("graphql message receipt failure from measure: {:?}", e); + None => { + tracing::info!("graphql received closed message from measure"); break; } } - // throttle reads from measure service to avoid blocking close messages from client - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } } }