Skip to content

Commit

Permalink
Miscellaneous inspection changes from IntelliJ
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen authored and tomwhite committed Mar 10, 2016
1 parent f9e8fab commit ec172ba
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -42,7 +43,7 @@ private T deserialize() {
try {
return coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
} catch (IOException e) {
throw new RuntimeException("Error deserializing broadcast variable", e);
throw new IllegalStateException("Error deserializing broadcast variable", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import java.io.ByteArrayInputStream;
Expand All @@ -26,7 +27,10 @@
/**
* Serialization utility class.
*/
public class CoderHelpers {
public final class CoderHelpers {
private CoderHelpers() {
}

/**
* Utility method for serializing an object using the specified coder.
*
Expand All @@ -39,7 +43,7 @@ static <T> byte[] toByteArray(T value, Coder<T> coder) {
try {
coder.encode(value, baos, new Coder.Context(true));
} catch (IOException e) {
throw new RuntimeException("Error encoding value: " + value, e);
throw new IllegalStateException("Error encoding value: " + value, e);
}
return baos.toByteArray();
}
Expand All @@ -51,7 +55,7 @@ static <T> byte[] toByteArray(T value, Coder<T> coder) {
* @param coder Coder to serialize with.
* @return List of bytes representing serialized objects.
*/
static <T> List<byte[]> toByteArrays(Iterable<T> values, final Coder<T> coder) {
static <T> List<byte[]> toByteArrays(Iterable<T> values, Coder<T> coder) {
List<byte[]> res = Lists.newLinkedList();
for (T value : values) {
res.add(toByteArray(value, coder));
Expand All @@ -72,7 +76,7 @@ static <T> T fromByteArray(byte[] serialized, Coder<T> coder) {
try {
return coder.decode(bais, new Coder.Context(true));
} catch (IOException e) {
throw new RuntimeException("Error decoding bytes for coder: " + coder, e);
throw new IllegalStateException("Error decoding bytes for coder: " + coder, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Iterable<O> call(Iterator<I> iter) throws Exception {

private class ProcCtxt<I, O> extends DoFn<I, O>.ProcessContext {

private List<O> outputs = new LinkedList<>();
private final List<O> outputs = new LinkedList<>();
private I element;

public ProcCtxt(DoFn<I, O> fn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import java.util.List;
Expand Down Expand Up @@ -111,7 +112,8 @@ <T> BroadcastHelper<T> getBroadcastHelper(PObject<T> value) {
public <T> T get(PObject<T> value) {
if (pobjects.containsKey(value)) {
return (T) pobjects.get(value);
} else if (rdds.containsKey(value)) {
}
if (rdds.containsKey(value)) {
JavaRDDLike rdd = rdds.get(value);
//TODO: need same logic from get() method below here for serialization of bytes
T res = (T) Iterables.getOnlyElement(rdd.collect());
Expand All @@ -133,6 +135,7 @@ public <T> Iterable<T> get(PCollection<T> pcollection) {
JavaRDDLike bytes = rdd.map(CoderHelpers.toByteFunction(coder));
List clientBytes = bytes.collect();
return Iterables.transform(clientBytes, new Function<byte[], T>() {
@Override
public T apply(byte[] bytes) {
return (T) CoderHelpers.fromByteArray(bytes, coder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import java.util.Collection;
Expand All @@ -36,8 +37,8 @@
import scala.Tuple2;

/**
* DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enrishing the
* undelrying data with multiple TupleTags.
* DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the
* underlying data with multiple TupleTags.
*
* @param <I> Input type for DoFunction.
* @param <O> Output type for DoFunction.
Expand All @@ -52,7 +53,7 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
private final TupleTag<?> mMainOutputTag;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;

public MultiDoFnFunction(
MultiDoFnFunction(
DoFn<I, O> fn,
SparkRuntimeContext runtimeContext,
TupleTag<O> mainOutputTag,
Expand All @@ -74,6 +75,7 @@ public Iterable<Tuple2<TupleTag<?>, Object>> call(Iterator<I> iter) throws Excep
mFunction.finishBundle(ctxt);
return Iterables.transform(ctxt.outputs.entries(),
new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
@Override
public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
}
Expand All @@ -82,10 +84,10 @@ public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {

private class ProcCtxt<I, O> extends DoFn<I, O>.ProcessContext {

private Multimap<TupleTag<?>, Object> outputs = LinkedListMultimap.create();
private final Multimap<TupleTag<?>, Object> outputs = LinkedListMultimap.create();
private I element;

public ProcCtxt(DoFn<I, O> fn) {
ProcCtxt(DoFn<I, O> fn) {
fn.super();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
/*
* Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;


public interface SparkPipelineOptions extends PipelineOptions {
@Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
@Default.String("local[1]")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
/*
* Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

public class SparkPipelineOptionsFactory {
public final class SparkPipelineOptionsFactory {
private SparkPipelineOptionsFactory() {
}

public static SparkPipelineOptions create() {
return PipelineOptionsFactory.create(SparkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class SparkPipelineRunner extends PipelineRunner<EvaluationResult> {

private static final Logger LOG = Logger.getLogger(SparkPipelineRunner.class.getName());
/** Options used in this pipeline runner.*/
private SparkPipelineOptions mOptions;
private final SparkPipelineOptions mOptions;

/**
* Creates and returns a new SparkPipelineRunner with default options. In particular, against a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.dataflow.spark;

import java.io.Serializable;
Expand All @@ -37,11 +38,11 @@ class SparkRuntimeContext implements Serializable {
/**
* An accumulator that is a map from names to aggregators.
*/
private Accumulator<NamedAggregators> accum;
private final Accumulator<NamedAggregators> accum;
/**
* Map fo names to dataflow aggregators.
*/
private Map<String, Aggregator> aggregators = new HashMap<>();
private final Map<String, Aggregator> aggregators = new HashMap<>();

public SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) {
this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam());
Expand Down
Loading

0 comments on commit ec172ba

Please sign in to comment.