Skip to content

Commit

Permalink
apache#99 Sort operator support
Browse files Browse the repository at this point in the history
* #! - eclipse compilation

* #40 - Sort operator with in-memory RSBK sorting

* #40 - Spark SortTranslator

* #40 - Flink SortTranslator

* #40 - RangePartitioning

* #40 - PR#99
  • Loading branch information
horkyada authored and David Moravek committed Oct 5, 2018
1 parent ceb554a commit d58c055
Show file tree
Hide file tree
Showing 31 changed files with 1,208 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset.partitioning;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;

class RangePartitioner<T extends Comparable<? super T> & Serializable>
implements Partitioner<T> {

private final List<T> ranges;

public RangePartitioner(List<T> ranges) {
this.ranges = ranges;
}

@Override
public int getPartition(T element) {
int search = Collections.binarySearch(ranges, element);
if (search < 0) {
return ((-search) - 1);
}
return search;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.dataset.partitioning;

import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Comparators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

/**
* Partitioning based on ranges simulating total order. The ranges are expected to be provided
* already sorted.
* @param <T> type of ranges - must be a subtype {@link Comparable} as well as {@link Serializable}
*/
public class RangePartitioning<T extends Comparable<? super T> & Serializable>
implements Partitioning<T> {

private final List<T> ranges;

public RangePartitioning(List<T> ranges) {
this.ranges = new ArrayList<>(ranges);
Preconditions.checkArgument(
Comparators.isInStrictOrder(ranges, Comparator.naturalOrder()),
"Ranges are expected to be sorted!");
}

@SuppressWarnings("unchecked")
public RangePartitioning(T ... ranges) {
this(Arrays.asList(ranges));
}

@Override
public Partitioner<T> getPartitioner() {
return new RangePartitioner<>(ranges);
}

@Override
public int getNumPartitions() {
return ranges.size() + 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public final class GlobalWindowing<T>
implements Windowing<T, GlobalWindowing.Window> {

public static final class Window
extends cz.seznam.euphoria.core.client.dataset.windowing.Window
implements Comparable<Window> {
extends cz.seznam.euphoria.core.client.dataset.windowing.Window<GlobalWindowing.Window> {

static final Window INSTANCE = new Window();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.Collection;

public interface MergingWindowing<T, W extends Window & Comparable<W>>
public interface MergingWindowing<T, W extends Window<W>>
extends Windowing<T, W>
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package cz.seznam.euphoria.core.client.dataset.windowing;

public final class TimeInterval
extends Window
implements TimedWindow<TimeInterval>, Comparable<TimeInterval> {
extends Window<TimeInterval>
implements TimedWindow {

private final long startMillis;
private final long endMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Extension to {@link cz.seznam.euphoria.core.client.dataset.windowing.Window}
* defining time based constraints on the implementor.
*/
public interface TimedWindow<W extends Window> {
public interface TimedWindow {

/**
* Defines the timestamp/watermark until this window is considered open.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
* thereby grouping input elements into chunks
* for further processing in small (micro-)batches.
* <p>
* Subclasses should implement {@code equals()} and {@code hashCode()} so that logically
* Subclasses should implement {@code equals()}, {@code hashCode()} and {@code compareTo()} so that logically
* same windows are treated the same.
*/
public abstract class Window implements Serializable {
public abstract class Window<T extends Window<T>> implements Serializable, Comparable<T> {

@Override
public abstract int hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
OUT, JoinState, W> reduce;

name = getName() + "::ReduceStateByKey";
reduce = new ReduceStateByKey<>(
reduce = new ReduceStateByKey(
name,
flow,
union.output(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,4 @@ public ReduceFunction<VALUE, OUT> getReducer() {

return DAG.of(reduceByKey, format);
}




}
Loading

0 comments on commit d58c055

Please sign in to comment.