Skip to content

Commit

Permalink
EQL: Introduce sequence internal paging (#58859)
Browse files Browse the repository at this point in the history
Refactor sequence matching classes in order to decouple querying from
results consumption (and matching).
Rename some classes to better convey their intent.

Introduce internal pagination of sequence algorithm, that is getting the
data in slices and, if needed, moving forward in order to find more
matches until either the dataset is consumer or the number of results
desired is found.
  • Loading branch information
costin authored Jul 2, 2020
1 parent f42d55d commit bcf2c11
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 345 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;

public class BoxedQueryRequest implements QueryRequest {

private final RangeQueryBuilder timestampRange;
private final RangeQueryBuilder tiebreakerRange;

private final SearchSourceBuilder searchSource;

public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
searchSource = original.searchSource();

// setup range queries and preserve their reference to simplify the update
timestampRange = rangeQuery(timestamp).timeZone("UTC").format("epoch_millis");
BoolQueryBuilder filter = boolQuery().filter(timestampRange);
if (tiebreaker != null) {
tiebreakerRange = rangeQuery(tiebreaker);
filter.filter(tiebreakerRange);
} else {
tiebreakerRange = null;
}
// add ranges to existing query
searchSource.query(filter.must(searchSource.query()));
}

@Override
public SearchSourceBuilder searchSource() {
return searchSource;
}

@Override
public void next(Ordinal ordinal) {
// reset existing constraints
timestampRange.gte(null).lte(null);
if (tiebreakerRange != null) {
tiebreakerRange.gte(null).lte(null);
}
// and leave only search_after
searchSource.searchAfter(ordinal.toArray());
}

public BoxedQueryRequest between(Ordinal begin, Ordinal end) {
timestampRange.gte(begin.timestamp()).lte(end.timestamp());

if (tiebreakerRange != null) {
tiebreakerRange.gte(begin.tiebreaker()).lte(end.tiebreaker());
}

return this;
}

@Override
public String toString() {
return searchSource.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,83 @@
package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.util.List;

public class Criterion implements QueryRequest {

private final SearchSourceBuilder searchSource;
private final List<HitExtractor> keyExtractors;
private final HitExtractor timestampExtractor;
private final HitExtractor tiebreakerExtractor;

// search after markers
private Ordinal startMarker;
private Ordinal stopMarker;

private boolean reverse;

//TODO: should accept QueryRequest instead of another SearchSourceBuilder
public Criterion(SearchSourceBuilder searchSource,
List<HitExtractor> searchAfterExractors,
HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor,
boolean reverse) {
this.searchSource = searchSource;
this.keyExtractors = searchAfterExractors;
this.timestampExtractor = timestampExtractor;
this.tiebreakerExtractor = tiebreakerExtractor;

this.startMarker = null;
this.stopMarker = null;
public class Criterion<Q extends QueryRequest> {

private final int stage;
private final Q queryRequest;
private final List<HitExtractor> keys;
private final HitExtractor timestamp;
private final HitExtractor tiebreaker;

private final boolean reverse;

Criterion(int stage,
Q queryRequest,
List<HitExtractor> keys,
HitExtractor timestamp,
HitExtractor tiebreaker,
boolean reverse) {
this.stage = stage;
this.queryRequest = queryRequest;
this.keys = keys;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;

this.reverse = reverse;
}

@Override
public SearchSourceBuilder searchSource() {
return searchSource;
public int stage() {
return stage;
}

public List<HitExtractor> keyExtractors() {
return keyExtractors;
boolean reverse() {
return reverse;
}

public HitExtractor timestampExtractor() {
return timestampExtractor;
public Q queryRequest() {
return queryRequest;
}

public HitExtractor tiebreakerExtractor() {
return tiebreakerExtractor;
public SequenceKey key(SearchHit hit) {
SequenceKey key;
if (keys.isEmpty()) {
key = SequenceKey.NONE;
} else {
Object[] docKeys = new Object[keys.size()];
for (int i = 0; i < docKeys.length; i++) {
docKeys[i] = keys.get(i).extract(hit);
}
key = new SequenceKey(docKeys);
}
return key;
}

@SuppressWarnings({ "unchecked" })
public Ordinal ordinal(SearchHit hit) {

Object ts = timestampExtractor.extract(hit);
Object ts = timestamp.extract(hit);
if (ts instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}

long timestamp = ((Number) ts).longValue();
Comparable<Object> tiebreaker = null;
Comparable<Object> tbreaker = null;

if (tiebreakerExtractor != null) {
Object tb = tiebreakerExtractor.extract(hit);
if (tiebreaker != null) {
Object tb = tiebreaker.extract(hit);
if (tb instanceof Comparable == false) {
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
tiebreaker = (Comparable<Object>) tb;
tbreaker = (Comparable<Object>) tb;
}
return new Ordinal(timestamp, tiebreaker);
}

public void startMarker(Ordinal ordinal) {
startMarker = ordinal;
}

public void stopMarker(Ordinal ordinal) {
stopMarker = ordinal;
}

public Ordinal nextMarker() {
return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker;
}

public Criterion useMarker(Ordinal marker) {
searchSource.searchAfter(marker.toArray());
return this;
}

public Iterable<SearchHit> iterable(List<SearchHit> hits) {
return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator();
return new Ordinal(timestamp, tbreaker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.util.Check;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -48,27 +47,51 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();

List<Criterion> criteria = new ArrayList<>(plans.size() - 1);

boolean descending = direction == OrderDirection.DESC;

// fields
HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
// NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name
String timestampName = Expressions.name(timestamp);
String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null;

// secondary criteria
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);

// build a criterion for each query
for (int i = 0; i < plans.size() - 1; i++) {
for (int i = 0; i < plans.size(); i++) {
List<Attribute> keys = listOfKeys.get(i);
// fields
HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
List<HitExtractor> keyExtractors = hitExtractors(keys, extractorRegistry);

PhysicalPlan query = plans.get(i);
// search query
// TODO: this could be generalized into an exec only query
Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass());
QueryRequest request = ((EsQueryExec) query).queryRequest(session);
// base query remains descending, the rest need to flip
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending));
if (query instanceof EsQueryExec) {
QueryRequest original = ((EsQueryExec) query).queryRequest(session);

BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending);
criteria.add(criterion);
} else {
// until
if (i != plans.size() - 1) {
throw new EqlIllegalArgumentException("Expected a query but got [{}]", query.getClass());
} else {
criteria.add(null);
}
}
}
return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit);

int completionStage = criteria.size() - 1;
Matcher matcher = new Matcher(completionStage, maxSpan, limit);

TumblingWindow w = new TumblingWindow(new BasicQueryClient(session),
criteria.subList(0, completionStage),
criteria.get(completionStage),
matcher);

return w;
}

private HitExtractor timestampExtractor(HitExtractor hitExtractor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;

import java.util.Objects;

class KeyAndOrdinal {
public class KeyAndOrdinal {
final SequenceKey key;
final Ordinal ordinal;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
import org.elasticsearch.xpack.eql.session.Payload;

import java.util.List;

/**
* Executable tracking sequences at runtime.
*/
class Matcher {

// NB: just like in a list, this represents the total number of stages yet counting starts at 0
private final SequenceStateMachine stateMachine;
private final int numberOfStages;

Matcher(int numberOfStages, TimeValue maxSpan, Limit limit) {
this.numberOfStages = numberOfStages;
this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit);
}

/**
* Match hits for the given stage.
* Returns false if the process needs to be stopped.
*/
boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
for (Tuple<KeyAndOrdinal, SearchHit> tuple : hits) {
KeyAndOrdinal ko = tuple.v1();
SearchHit hit = tuple.v2();

if (stage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
stateMachine.trackSequence(seq);
} else {
stateMachine.match(stage, ko.key, ko.ordinal, hit);

// early skip in case of reaching the limit
// check the last stage to avoid calling the state machine in other stages
if (stateMachine.reachedLimit()) {
return false;
}
}
}
return true;
}

boolean until(Iterable<Ordinal> markers) {
// no-op so far

return false;
}


public boolean hasCandidates(int stage) {
return stateMachine.hasCandidates(stage);
}

Payload payload(long startTime) {
List<Sequence> completed = stateMachine.completeSequences();
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
return new SequencePayload(completed, false, tookTime);
}
}
Loading

0 comments on commit bcf2c11

Please sign in to comment.