-
Notifications
You must be signed in to change notification settings - Fork 95
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: exactly once support v3 (#1022)
* Added new interfaces * `AckReplyConsumerWithResponse` * `MessageReceiverWithAckResponse` * Added `AckResponse` enum * Changed `MessageDispatcher` and `StreamingSubscriberConnection` to use builder pattern * Keeps track of whether exactly-once delivery is enabled for a subscription by looking at the subscription's SubscriptionProperties * Set the minimum ack deadline to 60 secs if exactly-once is known to be turned on. * Add new min-lease-extension parameter. If the user sets this, it overrides the auto-set param * Changed `AckId` information for Modacks and Acks to use a new `AckRequestData` object that also includes the message future (if applicable) Minor Updates: * Added `Mockito` dependency for mocking in unit tests
- Loading branch information
Showing
17 changed files
with
2,490 additions
and
464 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...e-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Copyright 2022 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsub.v1; | ||
|
||
import java.util.concurrent.Future; | ||
|
||
public interface AckReplyConsumerWithResponse { | ||
Future<AckResponse> ack(); | ||
|
||
Future<AckResponse> nack(); | ||
} |
85 changes: 85 additions & 0 deletions
85
google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright 2022 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsub.v1; | ||
|
||
import com.google.api.core.SettableApiFuture; | ||
import java.util.Optional; | ||
|
||
public class AckRequestData { | ||
private final String ackId; | ||
private final Optional<SettableApiFuture<AckResponse>> messageFuture; | ||
|
||
protected AckRequestData(Builder builder) { | ||
this.ackId = builder.ackId; | ||
this.messageFuture = builder.messageFuture; | ||
} | ||
|
||
public String getAckId() { | ||
return ackId; | ||
} | ||
|
||
public SettableApiFuture<AckResponse> getMessageFutureIfExists() { | ||
return this.messageFuture.orElse(null); | ||
} | ||
|
||
public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) { | ||
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) { | ||
switch (ackResponse) { | ||
case SUCCESSFUL: | ||
if (setResponseOnSuccess) { | ||
this.messageFuture.get().set(ackResponse); | ||
} | ||
break; | ||
case INVALID: | ||
case OTHER: | ||
case PERMISSION_DENIED: | ||
case FAILED_PRECONDITION: | ||
// Non-succesful messages will get set for both acks, nacks, and modacks | ||
this.messageFuture.get().set(ackResponse); | ||
break; | ||
} | ||
} | ||
return this; | ||
} | ||
|
||
public boolean hasMessageFuture() { | ||
return this.messageFuture.isPresent(); | ||
} | ||
|
||
public static Builder newBuilder(String ackId) { | ||
return new Builder(ackId); | ||
} | ||
|
||
/** Builder of {@link AckRequestData AckRequestData}. */ | ||
protected static final class Builder { | ||
private final String ackId; | ||
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty(); | ||
|
||
protected Builder(String ackId) { | ||
this.ackId = ackId; | ||
} | ||
|
||
public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) { | ||
this.messageFuture = Optional.of(messageFuture); | ||
return this; | ||
} | ||
|
||
public AckRequestData build() { | ||
return new AckRequestData(this); | ||
} | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Copyright 2021 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.pubsub.v1; | ||
|
||
public enum AckResponse { | ||
PERMISSION_DENIED, | ||
FAILED_PRECONDITION, | ||
SUCCESSFUL, | ||
INVALID, | ||
OTHER | ||
} |
Oops, something went wrong.