Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve response headers on cluster update task #31421

Merged
merged 1 commit into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ public <T> void submitStateUpdateTasks(final String source,
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.BeforeClass;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -177,6 +178,8 @@ public void testThreadContext() throws InterruptedException {

try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
final Map<String, String> expectedHeaders = Collections.singletonMap("test", "test");
final Map<String, List<String>> expectedResponseHeaders = Collections.singletonMap("testResponse",
Arrays.asList("testResponse"));
threadPool.getThreadContext().putHeader(expectedHeaders);

final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000));
Expand All @@ -187,6 +190,8 @@ public void testThreadContext() throws InterruptedException {
public ClusterState execute(ClusterState currentState) {
assertTrue(threadPool.getThreadContext().isSystemContext());
assertEquals(Collections.emptyMap(), threadPool.getThreadContext().getHeaders());
threadPool.getThreadContext().addResponseHeader("testResponse", "testResponse");
assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());

if (randomBoolean()) {
return ClusterState.builder(currentState).build();
Expand All @@ -201,13 +206,15 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
assertFalse(threadPool.getThreadContext().isSystemContext());
assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
latch.countDown();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
assertFalse(threadPool.getThreadContext().isSystemContext());
assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
latch.countDown();
}

Expand All @@ -229,20 +236,23 @@ public TimeValue timeout() {
public void onAllNodesAcked(@Nullable Exception e) {
assertFalse(threadPool.getThreadContext().isSystemContext());
assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
latch.countDown();
}

@Override
public void onAckTimeout() {
assertFalse(threadPool.getThreadContext().isSystemContext());
assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
assertEquals(expectedResponseHeaders, threadPool.getThreadContext().getResponseHeaders());
latch.countDown();
}

});

assertFalse(threadPool.getThreadContext().isSystemContext());
assertEquals(expectedHeaders, threadPool.getThreadContext().getHeaders());
assertEquals(Collections.emptyMap(), threadPool.getThreadContext().getResponseHeaders());
}

latch.await();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a short timeout here, I was just running the test without the fix and after the test itself throws an AssertionError it doesn't finish but simply hangs otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using this pattern all over the place (at least in the distributed systems tests). There are people in the team that prefer it this way rather than having a random timeout on the latch. The important thing is that the assertion trips and that the test fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason we don't use timeouts is that if they happen you don't get any info other than the time out. This way, the suite times out and you get a thread dump which helps (sometimes) to see deadlocks and where things are stuck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are people in the team that prefer it this way rather than having a random timeout on the latch

Then I guess I'm fine with it, it just made failing tests hang locally for me for quiet a while (I guess there are some hard timeouts after all).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification, Boaz.

Expand Down