Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability #32743

Merged
merged 24 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bb39f55
refactor rollup job
Aug 8, 2018
5706389
cleanup, simplify and add documentation
Aug 9, 2018
b6d681a
refactor Jobstats
Aug 9, 2018
106f654
fix some namings and style
Aug 9, 2018
a3f53ee
remove RollupJobStats
Aug 9, 2018
b51853b
adapt tests for fieldname change
Aug 10, 2018
5b2794c
add documentation
Aug 10, 2018
2e858c0
add a simple test
Aug 10, 2018
a5f6d93
rename Iteration to IterationResult
Aug 13, 2018
57c9986
address review comments
Aug 13, 2018
8c339cc
Revert "adapt tests for fieldname change"
Aug 13, 2018
c58759e
fix test for deprecation warning
Aug 14, 2018
d521f9d
Revert "fix test for deprecation warning"
Aug 14, 2018
948e3c3
implement BWC based on request params
Aug 14, 2018
f7385b0
Add a yaml test, clean up REST action a bit, add back getNumRollups()
polyfractal Aug 20, 2018
4e8ab17
Change method name to onStartJob()
polyfractal Aug 21, 2018
b583070
Merge remote-tracking branch 'origin/master' into rollup-indexer-refa…
polyfractal Aug 21, 2018
7366c63
Merge remote-tracking branch 'origin/master' into rollup-indexer-refa…
polyfractal Aug 24, 2018
11d27aa
Make IndexerJobStats abstract, rename IterativeIndexer
polyfractal Aug 27, 2018
3f5f54a
Merge remote-tracking branch 'origin/master' into rollup-indexer-refa…
polyfractal Aug 27, 2018
243dfb1
Merge remote-tracking branch 'origin/master' into rollup-indexer-refa…
polyfractal Aug 27, 2018
0f02c19
address review comments
polyfractal Aug 27, 2018
6011a27
Use mock JobStats instead of Rollup's version
polyfractal Aug 27, 2018
cff5a92
Remove more uses of abstract class
polyfractal Aug 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public abstract class AbstractSerializingTestCase<T extends ToXContent & Writeab
* Generic test that creates new instance from the test instance and checks
* both for equality and asserts equality on the two instances.
*/
public final void testFromXContent() throws IOException {
public void testFromXContent() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about removing the final keyword here. I don't like that someone could override the test method to make the test inconsistent across implementations of AbstractSerialzingTestCase. Also, I think the main execution of our tests should be testing the non-deprecated names so we should not be expecting warnings in these test methods, instead we should make these test methods use the non-deprecated path and test the deprecation in specific test methods on the JobWrapperSerializingTests class instead

Copy link
Author

@hendrikmuhs hendrikmuhs Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment, this is caused by the handling of the deprecated name, which I think is not the way to go as we spam the log for no good reason, therefore I will revert this anyhow.

AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(),
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, true, getToXContentParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -13,7 +13,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

Expand All @@ -24,45 +23,47 @@
* and are only for external monitoring/reference. Statistics are not persisted with the job, so if the
* allocated task is shutdown/restarted on a different node all the stats will reset.
*/
public class RollupJobStats implements ToXContentObject, Writeable {
public class IndexerJobStats implements ToXContentObject, Writeable {

public static final ParseField NAME = new ParseField("job_stats");

private static ParseField NUM_PAGES = new ParseField("pages_processed");
private static ParseField NUM_DOCUMENTS = new ParseField("documents_processed");
private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed");
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
// BWC for RollupJobStats
private static String ROLLUP_BWC_NUM_OUTPUT_DOCUMENTS = "rollups_indexed";
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed").withDeprecation(ROLLUP_BWC_NUM_OUTPUT_DOCUMENTS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: you can pass the deprecated name as a second argument of the constructor here and this will only create one instance instead of creating an instance and then using it to create a second instance with withDeprecation(). Not a big thing at all and either way is fine but wanted to point it out as its how we have done this elsewhere

private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");

private long numPages = 0;
private long numDocuments = 0;
private long numRollups = 0;
private long numInputDocuments = 0;
private long numOuputDocuments = 0;
private long numInvocations = 0;

public static final ConstructingObjectParser<RollupJobStats, Void> PARSER =
public static final ConstructingObjectParser<IndexerJobStats, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(),
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new IndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));

static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_ROLLUPS);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
}

public RollupJobStats() {
public IndexerJobStats() {
}

public RollupJobStats(long numPages, long numDocuments, long numRollups, long numInvocations) {
public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
this.numPages = numPages;
this.numDocuments = numDocuments;
this.numRollups = numRollups;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
}

public RollupJobStats(StreamInput in) throws IOException {
public IndexerJobStats(StreamInput in) throws IOException {
this.numPages = in.readVLong();
this.numDocuments = in.readVLong();
this.numRollups = in.readVLong();
this.numInputDocuments = in.readVLong();
this.numOuputDocuments = in.readVLong();
this.numInvocations = in.readVLong();
}

Expand All @@ -71,15 +72,15 @@ public long getNumPages() {
}

public long getNumDocuments() {
return numDocuments;
return numInputDocuments;
}

public long getNumInvocations() {
return numInvocations;
}

public long getNumRollups() {
return numRollups;
public long getOutputDocuments() {
return numOuputDocuments;
}

public void incrementNumPages(long n) {
Expand All @@ -89,28 +90,28 @@ public void incrementNumPages(long n) {

public void incrementNumDocuments(long n) {
assert(n >= 0);
numDocuments += n;
numInputDocuments += n;
}

public void incrementNumInvocations(long n) {
assert(n >= 0);
numInvocations += n;
}

public void incrementNumRollups(long n) {
public void incrementNumOutputDocuments(long n) {
assert(n >= 0);
numRollups += n;
numOuputDocuments += n;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numPages);
out.writeVLong(numDocuments);
out.writeVLong(numRollups);
out.writeVLong(numInputDocuments);
out.writeVLong(numOuputDocuments);
out.writeVLong(numInvocations);
}

public static RollupJobStats fromXContent(XContentParser parser) {
public static IndexerJobStats fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
Expand All @@ -121,11 +122,20 @@ public static RollupJobStats fromXContent(XContentParser parser) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toUnwrappedXContent(builder, false);
builder.endObject();
return builder;
}

public XContentBuilder toUnwrappedXContent(XContentBuilder builder, boolean rollupBWC) throws IOException {
builder.field(NUM_PAGES.getPreferredName(), numPages);
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments);
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups);
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
if (rollupBWC) {
builder.field(ROLLUP_BWC_NUM_OUTPUT_DOCUMENTS, numOuputDocuments);
} else {
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
}
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
builder.endObject();
return builder;
}

Expand All @@ -139,18 +149,16 @@ public boolean equals(Object other) {
return false;
}

RollupJobStats that = (RollupJobStats) other;
IndexerJobStats that = (IndexerJobStats) other;

return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numDocuments, that.numDocuments)
&& Objects.equals(this.numRollups, that.numRollups)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numDocuments, numRollups, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.action.index.IndexRequest;

import java.util.List;

/**
* Result object to hold the result of 1 iteration of iterative indexing.
* Acts as an interface between the implementation and the generic indexer.
*/
public class IterationResult<JobPosition> {

private final boolean isDone;
private final JobPosition position;
private final List<IndexRequest> toIndex;

/**
* Constructor for the result of 1 iteration.
*
* @param toIndex the list of requests to be indexed
* @param position the extracted, persistable position of the job required for the search phase
* @param isDone true if source is exhausted and job should go to sleep
*
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
*/
public IterationResult(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
this.toIndex = toIndex;
this.position = position;
this.isDone = isDone;
}

/**
* Returns true if this indexing iteration is done and job should go into sleep mode.
*/
public boolean isDone() {
return isDone;
}

/**
* Return the position of the job, a generic to be passed to the next query construction.
*
* @return the position
*/
public JobPosition getPosition() {
return position;
}

/**
* List of requests to be passed to bulk indexing.
*
* @return List of index requests.
*/
public List<IndexRequest> getToIndex() {
return toIndex;
}
}
Loading