Skip to content

Commit

Permalink
[BEAM-579] Integrate NamedAggregators into Spark sink system
Browse files Browse the repository at this point in the history
This closes #867
  • Loading branch information
Sela committed Aug 26, 2016
2 parents 2046783 + 226dea2 commit f346c87
Show file tree
Hide file tree
Showing 18 changed files with 611 additions and 16 deletions.
6 changes: 6 additions & 0 deletions runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<spark.version>1.6.2</spark.version>
<hadoop.version>2.2.0</hadoop.version>
<kafka.version>0.8.2.1</kafka.version>
<dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
</properties>

<profiles>
Expand Down Expand Up @@ -231,6 +232,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
+ "execution is stopped")
@Default.Long(-1)
Long getTimeout();
void setTimeout(Long batchInterval);
void setTimeout(Long timeoutMillis);

@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(1000)
Long getBatchIntervalMillis();
void setBatchIntervalMillis(Long batchInterval);

@Description("Enable/disable sending aggregator values to Spark's metric sinks")
@Default.Boolean(true)
Boolean getEnableSparkSinks();
void setEnableSparkSinks(Boolean enableSparkSinks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

package org.apache.beam.runners.spark.aggregators;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;

import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -69,6 +74,22 @@ public <T> T getValue(String name, Class<T> typeClass) {
return typeClass.cast(mNamedAggregators.get(name).render());
}

/**
* @return a map of all the aggregator names and their <b>rendered </b>values
*/
public Map<String, ?> renderAll() {
return
ImmutableMap.copyOf(
Maps.transformValues(mNamedAggregators,
new Function<State<?, ?, ?>, Object>() {

@Override
public Object apply(State<?, ?, ?> state) {
return state.render();
}
}));
}

/**
* Merges another NamedAggregators instance with this instance.
*
Expand Down Expand Up @@ -116,6 +137,7 @@ public String toString() {
* @param <OutputT> Output data type
*/
public interface State<InputT, InterT, OutputT> extends Serializable {

/**
* @param element new element to update state
*/
Expand All @@ -133,16 +155,16 @@ public interface State<InputT, InterT, OutputT> extends Serializable {
/**
* =&gt; combineFunction in data flow.
*/
public static class CombineFunctionState<InputT, InterT, OutpuT>
implements State<InputT, InterT, OutpuT> {
public static class CombineFunctionState<InputT, InterT, OutputT>
implements State<InputT, InterT, OutputT> {

private Combine.CombineFn<InputT, InterT, OutpuT> combineFn;
private Combine.CombineFn<InputT, InterT, OutputT> combineFn;
private Coder<InputT> inCoder;
private SparkRuntimeContext ctxt;
private transient InterT state;

public CombineFunctionState(
Combine.CombineFn<InputT, InterT, OutpuT> combineFn,
Combine.CombineFn<InputT, InterT, OutputT> combineFn,
Coder<InputT> inCoder,
SparkRuntimeContext ctxt) {
this.combineFn = combineFn;
Expand All @@ -157,7 +179,7 @@ public void update(InputT element) {
}

@Override
public State<InputT, InterT, OutpuT> merge(State<InputT, InterT, OutpuT> other) {
public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) {
this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
return this;
}
Expand All @@ -168,12 +190,12 @@ public InterT current() {
}

@Override
public OutpuT render() {
public OutputT render() {
return combineFn.extractOutput(state);
}

@Override
public Combine.CombineFn<InputT, InterT, OutpuT> getCombineFn() {
public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() {
return combineFn;
}

Expand All @@ -192,7 +214,7 @@ private void writeObject(ObjectOutputStream oos) throws IOException {
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ctxt = (SparkRuntimeContext) ois.readObject();
combineFn = (Combine.CombineFn<InputT, InterT, OutpuT>) ois.readObject();
combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject();
inCoder = (Coder<InputT>) ois.readObject();
try {
state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.spark.aggregators.metrics;

import com.codahale.metrics.Metric;

import org.apache.beam.runners.spark.aggregators.NamedAggregators;

/**
* An adapter between the {@link NamedAggregators} and codahale's {@link Metric}
* interface.
*/
public class AggregatorMetric implements Metric {

private final NamedAggregators namedAggregators;

private AggregatorMetric(final NamedAggregators namedAggregators) {
this.namedAggregators = namedAggregators;
}

public static AggregatorMetric of(final NamedAggregators namedAggregators) {
return new AggregatorMetric(namedAggregators);
}

NamedAggregators getNamedAggregators() {
return namedAggregators;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.spark.aggregators.metrics;

import com.codahale.metrics.MetricRegistry;

import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.spark.metrics.source.Source;

/**
* A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric},
* wrapping an underlying {@link NamedAggregators} instance.
*/
public class AggregatorMetricSource implements Source {

private static final String SOURCE_NAME = "NamedAggregators";

private final MetricRegistry metricRegistry = new MetricRegistry();

public AggregatorMetricSource(final NamedAggregators aggregators) {
metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators));
}

@Override
public String sourceName() {
return SOURCE_NAME;
}

@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
}
Loading

0 comments on commit f346c87

Please sign in to comment.