From bcf2c1141302f3f98c85e82d2c501aa02c8540e9 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 2 Jul 2020 13:19:13 +0300 Subject: [PATCH] EQL: Introduce sequence internal paging (#58859) Refactor sequence matching classes in order to decouple querying from results consumption (and matching). Rename some classes to better convey their intent. Introduce internal pagination of sequence algorithm, that is getting the data in slices and, if needed, moving forward in order to find more matches until either the dataset is consumer or the number of results desired is found. --- .../assembler/BoxedQueryRequest.java | 71 +++++++ .../eql/execution/assembler/Criterion.java | 113 +++++------ .../execution/assembler/ExecutionManager.java | 49 +++-- .../execution/assembler/KeyAndOrdinal.java | 4 +- .../eql/execution/assembler/Matcher.java | 74 +++++++ .../execution/assembler/SequenceRuntime.java | 152 -------------- .../execution/assembler/TumblingWindow.java | 192 ++++++++++++++++++ .../{sequence => search}/Ordinal.java | 14 +- .../eql/execution/search/QueryRequest.java | 4 + .../execution/sequence/KeyToSequences.java | 39 ++-- .../xpack/eql/execution/sequence/Match.java | 1 + .../eql/execution/sequence/Sequence.java | 3 +- ...{SequenceFrame.java => SequenceGroup.java} | 33 ++- .../sequence/SequenceStateMachine.java | 31 +-- .../eql/execution/sequence/TimeOrdinal.java | 60 ------ .../xpack/eql/planner/Mapper.java | 3 +- ...ntimeTests.java => SequenceSpecTests.java} | 45 ++-- 17 files changed, 543 insertions(+), 345 deletions(-) create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java rename x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/{sequence => search}/Ordinal.java (88%) rename x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/{SequenceFrame.java => SequenceGroup.java} (77%) delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java rename x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/{SequenceRuntimeTests.java => SequenceSpecTests.java} (81%) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java new file mode 100644 index 0000000000000..96adb76784b80 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.assembler; + +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; +import org.elasticsearch.xpack.eql.execution.search.QueryRequest; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; + +public class BoxedQueryRequest implements QueryRequest { + + private final RangeQueryBuilder timestampRange; + private final RangeQueryBuilder tiebreakerRange; + + private final SearchSourceBuilder searchSource; + + public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) { + searchSource = original.searchSource(); + + // setup range queries and preserve their reference to simplify the update + timestampRange = rangeQuery(timestamp).timeZone("UTC").format("epoch_millis"); + BoolQueryBuilder filter = boolQuery().filter(timestampRange); + if (tiebreaker != null) { + tiebreakerRange = rangeQuery(tiebreaker); + filter.filter(tiebreakerRange); + } else { + tiebreakerRange = null; + } + // add ranges to existing query + searchSource.query(filter.must(searchSource.query())); + } + + @Override + public SearchSourceBuilder searchSource() { + return searchSource; + } + + @Override + public void next(Ordinal ordinal) { + // reset existing constraints + timestampRange.gte(null).lte(null); + if (tiebreakerRange != null) { + tiebreakerRange.gte(null).lte(null); + } + // and leave only search_after + searchSource.searchAfter(ordinal.toArray()); + } + + public BoxedQueryRequest between(Ordinal begin, Ordinal end) { + timestampRange.gte(begin.timestamp()).lte(end.timestamp()); + + if (tiebreakerRange != null) { + tiebreakerRange.gte(begin.tiebreaker()).lte(end.tiebreaker()); + } + + return this; + } + + @Override + public String toString() { + return searchSource.toString(); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index 3f91f6c766137..e5b0ac5f66bf6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -7,100 +7,83 @@ package org.elasticsearch.xpack.eql.execution.assembler; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; -import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; -import org.elasticsearch.xpack.eql.util.ReversedIterator; +import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; import java.util.List; -public class Criterion implements QueryRequest { - - private final SearchSourceBuilder searchSource; - private final List keyExtractors; - private final HitExtractor timestampExtractor; - private final HitExtractor tiebreakerExtractor; - - // search after markers - private Ordinal startMarker; - private Ordinal stopMarker; - - private boolean reverse; - - //TODO: should accept QueryRequest instead of another SearchSourceBuilder - public Criterion(SearchSourceBuilder searchSource, - List searchAfterExractors, - HitExtractor timestampExtractor, - HitExtractor tiebreakerExtractor, - boolean reverse) { - this.searchSource = searchSource; - this.keyExtractors = searchAfterExractors; - this.timestampExtractor = timestampExtractor; - this.tiebreakerExtractor = tiebreakerExtractor; - - this.startMarker = null; - this.stopMarker = null; +public class Criterion { + + private final int stage; + private final Q queryRequest; + private final List keys; + private final HitExtractor timestamp; + private final HitExtractor tiebreaker; + + private final boolean reverse; + + Criterion(int stage, + Q queryRequest, + List keys, + HitExtractor timestamp, + HitExtractor tiebreaker, + boolean reverse) { + this.stage = stage; + this.queryRequest = queryRequest; + this.keys = keys; + this.timestamp = timestamp; + this.tiebreaker = tiebreaker; + this.reverse = reverse; } - @Override - public SearchSourceBuilder searchSource() { - return searchSource; + public int stage() { + return stage; } - public List keyExtractors() { - return keyExtractors; + boolean reverse() { + return reverse; } - public HitExtractor timestampExtractor() { - return timestampExtractor; + public Q queryRequest() { + return queryRequest; } - public HitExtractor tiebreakerExtractor() { - return tiebreakerExtractor; + public SequenceKey key(SearchHit hit) { + SequenceKey key; + if (keys.isEmpty()) { + key = SequenceKey.NONE; + } else { + Object[] docKeys = new Object[keys.size()]; + for (int i = 0; i < docKeys.length; i++) { + docKeys[i] = keys.get(i).extract(hit); + } + key = new SequenceKey(docKeys); + } + return key; } @SuppressWarnings({ "unchecked" }) public Ordinal ordinal(SearchHit hit) { - Object ts = timestampExtractor.extract(hit); + Object ts = timestamp.extract(hit); if (ts instanceof Number == false) { throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts); } long timestamp = ((Number) ts).longValue(); - Comparable tiebreaker = null; + Comparable tbreaker = null; - if (tiebreakerExtractor != null) { - Object tb = tiebreakerExtractor.extract(hit); + if (tiebreaker != null) { + Object tb = tiebreaker.extract(hit); if (tb instanceof Comparable == false) { throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb); } - tiebreaker = (Comparable) tb; + tbreaker = (Comparable) tb; } - return new Ordinal(timestamp, tiebreaker); - } - - public void startMarker(Ordinal ordinal) { - startMarker = ordinal; - } - - public void stopMarker(Ordinal ordinal) { - stopMarker = ordinal; - } - - public Ordinal nextMarker() { - return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker; - } - - public Criterion useMarker(Ordinal marker) { - searchSource.searchAfter(marker.toArray()); - return this; - } - - public Iterable iterable(List hits) { - return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator(); + return new Ordinal(timestamp, tbreaker); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 795a96b7b619e..1ed781537fd1d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ql.expression.Expression; import org.elasticsearch.xpack.ql.expression.Expressions; import org.elasticsearch.xpack.ql.expression.Order.OrderDirection; -import org.elasticsearch.xpack.ql.util.Check; import java.util.ArrayList; import java.util.List; @@ -48,27 +47,51 @@ public Executable assemble(List> listOfKeys, Limit limit) { FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry(); - List criteria = new ArrayList<>(plans.size() - 1); - boolean descending = direction == OrderDirection.DESC; + + // fields + HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry)); + HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null; + // NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name + String timestampName = Expressions.name(timestamp); + String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null; + + // secondary criteria + List> criteria = new ArrayList<>(plans.size() - 1); // build a criterion for each query - for (int i = 0; i < plans.size() - 1; i++) { + for (int i = 0; i < plans.size(); i++) { List keys = listOfKeys.get(i); - // fields - HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry)); - HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null; List keyExtractors = hitExtractors(keys, extractorRegistry); PhysicalPlan query = plans.get(i); // search query - // TODO: this could be generalized into an exec only query - Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass()); - QueryRequest request = ((EsQueryExec) query).queryRequest(session); - // base query remains descending, the rest need to flip - criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending)); + if (query instanceof EsQueryExec) { + QueryRequest original = ((EsQueryExec) query).queryRequest(session); + + BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName); + Criterion criterion = + new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending); + criteria.add(criterion); + } else { + // until + if (i != plans.size() - 1) { + throw new EqlIllegalArgumentException("Expected a query but got [{}]", query.getClass()); + } else { + criteria.add(null); + } + } } - return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit); + + int completionStage = criteria.size() - 1; + Matcher matcher = new Matcher(completionStage, maxSpan, limit); + + TumblingWindow w = new TumblingWindow(new BasicQueryClient(session), + criteria.subList(0, completionStage), + criteria.get(completionStage), + matcher); + + return w; } private HitExtractor timestampExtractor(HitExtractor hitExtractor) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java index 8b769b90d5c9f..69d0048679418 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java @@ -6,12 +6,12 @@ package org.elasticsearch.xpack.eql.execution.assembler; -import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import java.util.Objects; -class KeyAndOrdinal { +public class KeyAndOrdinal { final SequenceKey key; final Ordinal ordinal; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java new file mode 100644 index 0000000000000..95e6840bca8b5 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.execution.assembler; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.search.Limit; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; +import org.elasticsearch.xpack.eql.execution.sequence.Sequence; +import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; +import org.elasticsearch.xpack.eql.session.Payload; + +import java.util.List; + +/** + * Executable tracking sequences at runtime. + */ +class Matcher { + + // NB: just like in a list, this represents the total number of stages yet counting starts at 0 + private final SequenceStateMachine stateMachine; + private final int numberOfStages; + + Matcher(int numberOfStages, TimeValue maxSpan, Limit limit) { + this.numberOfStages = numberOfStages; + this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit); + } + + /** + * Match hits for the given stage. + * Returns false if the process needs to be stopped. + */ + boolean match(int stage, Iterable> hits) { + for (Tuple tuple : hits) { + KeyAndOrdinal ko = tuple.v1(); + SearchHit hit = tuple.v2(); + + if (stage == 0) { + Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit); + stateMachine.trackSequence(seq); + } else { + stateMachine.match(stage, ko.key, ko.ordinal, hit); + + // early skip in case of reaching the limit + // check the last stage to avoid calling the state machine in other stages + if (stateMachine.reachedLimit()) { + return false; + } + } + } + return true; + } + + boolean until(Iterable markers) { + // no-op so far + + return false; + } + + + public boolean hasCandidates(int stage) { + return stateMachine.hasCandidates(stage); + } + + Payload payload(long startTime) { + List completed = stateMachine.completeSequences(); + TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); + return new SequencePayload(completed, false, tookTime); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java deleted file mode 100644 index 09284e2bfd5b8..0000000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.assembler; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.eql.execution.search.Limit; -import org.elasticsearch.xpack.eql.execution.search.QueryClient; -import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; -import org.elasticsearch.xpack.eql.execution.sequence.Sequence; -import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; -import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; -import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; - -import java.util.List; - -import static org.elasticsearch.action.ActionListener.wrap; - -/** - * Executable tracking sequences at runtime. - */ -class SequenceRuntime implements Executable { - - private final Logger log = LogManager.getLogger(SequenceRuntime.class); - - private final List criteria; - // NB: just like in a list, this represents the total number of stages yet counting starts at 0 - private final int numberOfStages; - private final SequenceStateMachine stateMachine; - private final QueryClient queryClient; - - private long startTime; - - SequenceRuntime(List criteria, QueryClient queryClient, TimeValue maxSpan, Limit limit) { - this.criteria = criteria; - this.numberOfStages = criteria.size(); - this.queryClient = queryClient; - this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit); - } - - @Override - public void execute(ActionListener listener) { - startTime = System.currentTimeMillis(); - log.info("Starting sequencing"); - queryStage(0, listener); - } - - private void queryStage(int stage, ActionListener listener) { - // sequencing is done, return results - if (hasFinished(stage)) { - listener.onResponse(sequencePayload()); - return; - } - - // else continue finding matches - Criterion currentCriterion = criteria.get(stage); - if (stage > 0) { - // FIXME: revisit this during pagination since the second criterion need to be limited to the range of the first one - // narrow by the previous stage timestamp marker - - Criterion previous = criteria.get(stage - 1); - // pass the next marker along - currentCriterion.useMarker(previous.nextMarker()); - } - - log.info("Querying stage {}", stage); - queryClient.query(currentCriterion, wrap(payload -> { - List hits = payload.values(); - - // nothing matches the query -> bail out - // FIXME: needs to be changed when doing pagination - if (hits.isEmpty()) { - listener.onResponse(sequencePayload()); - return; - } - - findMatches(stage, hits); - queryStage(stage + 1, listener); - }, listener::onFailure)); - } - - // hits are guaranteed to be non-empty - private void findMatches(int stage, List hits) { - // update criterion - Criterion criterion = criteria.get(stage); - - // break the results per key - // when dealing with descending order, queries outside the base are ASC (search_before) - // so look at the data in reverse (that is DESC) - Ordinal firstOrdinal = null, ordinal = null; - for (SearchHit hit : criterion.iterable(hits)) { - KeyAndOrdinal ko = key(hit, criterion); - - ordinal = ko.ordinal; - - if (firstOrdinal == null) { - firstOrdinal = ordinal; - } - - if (stage == 0) { - Sequence seq = new Sequence(ko.key, numberOfStages, ordinal, hit); - stateMachine.trackSequence(seq); - } else { - stateMachine.match(stage, ko.key, ordinal, hit); - - // early skip in case of reaching the limit - // check the last stage to avoid calling the state machine in other stages - if (stateMachine.reachedLimit()) { - break; - } - } - } - - criterion.startMarker(firstOrdinal); - criterion.stopMarker(ordinal); - } - - private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { - List keyExtractors = criterion.keyExtractors(); - - SequenceKey key; - if (criterion.keyExtractors().isEmpty()) { - key = SequenceKey.NONE; - } else { - Object[] docKeys = new Object[keyExtractors.size()]; - for (int i = 0; i < docKeys.length; i++) { - docKeys[i] = keyExtractors.get(i).extract(hit); - } - key = new SequenceKey(docKeys); - } - - return new KeyAndOrdinal(key, criterion.ordinal(hit)); - } - - private Payload sequencePayload() { - List completed = stateMachine.completeSequences(); - TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); - return new SequencePayload(completed, false, tookTime); - } - - private boolean hasFinished(int stage) { - return stage == numberOfStages; - } -} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java new file mode 100644 index 0000000000000..11bd5a38e66c1 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.assembler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; +import org.elasticsearch.xpack.eql.execution.search.QueryClient; +import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; +import org.elasticsearch.xpack.eql.session.Payload; +import org.elasticsearch.xpack.eql.util.ReversedIterator; + +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.action.ActionListener.wrap; + +/** + * Time-based window encapsulating query creation and advancement. + * Since queries can return different number of results, to avoid creating incorrect sequences, + * all searches are 'boxed' to a base query. + * The base query is initially the first query - when no results are found, the next query gets promoted. + * + * This allows the window to find any follow-up results even if they are found outside the initial window + * of a base query. + */ +public class TumblingWindow implements Executable { + + private final Logger log = LogManager.getLogger(Matcher.class); + + private final QueryClient client; + private final List> criteria; + private final Criterion until; + private final Matcher matcher; + // shortcut + private final int maxStages; + + private long startTime; + + private int baseStage = 0; + private Ordinal begin, end; + + public TumblingWindow(QueryClient client, + List> criteria, + Criterion until, + Matcher matcher) { + this.client = client; + + this.until = until; + this.criteria = criteria; + this.maxStages = criteria.size(); + + this.matcher = matcher; + } + + @Override + public void execute(ActionListener listener) { + log.info("Starting sequence window..."); + startTime = System.currentTimeMillis(); + advance(listener); + } + + + private void advance(ActionListener listener) { + // initialize + log.info("Querying base stage"); + Criterion base = criteria.get(baseStage); + + if (end != null) { + // pick up where we left of + base.queryRequest().next(end); + } + client.query(base.queryRequest(), wrap(p -> baseCriterion(p, listener), listener::onFailure)); + } + + private void baseCriterion(Payload p, ActionListener listener) { + Criterion base = criteria.get(baseStage); + List hits = p.values(); + + if (hits.isEmpty() == false) { + if (matcher.match(baseStage, wrapValues(base, hits)) == false) { + listener.onResponse(payload()); + return; + } + } + // empty or only one result means there aren't going to be any matches + // so move the window boxing to the next stage + if (hits.size() < 2) { + // if there are still candidates, advance the window base + if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) { + // swap window begin/end when changing directions + if (base.reverse() != criteria.get(baseStage + 1).reverse()) { + Ordinal temp = begin; + begin = end; + end = temp; + } + baseStage++; + advance(listener); + } + // there aren't going to be any matches so cancel search + else { + listener.onResponse(payload()); + } + return; + } + + // get borders for the rest of the queries + begin = base.ordinal(hits.get(0)); + end = base.ordinal(hits.get(hits.size() - 1)); + + // find until ordinals + //NB: not currently implemented + + // no more queries to run + if (baseStage + 1 < maxStages) { + secondaryCriterion(baseStage + 1, listener); + } else { + advance(listener); + } + } + + private void secondaryCriterion(int index, ActionListener listener) { + Criterion criterion = criteria.get(index); + log.info("Querying (secondary) stage {}", criterion.stage()); + + // first box the query + BoxedQueryRequest request = criterion.queryRequest(); + Criterion base = criteria.get(baseStage); + + // if the base has a different direction, swap begin/end + if (criterion.reverse() != base.reverse()) { + request.between(end, begin); + } else { + request.between(begin, end); + } + + client.query(request, wrap(p -> { + List hits = p.values(); + // no more results in this window so continue in another window + if (hits.isEmpty()) { + log.info("Advancing window..."); + advance(listener); + return; + } + // if the limit has been reached, return what's available + if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { + listener.onResponse(payload()); + return; + } + + if (index + 1 < maxStages) { + secondaryCriterion(index + 1, listener); + } else { + advance(listener); + } + + }, listener::onFailure)); + } + + Iterable> wrapValues(Criterion criterion, List hits) { + return () -> { + final Iterator iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator(); + + return new Iterator<>() { + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Tuple next() { + SearchHit hit = iter.next(); + SequenceKey k = criterion.key(hit); + Ordinal o = criterion.ordinal(hit); + return new Tuple<>(new KeyAndOrdinal(k, o), hit); + } + }; + }; + } + + Payload payload() { + return matcher.payload(startTime); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java similarity index 88% rename from x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java rename to x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java index 70f71e002394a..b61111638132b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java @@ -4,20 +4,28 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.eql.execution.sequence; +package org.elasticsearch.xpack.eql.execution.search; import java.util.Objects; public class Ordinal implements Comparable { - final long timestamp; - final Comparable tiebreaker; + private final long timestamp; + private final Comparable tiebreaker; public Ordinal(long timestamp, Comparable tiebreaker) { this.timestamp = timestamp; this.tiebreaker = tiebreaker; } + public long timestamp() { + return timestamp; + } + + public Comparable tiebreaker() { + return tiebreaker; + } + @Override public int hashCode() { return Objects.hash(timestamp, tiebreaker); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java index 6d4c6621c57ff..d4c16c57ca99e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java @@ -11,4 +11,8 @@ public interface QueryRequest { SearchSourceBuilder searchSource(); + + default void next(Ordinal ordinal) { + searchSource().searchAfter(ordinal.toArray()); + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java index 22a4d239d29f4..8173bbbaf585a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java @@ -6,9 +6,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; /** Dedicated collection for mapping a key to a list of sequences */ @@ -17,32 +15,35 @@ class KeyToSequences { private final int listSize; - private final Map> keyToSequences; + /** for each key, associate the frame per state (determined by index) */ + private final Map keyToSequences; KeyToSequences(int listSize) { this.listSize = listSize; this.keyToSequences = new LinkedHashMap<>(); } - SequenceFrame frame(int stage, SequenceKey key) { - return frame(key).get(stage); + private SequenceGroup[] group(SequenceKey key) { + SequenceGroup[] groups = keyToSequences.get(key); + if (groups == null) { + groups = new SequenceGroup[listSize]; + keyToSequences.put(key, groups); + } + return groups; } - private List frame(SequenceKey key) { - List frames = keyToSequences.get(key); - if (frames == null) { - frames = new ArrayList<>(listSize); - keyToSequences.put(key, frames); - - for (int i = 0; i < listSize; i++) { - frames.add(new SequenceFrame()); - } - } - return frames; + SequenceGroup groupIfPresent(int stage, SequenceKey key) { + SequenceGroup[] groups = keyToSequences.get(key); + return groups == null ? null : groups[stage]; } - SequenceFrame frameIfPresent(int stage, SequenceKey key) { - List list = keyToSequences.get(key); - return list == null ? null : list.get(stage); + void add(int stage, Sequence sequence) { + SequenceKey key = sequence.key(); + SequenceGroup[] groups = group(key); + // create the group on demand + if (groups[stage] == null) { + groups[stage] = new SequenceGroup(key); + } + groups[stage].add(sequence); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java index 40b1e3d7d5d2a..533eebbc09383 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import java.util.Objects; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index 80f6dba71fc87..b0f41c0aead4b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -8,6 +8,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.ql.util.Check; import java.text.NumberFormat; @@ -58,7 +59,7 @@ public Ordinal ordinal() { } public long startTimestamp() { - return matches[0].ordinal().timestamp; + return matches[0].ordinal().timestamp(); } public List hits() { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java similarity index 77% rename from x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java rename to x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java index 90e63189d8fb3..08a8e7badb8cd 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java @@ -7,15 +7,19 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.function.Predicate; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; -/** List of sequences (typically in a stage) used for finding continuous events within a time-frame */ -public class SequenceFrame { +/** List of in-flight sequences for a given key. For fast lookup, typically associated with a stage. */ +public class SequenceGroup { + + private final SequenceKey key; // NB: since the size varies significantly, use a LinkedList // Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with @@ -24,6 +28,10 @@ public class SequenceFrame { private Ordinal start, stop; + SequenceGroup(SequenceKey key) { + this.key = key; + } + public void add(Sequence sequence) { sequences.add(sequence); Ordinal ordinal = sequence.ordinal(); @@ -91,8 +99,27 @@ public List sequences() { return sequences; } + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + SequenceGroup other = (SequenceGroup) obj; + return Objects.equals(key, other.key); + } + @Override public String toString() { - return format(null, "[{}-{}]({} seqs)", start, stop, sequences.size()); + return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, sequences.size()); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index df97f24f4f5fe..597b2e2fee1c4 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.search.Limit; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; import java.util.LinkedList; import java.util.List; @@ -60,38 +61,37 @@ public void trackSequence(Sequence sequence) { SequenceKey key = sequence.key(); stageToKeys.keys(0).add(key); - SequenceFrame frame = keyToSequences.frame(0, key); - frame.add(sequence); + keyToSequences.add(0, sequence); } /** * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous * given stage. If that's the case, update the sequence and the rest of the references. */ - public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { + public void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { int previousStage = stage - 1; // check key presence to avoid creating a collection - SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key); - if (frame == null || frame.isEmpty()) { - return false; + SequenceGroup group = keyToSequences.groupIfPresent(previousStage, key); + if (group == null || group.isEmpty()) { + return; } - Tuple before = frame.before(ordinal); + Tuple before = group.before(ordinal); if (before == null) { - return false; + return; } Sequence sequence = before.v1(); // eliminate the match and all previous values from the frame - frame.trim(before.v2() + 1); + group.trim(before.v2() + 1); // check maxspan before continuing the sequence - if (maxSpanInMillis > 0 && (ordinal.timestamp - sequence.startTimestamp() >= maxSpanInMillis)) { - return false; + if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() >= maxSpanInMillis)) { + return; } sequence.putMatch(stage, hit, ordinal); // remove the frame and keys early (as the key space is large) - if (frame.isEmpty()) { + if (group.isEmpty()) { stageToKeys.keys(previousStage).remove(key); } @@ -109,12 +109,15 @@ public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) } } else { stageToKeys.keys(stage).add(key); - keyToSequences.frame(stage, key).add(sequence); + keyToSequences.add(stage, sequence); } - return true; } public boolean reachedLimit() { return limitReached; } + + public boolean hasCandidates(int stage) { + return stage < completionStage && stageToKeys.keys(stage).isEmpty() == false; + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java deleted file mode 100644 index 8c255a28c5d33..0000000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TimeOrdinal.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.sequence; - -import org.elasticsearch.xpack.ql.capabilities.Resolvable; -import org.elasticsearch.xpack.ql.expression.Attribute; - -import java.util.List; -import java.util.Objects; - -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; - -/** - * Time ordinal for a given event. - * It is an internal construct that wraps the mandatory timestamp attribute and the optional application tiebreaker. - */ -public class TimeOrdinal implements Resolvable { - - private final Attribute timestamp; - private final Attribute tiebreaker; - - public TimeOrdinal(Attribute timestamp, Attribute tiebreaker) { - this.timestamp = timestamp; - this.tiebreaker = tiebreaker; - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, tiebreaker); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - TimeOrdinal other = (TimeOrdinal) obj; - return Objects.equals(timestamp, other.timestamp) && - Objects.equals(tiebreaker, other.tiebreaker); - } - - @Override - public boolean resolved() { - return timestamp.resolved() && (tiebreaker == null || tiebreaker.resolved()); - } - - public List output() { - return tiebreaker == null ? singletonList(timestamp) : asList(timestamp, tiebreaker); - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java index 5cfae03bd0d53..4b70b0ece6568 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java @@ -78,7 +78,8 @@ protected PhysicalPlan map(LogicalPlan p) { map(s.until().child()), s.timestamp(), s.tiebreaker(), - s.direction(), s.maxSpan()); + s.direction(), + s.maxSpan()); } if (p instanceof LocalRelation) { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java similarity index 81% rename from x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java rename to x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java index b48b0f0853294..32194612e16b3 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence; import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec; +import org.elasticsearch.xpack.eql.execution.search.QueryClient; import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.eql.session.Results; import org.elasticsearch.xpack.eql.session.Results.Type; @@ -29,11 +30,13 @@ import java.util.Map.Entry; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -public class SequenceRuntimeTests extends ESTestCase { +public class SequenceSpecTests extends ESTestCase { private final List>> events; private final List> matches; @@ -76,11 +79,15 @@ public String extract(SearchHit hit) { } } - class TestCriterion extends Criterion { + class TestCriterion extends Criterion { private final int ordinal; + private boolean unused = true; - TestCriterion(int ordinal) { - super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor, false); + TestCriterion(final int ordinal) { + super(ordinal, + new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp", null), + keyExtractors, + tsExtractor, tbExtractor, false); this.ordinal = ordinal; } @@ -93,6 +100,12 @@ public int hashCode() { return ordinal; } + public boolean use() { + boolean u = unused; + unused = false; + return u; + } + @Override public boolean equals(Object obj) { if (this == obj) { @@ -103,7 +116,7 @@ public boolean equals(Object obj) { return false; } - SequenceRuntimeTests.TestCriterion other = (SequenceRuntimeTests.TestCriterion) obj; + SequenceSpecTests.TestCriterion other = (SequenceSpecTests.TestCriterion) obj; return ordinal == other.ordinal; } @@ -157,7 +170,7 @@ public String toString() { } - public SequenceRuntimeTests(String testName, int lineNumber, SeriesSpec spec) { + public SequenceSpecTests(String testName, int lineNumber, SeriesSpec spec) { this.lineNumber = lineNumber; this.events = spec.eventsPerCriterion; this.matches = spec.matches; @@ -175,21 +188,29 @@ public static Iterable parameters() throws Exception { public void test() { int stages = events.size(); - List criteria = new ArrayList<>(stages); + List> criteria = new ArrayList<>(stages); // pass the items for each query through the Criterion for (int i = 0; i < stages; i++) { // set the index as size in the search source criteria.add(new TestCriterion(i)); } - + // convert the results through a test specific payload - SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> { - Map> evs = events.get(r.searchSource().size()); + Matcher matcher = new Matcher(stages, TimeValue.MINUS_ONE, null); + + QueryClient testClient = (r, l) -> { + int ordinal = r.searchSource().size(); + if (ordinal != Integer.MAX_VALUE) { + r.searchSource().size(Integer.MAX_VALUE); + } + Map> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap(); l.onResponse(new TestPayload(evs)); - }, TimeValue.MINUS_ONE, null); + }; + + TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher); // finally make the assertion at the end of the listener - runtime.execute(wrap(this::checkResults, ex -> { + window.execute(wrap(this::checkResults, ex -> { throw ExceptionsHelper.convertToRuntime(ex); })); }