Skip to content

Commit

Permalink
#! Sanitze javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr committed Feb 2, 2017
1 parent de0e610 commit 86b188d
Show file tree
Hide file tree
Showing 86 changed files with 932 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

/**
* Annotation marking an operator a {@code Basic} operator.
* A basic operator is such operator that an executor *must* implement
* in order to be able to run any flow.
* Such an operator *must* be implemented (natively) by an executor
* in order to run a flow.
*/
@Documented
@Target(ElementType.TYPE)
public @interface Basic {

/** State complexity, use {@code StateComplexity.<value>}. */
int state();
/** @return the state complexity */
StateComplexity state();

/** Number of global repartition operations. */
/** @return number of global repartition operations */
int repartitions();

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
@Target(ElementType.TYPE)
public @interface Derived {

/** State complexity, use {@code StateComplexity.<value>}. */
int state();
/** @return the state complexity */
StateComplexity state();

/** Number of global repartition operations. */
/** @return the number of global repartition operations */
int repartitions();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
@Target(ElementType.TYPE)
public @interface Recommended {

/** Textual documentation of the reason of the recommendation. */
/**
* @return a human readable explanation why the annotated operator is recommendation
* for native implementation by an executor
*/
String reason();

/** State complexity, use {@code StateComplexity.<value>}. */
int state();
/** @return the state complexity */
StateComplexity state();

/** Number of global repartition operations. */
/** @return the number of global repartition operations */
int repartitions();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,26 @@
package cz.seznam.euphoria.core.annotation.operator;

/**
* Space complexity of operator's state in each window depending on size of input.
* Space complexity of an operator's state in each window
* depending on the size of input.
*/
public class StateComplexity {
public enum StateComplexity {

/** The size of state will be O(N) in the size of input. */
public static final int LINEAR = 1;
LINEAR,

/** The size of state will be sub-linear but not constant. */
public static final int SUBLINEAR = 2;
SUBLINEAR,

/** The size of state will be O(1) in the size of input. */
public static final int CONSTANT = 3;
CONSTANT,

/** There is no state in this operator. */
public static final int ZERO = 4;
ZERO,

/**
* The size of state will be O(1) if the passed function
* is `combinable` (commutative, associative), otherwise it will be O(N).
*/
public static final int CONSTANT_IF_COMBINABLE = 5;

// do not construct this object
private StateComplexity() { }

CONSTANT_IF_COMBINABLE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.PartitioningAware;

import java.io.Serializable;
import java.net.URI;
import java.util.Collection;

/**
* A dataset abstraction.
*
* @param <T> type of elements of this data set
*/
public interface Dataset<T> extends Serializable {

/**
* Retrieve Flow associated with this dataset.
* @return the flow associated with this data set
*/
Flow getFlow();

Expand All @@ -40,15 +41,20 @@ public interface Dataset<T> extends Serializable {
* This might be null, if this dataset has no explicit source,
* it is calculated. If this method returns null, getProducer returns non null
* and vice versa.
*
* @return this dataset's explicit source - if any
*/
DataSource<T> getSource();

/** Retrieve operator that produced this dataset (if any). */
/**
* @return the operator that produced this dataset - if any
*/
Operator<?, T> getProducer();

/**
* Retrieve collection of consumers of this dataset.
* This returns the list of currently known consumers (this can chnage
*
* @return the list of currently known consumers (this can change
* if another consumer is added to the flow).
*/
Collection<Operator<?, ?>> getConsumers();
Expand All @@ -61,34 +67,52 @@ public interface Dataset<T> extends Serializable {
<X> Partitioning<X> getPartitioning();


/** Is this a bounded dataset? */
/**
* @return {@code true} if this is a bounded data set,
* {@code false} if it is unbounded.
*/
boolean isBounded();


default void persist(URI uri) throws Exception {
persist(getFlow().createOutput(uri));
}

/** Persist this dataset. */
/**
* Persist this dataset.
*
* @param sink the sink to use to persist this data set's data to
*/
void persist(DataSink<T> sink);


default void checkpoint(URI uri) throws Exception {
checkpoint(getFlow().createOutput(uri));
}

/** Checkpoint this dataset. */
/**
* Checkpoint this dataset.
*
* @param sink the sink to use to checkpoint this data set's data to
*/
void checkpoint(DataSink<T> sink);


/** Retrieve output sink for this dataset. */
/**
* Retrieve output sink for this dataset.
*
* @return {@code null} if there is no explicitly set sink this
* data set is supposed to be persisted to, otherwise the
* sink provided through {@link #persist(DataSink)}.
*/
default DataSink<T> getOutputSink() {
return null;
}

/** Retrieve checkpoint sink for this dataset. */
/**
* Retrieve checkpoint sink for this dataset.
*
* @return {@code null} if no checkpoint sink has been defined,
* otherwise the sink provided through {@link #checkpoint(DataSink)}
*/
default DataSink<T> getCheckpointSink() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@
*/
public class Datasets {

/** Create output dataset for given operator. */
/**
* Create output dataset for given operator.
*
* @param <IN> the type of elements of the input dataset
* @param <OUT> the type of elements in the output dataset
*
* @param flow the flow to associate the output dataset with
* @param input the input dataset the output dataset is indirectly derived from
* @param op the operator producing the output dataset
*
* @return a dataset representing the output of the given operator
*/
@SuppressWarnings("unchecked")
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {
Expand All @@ -44,7 +55,16 @@ public <X> Partitioning<X> getPartitioning() {
};
}

/** Create dataset from {@code DataSource}. */
/**
* Create dataset from {@code DataSource}.
*
* @param <T> the type of elements in the dataset
*
* @param flow the flow to associate the dataset with
* @param source the source producing the returned dataset
*
* @return a dataset representing the given source
*/
public static <T> Dataset<T> createInputFromSource(
Flow flow, DataSource<T> source) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public interface Partitioner<T> extends Serializable {
/**
* Retrieve ID of partition. The ID of partition is then taken modulo
* the number of partitions.
*
* @param element the element to assign a partition to
*
* @return a partition of the element
*/
int getPartition(T element);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@

/**
* Partitioning of a dataset.
*
* @param <T> the type of elements this partitioning scheme is able to handle
*/
public interface Partitioning<T> extends Serializable {

Partitioner DEFAULT_PARTITIONER = new DefaultPartitioner();

/** Retrieve partitioner for dataset. */
/** @return the actual partitioner */
default Partitioner<T> getPartitioner() {
return DEFAULT_PARTITIONER;
}

default int getNumPartitions() {
return -1;
}

/**
* @return true if the default partitioner is used - e.g. no other has been explicitly set.
* Should not be called after distribution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ private Session(long gapDurationMillis) {

/**
* Early results will be triggered periodically until the window is finally closed.
*
* @param <T> the type of elements dealt with
*
* @param timeout the period after which to periodically trigger windows
*
* @return this instance (for method chaining purposes)
*/
public <T> Session<T> earlyTriggering(Duration timeout) {
this.earlyTriggeringPeriod = Objects.requireNonNull(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public static <T> Time<T> of(Duration duration) {

/**
* Early results will be triggered periodically until the window is finally closed.
*
* @param <T> the type of elements dealt with
*
* @param timeout the period after which to periodically trigger windows
*
* @return this instance (for method chaining purposes)
*/
public <T> Time<T> earlyTriggering(Duration timeout) {
this.earlyTriggeringPeriod = Objects.requireNonNull(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@ public static <T> TimeSliding<T> of(Duration duration, Duration step) {
return new TimeSliding<>(duration.toMillis(), step.toMillis());
}

/** Helper method to extract window label from context. */
/**
* Helper method to extract window label from context.
*
* @param context the execution context
*
* @return the {@link TimeInterval} window of this execution
*
* @throws ClassCastException if the context is not part of a
* time-sliding execution
*/
public static TimeInterval getLabel(Context<?> context) {
return (TimeInterval) context.getWindow();
}

private final long duration;
private final long slide;

private TimeSliding(
long duration,
long slide) {

private TimeSliding(long duration, long slide) {
this.duration = duration;
this.slide = slide;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ public interface Windowing<T, W extends Window> extends Serializable {
* The element will always have assigned old window, which can be reused
* by this windowing.
* The default windowing assigned on input is derived from batch windowing.
*
* @param el The element to which windows should be assigned.
*
* @return set of windows to be assign this element into, never {@code null}.
*/
Set<W> assignWindowsToElement(WindowedElement<?, T> el);

/**
* Retrieve instance of {@link Trigger} associated with the current windowing
* strategy.
* @return a {@link Trigger} associated with the current windowing strategy
*/
Trigger<W> getTrigger();
}
Loading

0 comments on commit 86b188d

Please sign in to comment.