Skip to content

Commit

Permalink
Use serializable exception in GCP listeners (#33657)
Browse files Browse the repository at this point in the history
We used TimeoutException here but that's not serializable. This commit
switches to a serializable exception so that we can test for the
exception type on the remote side.
  • Loading branch information
jasontedor committed Sep 13, 2018
1 parent 9864c86 commit 9a03d8b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;

Expand All @@ -33,7 +34,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -53,7 +53,8 @@ public interface GlobalCheckpointListener {
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
* non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
* non-null and an instance of {@link ElasticsearchTimeoutException}. If the global checkpoint is updated, the exception will be
* null.
*
* @param globalCheckpoint the updated global checkpoint
* @param e if non-null, the shard is closed or the listener timed out
Expand Down Expand Up @@ -96,8 +97,8 @@ public interface GlobalCheckpointListener {
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link ElasticsearchTimeoutException}.
* Passing null for the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
Expand Down Expand Up @@ -140,7 +141,7 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint
removed = listeners != null && listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
final ElasticsearchTimeoutException e = new ElasticsearchTimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
Expand Down Expand Up @@ -225,7 +226,7 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
assert e instanceof TimeoutException : e;
assert e instanceof ElasticsearchTimeoutException : e;
logger.warn("error notifying global checkpoint listener of timeout", caught);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -42,7 +43,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -512,10 +512,11 @@ public void testTimeout() throws InterruptedException {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
assertThat(e, hasToString(containsString(timeout.getStringRep())));
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<TimeoutException> t = ArgumentCaptor.forClass(TimeoutException.class);
final ArgumentCaptor<ElasticsearchTimeoutException> t =
ArgumentCaptor.forClass(ElasticsearchTimeoutException.class);
verify(mockLogger).trace(message.capture(), t.capture());
assertThat(message.getValue(), equalTo("global checkpoint listener timed out"));
assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep())));
Expand Down Expand Up @@ -547,7 +548,7 @@ public void testTimeoutNotificationUsesExecutor() throws InterruptedException {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
} finally {
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;

import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand Down Expand Up @@ -87,7 +88,6 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -702,7 +702,7 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
} finally {
latch.countDown();
Expand Down

0 comments on commit 9a03d8b

Please sign in to comment.