Skip to content

Commit

Permalink
temporary merge PR 32743 (#32776)
Browse files Browse the repository at this point in the history
This is a temporary (squashed) commit of #32743 till ed4feb6,
to be reverted and replaced by the final version of this PR
  • Loading branch information
Hendrik Muhs authored Aug 10, 2018
1 parent fd708ed commit 1a00847
Show file tree
Hide file tree
Showing 24 changed files with 729 additions and 425 deletions.
6 changes: 3 additions & 3 deletions x-pack/docs/en/rest-api/rollup/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
},
Expand Down Expand Up @@ -268,7 +268,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -13,7 +13,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

Expand All @@ -24,45 +23,46 @@
* and are only for external monitoring/reference. Statistics are not persisted with the job, so if the
* allocated task is shutdown/restarted on a different node all the stats will reset.
*/
public class RollupJobStats implements ToXContentObject, Writeable {
public class IndexerJobStats implements ToXContentObject, Writeable {

public static final ParseField NAME = new ParseField("job_stats");

private static ParseField NUM_PAGES = new ParseField("pages_processed");
private static ParseField NUM_DOCUMENTS = new ParseField("documents_processed");
private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed");
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
// BWC for RollupJobStats
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed").withDeprecation("rollups_indexed");
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");

private long numPages = 0;
private long numDocuments = 0;
private long numRollups = 0;
private long numInputDocuments = 0;
private long numOuputDocuments = 0;
private long numInvocations = 0;

public static final ConstructingObjectParser<RollupJobStats, Void> PARSER =
public static final ConstructingObjectParser<IndexerJobStats, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(),
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new IndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));

static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_ROLLUPS);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
}

public RollupJobStats() {
public IndexerJobStats() {
}

public RollupJobStats(long numPages, long numDocuments, long numRollups, long numInvocations) {
public IndexerJobStats(long numPages, long numDocuments, long numOuputDocuments, long numInvocations) {
this.numPages = numPages;
this.numDocuments = numDocuments;
this.numRollups = numRollups;
this.numInputDocuments = numDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
}

public RollupJobStats(StreamInput in) throws IOException {
public IndexerJobStats(StreamInput in) throws IOException {
this.numPages = in.readVLong();
this.numDocuments = in.readVLong();
this.numRollups = in.readVLong();
this.numInputDocuments = in.readVLong();
this.numOuputDocuments = in.readVLong();
this.numInvocations = in.readVLong();
}

Expand All @@ -71,15 +71,15 @@ public long getNumPages() {
}

public long getNumDocuments() {
return numDocuments;
return numInputDocuments;
}

public long getNumInvocations() {
return numInvocations;
}

public long getNumRollups() {
return numRollups;
public long getOutputDocuments() {
return numOuputDocuments;
}

public void incrementNumPages(long n) {
Expand All @@ -89,28 +89,28 @@ public void incrementNumPages(long n) {

public void incrementNumDocuments(long n) {
assert(n >= 0);
numDocuments += n;
numInputDocuments += n;
}

public void incrementNumInvocations(long n) {
assert(n >= 0);
numInvocations += n;
}

public void incrementNumRollups(long n) {
public void incrementNumOutputDocuments(long n) {
assert(n >= 0);
numRollups += n;
numOuputDocuments += n;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numPages);
out.writeVLong(numDocuments);
out.writeVLong(numRollups);
out.writeVLong(numInputDocuments);
out.writeVLong(numOuputDocuments);
out.writeVLong(numInvocations);
}

public static RollupJobStats fromXContent(XContentParser parser) {
public static IndexerJobStats fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
Expand All @@ -122,8 +122,8 @@ public static RollupJobStats fromXContent(XContentParser parser) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUM_PAGES.getPreferredName(), numPages);
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments);
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups);
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
builder.endObject();
return builder;
Expand All @@ -139,18 +139,16 @@ public boolean equals(Object other) {
return false;
}

RollupJobStats that = (RollupJobStats) other;
IndexerJobStats that = (IndexerJobStats) other;

return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numDocuments, that.numDocuments)
&& Objects.equals(this.numRollups, that.numRollups)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numDocuments, numRollups, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.action.index.IndexRequest;

import java.util.List;

/**
* Result object to hold the result of 1 iteration of iterative indexing.
* Acts as an interface between the implementation and the generic indexer.
*/
public class Iteration<JobPosition> {

private final boolean isDone;
private final JobPosition position;
private final List<IndexRequest> toIndex;

/**
* Constructor for the result of 1 iteration.
*
* @param toIndex the list of requests to be indexed
* @param position the extracted, persistable position of the job required for the search phase
* @param isDone true if source is exhausted and job should go to sleep
*
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
*/
public Iteration(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
this.toIndex = toIndex;
this.position = position;
this.isDone = isDone;
}

/**
* Returns true if this indexing iteration is done and job should go into sleep mode.
*/
public boolean isDone() {
return isDone;
}

/**
* Return the position of the job, a generic to be passed to the next query construction.
*
* @return the position
*/
public JobPosition getPosition() {
return position;
}

/**
* List of requests to be passed to bulk indexing.
*
* @return List of index requests.
*/
public List<IndexRequest> getToIndex() {
return toIndex;
}
}
Loading

0 comments on commit 1a00847

Please sign in to comment.