Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6488] Portable Flink runner support for running cross-language … #7709

Merged
merged 3 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
Expand Down Expand Up @@ -66,25 +66,17 @@
* EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory
* stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for
* each client. {@link DefaultJobBundleFactory} initializes the Environment lazily when the forStage
* is called for a stage. This factory is not capable of handling mixed types of environment.
* is called for a stage.
*/
@ThreadSafe
public class DefaultJobBundleFactory implements JobBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);

private final IdGenerator stageIdGenerator;
private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
// Using environment as the initialization marker.
private Environment environment;
private ExecutorService executor;
private GrpcFnServer<FnApiControlClientPoolService> controlServer;
private GrpcFnServer<GrpcLoggingService> loggingServer;
private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
private GrpcFnServer<GrpcDataService> dataServer;
private GrpcFnServer<GrpcStateService> stateServer;
private MapControlClientPool clientPool;
private EnvironmentFactory environmentFactory;
private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
private final ExecutorService executor;
private final MapControlClientPool clientPool;
private final IdGenerator stageIdGenerator;

public static DefaultJobBundleFactory create(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
Expand All @@ -94,42 +86,42 @@ public static DefaultJobBundleFactory create(
DefaultJobBundleFactory(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
this.stageIdGenerator = stageIdGenerator;
this.environmentCache =
createEnvironmentCache(
environment -> {
synchronized (this) {
checkAndInitialize(jobInfo, environmentFactoryMap, environment);
}
return environmentFactory.createEnvironment(environment);
});
createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, serverFactory));
}

@VisibleForTesting
DefaultJobBundleFactory(
EnvironmentFactory environmentFactory,
Map<String, EnvironmentFactory.Provider> environmentFactoryMap,
IdGenerator stageIdGenerator,
GrpcFnServer<FnApiControlClientPoolService> controlServer,
GrpcFnServer<GrpcLoggingService> loggingServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
GrpcFnServer<GrpcDataService> dataServer,
GrpcFnServer<GrpcStateService> stateServer)
throws Exception {
GrpcFnServer<GrpcStateService> stateServer) {
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
this.stageIdGenerator = stageIdGenerator;
this.controlServer = controlServer;
this.loggingServer = loggingServer;
this.retrievalServer = retrievalServer;
this.provisioningServer = provisioningServer;
this.dataServer = dataServer;
this.stateServer = stateServer;
this.environmentCache =
createEnvironmentCache(env -> environmentFactory.createEnvironment(env));
ServerInfo serverInfo =
new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
.setControlServer(controlServer)
.setLoggingServer(loggingServer)
.setRetrievalServer(retrievalServer)
.setProvisioningServer(provisioningServer)
.setDataServer(dataServer)
.setStateServer(stateServer)
.build();
this.environmentCache = createEnvironmentCache(serverFactory -> serverInfo);
}

private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
ThrowingFunction<Environment, RemoteEnvironment> environmentCreator) {
ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator) {
return CacheBuilder.newBuilder()
.removalListener(
(RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
Expand All @@ -145,8 +137,21 @@ private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCach
new CacheLoader<Environment, WrappedSdkHarnessClient>() {
@Override
public WrappedSdkHarnessClient load(Environment environment) throws Exception {
EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
ServerInfo serverInfo = serverInfoCreator.apply(serverFactory);

EnvironmentFactory environmentFactory =
environmentFactoryProvider.createEnvironmentFactory(
serverInfo.getControlServer(),
serverInfo.getLoggingServer(),
serverInfo.getRetrievalServer(),
serverInfo.getProvisioningServer(),
clientPool,
stageIdGenerator);
return WrappedSdkHarnessClient.wrapping(
environmentCreator.apply(environment), dataServer);
environmentFactory.createEnvironment(environment), serverInfo);
}
});
}
Expand All @@ -161,12 +166,13 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
ProcessBundleDescriptors.fromExecutableStage(
stageIdGenerator.getId(),
executableStage,
dataServer.getApiServiceDescriptor(),
stateServer.getApiServiceDescriptor());
wrappedClient.getServerInfo().getDataServer().getApiServiceDescriptor(),
wrappedClient.getServerInfo().getStateServer().getApiServiceDescriptor());
} catch (IOException e) {
throw new RuntimeException(e);
}
return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor, stateServer);
return SimpleStageBundleFactory.create(
wrappedClient, processBundleDescriptor, wrappedClient.getServerInfo().getStateServer());
}

@Override
Expand All @@ -176,14 +182,6 @@ public void close() throws Exception {
environmentCache.invalidateAll();
environmentCache.cleanUp();

// Tear down common servers.
stateServer.close();
dataServer.close();
controlServer.close();
loggingServer.close();
retrievalServer.close();
provisioningServer.close();

executor.shutdown();
}

Expand Down Expand Up @@ -269,85 +267,114 @@ protected static class WrappedSdkHarnessClient implements AutoCloseable {

private final RemoteEnvironment environment;
private final SdkHarnessClient client;
private final ServerInfo serverInfo;

static WrappedSdkHarnessClient wrapping(
RemoteEnvironment environment, GrpcFnServer<GrpcDataService> dataServer) {
static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
SdkHarnessClient client =
SdkHarnessClient.usingFnApiClient(
environment.getInstructionRequestHandler(), dataServer.getService());
return new WrappedSdkHarnessClient(environment, client);
environment.getInstructionRequestHandler(), serverInfo.getDataServer().getService());
return new WrappedSdkHarnessClient(environment, client, serverInfo);
}

private WrappedSdkHarnessClient(RemoteEnvironment environment, SdkHarnessClient client) {
private WrappedSdkHarnessClient(
RemoteEnvironment environment, SdkHarnessClient client, ServerInfo serverInfo) {
this.environment = environment;
this.client = client;
this.serverInfo = serverInfo;
}

SdkHarnessClient getClient() {
return client;
}

ServerInfo getServerInfo() {
return serverInfo;
}

@Override
public void close() throws Exception {
try (AutoCloseable envCloser = environment) {
// Wrap resources in try-with-resources to ensure all are cleaned up.
}
try (AutoCloseable stateServer = serverInfo.getStateServer();
AutoCloseable dateServer = serverInfo.getDataServer();
AutoCloseable controlServer = serverInfo.getControlServer();
AutoCloseable loggingServer = serverInfo.getLoggingServer();
AutoCloseable retrievalServer = serverInfo.getRetrievalServer();
AutoCloseable provisioningServer = serverInfo.getProvisioningServer()) {}
// TODO: Wait for executor shutdown?
}
}

@GuardedBy("this")
private void checkAndInitialize(
JobInfo jobInfo,
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap,
Environment environment)
private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory)
throws IOException {
Preconditions.checkNotNull(environment, "Environment can not be null");
if (this.environment != null) {
Preconditions.checkArgument(
this.environment.getUrn().equals(environment.getUrn()),
"Unsupported: Mixing environment types (%s, %s) is not supported for a job.",
this.environment.getUrn(),
environment.getUrn());
// Nothing to do. Already initialized.
return;
}

EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
Preconditions.checkNotNull(serverFactory, "serverFactory can not be null");

this.clientPool = MapControlClientPool.create();
this.executor = Executors.newCachedThreadPool();
this.controlServer =
GrpcFnServer<FnApiControlClientPoolService> controlServer =
GrpcFnServer.allocatePortAndCreateFor(
FnApiControlClientPoolService.offeringClientsToPool(
clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);
this.loggingServer =
GrpcFnServer<GrpcLoggingService> loggingServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
this.retrievalServer =
GrpcFnServer<ArtifactRetrievalService> retrievalServer =
GrpcFnServer.allocatePortAndCreateFor(
BeamFileSystemArtifactRetrievalService.create(), serverFactory);
this.provisioningServer =
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
this.dataServer =
GrpcFnServer<GrpcDataService> dataServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcDataService.create(executor, OutboundObserverFactory.serverDirect()),
serverFactory);
this.stateServer =
GrpcFnServer<GrpcStateService> stateServer =
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);

this.environmentFactory =
environmentFactoryProvider.createEnvironmentFactory(
controlServer,
loggingServer,
retrievalServer,
provisioningServer,
clientPool,
stageIdGenerator);
this.environment = environment;
ServerInfo serverInfo =
new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
.setControlServer(controlServer)
.setLoggingServer(loggingServer)
.setRetrievalServer(retrievalServer)
.setProvisioningServer(provisioningServer)
.setDataServer(dataServer)
.setStateServer(stateServer)
.build();
return serverInfo;
}

/** A container for EnvironmentFactory and its corresponding Grpc servers. */
@AutoValue
public abstract static class ServerInfo {
abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer();

abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();

abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();

abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();

abstract GrpcFnServer<GrpcDataService> getDataServer();

abstract GrpcFnServer<GrpcStateService> getStateServer();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setControlServer(GrpcFnServer<FnApiControlClientPoolService> server);

abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> server);

abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> server);

abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> server);

abstract Builder setDataServer(GrpcFnServer<GrpcDataService> server);

abstract Builder setStateServer(GrpcFnServer<GrpcStateService> server);

abstract ServerInfo build();
}
}
}
Loading