-
Notifications
You must be signed in to change notification settings - Fork 693
Reactive support for Pub/Sub subscription #1461
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some review.
Good stuff, but more questions, than answers 😄
@GetMapping(value = "/getmessages", produces = MediaType.TEXT_EVENT_STREAM_VALUE) | ||
public Flux<? super String> getMessages() { | ||
Flux<ConvertedAcknowledgeablePubsubMessage<String>> flux | ||
= FluxSubscriber.createPolledFlux(this.template, String.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like conversion logic propagation is a good idea in terms of Reactive Streams.
You just need to request data and then downstream in the .map()
decide to what to convert it into.
* @author Elena Felder | ||
* @since 1.2 | ||
*/ | ||
public class FluxSubscriber { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the class is misleading. There is already a Subscriber
abstraction in the Reactive Streams and Flux
just simple deal with them as is.
Essentially what you did here is a factory. so, please, reconsider the class name and I believe something like PubSub
must be included in its name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like PubSubSubscriberFactory
?
Then we can move the .subscribe() methods into this, as we discussed before for the refactoring.
On the other hand, we already have a SubscriberFactory
, so I could just add the Flux creation logic into the existing DefaultSubscriberFactory
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can if reactor-core
is hard dependency, not optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point -- it should probably be optional. so it should not sit in a core class. Reactive is not the most common usecase.
|
||
LOGGER.info(" Current remaining demand: " + remainingDemand); | ||
List<ConvertedAcknowledgeablePubsubMessage<T>> messages | ||
= this.subscriberOperations.pullAndConvert("reactiveSubscription", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
M-m-m. I think this is not intentional.
Why do you use an explicit, non-configurable subscription name ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not intentional; I'll move it into a parameter.
return Flux.create(sink -> { | ||
sink.onRequest((numRequested) -> { | ||
// request must be non-blocking. | ||
executorService.submit(new PubSubPullTask<T>(numRequested, sink, subscriberOperations, targetType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that not your responsibility to do this.
What you, essentially, have is simply covered by the subscribeOn()
.
Although since underlying API is really not reactive (PullResponse
):
this.receivedMessages_.add(input.readMessage(ReceivedMessage.parser(), extensionRegistry));
Does it really make sense to promote this from the Framework perspective?..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that I am making a blocking pull call here, so if I request on the same thread, the code will violate the reactive streams rule that request should be non-obstructing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be a way to do the pull asynchronously without your own ExecutorService
.
See here: https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java#L213
subscriber.pullCallable().futureCall()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to object, but I think your thread spawning per subscriber is still bad idea.
Consider to use somehow Schedulers.elastic()
.
I will try to pull some one from our Reactor team to see what that think on the matter...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! futureCall()
. that's an answer! Right., we need to add it into the PubSubSubscriberOperations
API and use it from this Flux
trick.
There is simply need to be have a flatMap(e -> Mono.fromFuture())
and so on with flatMapIterable()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
futureCall() sounded good in theory, but now we have potentially multiple outstanding calls to PubSub with additional callbacks being spawned.
Flux making an additional request when the data is 75% or so processed, combined with PubSub synchronous pull timing out after 10 minutes makes for messy logic.
I'd almost prefer to take a "poll interval" parameter, accumulate total outstanding demand in the custom flux, and then make "return_immediately=true" requests at an interval.
int numReceived = messages.size(); | ||
messages.forEach(m -> { | ||
LOGGER.info("Sending message to subscriber: " + m.getPayload()); | ||
sink.next(m); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would send the whole list as a single event, then would have have a flatMapIterable()
before returning from the createPolledFlux()
Codecov Report
@@ Coverage Diff @@
## master #1461 +/- ##
============================================
- Coverage 78.94% 68.8% -10.14%
+ Complexity 1989 1692 -297
============================================
Files 244 246 +2
Lines 6526 6572 +46
Branches 662 667 +5
============================================
- Hits 5152 4522 -630
- Misses 1094 1755 +661
- Partials 280 295 +15
Continue to review full report at Codecov.
|
|
||
if (remainingDemand > 0) { | ||
// Ensure requested messages get fulfilled eventually. | ||
produceMessages(subscriptionName, remainingDemand, sink, subscriberOperations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between the need to keep onRequest()
non-blocking and attempting to fulfill outstanding demand, the code relying on gRPC's futureCall()
became messy and with potentially too many simultaneous calls to Pub/Sub. And adding callbacks to ApiFuture
requires a separate executor, anyway (I reused the former ackExecutor in the latest implementation).
I would prefer to go back to maintaining a thread pool (but a shared one, not one per flux) in PubSubReactiveFactory
, and using non-blocking polling at a customizable interval to satisfy remaining demand.
@artembilan @meltsufin What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the call stack can get huge with this recursive call. So, it might make sense to go back to the synchronous pull with a pooled executor. It doesn't look like we're able to get away from providing our own executor anyway. WDYT @artembilan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that's not our problem to deal with remained
here and die in the stack overflow.
Please, see Subscription.request()
JavaDocs:
* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
so, our goal here is to produce exactly requested amount or less and then onComplete()
: FluxSink.complete()
.
Not sure why have you gone a bit different way...
Also the rule number 1 of Publisher specification: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#specification
The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
I think you are missing the fact that our createPolledPublisher()
is just finite and that's already out of our scope to keep it hot and constantly updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a cold publisher useful in Pub/Sub context? If the requirement is "get up to N messages and complete", then the client code can just use generate:
Flux.<List<AcknowledgeablePubsubMessage>>generate(
() -> subscriberOperations.pull(subscriptionName, demand, true)
).flatMapIterable(Function.identity());
It seems to me based on discussion in reactor/reactor-core#314 that onRequest()
was added to support our exact usecase of an unlimited flux that still responds to backpressure.
Off-topic, if I start a metal band, "We Die in the Stack Overflow" will be its name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... Then I have no answer or my Reactive knowledge is limited...
I wonder if we can bring someone who is an expert in the area to give us some feedback...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, everything I learned about Reactive I learned in the last month, so I am hardly an expert. Could we maybe set up a meeting with Reactive folks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a bit more, I'm not sure what's wrong with just returning whatever we get from Pub/Sub with the return-immediately=true, and not trying to fulfill the full demand. If Pub/Sub returns less than the demand, we just call onComplete
assuming there are no more messages for the time being. The client can always resume its requests.
What's our outstanding question for the experts? I think the specification is fairly precise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, just talked to @elefeint. I see why it's bad to terminate the stream when the demand is unmet by Pub/Sub. It doesn't work for infinite streams. So, it looks like it's better to keep the subscription open knowing that Pub/Sub can eventually provide more messages. We just need to poll it on a regular basis. Ideally, Pub/Sub would support true reactive streaming itself, but for the time being we need to simulate it by polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey that'd be great to meet if you folks want to introduce us to this in a zoom or somewhere and we can pair-review together !
public void setAckExecutor(Executor ackExecutor) { | ||
Assert.notNull(ackExecutor, "ackExecutor can't be null."); | ||
this.ackExecutor = ackExecutor; | ||
public void setCallbackExecutor(Executor callbackExecutor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you realize that this is a breaking change? We just can't rename API just because.
Please, consider to deprecate exiting method in favor of a new one.
And let's keep it here at least for the next version!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Point well taken. If this is the direction in which this code will go, I'll add the proper deprecation (I hope it does not go there, though -- per discussion below, I would prefer to revert the changes to PubSubSubscriberTemplate
and manage multi threading in PubSubReactiveFactory
instead).
|
||
if (remainingDemand > 0) { | ||
// Ensure requested messages get fulfilled eventually. | ||
produceMessages(subscriptionName, remainingDemand, sink, subscriberOperations); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that's not our problem to deal with remained
here and die in the stack overflow.
Please, see Subscription.request()
JavaDocs:
* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
so, our goal here is to produce exactly requested amount or less and then onComplete()
: FluxSink.complete()
.
Not sure why have you gone a bit different way...
Also the rule number 1 of Publisher specification: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#specification
The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
I think you are missing the fact that our createPolledPublisher()
is just finite and that's already out of our scope to keep it hot and constantly updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From in-person review with @smaldini @artembilan @meltsufin
|
||
PubSubReactiveSubscription reactiveSubscription = new PubSubReactiveSubscription(subscriptionName); | ||
|
||
return Flux.interval(Duration.ofMillis(delayInMilliseconds)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onBackpressureDrop()
|
||
return Flux.interval(Duration.ofMillis(delayInMilliseconds)) | ||
.doOnRequest(reactiveSubscription::addDemand) | ||
.map(t -> reactiveSubscription.pullMessages()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collapse map and flatmap into one operation
public Publisher<AcknowledgeablePubsubMessage> createPolledPublisher( | ||
String subscriptionName, int delayInMilliseconds) { | ||
|
||
PubSubReactiveSubscription reactiveSubscription = new PubSubReactiveSubscription(subscriptionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid creating subscription object outside of flux creation.
bottom line: use Flux.create for more control over the process
…to make it global
…spring-integration
@smaldini @artembilan @meltsufin I've changed the Pub/Sub flux creation implementation as we discussed last Monday (with I still owe tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing critical.
The solution looks great! 👍
...main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java
Outdated
Show resolved
Hide resolved
...ubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Creates an infinite stream {@link Publisher} of {@link AcknowledgeablePubsubMessage} objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method JavaDocs must be in the imperative style: https://github.com/spring-projects/spring-framework/wiki/Code-Style#javadoc-formatting
* @param pollingPeriod how frequently to poll the source subscription in case of unlimited demand. | ||
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects. | ||
*/ | ||
public Publisher<AcknowledgeablePubsubMessage> createPolledPublisher(String subscriptionName, long pollingPeriod) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our (Spring) programming module to expect Reactive Streams interfaces as method arguments, but return just an explicit Reacor types: Flux
and Mono
.
No reason to narrow it if you deal internally with only Reactor.
messages.forEach(sink::next); | ||
} | ||
|
||
return messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may end up with the contract just really return an amount we have just pulled, not the whole list of messages.
<dependency> | ||
<groupId>io.projectreactor</groupId> | ||
<artifactId>reactor-core</artifactId> | ||
<optional>true</optional> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it can be non-optional here already.
This is end-user code, so that is fully OK to have it as a hard dependency.
return Flux.from(flux) | ||
.doOnNext(message -> { | ||
System.out.println("Received a message: " + message.getPubsubMessage().getMessageId()); | ||
message.ack(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to document this manual ack'ing or maybe consider an auto-ack mode as well...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Customizable ack mode is a good idea. Would it make sense to move (properly, with deprecation) AckMode
from o.s.c.gcp.pubsub.integration
up a level, or to simply use a boolean autoAck
parameter when creating the Flux
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think boolean autoAck
is fully enough, but let's really consider this improvement when we are already done with basics in this PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created #1517 to track; it will be a small follow-up PR.
docs/src/main/asciidoc/pubsub.adoc
Outdated
|
||
The `Flux` then represents an infinite stream of GCP Pub/Sub messages coming in through the specified subscription. | ||
The full range of Project Reactor operations can be applied to the stream. | ||
For example, the infinite stream can become a finite one with `limitRequest` operation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also use simpler language, like "If you only want to fetch 5 messages from Pub/Sub, you can use a finite stream...".
* @param pollingPeriod how frequently to poll the source subscription in case of unlimited demand. | ||
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects. | ||
*/ | ||
public Flux<AcknowledgeablePubsubMessage> createPolledFlux(String subscriptionName, long pollingPeriod) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Units for pollingPeriod
should be clarified. I would name the parameter pollingPeriodMs
and clarify units in the javadoc.
* @return number of messages retrieved | ||
*/ | ||
protected int pullToSink(long demand, boolean block) { | ||
int numMessagesToPull = demand > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) demand; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just only allow int demand
as parameter and let the caller deal with conversion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The demand comes from FluxSink.onRequest()
parameter, which is a Long
. Ultimately, there is tension between Reactor (which operates on Long
demand) and PubSub (which operates on int
demand), so it has to get resolved somewhere in my code.
Or did you mean to simply move the conversion from PubSubBlockingLimitedDemandPullTask.run()
up to PubSubReactiveFactory.createPolledFlux.createPolledFlux()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, hand the responsibility to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the code again -- the responsibility for converting from long
to int
belongs with the polling task, as close to the Pub/Sub polling operation is possible. Consider a request of Integer.MAX_VALUE + 1,000
. If we convert to int
before creating PubSubBlockingLimitedDemandPullTask
, then only a demand of Integer.MAX_VALUE
will be fulfilled, leaving the extra demand of 1,000 unmet.
Leaving the conversion responsibility with PubSubBlockingLimitedDemandPullTask
allows it to think in terms of long
internally, subtracting up to Integer.MAX_VALUE
messages at each poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the conversion belongs in createPolledFlux
. How would the demand of Integer.MAX_VALUE + 1,000
be fulfilled, if you cap it at Integer.MAX_VALUE
here? Automatically changing demand > Integer.MAX_VALUE
to Integer.MAX_VALUE
seems a bug waiting to happen. If the flux ever requests a demand greater than Integer.MAX_VALUE
and it's not the special Long.MAX_VALUE
we should probably just throw an unsupported exception in createPolledFlux
.
...ubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java
Show resolved
Hide resolved
|
||
private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class); | ||
|
||
private PubSubSubscriberOperations subscriberOperations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
this.subscriptionName, numMessagesToPull, !block); | ||
|
||
if (!this.sink.isCancelled()) { | ||
messages.forEach(sink::next); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what is wrong with sending the whole List<AcknowledgeablePubsubMessage>
batch to the sink as a single event and then use a flatMapIterable(Function.identity())
before returning the Flux
to end-user...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flatMapIterable
changes demand, which could look unintuitive to the consumer (if subscription requests 3 messages, flatMapIterable()
will cause 256 to be requested instead).
Also (this is not a good reason, just relevant) when I was testing, I ran into an issue with additional demand not being propagated from HTTP request to the Pub/Sub flux even though the initial request was fulfilled. I'd like to reproduce a minimal case to understand whether this behaves as intended or is a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Then let's deffer it to @smaldini .
I'm not sure how to be with this situation then 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi! Great work @elefeint! We were also thinking of implementing our own Reactor Pubsub client due to improper back-pressure semantics which is giving us quite a lot of headache and thankfully @bsideup pointed this ticket to us. About this specific messages.forEach(sink::next)
line, I have a question: If the demand
is a big number, wouldn't you be fetching a gigantic batch of messages from Pubsub and choking the consumer via over-production? I would rather cap it by Math.min(demand, maxDemand)
(e.g., maxDemand=100
by default) for each request(demand)
and wait for any follow up request()
s that the consumer will signal at its convenience. Otherwise my impression is that we will be periodically bombarding the consumer with messages regardless of its availability. Is there a detail I am missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the consumer is specifying the large demand, isn't it a signal that it can handle that number of messages? Certainly throttling can be implemented, but it may be unintuitive to consumers that do want a larger batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time consumers do not specify anything and simple subscribe()
just triggers a request(Long.MAX_VALUE)
. Though in the case of a message queue poller, "Apparently the consumer is asking for a cargo ship load of messages." does not sound like a practical assumption to me. Consider the following case:
- There are 1e6 messages in the queue.
request(unbounded)
triggers a continuousmessages.forEach(sink::next)
sequence.- Scheduler threads start working on the initial tasks and the rest quickly get piled up in the work queue of the
ScheduledExecutorService
. - After a while, majority of the messages timeout, work queue starts choking memory, and every task threads pick up turn out to be obsolete.
This is the problem I am dealing for the last couple of months at work. Let me know if there is a way I can work around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vy I understand now. As a practical matter, would adding limitRequest() or limitRate() operators to the invocation chain work?
@smaldini The conversation about default behavior of subscribe() is an interesting one.
Would it make sense to open an issue on reactor-core? Unbounded demand is not the most common usecase, so perhaps a .subscribeWithInitialDemand()
operator makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elefeint I suppose a combination of limit{Request,Rate}()
would do the trick, though it still expects an explicit action from the consumer side. I would still favor shipping a solution with decent defaults minimizing any effort from user side.
I think subscribeWithInitialDemand()
is sort of against the Reactor's fluent API encouraging composition. Isn't it just a shortcut for .limitRequest(n).subscribe()
?
Nevertheless, @smaldini, we would really appreciate your insights on the general back-pressure issue. (@simonbasle, is this something you can help us out?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI limitRate
only, not limitRequest
. limitRequest
will cut the requests after the threshold specified as an argument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vy Would you mind creating an issue in this project for adding the maxMessagesPerPoll
property? I've been thinking about this some more, and for unbounded demand scenario it makes sense to provide a configurable limit, especially if it's combined with a configurable project-wide default for pollingPeriodMs
.
The firehose of messages would likely still be the default, but smoothing message arrival by getting N messages per poll seems valuable.
Co-Authored-By: elefeint <41136058+elefeint@users.noreply.github.com>
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener). # Fixed URLs ## Fixed Success These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended. * http://docs.oracle.com/javaee/7/api/ with 1 occurrences migrated to: https://docs.oracle.com/javaee/7/api/ ([https](https://docs.oracle.com/javaee/7/api/) result 200). * http://docs.oracle.com/javase/7/docs/api/ with 1 occurrences migrated to: https://docs.oracle.com/javase/7/docs/api/ ([https](https://docs.oracle.com/javase/7/docs/api/) result 200). * http://docs.spring.io/spring/docs/4.1.x/javadoc-api/ with 1 occurrences migrated to: https://docs.spring.io/spring/docs/4.1.x/javadoc-api/ ([https](https://docs.spring.io/spring/docs/4.1.x/javadoc-api/) result 200). * http://maven.apache.org/xsd/maven-4.0.0.xsd with 51 occurrences migrated to: https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200). * http://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch with 1 occurrences migrated to: https://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch ([https](https://stackoverflow.com/questions/1593051/how-to-programmatically-determine-the-current-checked-out-git-branch) result 200). * http://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin with 1 occurrences migrated to: https://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin ([https](https://stackoverflow.com/questions/29300806/a-bash-script-to-check-if-a-string-is-present-in-a-comma-separated-list-of-strin) result 200). * http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to: https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200). * http://fasterxml.github.com/jackson-core/javadoc/2.0.0/ with 1 occurrences migrated to: https://fasterxml.github.com/jackson-core/javadoc/2.0.0/ ([https](https://fasterxml.github.com/jackson-core/javadoc/2.0.0/) result 301). * http://www.spring.io with 5 occurrences migrated to: https://www.spring.io ([https](https://www.spring.io) result 301). # Ignored These URLs were intentionally ignored. * http://maven.apache.org/POM/4.0.0 with 102 occurrences * http://www.w3.org/2001/XMLSchema-instance with 51 occurrences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from in-person review
protected static class ReactivePubSubAutoconfiguration { | ||
|
||
@Bean | ||
@ConditionalOnMissingBean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a property preventing this object from instantiating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what reason?
Doesn't look like this light object is some kind of overhead for the target application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it's not necessary to keep it off by default because it's just a wrapper around a SubscriberTemplate
.
But I am also going to add a default scheduler bean into autoconfig and extract reactive configuration into a separate file, at which point it will make sense to add an overridable matchIfMissing=true
property spring.cloud.gcp.pubsub.reactive.enabled
.
// unlimited demand | ||
Disposable task = Schedulers.single().schedulePeriodically( | ||
new PubSubNonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS); | ||
sink.onCancel(task::dispose); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass task directly
* @param pollingPeriodMs how frequently to poll the source subscription in case of unlimited demand, in milliseconds. | ||
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects. | ||
*/ | ||
public Flux<AcknowledgeablePubsubMessage> createPolledFlux(String subscriptionName, long pollingPeriodMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to poll(subscriptionName, pollingPeriodMs)
if (numRequested == Long.MAX_VALUE) { | ||
// unlimited demand | ||
Disposable task = Schedulers.single().schedulePeriodically( | ||
new PubSubNonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove PubSub prefix
demand -= pullToSink(intDemand, true); | ||
} | ||
catch (DeadlineExceededException e) { | ||
LOGGER.trace("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add isTraceEnabled()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason: this is in the catch()
block, so only in case of exception.
It's not going to be a big deal to build a logging message in case of this exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is actually expected. So, it may happen quiet often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every 10 minutes by default, which is negligible in computer speed. On the other hand, we might as well add isTraceEnabled()
for consistency.
sink.onCancel(task::dispose); | ||
} | ||
else { | ||
Schedulers.single().schedule(new PubSubBlockingLimitedDemandPullTask(subscriptionName, numRequested, sink)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use elastic(), create single worker (don't call scheduler() directly)
create worker before request
also, kill task when sink is cancelled -- 10 minutes is too long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse the worker for both, unlimited and limited demand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoconfigure the scheduler
...es/spring-cloud-gcp-pubsub-reactive-sample/src/main/java/com/example/ReactiveController.java
Show resolved
Hide resolved
protected static class ReactivePubSubAutoconfiguration { | ||
|
||
@Bean | ||
@ConditionalOnMissingBean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what reason?
Doesn't look like this light object is some kind of overhead for the target application.
demand -= pullToSink(intDemand, true); | ||
} | ||
catch (DeadlineExceededException e) { | ||
LOGGER.trace("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason: this is in the catch()
block, so only in case of exception.
It's not going to be a big deal to build a logging message in case of this exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good!
Would it be possible to add a integration test for the sample, like we have for other samples?
...a/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java
Show resolved
Hide resolved
Co-Authored-By: elefeint <41136058+elefeint@users.noreply.github.com>
I'll add an integration test in a follow-up PR -- I also want to change the sample, so users can send messages through a web UI and not have to have gcloud installed or multi-task with a third command-line window. |
This commit updates URLs to prefer the https protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job @elefeint and team!
The polling Pub/Sub flux is demand-sensitive, implementing the pull/push strategy.
Unbounded/bounded demand is treated differently:
pollingPeriodMs
parameter passed in when creating theFlux
.Fixes #186.