diff --git a/Cargo.toml b/Cargo.toml index 6a9836b..4459f68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ya-gcp" -version = "0.7.2" +version = "0.7.3" authors = ["Renar Narubin "] edition = "2018" description = "APIs for using Google Cloud Platform services" @@ -33,7 +33,7 @@ openssl = ["hyper-openssl"] # TODO maybe should be native-tls instead? # an internal feature used by services running grpc grpc = ["tonic", "prost", "prost-types", "tower", "derive_more"] -pubsub = ["grpc", "uuid", "async-stream", "pin-project", "async-channel"] +pubsub = ["grpc", "uuid", "async-stream", "pin-project", "async-channel", "tokio", "tokio/time"] storage = ["tame-gcs"] # whether to include service emulator implementations. useful for testing diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index fb0ba06..abc1a2d 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -13,7 +13,7 @@ use std::fmt::Display; pub use ::tonic::Status as Error; pub use client_builder::{BuildError, MakeConnection, PubSubConfig, Uri}; -pub use publish_sink::{PublishError, PublishTopicSink, SinkError}; +pub use publish_sink::{PublishConfig, PublishError, PublishTopicSink, SinkError}; pub use streaming_subscription::{ AcknowledgeError, AcknowledgeToken, ModifyAcknowledgeError, StreamSubscription, StreamSubscriptionConfig, @@ -57,7 +57,17 @@ where /// /// See the type's [documentation](PublishTopicSink) for more details. pub fn publish_topic_sink(&mut self, topic: ProjectTopicName) -> PublishTopicSink { - PublishTopicSink::new(self.inner.clone(), topic) + self.publish_topic_sink_config(topic, PublishConfig::default()) + } + + /// Create a sink with non-default config + // TODO(major semver) get rid of this method, always require config + pub fn publish_topic_sink_config( + &mut self, + topic: ProjectTopicName, + config: PublishConfig, + ) -> PublishTopicSink { + PublishTopicSink::new(self.inner.clone(), topic, config) } } diff --git a/src/pubsub/publish_sink.rs b/src/pubsub/publish_sink.rs index e971862..11bb0ae 100644 --- a/src/pubsub/publish_sink.rs +++ b/src/pubsub/publish_sink.rs @@ -3,7 +3,7 @@ use crate::{ auth::grpc::{AuthGrpcService, OAuthTokenSource}, retry_policy::{exponential_backoff, ExponentialBackoff, RetryOperation, RetryPolicy}, }; -use futures::{future::BoxFuture, ready, stream, Sink, SinkExt}; +use futures::{future::BoxFuture, ready, stream, Sink, SinkExt, TryFutureExt}; use pin_project::pin_project; use prost::Message; use std::{ @@ -14,6 +14,7 @@ use std::{ future::Future, pin::Pin, task::{Context, Poll}, + time::Duration, }; const MB: usize = 1000 * 1000; @@ -98,6 +99,23 @@ impl From> for tonic::Status { } } +config_default! { + /// Configuration for a [publish sink](super::PublisherClient::publish_topic_sink) + /// request + #[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Deserialize)] + #[non_exhaustive] + pub struct PublishConfig { + /// The amount of time to wait for a publish request to succeed before timing out + // TODO use 60sec default value from go lib, or 5 seconds plus backoff from java lib + // either of these should probably use time-based retries instead of count-based retry + // https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.20.0/pubsub/topic.go#L120 + // https://github.com/googleapis/java-pubsub/blob/3a8c83b973a1dfbae2ca037125574d74034218ce/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L660 + #[serde(with = "humantime_serde")] + @default(Duration::from_secs(5), "PublishConfig::default_timeout") + pub timeout: Duration, + } +} + /// A sink created by [`publish_topic_sink`] for publishing messages to a topic. /// /// Messages will be sent to the pubsub service in the same order that they are submitted to this sink @@ -154,6 +172,8 @@ pub struct PublishTopicSink< /// the retry policy to use when a publish attempt fails retry_policy: Retry, + config: PublishConfig, + // reserve the right to be !Unpin in the future without breaking changes _pin: std::marker::PhantomPinned, } @@ -176,7 +196,11 @@ enum FlushState> { impl PublishTopicSink, Drain> { /// Create a new `PublishTopicSink` with the default retry policy and no response sink - pub(super) fn new(client: ApiPublisherClient, topic: ProjectTopicName) -> Self + pub(super) fn new( + client: ApiPublisherClient, + topic: ProjectTopicName, + config: PublishConfig, + ) -> Self where C: crate::Connect + Clone + Send + Sync + 'static, { @@ -188,6 +212,7 @@ impl PublishTopicSink, Drain> { PubSubRetryCheck::default(), exponential_backoff::Config::default(), ), + config, _pin: std::marker::PhantomPinned, } } @@ -216,6 +241,7 @@ impl> PublishTopicSink> PublishTopicSink impl Future> { // until Sink gets syntax sugar like generators, internal futures can't borrow (safely) and // have to own their referents @@ -388,11 +421,24 @@ where // original to reuse for retries/errors/responses. On the bright side, message // payloads are Bytes which are essentially Arc'ed, so alloc clones mostly apply to // the attribute maps - break match client.publish(request.clone()).await { - Ok(response) => Ok(response.into_inner()), + let mut tonic_request = tonic::Request::new(request.clone()); + // set the grpc-timeout header to inform the server of our deadline + tonic_request.set_timeout(timeout); + + let publish = client.publish(tonic_request); + + // apply a timeout to the publish + let publish_fut = tokio::time::timeout(timeout, publish).map_err( + |tokio::time::error::Elapsed { .. }| { + tonic::Status::deadline_exceeded("publish attempt timed out") + }, + ); + + break match publish_fut.await { + Ok(Ok(response)) => Ok(response.into_inner()), // if the status code is retry worthy, poll the retry policy to backoff // before retrying, or stop retrying if the backoff is exhausted. - Err(status) => match retry.check_retry(&request, &status) { + Err(status) | Ok(Err(status)) => match retry.check_retry(&request, &status) { // if the retry is non-empty, await the backoff then loop back to retry Some(sleep) => { sleep.await; @@ -703,6 +749,7 @@ mod test { PublishTopicSink::new( dummy_client(), ProjectTopicName::new("dummy-project", "dummy-topic"), + PublishConfig::default(), ) } diff --git a/src/retry_policy/exponential_backoff.rs b/src/retry_policy/exponential_backoff.rs index d14c963..69da71d 100644 --- a/src/retry_policy/exponential_backoff.rs +++ b/src/retry_policy/exponential_backoff.rs @@ -95,7 +95,11 @@ where debug!( message = "retrying after error", ?error, - backoff_ms = %interval.as_millis() + backoff_ms = %interval.as_millis(), + remaining_attempts = match self.intervals.size_hint() { + (_lower_bound, Some(upper_bound)) => upper_bound, + (lower_bound, None) => lower_bound + }, ); return Some(self.sleeper.sleep(interval)); } else { @@ -213,6 +217,10 @@ impl Iterator for ExponentialIter { fn next(&mut self) -> Option { self.iter.next() } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } } /// An infinite iterator whose values grow by some growth factor with every iteration, until diff --git a/tests/pubsub_client.rs b/tests/pubsub_client.rs index b6cdb6d..b9b8af4 100644 --- a/tests/pubsub_client.rs +++ b/tests/pubsub_client.rs @@ -10,7 +10,7 @@ mod pubsub_client_tests { use ya_gcp::{ pubsub::{ self, api::PubsubMessage, emulator::EmulatorClient, ProjectSubscriptionName, - ProjectTopicName, PublisherClient, SinkError, StreamSubscriptionConfig, + ProjectTopicName, PublishConfig, PublisherClient, SinkError, StreamSubscriptionConfig, }, Connect, };