Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unload partitions when not publishing for better performance #1286

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;

@AutoValue
public abstract class SinglePartitionPublisherBuilder {
private static final Duration DEFAULT_UNLOAD_PERIOD = Duration.ofMinutes(5);

// Required parameters.
abstract TopicPath topic();

Expand All @@ -37,12 +41,16 @@ public abstract class SinglePartitionPublisherBuilder {

abstract BatchingSettings batchingSettings();

// Optional parameters.
abstract Duration unloadPeriod();

// For testing.
abstract PublisherBuilder.Builder underlyingBuilder();

public static Builder newBuilder() {
return new AutoValue_SinglePartitionPublisherBuilder.Builder()
.setUnderlyingBuilder(PublisherBuilder.builder());
.setUnderlyingBuilder(PublisherBuilder.builder())
.setUnloadPeriod(DEFAULT_UNLOAD_PERIOD);
}

@AutoValue.Builder
Expand All @@ -57,6 +65,9 @@ public abstract static class Builder {

public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);

// Optional parameters.
public abstract Builder setUnloadPeriod(Duration unloadPeriod);

// For testing.
@VisibleForTesting
abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder);
Expand All @@ -72,7 +83,11 @@ public Publisher<MessageMetadata> build() throws ApiException {
.setPartition(builder.partition())
.setStreamFactory(builder.streamFactory())
.setBatching(builder.batchingSettings());
return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition());
Partition partition = builder.partition();
Duration unloadPeriod = builder.unloadPeriod();
return new UnloadingPublisher(
() -> new SinglePartitionPublisher(publisherBuilder.build(), partition),
AlarmFactory.create(unloadPeriod));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.Publisher;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;

/** A publisher which tears down connections when inactive. */
class UnloadingPublisher extends AbstractApiService implements Publisher<MessageMetadata> {
private final Supplier<Publisher<MessageMetadata>> supplier;
private final Future<?> alarmFuture;

@GuardedBy("this")
private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();

@GuardedBy("this")
private boolean sawPublish = false;

UnloadingPublisher(Supplier<Publisher<MessageMetadata>> supplier, AlarmFactory unloadAlarm) {
this.supplier = supplier;
this.alarmFuture = unloadAlarm.newAlarm(this::onUnloadAlarm);
}

@Override
protected void doStart() {
notifyStarted();
}

@Override
protected synchronized void doStop() {
alarmFuture.cancel(false);
if (!publisher.isPresent()) {
notifyStopped();
return;
}
publisher
.get()
.addListener(
new Listener() {
@Override
public void terminated(State from) {
notifyStopped();
}
},
SystemExecutors.getFuturesExecutor());
publisher.get().stopAsync();
}

@Override
public synchronized ApiFuture<MessageMetadata> publish(Message message) {
sawPublish = true;
return getPublisher().publish(message);
}

private synchronized Publisher<MessageMetadata> getPublisher() {
if (!publisher.isPresent()) {
publisher = Optional.of(supplier.get());
publisher
.get()
.addListener(
new Listener() {
@Override
public void failed(State from, Throwable failure) {
notifyFailed(failure);
}
},
SystemExecutors.getFuturesExecutor());
publisher.get().startAsync().awaitRunning();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to block the publish() call here, while the stream is connecting?

In Go, I allowed the wire publisher to accept messages while it is starting up. The messages just get queued until the stream is connected.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this issue already exists- so I'm okay with blocking here. We should as a refactor fix this eventually.

}
return publisher.get();
}

@Override
public synchronized void cancelOutstandingPublishes() {
if (publisher.isPresent()) {
publisher.get().cancelOutstandingPublishes();
}
}

@Override
public synchronized void flush() throws IOException {
if (publisher.isPresent()) {
publisher.get().flush();
}
}

private synchronized void onUnloadAlarm() {
if (publisher.isPresent() && !sawPublish) {
publisher.get().stopAsync().awaitTerminated();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above, waiting here may block publish() due to the lock held.

publisher = Optional.empty();
}
sawPublish = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,39 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.RETURNS_SELF;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Spy;

@RunWith(JUnit4.class)
public class SinglePartitionPublisherTest {
abstract static class FakeOffsetPublisher extends FakeApiService implements Publisher<Offset> {}

@Spy private FakeOffsetPublisher underlying;
@Mock private PublishStreamFactory streamFactory;

private Publisher<MessageMetadata> pub;

@Before
public void setUp() {
initMocks(this);

TopicPath topic =
TopicPath.newBuilder()
.setName(TopicName.of("abc"))
.setProject(ProjectNumber.of(123))
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.build();
Partition partition = Partition.of(3);

PublisherBuilder.Builder mockBuilder = mock(PublisherBuilder.Builder.class, RETURNS_SELF);
when(mockBuilder.build()).thenReturn(underlying);

when(mockBuilder.setTopic(topic)).thenReturn(mockBuilder);
this.pub =
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topic)
.setPartition(partition)
.setUnderlyingBuilder(mockBuilder)
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build())
.setStreamFactory(streamFactory)
.build();
this.pub = new SinglePartitionPublisher(underlying, partition);
this.pub.startAsync().awaitRunning();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.protobuf.ByteString;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;

@RunWith(JUnit4.class)
public class UnloadingPublisherTest {
private static final Message MESSAGE =
Message.builder().setData(ByteString.copyFromUtf8("abc")).build();

@Rule public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

@Mock private Supplier<Publisher<MessageMetadata>> supplier;

abstract static class FakePublisher extends FakeApiService
implements Publisher<MessageMetadata> {};

@Spy private FakePublisher underlying1;
@Spy private FakePublisher underlying2;

@Mock private AlarmFactory alarmFactory;
private Runnable onAlarm;

private UnloadingPublisher publisher;

@Before
public void setUp() {
when(alarmFactory.newAlarm(any()))
.thenAnswer(
args -> {
onAlarm = args.getArgument(0);
return SettableApiFuture.create();
});
publisher = new UnloadingPublisher(supplier, alarmFactory);
Objects.requireNonNull(onAlarm);
publisher.startAsync().awaitRunning();
}

@Test
public void loadOnPublish() {
doReturn(underlying1).when(supplier).get();
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
doReturn(future).when(underlying1).publish(MESSAGE);

assertThat(publisher.publish(MESSAGE)).isSameInstanceAs(future);
}

@Test
public void underlyingFailureFails() throws Exception {
doReturn(underlying1).when(supplier).get();
doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE);
ApiFuture<?> unused = publisher.publish(MESSAGE);

Future<Void> failed = whenFailed(publisher);
underlying1.fail(new IllegalStateException("bad"));
failed.get();
}

@Test
public void twoAlarmsWithoutPublishShutsDown() throws Exception {
doReturn(underlying1).when(supplier).get();
doReturn(SettableApiFuture.create()).when(underlying1).publish(MESSAGE);
ApiFuture<?> future1 = publisher.publish(MESSAGE);

onAlarm.run();

assertThat(underlying1.isRunning()).isTrue();

ApiFuture<?> future2 = publisher.publish(MESSAGE);
onAlarm.run();
assertThat(underlying1.isRunning()).isTrue();

onAlarm.run();
assertThat(underlying1.isRunning()).isFalse();
}
}