Skip to content

Commit

Permalink
Merge pull request apache#17 from seznam/pete/16/dataset-getpartitioning
Browse files Browse the repository at this point in the history
Replace Dataset#getPartitioning() with Dataset#getNumPartitions()
  • Loading branch information
xitep authored Feb 3, 2017
2 parents e9d7a16 + f133030 commit f32ba6f
Show file tree
Hide file tree
Showing 41 changed files with 147 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ public interface Dataset<T> extends Serializable {
Collection<Operator<?, ?>> getConsumers();

/**
* Retrieve partitioning for this dataset.
* The dataset might be partitioned by some other type
* (using some extraction function).
* Determines the parallelism of this data set - if known. Typically,
* a data set is split into multiple partitions which can be processed
* in parallel.
*
* @return {@code < 0} if the partition count is unknown, otherwise the
* count of partitions of this dataset (which can potentially
* be processed in parallel)
*/
<X> Partitioning<X> getPartitioning();

int getNumPartitions();

/**
* @return {@code true} if this is a bounded data set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,13 @@ public class Datasets {
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {

return new OutputDataset<OUT>(flow, (Operator) op, input.isBounded()) {
return new OutputDataset<OUT>(flow, op, input.isBounded()) {
@Override
@SuppressWarnings("unchecked")
public <X> Partitioning<X> getPartitioning() {
if (op instanceof PartitioningAware) {
// only partitioning aware operators change the partitioning
PartitioningAware<IN> pa = (PartitioningAware<IN>) op;
return (Partitioning<X>) pa.getPartitioning();
}
return input.getPartitioning();
public int getNumPartitions() {
// only partitioning aware operators can change the partition count
return (op instanceof PartitioningAware)
? ((PartitioningAware) op).getPartitioning().getNumPartitions()
: input.getNumPartitions();
}
};
}
Expand All @@ -70,13 +67,8 @@ public static <T> Dataset<T> createInputFromSource(

return new InputDataset<T>(flow, source, source.isBounded()) {
@Override
public <X> Partitioning<X> getPartitioning() {
return new Partitioning<X>() {
@Override
public int getNumPartitions() {
return source.getPartitions().size();
}
};
public int getNumPartitions() {
return source.getPartitions().size();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset;
package cz.seznam.euphoria.core.client.dataset.partitioning;

/**
* Default partitioner used in {@link Partitioning}. It is has its own type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset;
package cz.seznam.euphoria.core.client.dataset.partitioning;

/**
* Partitioner by hash of input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset;
package cz.seznam.euphoria.core.client.dataset.partitioning;

/**
* Partitioning by hashcode of input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset;
package cz.seznam.euphoria.core.client.dataset.partitioning;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset;
package cz.seznam.euphoria.core.client.dataset.partitioning;

import java.io.Serializable;

Expand All @@ -23,7 +23,7 @@
* @param <T> the type of elements this partitioning scheme is able to handle
*/
public interface Partitioning<T> extends Serializable {

Partitioner DEFAULT_PARTITIONER = new DefaultPartitioner();

/** @return the actual partitioner */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ private Flow(String name, Settings settings) {
this.settings = cloneSettings(settings);
}

/**
* Creates a new (anonymous) Flow.
*
* @return a new flow with an undefined name,
* i.e. either not named at all or with a system generated name
*/
public static Flow create() {
return create(null);
}

/**
* Creates a new Flow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cz.seznam.euphoria.core.annotation.operator.Derived;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static class WindowingBuilder<IN, KEY>

WindowingBuilder(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
// define default partitioning
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Partitioner;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cz.seznam.euphoria.core.annotation.operator.Recommended;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -71,7 +71,7 @@ public static class WindowingBuilder<IN, ELEM>
UnaryFunction<IN, ELEM> mapper /* optional */) {

// define default partitioning
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cz.seznam.euphoria.core.annotation.operator.Recommended;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -141,9 +141,8 @@ public static class WindowingBuilder<LEFT, RIGHT, KEY, OUT>
BinaryFunctor<LEFT, RIGHT, OUT> joinFunc) {

// define default partitioning
super(new DefaultPartitioning<>(Math.max(
left.getPartitioning().getNumPartitions(),
right.getPartitioning().getNumPartitions())));
super(new DefaultPartitioning<>(
Math.max(left.getNumPartitions(), right.getNumPartitions())));

this.name = Objects.requireNonNull(name);
this.left = Objects.requireNonNull(left);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Partitioner;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -144,7 +144,7 @@ public static class DatasetBuilder4<IN, KEY, VALUE, OUT>
ReduceFunction<VALUE, OUT> reducer) {

// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cz.seznam.euphoria.core.annotation.operator.Basic;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -158,7 +158,7 @@ public static class DatasetBuilder5<
CombinableReduceFunction<STATE> stateCombiner)
{
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import cz.seznam.euphoria.core.annotation.operator.Derived;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioner;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -175,8 +175,7 @@ public Partitioner<Byte> getPartitioner() {
}
@Override
public int getNumPartitions() {
return numPartitions > 0
? numPartitions : input.getPartitioning().getNumPartitions();
return numPartitions > 0 ? numPartitions : input.getNumPartitions();
}
});
this.reducer = reducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cz.seznam.euphoria.core.annotation.operator.Basic;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.flow.Flow;

import java.util.Objects;
Expand Down Expand Up @@ -55,7 +55,7 @@ public static class OutputBuilder<IN>
private final Dataset<IN> input;
OutputBuilder(String name, Dataset<IN> input) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Partitioner;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -49,16 +49,6 @@ protected StateAwareWindowWiseSingleInputOperator(
this.output = createOutput(input);
}

protected StateAwareWindowWiseSingleInputOperator(
String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<KIN, KEY> extractor,
Windowing<WIN, W> windowing /* optional */,
UnaryFunction<WIN, Long> eventTimeAssigner /* optional */) {
this(name, flow, input, extractor, windowing, eventTimeAssigner, input.getPartitioning());
}

@Override
public Collection<Dataset<IN>> listInputs() {
return Collections.singletonList(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

import cz.seznam.euphoria.core.annotation.operator.Derived;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.windowing.Batch;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -76,7 +75,7 @@ public static class ByBuilder2<IN, KEY>
private UnaryFunction<IN, Long> valueExtractor = e -> 1L;
ByBuilder2(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.Partitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.flow.Flow;
Expand Down Expand Up @@ -170,8 +170,7 @@ public static class WindowByBuilder<IN, K, V, S extends Comparable<S>>
UnaryFunction<IN, V> valueFn,
UnaryFunction<IN, S> scoreFn)
{
super(new DefaultPartitioning<>(
input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = requireNonNull(name);
this.input = requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public static class OfBuilder {
this.name = name;
}

public <IN> OutputBuilder<IN> of(Dataset<IN> left, Dataset<IN> right)
{
public <IN> OutputBuilder<IN> of(Dataset<IN> left, Dataset<IN> right) {
if (right.getFlow() != left.getFlow()) {
throw new IllegalArgumentException("Pass inputs from the same flow");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.HashPartitioner;
import cz.seznam.euphoria.core.client.dataset.HashPartitioning;
import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner;
import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Time;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.util.Pair;
Expand Down
Loading

0 comments on commit f32ba6f

Please sign in to comment.