Skip to content

Commit

Permalink
enabling {multi-payload, multi-urn}-environment
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed Feb 6, 2019
1 parent 3720385 commit 67bfa40
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@
* EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory
* stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for
* each client. {@link DefaultJobBundleFactory} initializes the Environment lazily when the forStage
* is called for a stage. This factory is not capable of handling mixed types of environment.
* is called for a stage.
*/
@ThreadSafe
public class DefaultJobBundleFactory implements JobBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);

private final IdGenerator stageIdGenerator;
private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
private ExecutorService executor;
private String environmentURN;
private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
private final ExecutorService executor;
private final MapControlClientPool clientPool;
private final IdGenerator stageIdGenerator;

public static DefaultJobBundleFactory create(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
Expand All @@ -85,24 +86,27 @@ public static DefaultJobBundleFactory create(
DefaultJobBundleFactory(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
this.stageIdGenerator = stageIdGenerator;
this.environmentCache =
createEnvironmentCache(
environment -> createServerInfo(jobInfo, environmentFactoryMap, environment));
createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, serverFactory));
}

@VisibleForTesting
DefaultJobBundleFactory(
EnvironmentFactory environmentFactory,
Map<String, EnvironmentFactory.Provider> environmentFactoryMap,
IdGenerator stageIdGenerator,
GrpcFnServer<FnApiControlClientPoolService> controlServer,
GrpcFnServer<GrpcLoggingService> loggingServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
GrpcFnServer<GrpcDataService> dataServer,
GrpcFnServer<GrpcStateService> stateServer) {
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
this.stageIdGenerator = stageIdGenerator;
ServerInfo serverInfo =
new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
Expand All @@ -112,13 +116,12 @@ public static DefaultJobBundleFactory create(
.setProvisioningServer(provisioningServer)
.setDataServer(dataServer)
.setStateServer(stateServer)
.setEnvironmentFactory(environmentFactory)
.build();
this.environmentCache = createEnvironmentCache(env -> serverInfo);
this.environmentCache = createEnvironmentCache(serverFactory -> serverInfo);
}

private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
ThrowingFunction<Environment, ServerInfo> serverInfoCreator) {
ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator) {
return CacheBuilder.newBuilder()
.removalListener(
(RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
Expand All @@ -134,9 +137,21 @@ private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCach
new CacheLoader<Environment, WrappedSdkHarnessClient>() {
@Override
public WrappedSdkHarnessClient load(Environment environment) throws Exception {
ServerInfo serverInfo = serverInfoCreator.apply(environment);
EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
ServerInfo serverInfo = serverInfoCreator.apply(serverFactory);

EnvironmentFactory environmentFactory =
environmentFactoryProvider.createEnvironmentFactory(
serverInfo.getControlServer(),
serverInfo.getLoggingServer(),
serverInfo.getRetrievalServer(),
serverInfo.getProvisioningServer(),
clientPool,
stageIdGenerator);
return WrappedSdkHarnessClient.wrapping(
serverInfo.getEnvironmentFactory().createEnvironment(environment), serverInfo);
environmentFactory.createEnvironment(environment), serverInfo);
}
});
}
Expand Down Expand Up @@ -165,12 +180,12 @@ public void close() throws Exception {
// Tear down common servers.
for (WrappedSdkHarnessClient client : environmentCache.asMap().values()) {
ServerInfo serverInfo = client.getServerInfo();
serverInfo.getStateServer().close();
serverInfo.getDataServer().close();
serverInfo.getControlServer().close();
serverInfo.getLoggingServer().close();
serverInfo.getRetrievalServer().close();
serverInfo.getProvisioningServer().close();
try (AutoCloseable stateServer = serverInfo.getStateServer();
AutoCloseable dateServer = serverInfo.getDataServer();
AutoCloseable controlServer = serverInfo.getControlServer();
AutoCloseable loggingServer = serverInfo.getLoggingServer();
AutoCloseable retrievalServer = serverInfo.getRetrievalServer();
AutoCloseable provisioningServer = serverInfo.getProvisioningServer()) {}
}

// Clear the cache. This closes all active environments.
Expand Down Expand Up @@ -296,29 +311,10 @@ public void close() throws Exception {
}
}

private ServerInfo createServerInfo(
JobInfo jobInfo,
Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap,
Environment environment)
private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory)
throws IOException {
Preconditions.checkNotNull(environment, "Environment can not be null");
synchronized (this) {
if (this.environmentURN != null) {
Preconditions.checkArgument(
this.environmentURN.equals(environment.getUrn()),
"Unsupported: Mixing environment types (%s, %s) is not supported for a job.",
this.environmentURN,
environment.getUrn());
} else {
this.environmentURN = environment.getUrn();
}
}
Preconditions.checkNotNull(serverFactory, "serverFactory can not be null");

EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();

MapControlClientPool clientPool = MapControlClientPool.create();
GrpcFnServer<FnApiControlClientPoolService> controlServer =
GrpcFnServer.allocatePortAndCreateFor(
FnApiControlClientPoolService.offeringClientsToPool(
Expand All @@ -340,15 +336,6 @@ private ServerInfo createServerInfo(
GrpcFnServer<GrpcStateService> stateServer =
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);

EnvironmentFactory environmentFactory =
environmentFactoryProvider.createEnvironmentFactory(
controlServer,
loggingServer,
retrievalServer,
provisioningServer,
clientPool,
stageIdGenerator);

ServerInfo serverInfo =
new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder()
.setControlServer(controlServer)
Expand All @@ -357,7 +344,6 @@ private ServerInfo createServerInfo(
.setProvisioningServer(provisioningServer)
.setDataServer(dataServer)
.setStateServer(stateServer)
.setEnvironmentFactory(environmentFactory)
.build();
return serverInfo;
}
Expand All @@ -377,8 +363,6 @@ public abstract static class ServerInfo {

abstract GrpcFnServer<GrpcStateService> getStateServer();

abstract EnvironmentFactory getEnvironmentFactory();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -395,8 +379,6 @@ abstract static class Builder {

abstract Builder setStateServer(GrpcFnServer<GrpcStateService> server);

abstract Builder setEnvironmentFactory(EnvironmentFactory factory);

abstract ServerInfo build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -82,6 +81,15 @@ public class DefaultJobBundleFactoryTest {
private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
private final InstructionResponse instructionResponse =
InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
private final EnvironmentFactory.Provider envFactoryProvider =
(GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool clientPool,
IdGenerator idGenerator) -> envFactory;
private final Map<String, EnvironmentFactory.Provider> envFactoryProviderMap =
ImmutableMap.of(environment.getUrn(), envFactoryProvider);

@Before
public void setUpMocks() throws Exception {
Expand All @@ -100,7 +108,7 @@ public void setUpMocks() throws Exception {
public void createsCorrectEnvironment() throws Exception {
try (DefaultJobBundleFactory bundleFactory =
new DefaultJobBundleFactory(
envFactory,
envFactoryProviderMap,
stageIdGenerator,
controlServer,
loggingServer,
Expand Down Expand Up @@ -174,7 +182,7 @@ public void createsMultipleEnvironmentOfSingleType() throws Exception {
}

@Test
public void failedCreatingMultipleEnvironmentFromMultipleTypes() throws Exception {
public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception {
ServerFactory serverFactory = ServerFactory.createDefault();

Environment environmentA = Environment.newBuilder().setUrn("env:urn:a").build();
Expand Down Expand Up @@ -206,16 +214,17 @@ public void failedCreatingMultipleEnvironmentFromMultipleTypes() throws Exceptio
JobInfo.create("testJob", "testJob", "token", Struct.getDefaultInstance()),
environmentFactoryProviderMap)) {
bundleFactory.forStage(getExecutableStage(environmentB));
thrown.expectCause(Matchers.any(IllegalArgumentException.class));
bundleFactory.forStage(getExecutableStage(environmentA));
}
verify(envFactoryA).createEnvironment(environmentA);
verify(envFactoryB).createEnvironment(environmentB);
}

@Test
public void closesEnvironmentOnCleanup() throws Exception {
DefaultJobBundleFactory bundleFactory =
new DefaultJobBundleFactory(
envFactory,
envFactoryProviderMap,
stageIdGenerator,
controlServer,
loggingServer,
Expand All @@ -233,7 +242,7 @@ public void closesEnvironmentOnCleanup() throws Exception {
public void cachesEnvironment() throws Exception {
try (DefaultJobBundleFactory bundleFactory =
new DefaultJobBundleFactory(
envFactory,
envFactoryProviderMap,
stageIdGenerator,
controlServer,
loggingServer,
Expand All @@ -258,6 +267,9 @@ public void doesNotCacheDifferentEnvironments() throws Exception {
Environment envFoo = Environment.newBuilder().setUrn("dummy:urn:another").build();
RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
InstructionRequestHandler fooInstructionHandler = mock(InstructionRequestHandler.class);
Map<String, EnvironmentFactory.Provider> envFactoryProviderMapFoo =
ImmutableMap.of(
environment.getUrn(), envFactoryProvider, envFoo.getUrn(), envFactoryProvider);
when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
// Don't bother creating a distinct instruction response because we don't use it here.
Expand All @@ -266,7 +278,7 @@ public void doesNotCacheDifferentEnvironments() throws Exception {

try (DefaultJobBundleFactory bundleFactory =
new DefaultJobBundleFactory(
envFactory,
envFactoryProviderMapFoo,
stageIdGenerator,
controlServer,
loggingServer,
Expand Down

0 comments on commit 67bfa40

Please sign in to comment.