Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize warning header de-duplication #37725

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1038,16 +1038,20 @@ public <T> void writeCollection(Collection<T> collection, Writer<T> writer) thro
}
}

/**
* Writes a list of strings
*/
public void writeStringList(List<String> list) throws IOException {
public void writeStringCollection(final Collection<String> list) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I factor this change out to #37768.

writeVInt(list.size());
for (String string: list) {
this.writeString(string);
}
}

/**
* Writes a list of strings
*/
public void writeStringList(List<String> list) throws IOException {
writeStringCollection(list);
}

/**
* Writes a list of {@link NamedWriteable} objects.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void deprecated(final Set<ThreadContext> threadContexts, final String message, f
while (iterator.hasNext()) {
try {
final ThreadContext next = iterator.next();
next.addResponseHeader("Warning", warningHeaderValue, DeprecationLogger::extractWarningValueFromWarningHeader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we need DeprecationLogger::extractWarningValueFromWarningHeader at all now? It seems to be used in some tests and an assert in DeprecationLogger but I think we can get rid of it there as well?

Then we can also get rid of ThreadContext#addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) and just have #addResponseHeader(final String key, final String value)?

In order to keep this PR small this could be done in a follow-up though as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was going to remove the dead code in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Then I'll approve now.

next.addResponseHeader("Warning", warningHeaderValue);
} catch (final IllegalStateException e) {
// ignored; it should be removed shortly
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -258,11 +259,11 @@ public Map<String, String> getHeaders() {
* @return Never {@code null}.
*/
public Map<String, List<String>> getResponseHeaders() {
Map<String, List<String>> responseHeaders = threadLocal.get().responseHeaders;
Map<String, Set<String>> responseHeaders = threadLocal.get().responseHeaders;
HashMap<String, List<String>> map = new HashMap<>(responseHeaders.size());

for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
map.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
for (Map.Entry<String, Set<String>> entry : responseHeaders.entrySet()) {
map.put(entry.getKey(), Collections.unmodifiableList(new ArrayList<>(entry.getValue())));
}

return Collections.unmodifiableMap(map);
Expand Down Expand Up @@ -405,7 +406,7 @@ default void restore() {
private static final class ThreadContextStruct {
private final Map<String, String> requestHeaders;
private final Map<String, Object> transientHeaders;
private final Map<String, List<String>> responseHeaders;
private final Map<String, Set<String>> responseHeaders;
private final boolean isSystemContext;
private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
private ThreadContextStruct(StreamInput in) throws IOException {
Expand All @@ -416,7 +417,23 @@ private ThreadContextStruct(StreamInput in) throws IOException {
}

this.requestHeaders = requestHeaders;
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
this.responseHeaders = in.readMap(StreamInput::readString, input -> {
final int size = input.readVInt();
if (size == 0) {
return Collections.emptySet();
} else if (size == 1) {
return Collections.singleton(input.readString());
} else {
// use a linked hash set to preserve order
final LinkedHashSet<String> values = new LinkedHashSet<>(size);
for (int i = 0; i < size; i++) {
final String value = input.readString();
final boolean exists = values.add(value);
assert exists == false : value;
}
return values;
}
});
this.transientHeaders = Collections.emptyMap();
isSystemContext = false; // we never serialize this it's a transient flag
this.warningHeadersSize = 0L;
Expand All @@ -430,7 +447,7 @@ private ThreadContextStruct setSystemContext() {
}

private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, List<String>> responseHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext) {
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
Expand All @@ -440,7 +457,7 @@ private ThreadContextStruct(Map<String, String> requestHeaders,
}

private ThreadContextStruct(Map<String, String> requestHeaders,
Map<String, List<String>> responseHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders, boolean isSystemContext,
long warningHeadersSize) {
this.requestHeaders = requestHeaders;
Expand Down Expand Up @@ -481,19 +498,18 @@ private ThreadContextStruct putHeaders(Map<String, String> headers) {
}
}

private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers) {
private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers) {
assert headers != null;
if (headers.isEmpty()) {
return this;
}
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
final Map<String, Set<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
for (Map.Entry<String, Set<String>> entry : headers.entrySet()) {
String key = entry.getKey();
final List<String> existingValues = newResponseHeaders.get(key);
final Set<String> existingValues = newResponseHeaders.get(key);
if (existingValues != null) {
List<String> newValues = Stream.concat(entry.getValue().stream(),
existingValues.stream()).distinct().collect(Collectors.toList());
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
Set<String> newValues = Stream.concat(entry.getValue().stream(), existingValues.stream()).collect(Collectors.toSet());
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
} else {
newResponseHeaders.put(key, entry.getValue());
}
Expand Down Expand Up @@ -523,20 +539,20 @@ private ThreadContextStruct putResponse(final String key, final String value, fi
}
}

final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
final List<String> existingValues = newResponseHeaders.get(key);
final Map<String, Set<String>> newResponseHeaders;
final Set<String> existingValues = responseHeaders.get(key);
if (existingValues != null) {
final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet());
assert existingValues.size() == existingUniqueValues.size() :
"existing values: [" + existingValues + "], existing unique values [" + existingUniqueValues + "]";
if (existingUniqueValues.contains(uniqueValue.apply(value))) {
if (existingValues.contains(uniqueValue.apply(value))) {
return this;
}
final List<String> newValues = new ArrayList<>(existingValues);
// preserve insertion order
final LinkedHashSet<String> newValues = new LinkedHashSet<>(existingValues);
newValues.add(value);
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
newResponseHeaders = new HashMap<>(responseHeaders);
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
} else {
newResponseHeaders.put(key, Collections.singletonList(value));
newResponseHeaders = new HashMap<>(responseHeaders);
newResponseHeaders.put(key, Collections.singleton(value));
}

//check if we can add another warning header - if max count within limits
Expand Down Expand Up @@ -588,7 +604,7 @@ private void writeTo(StreamOutput out, Map<String, String> defaultHeaders) throw
out.writeString(entry.getValue());
}

out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString);
out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
}
}

Expand Down