Skip to content

Commit

Permalink
apache#31 [euphoria-core] Implementation of accumulator API + integra…
Browse files Browse the repository at this point in the history
…tion with Flink executor
  • Loading branch information
vanekjar committed May 23, 2017
1 parent 0d4edf0 commit 0effeed
Show file tree
Hide file tree
Showing 74 changed files with 1,352 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import cz.seznam.euphoria.core.client.dataset.windowing.TimeSliding;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.StdoutSink;
import cz.seznam.euphoria.core.client.io.VoidSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
Expand Down Expand Up @@ -148,7 +148,7 @@ public void execute() throws Exception {
.by(Pair::getFirst, Pair::getFirst)
.using((Pair<String, Integer> left,
Pair<String, Integer> right,
Context<Double> context) -> {
Collector<Double> context) -> {
double score = rank(
longIntervalMillis, left.getSecond(),
shortItervalMillis, right.getSecond(),
Expand All @@ -170,7 +170,7 @@ public void execute() throws Exception {
.output();

FlatMap.of(output)
.using((Triple<Byte, Pair<String, Double>, Double> e, Context<String> c) -> {
.using((Triple<Byte, Pair<String, Double>, Double> e, Collector<String> c) -> {
Date now = new Date();
Date stamp = new Date(TimeSliding.getLabel(c).getEndMillis());
c.collect(now + ": " + stamp + ", " + e.getSecond().getFirst() + ", " + e.getSecond().getSecond());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.io.ListDataSource;
import cz.seznam.euphoria.core.client.operator.FlatMap;
Expand All @@ -46,7 +46,7 @@ static Dataset<Pair<Long, String>> getInput(boolean test, URI uri, Flow flow)
.using(new UnaryFunctor<Pair<byte[], byte[]>, Pair<Long, String>>() {
private final SearchEventsParser parser = new SearchEventsParser();
@Override
public void apply(Pair<byte[], byte[]> pair, Context<Pair<Long, String>> context) {
public void apply(Pair<byte[], byte[]> pair, Collector<Pair<Long, String>> context) {
try {
SearchEventsParser.Query q = parser.parse(pair.getSecond());
if (q != null && q.query != null && !q.query.isEmpty()) {
Expand All @@ -69,7 +69,7 @@ public void apply(Pair<byte[], byte[]> pair, Context<Pair<Long, String>> context
.using(new UnaryFunctor<String, Pair<Long, String>>() {
SearchEventsParser parser = new SearchEventsParser();
@Override
public void apply(String line, Context<Pair<Long, String>> context) {
public void apply(String line, Collector<Pair<Long, String>> context) {
try {
SearchEventsParser.Query q = parser.parse(line);
if (q != null && q.query != null && !q.query.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

/**
* Accumulators collect values from user functions.
* Accumulators allow user to calculate statistics during the flow execution.
* <p>
* Accumulators are inspired by the Hadoop/MapReduce counters.
*/
public interface Accumulator {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.util.Settings;

import java.io.Serializable;

/**
* Provides access to an accumulator backend service. It is intended to be
* implemented by third party to support different type of services.
*/
public interface AccumulatorProvider {

/**
* Get an existing instance of a counter or create a new one.
*
* @param name Unique name of the counter.
* @return Instance of a counter.
*/
Counter getCounter(String name);

/**
* Get an existing instance of a histogram or create a new one.
*
* @param name Unique name of the histogram.
* @return Instance of a histogram.
*/
Histogram getHistogram(String name);

/**
* Get an existing instance of a timer or create a new one.
*
* @param name Unique name of the timer.
* @return Instance of a timer.
*/
Timer getTimer(String name);

/**
* Creates a new instance of {@link AccumulatorProvider}
* initialized by given settings.
* <p>
* It is required this factory is thread-safe.
*/
@FunctionalInterface
interface Factory extends Serializable {
AccumulatorProvider create(Settings settings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

/**
* Counter is a type of accumulator making a sum from integral numbers.
*/
public interface Counter extends Accumulator {

/**
* Increment counter by given value.
* @param value Value to be added to the counter.
*/
void increment(long value);

/**
* Increment counter by one.
*/
void increment();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

/**
* Histogram is a type of accumulator recording a distribution of different values.
*/
public interface Histogram extends Accumulator {

/**
* Add specified value.
* @param value Value to be added.
*/
void add(long value);

/**
* Add specified value multiple times.
* @param value Value to be added.
* @param times Number of occurrences to add.
*/
void add(long value, long times);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
* Timer provides convenience API very similar to {@link Histogram}
* but extended by time unit support.
*/
public interface Timer extends Accumulator {

/**
* Add specific duration.
* @param duration Duration to be added.
*/
void add(Duration duration);

/**
* Add specific duration with given time unit
* @param duration Duration to be added.
* @param unit Time unit.
*/
default void add(long duration, TimeUnit unit) {
add(Duration.ofMillis(unit.toMillis(duration)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.accumulators;

import cz.seznam.euphoria.core.util.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/**
* Placeholder implementation of {@link AccumulatorProvider} that
* may be used in executors as a default.
*/
public class VoidAccumulatorProvider implements AccumulatorProvider {

private static final Logger LOG = LoggerFactory.getLogger(VoidAccumulatorProvider.class);

private final Map<String, Accumulator> accumulators = new HashMap<>();

private VoidAccumulatorProvider() {}

@Override
public Counter getCounter(String name) {
return getAccumulator(name, VoidCounter.class);
}

@Override
public Histogram getHistogram(String name) {
return getAccumulator(name, VoidHistogram.class);
}

@Override
public Timer getTimer(String name) {
return getAccumulator(name, VoidTimer.class);
}

private <ACC extends Accumulator> ACC getAccumulator(String name, Class<ACC> clz) {
try {
ACC acc = clz.getConstructor().newInstance();
if (accumulators.putIfAbsent(name, acc) == null) {
LOG.warn("Using accumulators with VoidAccumulatorProvider will have no effect");
}
return acc;
} catch (Exception e) {
throw new RuntimeException("Exception during accumulator initialization: " + clz, e);
}
}

public static Factory getFactory() {
return Factory.get();
}

// ------------------------

public static class Factory implements AccumulatorProvider.Factory {

private static final Factory INSTANCE = new Factory();

private static final AccumulatorProvider PROVIDER =
new VoidAccumulatorProvider();

private Factory() {}

@Override
public AccumulatorProvider create(Settings settings) {
return PROVIDER;
}

public static Factory get() {
return INSTANCE;
}
}

// ------------------------

public static class VoidCounter implements Counter {

@Override
public void increment(long value) {
// NOOP
}

@Override
public void increment() {
// NOOP
}
}

public static class VoidHistogram implements Histogram {

@Override
public void add(long value) {
// NOOP
}

@Override
public void add(long value, long times) {
// NOOP
}
}

public static class VoidTimer implements Timer {

@Override
public void add(Duration duration) {
// NOOP
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package cz.seznam.euphoria.core.client.dataset.windowing;

import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.triggers.TimeTrigger;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -44,7 +44,7 @@ public static <T> TimeSliding<T> of(Duration duration, Duration step) {
* @throws ClassCastException if the context is not part of a
* time-sliding execution
*/
public static TimeInterval getLabel(Context<?> context) {
public static TimeInterval getLabel(Collector<?> context) {
return (TimeInterval) context.getWindow();
}

Expand Down
Loading

0 comments on commit 0effeed

Please sign in to comment.