Skip to content

Commit

Permalink
Fix #16 - Add support for defining a number of occurrences for a pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed May 17, 2018
1 parent b3cef98 commit 9ca04d2
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 123 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ QueryBuilder<String, Integer> builder = new QueryBuilder<>();
<pre lang="java">builder.oneOrMore()</pre>
</td>
</tr>
<tr>
<td><pre lang="java">times(#ofTimes)</pre></td>
<td>
<p>Defines that a specific number of events should match this pattern.</p>
<pre lang="java">builder.time(3)</pre>
</td>
</tr>
<tr>
<td><pre lang="java">where(Matcher)</pre></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.github.fhuss.kafka.streams.cep;

import com.github.fhuss.kafka.streams.cep.pattern.Pattern;
import com.github.fhuss.kafka.streams.cep.pattern.PredicateBuilder;
import com.github.fhuss.kafka.streams.cep.pattern.PatternBuilder;
import org.apache.kafka.streams.kstream.KStream;

/**
Expand All @@ -34,7 +34,7 @@ public interface CEPStream<K, V> {
* @param builder the builder to builder the pattern to query.
* @return a {@link KStream} that contains only those records that satisfy the given pattern with unmodified keys and new values.
*/
default KStream<K, Sequence<K, V>> query(final String queryName, final PredicateBuilder<K, V> builder) {
default KStream<K, Sequence<K, V>> query(final String queryName, final PatternBuilder<K, V> builder) {
return query(queryName, builder, null);
}

Expand All @@ -46,7 +46,7 @@ default KStream<K, Sequence<K, V>> query(final String queryName, final Predicat
* @return a {@link KStream} that contains only those records that satisfy the given pattern with unmodified keys and new values.
*/
default KStream<K, Sequence<K, V>> query(final String queryName,
final PredicateBuilder<K, V> builder,
final PatternBuilder<K, V> builder,
final Queried<K, V> queried) {
return query(queryName, builder.build(), queried);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
public class Pattern<K, V> implements Iterable<Pattern<K, V>> {

public enum Cardinality {
ZERO_OR_MORE(-2),
ONE_OR_MORE(-1),
OPTIONAL(0),
ONE(1);

private int val;
Expand Down Expand Up @@ -59,6 +57,10 @@ public int value() {

private Selected selected;

private boolean isOptional = false;

private int times = 1;

/**
* Creates a new {@link Pattern} instance.
**/
Expand Down Expand Up @@ -105,24 +107,40 @@ private Pattern(final int level, final String name, final Selected selected, fin
this.selected = selected == null ? Selected.withStrictContiguity() : selected;
}

public SelectBuilder<K, V> select() {
return new SelectBuilder<>(this);
public StageBuilder<K, V> select() {
return new StageBuilder<>(this);
}

public SelectBuilder<K, V> select(final Selected selected) {
public StageBuilder<K, V> select(final Selected selected) {
this.selected = selected;
return new SelectBuilder<>(this);
return new StageBuilder<>(this);
}

public SelectBuilder<K, V> select(final String name) {
public StageBuilder<K, V> select(final String name) {
this.name = name;
return new SelectBuilder<>(this);
return new StageBuilder<>(this);
}

public SelectBuilder<K, V> select(final String name, final Selected selected) {
public StageBuilder<K, V> select(final String name, final Selected selected) {
this.name = name;
this.selected = selected;
return new SelectBuilder<>(this);
return new StageBuilder<>(this);
}

boolean isOptional() {
return isOptional;
}

void setOptional(boolean optional) {
isOptional = optional;
}

void setTimes(int times) {
this.times = times;
}

int getTimes() {
return times;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -158,6 +176,8 @@ void orPredicate(final Matcher<K, V> predicate) {
this.predicate = Matcher.or(this.predicate, predicate);
}



void setCardinality(final Cardinality cardinality) {
this.cardinality = cardinality;
}
Expand Down Expand Up @@ -208,7 +228,7 @@ public Iterator<Pattern<K, V>> iterator() {
private static class PatternIterator<K, V> implements Iterator<Pattern<K, V>> {
private Pattern<K, V> current;

public PatternIterator(Pattern<K, V> pattern) {
PatternIterator(Pattern<K, V> pattern) {
this.current = pattern;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.fhuss.kafka.streams.cep.pattern;

import java.util.concurrent.TimeUnit;

public class PatternBuilder<K, V> {

private final Pattern<K, V> pattern;

/**
* Creates a new {@link PatternBuilder} instance.
*
* @param pattern the current pattern
*/
PatternBuilder(final Pattern<K, V> pattern) {
this.pattern = pattern;
}

public PatternBuilder<K, V> and(final SimpleMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
}

public PatternBuilder<K, V> and(final StatefulMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
}

public PatternBuilder<K, V> and(final SequenceMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
}

public PatternBuilder<K, V> or(final SimpleMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
return this;
}

public PatternBuilder<K, V> or(final StatefulMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
return this;
}

public PatternBuilder<K, V> or(final SequenceMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
return this;
}

public <T> PatternBuilder<K, V> fold(final String state, final Aggregator<K, V, T> aggregator) {
this.pattern.addStateAggregator(new StateAggregator<>(state, aggregator));
return this;
}

public PatternBuilder<K, V> within(long time, TimeUnit unit) {
this.pattern.setWindow(time, unit);
return this;
}

public Pattern<K, V> then() {
return new Pattern<>(pattern);
}

public Pattern<K, V> build() {
return pattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,36 @@
*/
package com.github.fhuss.kafka.streams.cep.pattern;


import java.util.concurrent.TimeUnit;

public class PredicateBuilder<K, V> {

private final Pattern<K, V> pattern;
protected final Pattern<K, V> pattern;

/**
* Creates a new {@link PredicateBuilder} instance.
* @param pattern
* Creates a new {@link PredicateBuilder} instance.
*
* @param pattern the current pattern
*/
PredicateBuilder(final Pattern<K, V> pattern) {
this.pattern = pattern;
}

public PredicateBuilder<K, V> and(final SimpleMatcher<K, V> predicate) {
public PatternBuilder<K, V> where(final SimpleMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
return new PatternBuilder<>(this.pattern);
}

public PredicateBuilder<K, V> and(final StatefulMatcher<K, V> predicate) {
public PatternBuilder<K, V> where(final StatefulMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
return new PatternBuilder<>(this.pattern);
}

public PredicateBuilder<K, V> and(final SequenceMatcher<K, V> predicate) {
public PatternBuilder<K, V> where(final SequenceMatcher<K, V> predicate) {
this.pattern.andPredicate(predicate);
return this;
}

public PredicateBuilder<K, V> or(final SimpleMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
return this;
}

public PredicateBuilder<K, V> or(final StatefulMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
return this;
return new PatternBuilder<>(this.pattern);
}

public PredicateBuilder<K, V> or(final SequenceMatcher<K, V> predicate) {
this.pattern.orPredicate(predicate);
public PredicateBuilder<K, V> optional() {
this.pattern.setOptional(true);
return this;
}

public <T> PredicateBuilder<K, V> fold(final String state, final Aggregator<K, V, T> aggregator) {
this.pattern.addStateAggregator(new StateAggregator<>(state, aggregator));
return this;
}

public PredicateBuilder<K, V> within(long time, TimeUnit unit) {
this.pattern.setWindow(time, unit);
return this;
}

public Pattern<K, V> then() {
return new Pattern<>(pattern);
}

public Pattern<K, V> build() {
return pattern;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -24,32 +24,32 @@ public class QueryBuilder<K, V> {
/**
* Creates a new stage with no name.
*
* @return a new {@link SelectBuilder}.
* @return a new {@link StageBuilder}.
*/
public SelectBuilder<K, V> select() {
public StageBuilder<K, V> select() {
return select(DEFAULT_SELECT_STRATEGY);
}

/**
* Creates a new stage with the specified name.
*
* @param name the stage name.
* @return a new {@link SelectBuilder}.
* @return a new {@link StageBuilder}.
*/
public SelectBuilder<K, V> select(final String name) {
public StageBuilder<K, V> select(final String name) {
return select(name, DEFAULT_SELECT_STRATEGY);
}

/**
* Creates a new stage with no name.
*
* @return a new {@link SelectBuilder}.
* @return a new {@link StageBuilder}.
*/
public SelectBuilder<K, V> select(final Selected selected) {
return new SelectBuilder<>(new Pattern<>(selected));
public StageBuilder<K, V> select(final Selected selected) {
return new StageBuilder<>(new Pattern<>(selected));
}

public SelectBuilder<K, V> select(final String name, final Selected selected) {
return new SelectBuilder<>(new Pattern<>(name, selected));
public StageBuilder<K, V> select(final String name, final Selected selected) {
return new StageBuilder<>(new Pattern<>(name, selected));
}
}
Loading

0 comments on commit 9ca04d2

Please sign in to comment.