Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce java.time variables and methods #2271

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions google-cloud-pubsub/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7005</differenceType>
<!--Ignore changes in this class because it's package private-->
<className>com/google/cloud/pubsub/v1/MessageDispatcher$Builder</className>
<method>*(org.threeten.bp.Duration)</method>
<to>*(java.time.Duration)</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<!--Ignore changes in this class because it's package private-->
<className>com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder</className>
<method>*(org.threeten.bp.Duration)</method>
<to>*(java.time.Duration)</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -48,9 +51,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

/**
* Dispatches messages to a message receiver while handling the messages acking and lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -73,7 +74,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
Expand Down Expand Up @@ -198,7 +198,7 @@ private Publisher(Builder builder) throws IOException {
// key?
retrySettingsBuilder
.setMaxAttempts(Integer.MAX_VALUE)
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
.setTotalTimeoutDuration(Duration.ofNanos(Long.MAX_VALUE));
}

PublisherStubSettings.Builder stubSettings =
Expand Down Expand Up @@ -740,7 +740,7 @@ public static final class Builder {
private static final double DEFAULT_MULTIPLIER = 4;
static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setDelayThresholdDuration(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setFlowControlSettings(
Expand All @@ -750,13 +750,13 @@ public static final class Builder {
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setTotalTimeout(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelay(DEFAULT_INITIAL_RETRY_DELAY)
.setTotalTimeoutDuration(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelayDuration(DEFAULT_INITIAL_RETRY_DELAY)
.setRetryDelayMultiplier(DEFAULT_MULTIPLIER)
.setMaxRetryDelay(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeout(DEFAULT_INITIAL_RPC_TIMEOUT)
.setMaxRetryDelayDuration(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeoutDuration(DEFAULT_INITIAL_RPC_TIMEOUT)
.setRpcTimeoutMultiplier(DEFAULT_MULTIPLIER)
.setMaxRpcTimeout(DEFAULT_MAX_RPC_TIMEOUT)
.setMaxRpcTimeoutDuration(DEFAULT_MAX_RPC_TIMEOUT)
.build();
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
private static final int THREADS_PER_CPU = 5;
Expand Down Expand Up @@ -876,9 +876,9 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
/** Configures the Publisher's retry parameters. */
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
retrySettings.getTotalTimeoutDuration().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
retrySettings.getInitialRpcTimeoutDuration().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.opentelemetry.api.trace.Span;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -68,7 +69,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */
final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
Expand Down
michaelpri10 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.google.cloud.pubsub.v1;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.core.BetaApi;
import com.google.api.core.CurrentMillisClock;
import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand Down Expand Up @@ -55,7 +58,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/subscriber">subscriber</a> that is
Expand Down Expand Up @@ -98,24 +100,37 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private static final int MAX_INBOUND_METADATA_SIZE =
4 * 1024 * 1024; // 4MB API maximum metadata size

@InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD =
java.time.Duration.ofMinutes(60);

@InternalApi
static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
Duration.ofMinutes(1);
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
java.time.Duration.ofMinutes(1);

@InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0);
@InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0);
@InternalApi
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofMinutes(0);

@InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
@InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofSeconds(0);

@InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60);
@InternalApi
static final java.time.Duration MIN_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(10);

@InternalApi
static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60);
static final java.time.Duration MAX_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(600);

@InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5);
@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_DEFAULT = java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT =
java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration ACK_EXPIRATION_PADDING_DEFAULT = java.time.Duration.ofSeconds(5);

private static final Logger logger = Logger.getLogger(Subscriber.class.getName());

Expand All @@ -124,10 +139,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
private final java.time.Duration maxAckExtensionPeriod;
private final java.time.Duration maxDurationPerAckExtension;
private final boolean maxDurationPerAckExtensionDefaultUsed;
private final Duration minDurationPerAckExtension;
private final java.time.Duration minDurationPerAckExtension;
private final boolean minDurationPerAckExtensionDefaultUsed;

// The ExecutorProvider used to generate executors for processing messages.
Expand Down Expand Up @@ -490,10 +505,10 @@ public static final class Builder {
private MessageReceiver receiver;
private MessageReceiverWithAckResponse receiverWithAckResponse;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private java.time.Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private boolean minDurationPerAckExtensionDefaultUsed = true;
private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private boolean maxDurationPerAckExtensionDefaultUsed = true;

private boolean useLegacyFlowControl = false;
Expand All @@ -505,7 +520,7 @@ public static final class Builder {
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE)
.setKeepAliveTime(Duration.ofMinutes(5))
.setKeepAliveTimeDuration(java.time.Duration.ofMinutes(5))
.build();
private HeaderProvider headerProvider = new NoHeaderProvider();
private CredentialsProvider credentialsProvider =
Expand Down Expand Up @@ -596,6 +611,15 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* This method is obsolete. Use {@link #setMaxAckExtensionPeriodDuration(java.time.Duration)}
* instead.
*/
@ObsoleteApi("Use setMaxAckExtensionPeriodDuration(java.time.Duration) instead")
public Builder setMaxAckExtensionPeriod(org.threeten.bp.Duration maxAckExtensionPeriod) {
return setMaxAckExtensionPeriodDuration(toJavaTimeDuration(maxAckExtensionPeriod));
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand All @@ -605,12 +629,22 @@ public Builder setUseLegacyFlowControl(boolean value) {
*
* <p>A zero duration effectively disables auto deadline extensions.
*/
public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
public Builder setMaxAckExtensionPeriodDuration(java.time.Duration maxAckExtensionPeriod) {
Preconditions.checkArgument(maxAckExtensionPeriod.toMillis() >= 0);
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
return this;
}

/**
* This method is obsolete. Use {@link
* #setMaxDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMaxDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMaxDurationPerAckExtension(
org.threeten.bp.Duration maxDurationPerAckExtension) {
return setMaxDurationPerAckExtensionDuration(toJavaTimeDuration(maxDurationPerAckExtension));
}

/**
* Set the upper bound for a single mod ack extention period.
*
Expand All @@ -621,7 +655,8 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
*
* <p>MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
public Builder setMaxDurationPerAckExtensionDuration(
java.time.Duration maxDurationPerAckExtension) {
// If a non-default min is set, make sure min is less than max
Preconditions.checkArgument(
maxDurationPerAckExtension.toMillis() >= 0
Expand All @@ -633,6 +668,16 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
return this;
}

/**
* This method is obsolete. Use {@link
* #setMinDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMinDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMinDurationPerAckExtension(
org.threeten.bp.Duration minDurationPerAckExtension) {
return setMinDurationPerAckExtensionDuration(toJavaTimeDuration(minDurationPerAckExtension));
}

/**
* Set the lower bound for a single mod ack extention period.
*
Expand All @@ -643,7 +688,8 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
*
* <p>MinDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
public Builder setMinDurationPerAckExtensionDuration(
java.time.Duration minDurationPerAckExtension) {
// If a non-default max is set, make sure min is less than max
Preconditions.checkArgument(
minDurationPerAckExtension.toMillis() >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.threeten.bp.Duration;

/**
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -32,8 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/**
* Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
Expand Down
Loading
Loading