Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PubSub publish timeout #8

Merged
merged 2 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ya-gcp"
version = "0.7.2"
version = "0.7.3"
authors = ["Renar Narubin <renar@standard.ai>"]
edition = "2018"
description = "APIs for using Google Cloud Platform services"
Expand Down Expand Up @@ -33,7 +33,7 @@ openssl = ["hyper-openssl"] # TODO maybe should be native-tls instead?
# an internal feature used by services running grpc
grpc = ["tonic", "prost", "prost-types", "tower", "derive_more"]

pubsub = ["grpc", "uuid", "async-stream", "pin-project", "async-channel"]
pubsub = ["grpc", "uuid", "async-stream", "pin-project", "async-channel", "tokio", "tokio/time"]
storage = ["tame-gcs"]

# whether to include service emulator implementations. useful for testing
Expand Down
14 changes: 12 additions & 2 deletions src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fmt::Display;
pub use ::tonic::Status as Error;

pub use client_builder::{BuildError, MakeConnection, PubSubConfig, Uri};
pub use publish_sink::{PublishError, PublishTopicSink, SinkError};
pub use publish_sink::{PublishConfig, PublishError, PublishTopicSink, SinkError};
pub use streaming_subscription::{
AcknowledgeError, AcknowledgeToken, ModifyAcknowledgeError, StreamSubscription,
StreamSubscriptionConfig,
Expand Down Expand Up @@ -57,7 +57,17 @@ where
///
/// See the type's [documentation](PublishTopicSink) for more details.
pub fn publish_topic_sink(&mut self, topic: ProjectTopicName) -> PublishTopicSink<C> {
PublishTopicSink::new(self.inner.clone(), topic)
self.publish_topic_sink_config(topic, PublishConfig::default())
}

/// Create a sink with non-default config
// TODO(major semver) get rid of this method, always require config
pub fn publish_topic_sink_config(
&mut self,
topic: ProjectTopicName,
config: PublishConfig,
) -> PublishTopicSink<C> {
PublishTopicSink::new(self.inner.clone(), topic, config)
}
}

Expand Down
61 changes: 54 additions & 7 deletions src/pubsub/publish_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
auth::grpc::{AuthGrpcService, OAuthTokenSource},
retry_policy::{exponential_backoff, ExponentialBackoff, RetryOperation, RetryPolicy},
};
use futures::{future::BoxFuture, ready, stream, Sink, SinkExt};
use futures::{future::BoxFuture, ready, stream, Sink, SinkExt, TryFutureExt};
use pin_project::pin_project;
use prost::Message;
use std::{
Expand All @@ -14,6 +14,7 @@ use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

const MB: usize = 1000 * 1000;
Expand Down Expand Up @@ -98,6 +99,23 @@ impl From<SinkError<Infallible>> for tonic::Status {
}
}

config_default! {
/// Configuration for a [publish sink](super::PublisherClient::publish_topic_sink)
/// request
#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Deserialize)]
#[non_exhaustive]
pub struct PublishConfig {
/// The amount of time to wait for a publish request to succeed before timing out
// TODO use 60sec default value from go lib, or 5 seconds plus backoff from java lib
// either of these should probably use time-based retries instead of count-based retry
// https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.20.0/pubsub/topic.go#L120
// https://github.com/googleapis/java-pubsub/blob/3a8c83b973a1dfbae2ca037125574d74034218ce/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L660
#[serde(with = "humantime_serde")]
@default(Duration::from_secs(5), "PublishConfig::default_timeout")
pub timeout: Duration,
}
}

/// A sink created by [`publish_topic_sink`] for publishing messages to a topic.
///
/// Messages will be sent to the pubsub service in the same order that they are submitted to this sink
Expand Down Expand Up @@ -154,6 +172,8 @@ pub struct PublishTopicSink<
/// the retry policy to use when a publish attempt fails
retry_policy: Retry,

config: PublishConfig,

// reserve the right to be !Unpin in the future without breaking changes
_pin: std::marker::PhantomPinned,
}
Expand All @@ -176,7 +196,11 @@ enum FlushState<ResponseSink: Sink<api::PubsubMessage>> {

impl<C> PublishTopicSink<C, ExponentialBackoff<PubSubRetryCheck>, Drain> {
/// Create a new `PublishTopicSink` with the default retry policy and no response sink
pub(super) fn new(client: ApiPublisherClient<C>, topic: ProjectTopicName) -> Self
pub(super) fn new(
client: ApiPublisherClient<C>,
topic: ProjectTopicName,
config: PublishConfig,
) -> Self
where
C: crate::Connect + Clone + Send + Sync + 'static,
{
Expand All @@ -188,6 +212,7 @@ impl<C> PublishTopicSink<C, ExponentialBackoff<PubSubRetryCheck>, Drain> {
PubSubRetryCheck::default(),
exponential_backoff::Config::default(),
),
config,
_pin: std::marker::PhantomPinned,
}
}
Expand Down Expand Up @@ -216,6 +241,7 @@ impl<C, Retry, ResponseSink: Sink<api::PubsubMessage>> PublishTopicSink<C, Retry
retry_policy: self.retry_policy,
client: self.client,
buffer: self.buffer,
config: self.config,
_pin: self._pin,
}
}
Expand All @@ -234,6 +260,7 @@ impl<C, Retry, ResponseSink: Sink<api::PubsubMessage>> PublishTopicSink<C, Retry
client: self.client,
buffer: self.buffer,
flush_state: self.flush_state,
config: self.config,
_pin: self._pin,
}
}
Expand Down Expand Up @@ -353,8 +380,13 @@ where
};

// construct the flush future
let flush_fut =
Self::flush(self.client, request, response_sink, self.retry_policy);
let flush_fut = Self::flush(
self.client,
request,
response_sink,
self.retry_policy,
self.config.timeout,
);

// transition the state
self.flush_state
Expand All @@ -372,6 +404,7 @@ where
mut request: api::PublishRequest,
mut response_sink: ResponseSink,
retry_policy: &mut Retry,
timeout: Duration,
) -> impl Future<Output = FlushOutput<ResponseSink, ResponseSink::Error>> {
// until Sink gets syntax sugar like generators, internal futures can't borrow (safely) and
// have to own their referents
Expand All @@ -388,11 +421,24 @@ where
// original to reuse for retries/errors/responses. On the bright side, message
// payloads are Bytes which are essentially Arc'ed, so alloc clones mostly apply to
// the attribute maps
break match client.publish(request.clone()).await {
Ok(response) => Ok(response.into_inner()),
let mut tonic_request = tonic::Request::new(request.clone());
// set the grpc-timeout header to inform the server of our deadline
tonic_request.set_timeout(timeout);

let publish = client.publish(tonic_request);

// apply a timeout to the publish
let publish_fut = tokio::time::timeout(timeout, publish).map_err(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know all that much about grpc... do we need both a timeout on the client and server side?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Client side is the more obvious one, in case the server never responds, or a network problem, or whatever. Server side is more of a good convention; if the server takes too long and knows the client will timeout, the server is allowed to free resources after that time, which can help responsiveness overall when under load.

|tokio::time::error::Elapsed { .. }| {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we hit the server side timeout, does tokio return the same time::error:Elapsed error here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server timeout would result in an error from the wrappee publish future. That's why the match is on a double Result<Result<T, Status> Status>. I'd have used flatten but it's not stable yet rust-lang/rust#70142

tonic::Status::deadline_exceeded("publish attempt timed out")
},
);

break match publish_fut.await {
Ok(Ok(response)) => Ok(response.into_inner()),
// if the status code is retry worthy, poll the retry policy to backoff
// before retrying, or stop retrying if the backoff is exhausted.
Err(status) => match retry.check_retry(&request, &status) {
Err(status) | Ok(Err(status)) => match retry.check_retry(&request, &status) {
// if the retry is non-empty, await the backoff then loop back to retry
Some(sleep) => {
sleep.await;
Expand Down Expand Up @@ -703,6 +749,7 @@ mod test {
PublishTopicSink::new(
dummy_client(),
ProjectTopicName::new("dummy-project", "dummy-topic"),
PublishConfig::default(),
)
}

Expand Down
10 changes: 9 additions & 1 deletion src/retry_policy/exponential_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ where
debug!(
message = "retrying after error",
?error,
backoff_ms = %interval.as_millis()
backoff_ms = %interval.as_millis(),
remaining_attempts = match self.intervals.size_hint() {
(_lower_bound, Some(upper_bound)) => upper_bound,
(lower_bound, None) => lower_bound
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't grok why the lower bound of the iterator would be indicative of the remaining attempts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not exactly indicative. I thought about having "unknown" if there's no upper bound. In practice this will never matter because the only possible iterator will always have an upper bound (.take(max_retries.unwrap_or(usize::MAX)))

},
);
return Some(self.sleeper.sleep(interval));
} else {
Expand Down Expand Up @@ -213,6 +217,10 @@ impl Iterator for ExponentialIter {
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}

/// An infinite iterator whose values grow by some growth factor with every iteration, until
Expand Down
2 changes: 1 addition & 1 deletion tests/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod pubsub_client_tests {
use ya_gcp::{
pubsub::{
self, api::PubsubMessage, emulator::EmulatorClient, ProjectSubscriptionName,
ProjectTopicName, PublisherClient, SinkError, StreamSubscriptionConfig,
ProjectTopicName, PublishConfig, PublisherClient, SinkError, StreamSubscriptionConfig,
},
Connect,
};
Expand Down