-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add PubSub publish timeout #8
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ use crate::{ | |
auth::grpc::{AuthGrpcService, OAuthTokenSource}, | ||
retry_policy::{exponential_backoff, ExponentialBackoff, RetryOperation, RetryPolicy}, | ||
}; | ||
use futures::{future::BoxFuture, ready, stream, Sink, SinkExt}; | ||
use futures::{future::BoxFuture, ready, stream, Sink, SinkExt, TryFutureExt}; | ||
use pin_project::pin_project; | ||
use prost::Message; | ||
use std::{ | ||
|
@@ -14,6 +14,7 @@ use std::{ | |
future::Future, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
time::Duration, | ||
}; | ||
|
||
const MB: usize = 1000 * 1000; | ||
|
@@ -98,6 +99,23 @@ impl From<SinkError<Infallible>> for tonic::Status { | |
} | ||
} | ||
|
||
config_default! { | ||
/// Configuration for a [publish sink](super::PublisherClient::publish_topic_sink) | ||
/// request | ||
#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Deserialize)] | ||
#[non_exhaustive] | ||
pub struct PublishConfig { | ||
/// The amount of time to wait for a publish request to succeed before timing out | ||
// TODO use 60sec default value from go lib, or 5 seconds plus backoff from java lib | ||
// either of these should probably use time-based retries instead of count-based retry | ||
// https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.20.0/pubsub/topic.go#L120 | ||
// https://github.com/googleapis/java-pubsub/blob/3a8c83b973a1dfbae2ca037125574d74034218ce/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L660 | ||
#[serde(with = "humantime_serde")] | ||
@default(Duration::from_secs(5), "PublishConfig::default_timeout") | ||
pub timeout: Duration, | ||
} | ||
} | ||
|
||
/// A sink created by [`publish_topic_sink`] for publishing messages to a topic. | ||
/// | ||
/// Messages will be sent to the pubsub service in the same order that they are submitted to this sink | ||
|
@@ -154,6 +172,8 @@ pub struct PublishTopicSink< | |
/// the retry policy to use when a publish attempt fails | ||
retry_policy: Retry, | ||
|
||
config: PublishConfig, | ||
|
||
// reserve the right to be !Unpin in the future without breaking changes | ||
_pin: std::marker::PhantomPinned, | ||
} | ||
|
@@ -176,7 +196,11 @@ enum FlushState<ResponseSink: Sink<api::PubsubMessage>> { | |
|
||
impl<C> PublishTopicSink<C, ExponentialBackoff<PubSubRetryCheck>, Drain> { | ||
/// Create a new `PublishTopicSink` with the default retry policy and no response sink | ||
pub(super) fn new(client: ApiPublisherClient<C>, topic: ProjectTopicName) -> Self | ||
pub(super) fn new( | ||
client: ApiPublisherClient<C>, | ||
topic: ProjectTopicName, | ||
config: PublishConfig, | ||
) -> Self | ||
where | ||
C: crate::Connect + Clone + Send + Sync + 'static, | ||
{ | ||
|
@@ -188,6 +212,7 @@ impl<C> PublishTopicSink<C, ExponentialBackoff<PubSubRetryCheck>, Drain> { | |
PubSubRetryCheck::default(), | ||
exponential_backoff::Config::default(), | ||
), | ||
config, | ||
_pin: std::marker::PhantomPinned, | ||
} | ||
} | ||
|
@@ -216,6 +241,7 @@ impl<C, Retry, ResponseSink: Sink<api::PubsubMessage>> PublishTopicSink<C, Retry | |
retry_policy: self.retry_policy, | ||
client: self.client, | ||
buffer: self.buffer, | ||
config: self.config, | ||
_pin: self._pin, | ||
} | ||
} | ||
|
@@ -234,6 +260,7 @@ impl<C, Retry, ResponseSink: Sink<api::PubsubMessage>> PublishTopicSink<C, Retry | |
client: self.client, | ||
buffer: self.buffer, | ||
flush_state: self.flush_state, | ||
config: self.config, | ||
_pin: self._pin, | ||
} | ||
} | ||
|
@@ -353,8 +380,13 @@ where | |
}; | ||
|
||
// construct the flush future | ||
let flush_fut = | ||
Self::flush(self.client, request, response_sink, self.retry_policy); | ||
let flush_fut = Self::flush( | ||
self.client, | ||
request, | ||
response_sink, | ||
self.retry_policy, | ||
self.config.timeout, | ||
); | ||
|
||
// transition the state | ||
self.flush_state | ||
|
@@ -372,6 +404,7 @@ where | |
mut request: api::PublishRequest, | ||
mut response_sink: ResponseSink, | ||
retry_policy: &mut Retry, | ||
timeout: Duration, | ||
) -> impl Future<Output = FlushOutput<ResponseSink, ResponseSink::Error>> { | ||
// until Sink gets syntax sugar like generators, internal futures can't borrow (safely) and | ||
// have to own their referents | ||
|
@@ -388,11 +421,24 @@ where | |
// original to reuse for retries/errors/responses. On the bright side, message | ||
// payloads are Bytes which are essentially Arc'ed, so alloc clones mostly apply to | ||
// the attribute maps | ||
break match client.publish(request.clone()).await { | ||
Ok(response) => Ok(response.into_inner()), | ||
let mut tonic_request = tonic::Request::new(request.clone()); | ||
// set the grpc-timeout header to inform the server of our deadline | ||
tonic_request.set_timeout(timeout); | ||
|
||
let publish = client.publish(tonic_request); | ||
|
||
// apply a timeout to the publish | ||
let publish_fut = tokio::time::timeout(timeout, publish).map_err( | ||
|tokio::time::error::Elapsed { .. }| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we hit the server side timeout, does tokio return the same time::error:Elapsed error here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Server timeout would result in an error from the wrappee |
||
tonic::Status::deadline_exceeded("publish attempt timed out") | ||
}, | ||
); | ||
|
||
break match publish_fut.await { | ||
Ok(Ok(response)) => Ok(response.into_inner()), | ||
// if the status code is retry worthy, poll the retry policy to backoff | ||
// before retrying, or stop retrying if the backoff is exhausted. | ||
Err(status) => match retry.check_retry(&request, &status) { | ||
Err(status) | Ok(Err(status)) => match retry.check_retry(&request, &status) { | ||
// if the retry is non-empty, await the backoff then loop back to retry | ||
Some(sleep) => { | ||
sleep.await; | ||
|
@@ -703,6 +749,7 @@ mod test { | |
PublishTopicSink::new( | ||
dummy_client(), | ||
ProjectTopicName::new("dummy-project", "dummy-topic"), | ||
PublishConfig::default(), | ||
) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,7 +95,11 @@ where | |
debug!( | ||
message = "retrying after error", | ||
?error, | ||
backoff_ms = %interval.as_millis() | ||
backoff_ms = %interval.as_millis(), | ||
remaining_attempts = match self.intervals.size_hint() { | ||
(_lower_bound, Some(upper_bound)) => upper_bound, | ||
(lower_bound, None) => lower_bound | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't grok why the lower bound of the iterator would be indicative of the remaining attempts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's not exactly indicative. I thought about having "unknown" if there's no upper bound. In practice this will never matter because the only possible iterator will always have an upper bound ( |
||
}, | ||
); | ||
return Some(self.sleeper.sleep(interval)); | ||
} else { | ||
|
@@ -213,6 +217,10 @@ impl Iterator for ExponentialIter { | |
fn next(&mut self) -> Option<Self::Item> { | ||
self.iter.next() | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
self.iter.size_hint() | ||
} | ||
} | ||
|
||
/// An infinite iterator whose values grow by some growth factor with every iteration, until | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know all that much about grpc... do we need both a timeout on the client and server side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Client side is the more obvious one, in case the server never responds, or a network problem, or whatever. Server side is more of a good convention; if the server takes too long and knows the client will timeout, the server is allowed to free resources after that time, which can help responsiveness overall when under load.