diff --git a/README.md b/README.md index 7ee2d85..1e7e41f 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,13 @@ QueryBuilder builder = new QueryBuilder<>();
builder.oneOrMore()
+ +
times(#ofTimes)
+ +

Defines that a specific number of events should match this pattern.

+
builder.time(3)
+ +
where(Matcher)
diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/CEPStream.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/CEPStream.java index 88f0701..f8a5a5c 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/CEPStream.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/CEPStream.java @@ -17,7 +17,7 @@ package com.github.fhuss.kafka.streams.cep; import com.github.fhuss.kafka.streams.cep.pattern.Pattern; -import com.github.fhuss.kafka.streams.cep.pattern.PredicateBuilder; +import com.github.fhuss.kafka.streams.cep.pattern.PatternBuilder; import org.apache.kafka.streams.kstream.KStream; /** @@ -34,7 +34,7 @@ public interface CEPStream { * @param builder the builder to builder the pattern to query. * @return a {@link KStream} that contains only those records that satisfy the given pattern with unmodified keys and new values. */ - default KStream> query(final String queryName, final PredicateBuilder builder) { + default KStream> query(final String queryName, final PatternBuilder builder) { return query(queryName, builder, null); } @@ -46,7 +46,7 @@ default KStream> query(final String queryName, final Predicat * @return a {@link KStream} that contains only those records that satisfy the given pattern with unmodified keys and new values. */ default KStream> query(final String queryName, - final PredicateBuilder builder, + final PatternBuilder builder, final Queried queried) { return query(queryName, builder.build(), queried); } diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/Pattern.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/Pattern.java index 1291110..2728b34 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/Pattern.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/Pattern.java @@ -25,9 +25,7 @@ public class Pattern implements Iterable> { public enum Cardinality { - ZERO_OR_MORE(-2), ONE_OR_MORE(-1), - OPTIONAL(0), ONE(1); private int val; @@ -59,6 +57,10 @@ public int value() { private Selected selected; + private boolean isOptional = false; + + private int times = 1; + /** * Creates a new {@link Pattern} instance. **/ @@ -105,24 +107,40 @@ private Pattern(final int level, final String name, final Selected selected, fin this.selected = selected == null ? Selected.withStrictContiguity() : selected; } - public SelectBuilder select() { - return new SelectBuilder<>(this); + public StageBuilder select() { + return new StageBuilder<>(this); } - public SelectBuilder select(final Selected selected) { + public StageBuilder select(final Selected selected) { this.selected = selected; - return new SelectBuilder<>(this); + return new StageBuilder<>(this); } - public SelectBuilder select(final String name) { + public StageBuilder select(final String name) { this.name = name; - return new SelectBuilder<>(this); + return new StageBuilder<>(this); } - public SelectBuilder select(final String name, final Selected selected) { + public StageBuilder select(final String name, final Selected selected) { this.name = name; this.selected = selected; - return new SelectBuilder<>(this); + return new StageBuilder<>(this); + } + + boolean isOptional() { + return isOptional; + } + + void setOptional(boolean optional) { + isOptional = optional; + } + + void setTimes(int times) { + this.times = times; + } + + int getTimes() { + return times; } @SuppressWarnings("unchecked") @@ -158,6 +176,8 @@ void orPredicate(final Matcher predicate) { this.predicate = Matcher.or(this.predicate, predicate); } + + void setCardinality(final Cardinality cardinality) { this.cardinality = cardinality; } @@ -208,7 +228,7 @@ public Iterator> iterator() { private static class PatternIterator implements Iterator> { private Pattern current; - public PatternIterator(Pattern pattern) { + PatternIterator(Pattern pattern) { this.current = pattern; } diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PatternBuilder.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PatternBuilder.java new file mode 100644 index 0000000..eda70dc --- /dev/null +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PatternBuilder.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.fhuss.kafka.streams.cep.pattern; + +import java.util.concurrent.TimeUnit; + +public class PatternBuilder { + + private final Pattern pattern; + + /** + * Creates a new {@link PatternBuilder} instance. + * + * @param pattern the current pattern + */ + PatternBuilder(final Pattern pattern) { + this.pattern = pattern; + } + + public PatternBuilder and(final SimpleMatcher predicate) { + this.pattern.andPredicate(predicate); + return this; + } + + public PatternBuilder and(final StatefulMatcher predicate) { + this.pattern.andPredicate(predicate); + return this; + } + + public PatternBuilder and(final SequenceMatcher predicate) { + this.pattern.andPredicate(predicate); + return this; + } + + public PatternBuilder or(final SimpleMatcher predicate) { + this.pattern.orPredicate(predicate); + return this; + } + + public PatternBuilder or(final StatefulMatcher predicate) { + this.pattern.orPredicate(predicate); + return this; + } + + public PatternBuilder or(final SequenceMatcher predicate) { + this.pattern.orPredicate(predicate); + return this; + } + + public PatternBuilder fold(final String state, final Aggregator aggregator) { + this.pattern.addStateAggregator(new StateAggregator<>(state, aggregator)); + return this; + } + + public PatternBuilder within(long time, TimeUnit unit) { + this.pattern.setWindow(time, unit); + return this; + } + + public Pattern then() { + return new Pattern<>(pattern); + } + + public Pattern build() { + return pattern; + } +} diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PredicateBuilder.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PredicateBuilder.java index e310907..05b82e3 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PredicateBuilder.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/PredicateBuilder.java @@ -16,66 +16,36 @@ */ package com.github.fhuss.kafka.streams.cep.pattern; - -import java.util.concurrent.TimeUnit; - public class PredicateBuilder { - private final Pattern pattern; + protected final Pattern pattern; /** - * Creates a new {@link PredicateBuilder} instance. - * @param pattern + * Creates a new {@link PredicateBuilder} instance. + * + * @param pattern the current pattern */ PredicateBuilder(final Pattern pattern) { this.pattern = pattern; } - public PredicateBuilder and(final SimpleMatcher predicate) { + public PatternBuilder where(final SimpleMatcher predicate) { this.pattern.andPredicate(predicate); - return this; + return new PatternBuilder<>(this.pattern); } - public PredicateBuilder and(final StatefulMatcher predicate) { + public PatternBuilder where(final StatefulMatcher predicate) { this.pattern.andPredicate(predicate); - return this; + return new PatternBuilder<>(this.pattern); } - public PredicateBuilder and(final SequenceMatcher predicate) { + public PatternBuilder where(final SequenceMatcher predicate) { this.pattern.andPredicate(predicate); - return this; - } - - public PredicateBuilder or(final SimpleMatcher predicate) { - this.pattern.orPredicate(predicate); - return this; - } - - public PredicateBuilder or(final StatefulMatcher predicate) { - this.pattern.orPredicate(predicate); - return this; + return new PatternBuilder<>(this.pattern); } - public PredicateBuilder or(final SequenceMatcher predicate) { - this.pattern.orPredicate(predicate); + public PredicateBuilder optional() { + this.pattern.setOptional(true); return this; } - - public PredicateBuilder fold(final String state, final Aggregator aggregator) { - this.pattern.addStateAggregator(new StateAggregator<>(state, aggregator)); - return this; - } - - public PredicateBuilder within(long time, TimeUnit unit) { - this.pattern.setWindow(time, unit); - return this; - } - - public Pattern then() { - return new Pattern<>(pattern); - } - - public Pattern build() { - return pattern; - } } diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/QueryBuilder.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/QueryBuilder.java index de6df86..17d9cd7 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/QueryBuilder.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/QueryBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -24,9 +24,9 @@ public class QueryBuilder { /** * Creates a new stage with no name. * - * @return a new {@link SelectBuilder}. + * @return a new {@link StageBuilder}. */ - public SelectBuilder select() { + public StageBuilder select() { return select(DEFAULT_SELECT_STRATEGY); } @@ -34,22 +34,22 @@ public SelectBuilder select() { * Creates a new stage with the specified name. * * @param name the stage name. - * @return a new {@link SelectBuilder}. + * @return a new {@link StageBuilder}. */ - public SelectBuilder select(final String name) { + public StageBuilder select(final String name) { return select(name, DEFAULT_SELECT_STRATEGY); } /** * Creates a new stage with no name. * - * @return a new {@link SelectBuilder}. + * @return a new {@link StageBuilder}. */ - public SelectBuilder select(final Selected selected) { - return new SelectBuilder<>(new Pattern<>(selected)); + public StageBuilder select(final Selected selected) { + return new StageBuilder<>(new Pattern<>(selected)); } - public SelectBuilder select(final String name, final Selected selected) { - return new SelectBuilder<>(new Pattern<>(name, selected)); + public StageBuilder select(final String name, final Selected selected) { + return new StageBuilder<>(new Pattern<>(name, selected)); } } diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/SelectBuilder.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StageBuilder.java similarity index 51% rename from core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/SelectBuilder.java rename to core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StageBuilder.java index 0b0a29f..14befab 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/SelectBuilder.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StageBuilder.java @@ -16,60 +16,47 @@ */ package com.github.fhuss.kafka.streams.cep.pattern; -public class SelectBuilder { +public class StageBuilder extends PredicateBuilder { - private final Pattern pattern; - - SelectBuilder(final Pattern pattern) { - this.pattern = pattern; + /** + * Creates a new {@link StageBuilder} instance. + * @param pattern + */ + StageBuilder(final Pattern pattern) { + super(pattern); } - public SelectBuilder optional() { - pattern.setCardinality(Pattern.Cardinality.OPTIONAL); + public PredicateBuilder oneOrMore() { + this.pattern.setCardinality(Pattern.Cardinality.ONE_OR_MORE); return this; } - public SelectBuilder oneOrMore() { - pattern.setCardinality(Pattern.Cardinality.ONE_OR_MORE); + public PredicateBuilder zeroOrMore() { + this.pattern.setCardinality(Pattern.Cardinality.ONE_OR_MORE); + this.pattern.setOptional(true); return this; } - public SelectBuilder zeroOrMore() { - pattern.setCardinality(Pattern.Cardinality.ZERO_OR_MORE); + public PredicateBuilder times(int times) { + this.pattern.setTimes(times); return this; } @Deprecated - public SelectBuilder skipTillNextMatch() { + public StageBuilder skipTillNextMatch() { this.pattern.setStrategy(Strategy.SKIP_TIL_NEXT_MATCH); return this; } @Deprecated - public SelectBuilder skipTillAnyMatch() { + public StageBuilder skipTillAnyMatch() { this.pattern.setStrategy(Strategy.SKIP_TIL_ANY_MATCH); return this; } @Deprecated - public SelectBuilder strictContiguity() { + public StageBuilder strictContiguity() { this.pattern.setStrategy(Strategy.STRICT_CONTIGUITY); return this; } - - public PredicateBuilder where(final SimpleMatcher predicate) { - this.pattern.andPredicate(predicate); - return new PredicateBuilder<>(this.pattern); - } - - public PredicateBuilder where(final StatefulMatcher predicate) { - this.pattern.andPredicate(predicate); - return new PredicateBuilder<>(this.pattern); - } - - public PredicateBuilder where(final SequenceMatcher predicate) { - this.pattern.andPredicate(predicate); - return new PredicateBuilder<>(this.pattern); - } - } diff --git a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StagesFactory.java b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StagesFactory.java index d970993..94fa9aa 100644 --- a/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StagesFactory.java +++ b/core/src/main/java/com/github/fhuss/kafka/streams/cep/pattern/StagesFactory.java @@ -98,7 +98,7 @@ private List> buildStages(final Stage.StateType type, Matcher.and(new Matcher.TopicPredicate<>(selected.getTopic()), currentPattern.getPredicate()) : currentPattern.getPredicate(); - EdgeOperation operation = Arrays.asList(ONE, OPTIONAL).contains(cardinality) ? EdgeOperation.BEGIN : EdgeOperation.TAKE; + EdgeOperation operation = cardinality.equals(ONE) ? EdgeOperation.BEGIN : EdgeOperation.TAKE; stage.addEdge(new Stage.Edge<>(operation, predicate, successorStage)); Matcher ignore = null; @@ -114,18 +114,6 @@ private List> buildStages(final Stage.StateType type, stage.addEdge(new Stage.Edge<>(EdgeOperation.IGNORE, ignore, null)); } - if (cardinality.equals(OPTIONAL)) { - if (successorPattern == null && successorStage.isFinalState()) { - throw new InvalidPatternException( - "Cannot define a pattern with an optional final stage"); - } - - Matcher successorPredicate = successorPattern.getPredicate(); - // proceed = successor_begin && !take - Matcher skip = Matcher.and(successorPredicate, Matcher.not(predicate)); - stage.addEdge(new Stage.Edge<>(EdgeOperation.SKIP_PROCEED, skip, successorStage)); - } - if (operation.equals(EdgeOperation.TAKE) ) { if (successorPattern == null && successorStage.isFinalState()) { @@ -153,16 +141,33 @@ private List> buildStages(final Stage.StateType type, List> stages = new ArrayList<>(); stages.add(stage); // we need to introduce a required state - if (hasMandatoryState) { - final Stage internalStage = new Stage<>(nextStageId(), currentPattern.getName(), type); - internalStage.addEdge(new Stage.Edge<>(EdgeOperation.BEGIN, currentPattern.getPredicate(), stage)); - if (ignore != null) { - internalStage.addEdge(new Stage.Edge<>(EdgeOperation.IGNORE, ignore, null)); + int times = currentPattern.getTimes(); + if (hasMandatoryState || times > 1) { + do { + final Stage internalStage = new Stage<>(nextStageId(), currentPattern.getName(), type); + internalStage.addEdge(new Stage.Edge<>(EdgeOperation.BEGIN, predicate, stage)); + if (ignore != null) { + internalStage.addEdge(new Stage.Edge<>(EdgeOperation.IGNORE, ignore, null)); + } + internalStage.setWindow(windowLengthMs); // Pushing the time window early + internalStage.setAggregates(currentPattern.getAggregates()); + stages.add(internalStage); + stage = internalStage; + } while (--times > 1); + } + + if (currentPattern.isOptional()) { + if (successorPattern == null && successorStage.isFinalState()) { + throw new InvalidPatternException( + "Cannot define a pattern with an optional final stage"); } - internalStage.setWindow(windowLengthMs); // Pushing the time window early - internalStage.setAggregates(currentPattern.getAggregates()); - stages.add(internalStage); + + Matcher successorPredicate = successorPattern.getPredicate(); + // proceed = successor_begin && !take + Matcher skip = Matcher.and(successorPredicate, Matcher.not(predicate)); + stage.addEdge(new Stage.Edge<>(EdgeOperation.SKIP_PROCEED, skip, successorStage)); } + return stages; } diff --git a/core/src/test/java/com/github/fhuss/kafka/streams/cep/nfa/NFATest.java b/core/src/test/java/com/github/fhuss/kafka/streams/cep/nfa/NFATest.java index 39caf63..5e60928 100644 --- a/core/src/test/java/com/github/fhuss/kafka/streams/cep/nfa/NFATest.java +++ b/core/src/test/java/com/github/fhuss/kafka/streams/cep/nfa/NFATest.java @@ -51,8 +51,9 @@ public class NFATest { private Event ev3 = new Event<>("ev3", "C", System.currentTimeMillis(), "test", 0, 2); private Event ev4 = new Event<>("ev4", "C", System.currentTimeMillis(), "test", 0, 3); private Event ev5 = new Event<>("ev5", "D", System.currentTimeMillis(), "test", 0, 4); - - private Event ev6 = new Event<>("ev6", "D", System.currentTimeMillis(), "test", 0, 5); + private Event ev6 = new Event<>("ev6", "C", System.currentTimeMillis(), "test", 0, 5); + private Event ev7 = new Event<>("ev7", "D", System.currentTimeMillis(), "test", 0, 6); + private Event ev8 = new Event<>("ev8", "E", System.currentTimeMillis(), "test", 0, 7); private AtomicInteger offset; @@ -155,6 +156,234 @@ public void testNFAGivenSequenceCondition() { assertEquals(expected, s.get(0)); } + /** + * Pattern : (A;C{3} E) / Events : A1, C3, C4, C6, E8 + * + * R1: A1, C3, C4, E8(matched) + * R2: _ + */ + @Test + public void testNFAGivenExpectingOccurrencesStage() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second") + .times(3) + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("E")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev3, ev4, ev6, ev8); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev8) + .add("second", ev6) + .add("second", ev4) + .add("second", ev3) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + + /** + * Pattern : (A;C*; E) / Events : A1, D5 + * + * R1: A1, D5(matched) + * R2: _ + */ + @Test + public void testNFAGivenZeroOrMoreStageWhenNoMatchingInputs() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second") + .zeroOrMore() + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("D")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev5); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev5) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + + /** + * Pattern : (A;C*; E) / Events : A1, C3, C4, D5 + * + * R1: A1, C3, C4, D5(matched) + * R2: _ + */ + @Test + public void testNFAGivenZeroOrMoreStageWhenMatchingInputs() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second") + .zeroOrMore() + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("D")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev3, ev4, ev5); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev5) + .add("second", ev4) + .add("second", ev3) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + + /** + * Pattern : (A;C{2}?; E) / Events : A1, D5 + * + * R1: A1, D5(matched) + * R2: _ + */ + @Test + public void testNFAGivenOptionalExpectingOccurrenceStageWhenNoMatchingInputs() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second") + .times(2) + .optional() + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("D")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev5); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev5) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + + /** + * Pattern : (A;C{2}?; E) / Events : A1, C3, C4, D5 + * + * R1: A1, C3, C4, D5(matched) + * R2: _ + */ + @Test + public void testNFAGivenOptionalExpectingOccurrenceStageWhenMatchingInputs() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second") + .times(2) + .optional() + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("D")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev3, ev4, ev5); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev5) + .add("second", ev4) + .add("second", ev3) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + + /** + * Pattern : (A;C{3} E) / Events : A1, C3, C4, D5, C6, E8 + * + * R1: A1, C3, C4, C6, E7 (matched) + * R2: _ + */ + @Test + public void testNFAGivenExpectingOccurrencesStageWhenSkipTilNextMatchContiguityIsUsed() { + + Pattern pattern = new QueryBuilder() + .select("first") + .where(TestMatcher.isEqualTo("A")) + .then() + .select("second", Selected.withSkipTilNextMatch()) + .times(3) + .where(TestMatcher.isEqualTo("C")) + .then() + .select("latest") + .where(TestMatcher.isEqualTo("E")) + .build(); + + final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); + + List> s = simulate(nfa, ev1, ev3, ev4, ev5, ev6, ev8); + assertEquals(1, s.size()); + + assertNFA(nfa, 2, 1); + + Sequence expected = Sequence.newBuilder() + .add("latest", ev8) + .add("second", ev6) + .add("second", ev4) + .add("second", ev3) + .add("first", ev1) + .build(true); + + assertEquals(expected, s.get(0)); + } + /** * Pattern : (A;B;C) / Events : A1, C3 * @@ -567,7 +796,7 @@ public void testNFAGivenSkipTillAnyMatchOnLatestStage() { .build(); final NFA nfa = newNFA(pattern, Serdes.String(), Serdes.String()); - List> s = simulate(nfa, ev1, ev2, ev3, ev5, ev6); + List> s = simulate(nfa, ev1, ev2, ev3, ev5, ev7); Assert.assertEquals(4, nfa.getRuns()); @@ -598,7 +827,7 @@ public void testNFAGivenSkipTillAnyMatchOnLatestStage() { .add("first", ev1) .add("second", ev2) .add("three", ev3) - .add("latest", ev6) + .add("latest", ev7) .build(false); assertEquals(expected2, s.get(1)); diff --git a/core/src/test/java/com/github/fhuss/kafka/streams/cep/processor/CEPProcessorTest.java b/core/src/test/java/com/github/fhuss/kafka/streams/cep/processor/CEPProcessorTest.java index 88bd813..0bd90b9 100644 --- a/core/src/test/java/com/github/fhuss/kafka/streams/cep/processor/CEPProcessorTest.java +++ b/core/src/test/java/com/github/fhuss/kafka/streams/cep/processor/CEPProcessorTest.java @@ -18,7 +18,7 @@ import com.github.fhuss.kafka.streams.cep.nfa.Stages; import com.github.fhuss.kafka.streams.cep.pattern.Pattern; -import com.github.fhuss.kafka.streams.cep.pattern.PredicateBuilder; +import com.github.fhuss.kafka.streams.cep.pattern.PatternBuilder; import com.github.fhuss.kafka.streams.cep.pattern.QueryBuilder; import com.github.fhuss.kafka.streams.cep.pattern.StagesFactory; import com.github.fhuss.kafka.streams.cep.state.AggregatesStore; @@ -49,7 +49,7 @@ public class CEPProcessorTest { private static final String DEFAULT_STRING_VALUE = "value"; private static final String TEST_QUERY = "test-query"; - private static final PredicateBuilder PATTERN = new QueryBuilder() + private static final PatternBuilder PATTERN = new QueryBuilder() .select() .where((event) -> true);