-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Treat ack timeout more like a publish timeout #31303
Changes from 3 commits
82ecd90
4c9b6f8
750420b
087518e
08f15fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterName; | ||
import org.elasticsearch.cluster.ClusterState; | ||
|
@@ -39,6 +40,7 @@ | |
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.BaseFuture; | ||
import org.elasticsearch.discovery.Discovery; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.elasticsearch.test.MockLogAppender; | ||
import org.elasticsearch.test.junit.annotations.TestLogging; | ||
|
@@ -65,6 +67,7 @@ | |
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
|
||
import static java.util.Collections.emptyMap; | ||
import static java.util.Collections.emptySet; | ||
|
@@ -680,6 +683,132 @@ public void onFailure(String source, Exception e) { | |
mockAppender.assertAllExpectationsMatched(); | ||
} | ||
|
||
public void testAcking() throws InterruptedException { | ||
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); | ||
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); | ||
final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); | ||
TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name", | ||
MasterServiceTests.class.getSimpleName()).build(), threadPool); | ||
ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) | ||
.nodes(DiscoveryNodes.builder() | ||
.add(node1) | ||
.add(node2) | ||
.add(node3) | ||
.localNodeId(node1.getId()) | ||
.masterNodeId(node1.getId())) | ||
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); | ||
final AtomicReference<BiConsumer<ClusterChangedEvent, Discovery.AckListener>> publisherRef = new AtomicReference<>(); | ||
timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l)); | ||
timedMasterService.setClusterStateSupplier(() -> initialClusterState); | ||
timedMasterService.start(); | ||
|
||
|
||
// check that we don't time out before even committing the cluster state | ||
{ | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
publisherRef.set((clusterChangedEvent, ackListener) -> { | ||
throw new Discovery.FailedToCommitClusterStateException("mock exception"); | ||
}); | ||
|
||
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) { | ||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
return ClusterState.builder(currentState).build(); | ||
} | ||
|
||
@Override | ||
public TimeValue ackTimeout() { | ||
return TimeValue.ZERO; | ||
} | ||
|
||
@Override | ||
public TimeValue timeout() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we want to rename this one to avoid confusion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this one. Currently this is defined in |
||
return null; | ||
} | ||
|
||
@Override | ||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
fail(); | ||
} | ||
|
||
@Override | ||
protected Void newResponse(boolean acknowledged) { | ||
fail(); | ||
return null; | ||
} | ||
|
||
@Override | ||
public void onFailure(String source, Exception e) { | ||
latch.countDown(); | ||
} | ||
|
||
@Override | ||
public void onAckTimeout() { | ||
fail(); | ||
} | ||
}); | ||
|
||
latch.await(); | ||
} | ||
|
||
// check that we timeout if commit took too long | ||
{ | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100)); | ||
|
||
publisherRef.set((clusterChangedEvent, ackListener) -> { | ||
ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100))); | ||
ackListener.onNodeAck(node1, null); | ||
ackListener.onNodeAck(node2, null); | ||
ackListener.onNodeAck(node3, null); | ||
}); | ||
|
||
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) { | ||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
return ClusterState.builder(currentState).build(); | ||
} | ||
|
||
@Override | ||
public TimeValue ackTimeout() { | ||
return ackTimeout; | ||
} | ||
|
||
@Override | ||
public TimeValue timeout() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
// ok | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we we want to verify we ended up here? i.e., cluster state was committed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 08f15fb |
||
} | ||
|
||
@Override | ||
protected Void newResponse(boolean acknowledged) { | ||
fail(); | ||
return null; | ||
} | ||
|
||
@Override | ||
public void onFailure(String source, Exception e) { | ||
fail(); | ||
} | ||
|
||
@Override | ||
public void onAckTimeout() { | ||
latch.countDown(); | ||
} | ||
}); | ||
|
||
latch.await(); | ||
} | ||
|
||
timedMasterService.close(); | ||
} | ||
|
||
static class TimedMasterService extends MasterService { | ||
|
||
public volatile Long currentTimeOverride = null; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java docs please on what the commitTime means/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 087518e