Skip to content

Commit

Permalink
Liberalize StreamOutput#writeStringList (elastic#37768)
Browse files Browse the repository at this point in the history
In some cases we only have a string collection instead of a string list
that we want to serialize out. We have a convenience method for writing
a list of strings, but no such method for writing a collection of
strings. Yet, a list of strings is a collection of strings, so we can
simply liberalize StreamOutput#writeStringList to be more generous in
the collections that it accepts and write out collections of strings
too. On the other side, we do not have a convenience method for reading
a list of strings. This commit addresses both of these issues.
  • Loading branch information
jasontedor authored Jan 23, 2019
1 parent 1c2ae91 commit 169cb38
Show file tree
Hide file tree
Showing 29 changed files with 114 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void readFrom(StreamInput in) throws IOException {
name = in.readString();

if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
indexPatterns = in.readList(StreamInput::readString);
indexPatterns = in.readStringList();
} else {
indexPatterns = Collections.singletonList(in.readString());
}
Expand Down Expand Up @@ -495,7 +495,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(cause);
out.writeString(name);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeStringList(indexPatterns);
out.writeStringCollection(indexPatterns);
} else {
out.writeString(indexPatterns.size() > 0 ? indexPatterns.get(0) : "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public static IndexTemplateMetaData readFrom(StreamInput in) throws IOException
Builder builder = new Builder(in.readString());
builder.order(in.readInt());
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
builder.patterns(in.readList(StreamInput::readString));
builder.patterns(in.readStringList());
} else {
builder.patterns(Collections.singletonList(in.readString()));
}
Expand Down Expand Up @@ -224,7 +224,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeInt(order);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeStringList(patterns);
out.writeStringCollection(patterns);
} else {
out.writeString(patterns.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,12 +946,26 @@ public <T extends Streamable> List<T> readStreamableList(Supplier<T> constructor
}

/**
* Reads a list of objects
* Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)} or
* {@link StreamOutput#writeStreamableList(List)}.
*
* @return the list of objects
* @throws IOException if an I/O exception occurs reading the list
*/
public <T> List<T> readList(Writeable.Reader<T> reader) throws IOException {
public <T> List<T> readList(final Writeable.Reader<T> reader) throws IOException {
return readCollection(reader, ArrayList::new);
}

/**
* Reads a list of strings. The list is expected to have been written using {@link StreamOutput#writeStringCollection(Collection)}.
*
* @return the list of strings
* @throws IOException if an I/O exception occurs reading the list
*/
public List<String> readStringList() throws IOException {
return readList(StreamInput::readString);
}

/**
* Reads a set of objects
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,23 +1048,27 @@ public void writeList(List<? extends Writeable> list) throws IOException {
}

/**
* Writes a collection of generic objects via a {@link Writer}
* Writes a collection of objects via a {@link Writer}.
*
* @param collection the collection of objects
* @throws IOException if an I/O exception occurs writing the collection
*/
public <T> void writeCollection(Collection<T> collection, Writer<T> writer) throws IOException {
public <T> void writeCollection(final Collection<T> collection, final Writer<T> writer) throws IOException {
writeVInt(collection.size());
for (T val: collection) {
for (final T val: collection) {
writer.write(this, val);
}
}

/**
* Writes a list of strings
* Writes a collection of a strings. The corresponding collection can be read from a stream input using
* {@link StreamInput#readList(Writeable.Reader)}.
*
* @param collection the collection of strings
* @throws IOException if an I/O exception occurs writing the collection
*/
public void writeStringList(List<String> list) throws IOException {
writeVInt(list.size());
for (String string: list) {
this.writeString(string);
}
public void writeStringCollection(final Collection<String> collection) throws IOException {
writeCollection(collection, StreamOutput::writeString);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public final class RecoveryResponse extends TransportResponse {

RecoveryResponse(StreamInput in) throws IOException {
super(in);
phase1FileNames = in.readList(StreamInput::readString);
phase1FileNames = in.readStringList();
phase1FileSizes = in.readList(StreamInput::readVLong);
phase1ExistingFileNames = in.readList(StreamInput::readString);
phase1ExistingFileNames = in.readStringList();
phase1ExistingFileSizes = in.readList(StreamInput::readVLong);
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
Expand All @@ -76,9 +76,9 @@ public final class RecoveryResponse extends TransportResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringList(phase1FileNames);
out.writeStringCollection(phase1FileNames);
out.writeCollection(phase1FileSizes, StreamOutput::writeVLong);
out.writeStringList(phase1ExistingFileNames);
out.writeStringCollection(phase1ExistingFileNames);
out.writeCollection(phase1ExistingFileSizes, StreamOutput::writeVLong);
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public PluginInfo(final StreamInput in) throws IOException {
}
this.classname = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
extendedPlugins = in.readList(StreamInput::readString);
extendedPlugins = in.readStringList();
} else {
extendedPlugins = Collections.emptyList();
}
Expand All @@ -128,7 +128,7 @@ public void writeTo(final StreamOutput out) throws IOException {
}
out.writeString(classname);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(extendedPlugins);
out.writeStringCollection(extendedPlugins);
}
out.writeBoolean(hasNativeController);
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta2) && out.getVersion().before(Version.V_6_3_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class InternalComposite
public InternalComposite(StreamInput in) throws IOException {
super(in);
this.size = in.readVInt();
this.sourceNames = in.readList(StreamInput::readString);
this.sourceNames = in.readStringList();
this.formats = new ArrayList<>(sourceNames.size());
for (int i = 0; i < sourceNames.size(); i++) {
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
Expand All @@ -90,7 +90,7 @@ public InternalComposite(StreamInput in) throws IOException {
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(size);
out.writeStringList(sourceNames);
out.writeStringCollection(sourceNames);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
for (DocValueFormat format : formats) {
out.writeNamedWriteable(format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
}
}
if (in.readBoolean()) {
stats = in.readList(StreamInput::readString);
stats = in.readStringList();
}
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
terminateAfter = in.readVInt();
Expand Down Expand Up @@ -311,7 +311,7 @@ public void writeTo(StreamOutput out) throws IOException {
boolean hasStats = stats != null;
out.writeBoolean(hasStats);
if (hasStats) {
out.writeStringList(stats);
out.writeStringCollection(stats);
}
out.writeOptionalWriteable(suggestBuilder);
out.writeVInt(terminateAfter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.common.io.stream;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -39,6 +41,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -293,15 +296,27 @@ public int hashCode() {

}

final int length = randomIntBetween(0, 16);
final Collection<FooBar> fooBars = new ArrayList<>(length);
runWriteReadCollectionTest(
() -> new FooBar(randomInt(), randomInt()), StreamOutput::writeCollection, in -> in.readList(FooBar::new));
}

public void testStringCollection() throws IOException {
runWriteReadCollectionTest(() -> randomUnicodeOfLength(16), StreamOutput::writeStringCollection, StreamInput::readStringList);
}

private <T> void runWriteReadCollectionTest(
final Supplier<T> supplier,
final CheckedBiConsumer<StreamOutput, Collection<T>, IOException> writer,
final CheckedFunction<StreamInput, Collection<T>, IOException> reader) throws IOException {
final int length = randomIntBetween(0, 10);
final Collection<T> collection = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
fooBars.add(new FooBar(randomInt(), randomInt()));
collection.add(supplier.get());
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeCollection(fooBars);
writer.accept(out, collection);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(fooBars, equalTo(in.readList(FooBar::new)));
assertThat(collection, equalTo(reader.apply(in)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public AutoFollowPattern(String remoteCluster,

public AutoFollowPattern(StreamInput in) throws IOException {
remoteCluster = in.readString();
leaderIndexPatterns = in.readList(StreamInput::readString);
leaderIndexPatterns = in.readStringList();
followIndexPattern = in.readOptionalString();
maxReadRequestOperationCount = in.readOptionalVInt();
maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
Expand Down Expand Up @@ -350,7 +350,7 @@ public TimeValue getPollTimeout() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(remoteCluster);
out.writeStringList(leaderIndexPatterns);
out.writeStringCollection(leaderIndexPatterns);
out.writeOptionalString(followIndexPattern);
out.writeOptionalVInt(maxReadRequestOperationCount);
out.writeOptionalWriteable(maxReadRequestSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public Request(StreamInput in) throws IOException {
super(in);
name = in.readString();
remoteCluster = in.readString();
leaderIndexPatterns = in.readList(StreamInput::readString);
leaderIndexPatterns = in.readStringList();
followIndexNamePattern = in.readOptionalString();
maxReadRequestOperationCount = in.readOptionalVInt();
maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
Expand All @@ -294,7 +294,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(remoteCluster);
out.writeStringList(leaderIndexPatterns);
out.writeStringCollection(leaderIndexPatterns);
out.writeOptionalString(followIndexNamePattern);
out.writeOptionalVInt(maxReadRequestOperationCount);
out.writeOptionalWriteable(maxReadRequestSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failedIndexes = in.readList(StreamInput::readString);
failedIndexes = in.readStringList();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringList(failedIndexes);
out.writeStringCollection(failedIndexes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void readFrom(StreamInput in) throws IOException {
timeout = in.readOptionalTimeValue();
charset = in.readOptionalString();
format = in.readBoolean() ? in.readEnum(FileStructure.Format.class) : null;
columnNames = in.readBoolean() ? in.readList(StreamInput::readString) : null;
columnNames = in.readBoolean() ? in.readStringList() : null;
hasHeaderRow = in.readOptionalBoolean();
delimiter = in.readBoolean() ? (char) in.readVInt() : null;
quote = in.readBoolean() ? (char) in.readVInt() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
expandedJobsIds = in.readList(StreamInput::readString);
expandedJobsIds = in.readStringList();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
Expand All @@ -94,7 +94,7 @@ public Request(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeStringList(expandedJobsIds);
out.writeStringCollection(expandedJobsIds);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public DatafeedParams(StreamInput in) throws IOException {
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
jobId = in.readOptionalString();
datafeedIndices = in.readList(StreamInput::readString);
datafeedIndices = in.readStringList();
}
}

Expand Down Expand Up @@ -274,7 +274,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalString(jobId);
out.writeStringList(datafeedIndices);
out.writeStringCollection(datafeedIndices);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,14 @@ public DatafeedConfig(StreamInput in) throws IOException {
this.queryDelay = in.readOptionalTimeValue();
this.frequency = in.readOptionalTimeValue();
if (in.readBoolean()) {
this.indices = Collections.unmodifiableList(in.readList(StreamInput::readString));
this.indices = Collections.unmodifiableList(in.readStringList());
} else {
this.indices = null;
}
// This consumes the list of types if there was one.
if (in.getVersion().before(Version.V_7_0_0)) {
if (in.readBoolean()) {
in.readList(StreamInput::readString);
in.readStringList();
}
}
if (in.getVersion().before(Version.V_6_6_0)) {
Expand Down Expand Up @@ -408,15 +408,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(frequency);
if (indices != null) {
out.writeBoolean(true);
out.writeStringList(indices);
out.writeStringCollection(indices);
} else {
out.writeBoolean(false);
}
// Write the now removed types to prior versions.
// An empty list is expected
if (out.getVersion().before(Version.V_7_0_0)) {
out.writeBoolean(true);
out.writeStringList(Collections.emptyList());
out.writeStringCollection(Collections.emptyList());
}
if (out.getVersion().before(Version.V_6_6_0)) {
out.writeNamedWriteable(getParsedQuery());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ public DatafeedUpdate(StreamInput in) throws IOException {
this.queryDelay = in.readOptionalTimeValue();
this.frequency = in.readOptionalTimeValue();
if (in.readBoolean()) {
this.indices = in.readList(StreamInput::readString);
this.indices = in.readStringList();
} else {
this.indices = null;
}
// This consumes the list of types if there was one.
if (in.getVersion().before(Version.V_7_0_0)) {
if (in.readBoolean()) {
in.readList(StreamInput::readString);
in.readStringList();
}
}
this.query = in.readOptionalNamedWriteable(QueryBuilder.class);
Expand Down Expand Up @@ -148,15 +148,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(frequency);
if (indices != null) {
out.writeBoolean(true);
out.writeStringList(indices);
out.writeStringCollection(indices);
} else {
out.writeBoolean(false);
}
// Write the now removed types to prior versions.
// An empty list is expected
if (out.getVersion().before(Version.V_7_0_0)) {
out.writeBoolean(true);
out.writeStringList(Collections.emptyList());
out.writeStringCollection(Collections.emptyList());
}
out.writeOptionalNamedWriteable(query);
out.writeOptionalWriteable(aggregations);
Expand Down
Loading

0 comments on commit 169cb38

Please sign in to comment.