Skip to content

Commit

Permalink
feat: introduce java.time variables and methods (#2271)
Browse files Browse the repository at this point in the history
* feat: introduce `java.time` variables and methods

* ignore package private public interface changes
  • Loading branch information
diegomarquezp authored Nov 21, 2024
1 parent a5f70a9 commit 7edfd9c
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 80 deletions.
14 changes: 14 additions & 0 deletions google-cloud-pubsub/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7005</differenceType>
<!--Ignore changes in this class because it's package private-->
<className>com/google/cloud/pubsub/v1/MessageDispatcher$Builder</className>
<method>*(org.threeten.bp.Duration)</method>
<to>*(java.time.Duration)</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<!--Ignore changes in this class because it's package private-->
<className>com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder</className>
<method>*(org.threeten.bp.Duration)</method>
<to>*(java.time.Duration)</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -48,9 +51,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

/**
* Dispatches messages to a message receiver while handling the messages acking and lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -73,7 +74,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
Expand Down Expand Up @@ -198,7 +198,7 @@ private Publisher(Builder builder) throws IOException {
// key?
retrySettingsBuilder
.setMaxAttempts(Integer.MAX_VALUE)
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
.setTotalTimeoutDuration(Duration.ofNanos(Long.MAX_VALUE));
}

PublisherStubSettings.Builder stubSettings =
Expand Down Expand Up @@ -740,7 +740,7 @@ public static final class Builder {
private static final double DEFAULT_MULTIPLIER = 4;
static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setDelayThresholdDuration(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setFlowControlSettings(
Expand All @@ -750,13 +750,13 @@ public static final class Builder {
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setTotalTimeout(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelay(DEFAULT_INITIAL_RETRY_DELAY)
.setTotalTimeoutDuration(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelayDuration(DEFAULT_INITIAL_RETRY_DELAY)
.setRetryDelayMultiplier(DEFAULT_MULTIPLIER)
.setMaxRetryDelay(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeout(DEFAULT_INITIAL_RPC_TIMEOUT)
.setMaxRetryDelayDuration(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeoutDuration(DEFAULT_INITIAL_RPC_TIMEOUT)
.setRpcTimeoutMultiplier(DEFAULT_MULTIPLIER)
.setMaxRpcTimeout(DEFAULT_MAX_RPC_TIMEOUT)
.setMaxRpcTimeoutDuration(DEFAULT_MAX_RPC_TIMEOUT)
.build();
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
private static final int THREADS_PER_CPU = 5;
Expand Down Expand Up @@ -876,9 +876,9 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
/** Configures the Publisher's retry parameters. */
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
retrySettings.getTotalTimeoutDuration().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
retrySettings.getInitialRpcTimeoutDuration().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.opentelemetry.api.trace.Span;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -68,7 +69,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */
final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.google.cloud.pubsub.v1;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.core.BetaApi;
import com.google.api.core.CurrentMillisClock;
import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand Down Expand Up @@ -55,7 +58,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/subscriber">subscriber</a> that is
Expand Down Expand Up @@ -98,24 +100,37 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private static final int MAX_INBOUND_METADATA_SIZE =
4 * 1024 * 1024; // 4MB API maximum metadata size

@InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD =
java.time.Duration.ofMinutes(60);

@InternalApi
static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
Duration.ofMinutes(1);
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
java.time.Duration.ofMinutes(1);

@InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0);
@InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0);
@InternalApi
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofMinutes(0);

@InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
@InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofSeconds(0);

@InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60);
@InternalApi
static final java.time.Duration MIN_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(10);

@InternalApi
static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60);
static final java.time.Duration MAX_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(600);

@InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5);
@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_DEFAULT = java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT =
java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration ACK_EXPIRATION_PADDING_DEFAULT = java.time.Duration.ofSeconds(5);

private static final Logger logger = Logger.getLogger(Subscriber.class.getName());

Expand All @@ -124,10 +139,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
private final java.time.Duration maxAckExtensionPeriod;
private final java.time.Duration maxDurationPerAckExtension;
private final boolean maxDurationPerAckExtensionDefaultUsed;
private final Duration minDurationPerAckExtension;
private final java.time.Duration minDurationPerAckExtension;
private final boolean minDurationPerAckExtensionDefaultUsed;

// The ExecutorProvider used to generate executors for processing messages.
Expand Down Expand Up @@ -490,10 +505,10 @@ public static final class Builder {
private MessageReceiver receiver;
private MessageReceiverWithAckResponse receiverWithAckResponse;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private java.time.Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private boolean minDurationPerAckExtensionDefaultUsed = true;
private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private boolean maxDurationPerAckExtensionDefaultUsed = true;

private boolean useLegacyFlowControl = false;
Expand All @@ -505,7 +520,7 @@ public static final class Builder {
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE)
.setKeepAliveTime(Duration.ofMinutes(5))
.setKeepAliveTimeDuration(java.time.Duration.ofMinutes(5))
.build();
private HeaderProvider headerProvider = new NoHeaderProvider();
private CredentialsProvider credentialsProvider =
Expand Down Expand Up @@ -596,6 +611,15 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* This method is obsolete. Use {@link #setMaxAckExtensionPeriodDuration(java.time.Duration)}
* instead.
*/
@ObsoleteApi("Use setMaxAckExtensionPeriodDuration(java.time.Duration) instead")
public Builder setMaxAckExtensionPeriod(org.threeten.bp.Duration maxAckExtensionPeriod) {
return setMaxAckExtensionPeriodDuration(toJavaTimeDuration(maxAckExtensionPeriod));
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand All @@ -605,12 +629,22 @@ public Builder setUseLegacyFlowControl(boolean value) {
*
* <p>A zero duration effectively disables auto deadline extensions.
*/
public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
public Builder setMaxAckExtensionPeriodDuration(java.time.Duration maxAckExtensionPeriod) {
Preconditions.checkArgument(maxAckExtensionPeriod.toMillis() >= 0);
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
return this;
}

/**
* This method is obsolete. Use {@link
* #setMaxDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMaxDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMaxDurationPerAckExtension(
org.threeten.bp.Duration maxDurationPerAckExtension) {
return setMaxDurationPerAckExtensionDuration(toJavaTimeDuration(maxDurationPerAckExtension));
}

/**
* Set the upper bound for a single mod ack extention period.
*
Expand All @@ -621,7 +655,8 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
*
* <p>MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
public Builder setMaxDurationPerAckExtensionDuration(
java.time.Duration maxDurationPerAckExtension) {
// If a non-default min is set, make sure min is less than max
Preconditions.checkArgument(
maxDurationPerAckExtension.toMillis() >= 0
Expand All @@ -633,6 +668,16 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
return this;
}

/**
* This method is obsolete. Use {@link
* #setMinDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMinDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMinDurationPerAckExtension(
org.threeten.bp.Duration minDurationPerAckExtension) {
return setMinDurationPerAckExtensionDuration(toJavaTimeDuration(minDurationPerAckExtension));
}

/**
* Set the lower bound for a single mod ack extention period.
*
Expand All @@ -643,7 +688,8 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
*
* <p>MinDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
public Builder setMinDurationPerAckExtensionDuration(
java.time.Duration minDurationPerAckExtension) {
// If a non-default max is set, make sure min is less than max
Preconditions.checkArgument(
minDurationPerAckExtension.toMillis() >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.threeten.bp.Duration;

/**
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -32,8 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/**
* Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
Expand Down
Loading

0 comments on commit 7edfd9c

Please sign in to comment.