Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part 12] Calling Source::close, Sink::close as part of stage execution #163

Merged
merged 2 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,34 @@
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.sink.Sink;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;


public class SinkPublisher<T, R> implements WorkerPublisher<T, R> {
/**
* Implementation that publishes the results of a stage to a sink such as an SSE port.
* @param <T>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add documentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

*/
public class SinkPublisher<T> implements WorkerPublisher<T> {

private static final Logger logger = LoggerFactory.getLogger(SinkPublisher.class);
private SinkHolder<R> sinkHolder;
private PortSelector portSelector;
private Context context;
private Action0 observableTerminatedCallback;
private Action0 onSubscribeAction;
private Action0 onUnsubscribeAction;
private Action0 observableOnCompleteCallback;
private Action1<Throwable> observableOnErrorCallback;
private final SinkHolder<T> sinkHolder;
private final PortSelector portSelector;
private final Context context;
private final Action0 observableTerminatedCallback;
private final Action0 onSubscribeAction;
private final Action0 onUnsubscribeAction;
private final Action0 observableOnCompleteCallback;
private final Action1<Throwable> observableOnErrorCallback;
private Subscription eagerSubscription;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate the state from the rest of the fields.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private Sink<T> sink;

public SinkPublisher(SinkHolder<R> sinkHolder,
public SinkPublisher(SinkHolder<T> sinkHolder,
PortSelector portSelector,
Context context,
Action0 observableTerminatedCallback, Action0 onSubscribeAction, Action0 onUnsubscribeAction,
Expand All @@ -61,9 +68,9 @@ public SinkPublisher(SinkHolder<R> sinkHolder,

@Override
@SuppressWarnings( {"unchecked", "rawtypes"})
public void start(StageConfig<T, R> stage,
Observable<Observable<R>> observablesToPublish) {
final Sink<R> sink = sinkHolder.getSinkAction();
public void start(StageConfig<?, T> stage,
Observable<Observable<T>> observablesToPublish) {
sink = sinkHolder.getSinkAction();

int sinkPort = -1;
if (sinkHolder.isPortRequested()) {
Expand All @@ -72,7 +79,7 @@ public void start(StageConfig<T, R> stage,

// apply transform

Observable<R> merged = Observable.merge(observablesToPublish);
Observable<T> merged = Observable.merge(observablesToPublish);
final Observable wrappedO = merged.lift(new MonitorOperator("worker_sink"));

Observable o = Observable
Expand Down Expand Up @@ -101,17 +108,24 @@ public void call() {
.share();
if (context.getWorkerInfo().getDurationType() == MantisJobDurationType.Perpetual) {
// eager subscribe, don't allow unsubscribe back
o.subscribe();
eagerSubscription = o.subscribe();
}
sink.init(context);
sink.call(context, new PortRequest(sinkPort),
o);
//o.lift(new DoOnRequestOperator("beforeShare")).share().lift(new DropOperator<>("sink_share")));
sink.call(context, new PortRequest(sinkPort), o);
}

@Override
public RxMetrics getMetrics() {return null;}

@Override
public void stop() {}
public void close() throws IOException {
try {
sink.close();
} finally {
if (eagerSubscription != null) {
eagerSubscription.unsubscribe();
eagerSubscription = null;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should sink also be set to null eventually?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static io.mantisrx.runtime.parameter.ParameterUtils.STAGE_CONCURRENCY;

import com.mantisrx.common.utils.Closeables;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
Expand All @@ -42,6 +43,8 @@
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivx.mantis.operators.GroupedObservableUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,7 +88,7 @@ private StageExecutors() {
}

@SuppressWarnings( {"rawtypes", "unchecked"})
public static void executeSingleStageJob(final SourceHolder source, final StageConfig stage,
public static Closeable executeSingleStageJob(final SourceHolder source, final StageConfig stage,
final SinkHolder sink, final PortSelector portSelector, RxMetrics rxMetrics,
final Context context, Action0 sinkObservableTerminatedCallback,
final int workerIndex,
Expand All @@ -112,17 +115,19 @@ public Observable start(StageConfig previousStage) {
}

@Override
public void stop() {}
public void close() throws IOException {
source.getSourceFunction().close();
}
};
// sink publisher with metrics
WorkerPublisher sinkPublisher = new SinkPublisher(sink, portSelector, context,
sinkObservableTerminatedCallback, onSinkSubscribe, onSinkUnsubscribe,
observableOnCompleteCallback, observableOnErrorCallback);
StageExecutors.executeIntermediate(sourceConsumer, stage, sinkPublisher, context);
return StageExecutors.executeIntermediate(sourceConsumer, stage, sinkPublisher, context);
}

@SuppressWarnings( {"rawtypes", "unchecked"})
public static void executeSource(final int workerIndex, final SourceHolder source, final StageConfig stage,
public static Closeable executeSource(final int workerIndex, final SourceHolder source, final StageConfig stage,
WorkerPublisher publisher, final Context context, final Observable<Integer> totalWorkerAtStageObservable) {
// create a consumer from passed in source
WorkerConsumer sourceConsumer = new WorkerConsumer() {
Expand All @@ -137,9 +142,11 @@ public Observable start(StageConfig stage) {
}

@Override
public void stop() {}
public void close() throws IOException {
source.getSourceFunction().close();
}
};
executeIntermediate(sourceConsumer, stage, publisher, context);
return executeIntermediate(sourceConsumer, stage, publisher, context);
}


Expand Down Expand Up @@ -452,7 +459,7 @@ private static <K, T, R> Observable<Observable<R>> setupGroupToScalarStage(Group
}

@SuppressWarnings( {"rawtypes", "unchecked"})
public static <T, R> void executeIntermediate(WorkerConsumer consumer,
public static <T, R> Closeable executeIntermediate(WorkerConsumer consumer,
final StageConfig<T, R> stage, WorkerPublisher publisher, final Context context) {
if (consumer == null) {
throw new IllegalArgumentException("consumer cannot be null");
Expand Down Expand Up @@ -513,17 +520,20 @@ else if (stage instanceof ScalarToGroup) {
}

publisher.start(stage, toSink);
// the ordering is important here as we want to first close the sinks so that the subscriptions
// are first cut off before closing the sources.
return Closeables.combine(publisher, consumer);
}

@SuppressWarnings( {"rawtypes", "unchecked"})
public static void executeSink(WorkerConsumer consumer, StageConfig stage, SinkHolder sink,
public static Closeable executeSink(WorkerConsumer consumer, StageConfig stage, SinkHolder sink,
PortSelector portSelector, RxMetrics rxMetrics, Context context,
Action0 sinkObservableCompletedCallback,
final Action0 onSinkSubscribe, final Action0 onSinkUnsubscribe,
Action0 observableOnCompleteCallback, Action1<Throwable> observableOnErrorCallback) {
WorkerPublisher sinkPublisher = new SinkPublisher(sink, portSelector, context,
sinkObservableCompletedCallback, onSinkSubscribe, onSinkUnsubscribe,
observableOnCompleteCallback, observableOnErrorCallback);
executeIntermediate(consumer, stage, sinkPublisher, context);
return executeIntermediate(consumer, stage, sinkPublisher, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
package io.mantisrx.runtime.executor;

import io.mantisrx.runtime.StageConfig;
import java.io.Closeable;
import rx.Observable;

/**
* Abstraction for executing source observables. Source observables can be of two forms:
* 1). ones that consume from previous jobs or act as direct sources somehow
* 2). ones that consume from workers in previous stages.
*
* @param <T> type of the data that the observable emits
*/
public interface WorkerConsumer<T> extends Closeable {

public interface WorkerConsumer<T, R> {

public Observable<Observable<T>> start(StageConfig<T, R> stage);

public void stop();

Observable<Observable<T>> start(StageConfig<T, ?> stage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.reconciliator.Reconciliator;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;


public class WorkerConsumerRemoteObservable<T, R> implements WorkerConsumer<T, R> {
public class WorkerConsumerRemoteObservable<T, R> implements WorkerConsumer<T> {

private static final Logger logger = LoggerFactory.getLogger(WorkerConsumerRemoteObservable.class);

Expand All @@ -55,7 +55,7 @@ public WorkerConsumerRemoteObservable(String name,

@SuppressWarnings( {"rawtypes", "unchecked"})
@Override
public Observable<Observable<T>> start(StageConfig<T, R> stage) {
public Observable<Observable<T>> start(StageConfig<T, ?> stage) {
if (stage instanceof KeyToKey || stage instanceof KeyToScalar || stage instanceof GroupToScalar || stage instanceof GroupToGroup) {

logger.info("Remote connection to stage " + name + " is KeyedStage");
Expand Down Expand Up @@ -100,6 +100,7 @@ private void registerMetrics(Metrics metrics) {
}

@Override
public void stop() {}
public void close() throws IOException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there nothing to close for this executor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the Observable handles most of the behavior under the hood when the unsubscribe occurs. Or just depends on the whole JVM being blown away.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

import io.mantisrx.runtime.StageConfig;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.io.Closeable;
import rx.Observable;

/**
* WorkerPublisher is an abstraction for a sink operator execution.
* @param <T> type of the observable that's getting published
*/
public interface WorkerPublisher<T> extends Closeable {

public interface WorkerPublisher<T, R> {

public void start(StageConfig<T, R> stage, Observable<Observable<R>> observableToPublish);

public RxMetrics getMetrics();
void start(StageConfig<?, T> stage, Observable<Observable<T>> observableToPublish);

public void stop();
RxMetrics getMetrics();
}
Loading