Skip to content

Commit

Permalink
Merge pull request #9927 from lukecwik/beam7230
Browse files Browse the repository at this point in the history
[BEAM-7230, BEAM-2699, BEAM-8515] Cache DataSource instances based upon DataSourceConfiguration
  • Loading branch information
lukecwik authored Oct 30, 2019
2 parents aa47745 + 6a900d2 commit b02bc8a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
Expand Down Expand Up @@ -100,6 +101,17 @@ public boolean isAccessible() {
public String toString() {
return String.valueOf(value);
}

@Override
public boolean equals(Object other) {
return other instanceof StaticValueProvider
&& Objects.equals(value, ((StaticValueProvider) other).value);
}

@Override
public int hashCode() {
return Objects.hashCode(value);
}
}

/**
Expand Down Expand Up @@ -159,6 +171,18 @@ public String toString() {
.add("translator", translator.getClass().getSimpleName())
.toString();
}

@Override
public boolean equals(Object other) {
return other instanceof NestedValueProvider
&& Objects.equals(value, ((NestedValueProvider) other).value)
&& Objects.equals(translator, ((NestedValueProvider) other).translator);
}

@Override
public int hashCode() {
return Objects.hash(value, translator);
}
}

/**
Expand Down Expand Up @@ -265,6 +289,21 @@ public String toString() {
.add("default", defaultValue)
.toString();
}

@Override
public boolean equals(Object other) {
return other instanceof RuntimeValueProvider
&& Objects.equals(klass, ((RuntimeValueProvider) other).klass)
&& Objects.equals(methodName, ((RuntimeValueProvider) other).methodName)
&& Objects.equals(propertyName, ((RuntimeValueProvider) other).propertyName)
&& Objects.equals(defaultValue, ((RuntimeValueProvider) other).defaultValue)
&& Objects.equals(optionsId, ((RuntimeValueProvider) other).optionsId);
}

@Override
public int hashCode() {
return Objects.hash(klass, methodName, propertyName, defaultValue, optionsId);
}
}

/** <b>For internal use only; no backwards compatibility guarantees.</b> */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void testStaticValueProvider() {
assertEquals("foo", provider.get());
assertTrue(provider.isAccessible());
assertEquals("foo", provider.toString());
assertEquals(provider, StaticValueProvider.of("foo"));
}

@Test
Expand Down Expand Up @@ -120,6 +121,7 @@ public void testDefaultRuntimeProvider() {
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
ValueProvider<String> provider = options.getBar();
assertFalse(provider.isAccessible());
assertEquals(provider, options.getBar());
}

@Test
Expand Down Expand Up @@ -232,11 +234,13 @@ public void testSerializeDeserializeWithArg() throws Exception {

@Test
public void testNestedValueProviderStatic() throws Exception {
SerializableFunction<String, String> function = from -> from + "bar";
ValueProvider<String> svp = StaticValueProvider.of("foo");
ValueProvider<String> nvp = NestedValueProvider.of(svp, from -> from + "bar");
ValueProvider<String> nvp = NestedValueProvider.of(svp, function);
assertTrue(nvp.isAccessible());
assertEquals("foobar", nvp.get());
assertEquals("foobar", nvp.toString());
assertEquals(nvp, NestedValueProvider.of(svp, function));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -935,10 +936,7 @@ public static class Write<T> extends PTransform<PCollection<T>, PDone> {

/** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
return new Write(
inner
.withDataSourceConfiguration(config)
.withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config)));
return new Write(inner.withDataSourceConfiguration(config));
}

/** See {@link WriteVoid#withDataSourceProviderFn(SerializableFunction)}. */
Expand Down Expand Up @@ -1369,79 +1367,79 @@ public void process(ProcessContext c) {
}
}

/** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */
/**
* Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}.
*
* <p>At most a single {@link DataSource} instance will be constructed during pipeline execution
* for each unique {@link DataSourceConfiguration} within the pipeline.
*/
public static class PoolableDataSourceProvider
implements SerializableFunction<Void, DataSource>, HasDisplayData {
private static PoolableDataSourceProvider instance;
private static transient DataSource source;
private static SerializableFunction<Void, DataSource> dataSourceProviderFn;
private static final ConcurrentHashMap<DataSourceConfiguration, DataSource> instances =
new ConcurrentHashMap<>();
private final DataSourceProviderFromDataSourceConfiguration config;

private PoolableDataSourceProvider(DataSourceConfiguration config) {
dataSourceProviderFn = DataSourceProviderFromDataSourceConfiguration.of(config);
this.config = new DataSourceProviderFromDataSourceConfiguration(config);
}

public static synchronized SerializableFunction<Void, DataSource> of(
DataSourceConfiguration config) {
if (instance == null) {
instance = new PoolableDataSourceProvider(config);
}
return instance;
public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
return new PoolableDataSourceProvider(config);
}

@Override
public DataSource apply(Void input) {
return buildDataSource(input);
}

static synchronized DataSource buildDataSource(Void input) {
if (source == null) {
DataSource basicSource = dataSourceProviderFn.apply(input);
DataSourceConnectionFactory connectionFactory =
new DataSourceConnectionFactory(basicSource);
PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(1);
poolConfig.setMinIdle(0);
poolConfig.setMinEvictableIdleTimeMillis(10000);
poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
GenericObjectPool connectionPool =
new GenericObjectPool(poolableConnectionFactory, poolConfig);
poolableConnectionFactory.setPool(connectionPool);
poolableConnectionFactory.setDefaultAutoCommit(false);
poolableConnectionFactory.setDefaultReadOnly(false);
source = new PoolingDataSource(connectionPool);
}
return source;
return instances.computeIfAbsent(
config.config,
ignored -> {
DataSource basicSource = config.apply(input);
DataSourceConnectionFactory connectionFactory =
new DataSourceConnectionFactory(basicSource);
PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(1);
poolConfig.setMinIdle(0);
poolConfig.setMinEvictableIdleTimeMillis(10000);
poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
GenericObjectPool connectionPool =
new GenericObjectPool(poolableConnectionFactory, poolConfig);
poolableConnectionFactory.setPool(connectionPool);
poolableConnectionFactory.setDefaultAutoCommit(false);
poolableConnectionFactory.setDefaultReadOnly(false);
return new PoolingDataSource(connectionPool);
});
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (dataSourceProviderFn instanceof HasDisplayData) {
((HasDisplayData) dataSourceProviderFn).populateDisplayData(builder);
}
config.populateDisplayData(builder);
}
}

private static class DataSourceProviderFromDataSourceConfiguration
/**
* Wraps a {@link DataSourceConfiguration} to provide a {@link DataSource}.
*
* <p>At most a single {@link DataSource} instance will be constructed during pipeline execution
* for each unique {@link DataSourceConfiguration} within the pipeline.
*/
public static class DataSourceProviderFromDataSourceConfiguration
implements SerializableFunction<Void, DataSource>, HasDisplayData {
private static final ConcurrentHashMap<DataSourceConfiguration, DataSource> instances =
new ConcurrentHashMap<>();
private final DataSourceConfiguration config;
private static DataSourceProviderFromDataSourceConfiguration instance;

private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) {
this.config = config;
}

public static SerializableFunction<Void, DataSource> of(DataSourceConfiguration config) {
if (instance == null) {
instance = new DataSourceProviderFromDataSourceConfiguration(config);
}
return instance;
return new DataSourceProviderFromDataSourceConfiguration(config);
}

@Override
public DataSource apply(Void input) {
return config.buildDatasource();
return instances.computeIfAbsent(config, (config) -> config.buildDatasource());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.jdbc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.ExpectedLogs;
Expand All @@ -66,6 +68,7 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -917,4 +920,20 @@ public void testWriteWithEmptyPCollection() {

pipeline.run();
}

@Test
public void testSerializationAndCachingOfPoolingDataSourceProvider() {
SerializableFunction<Void, DataSource> provider =
PoolableDataSourceProvider.of(
JdbcIO.DataSourceConfiguration.create(
"org.apache.derby.jdbc.ClientDriver",
"jdbc:derby://localhost:" + port + "/target/beam"));
SerializableFunction<Void, DataSource> deserializedProvider =
SerializableUtils.ensureSerializable(provider);

// Assert that that same instance is being returned even when there are multiple provider
// instances with the same configuration. Also check that the deserialized provider was
// able to produce an instance.
assertSame(provider.apply(null), deserializedProvider.apply(null));
}
}

0 comments on commit b02bc8a

Please sign in to comment.