From b26e8212de2b2bb7ad580412ce93eb67e70e170a Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 29 Oct 2019 16:00:47 -0700 Subject: [PATCH 1/2] [BEAM-8515, BEAM-2699] Add equals/hashCode to ValueProvider classes. --- .../beam/sdk/options/ValueProvider.java | 39 +++++++++++++++++++ .../beam/sdk/options/ValueProviderTest.java | 6 ++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java index 92f06442d2179..903b4a8241a03 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java @@ -37,6 +37,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; @@ -100,6 +101,17 @@ public boolean isAccessible() { public String toString() { return String.valueOf(value); } + + @Override + public boolean equals(Object other) { + return other instanceof StaticValueProvider + && Objects.equals(value, ((StaticValueProvider) other).value); + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } } /** @@ -159,6 +171,18 @@ public String toString() { .add("translator", translator.getClass().getSimpleName()) .toString(); } + + @Override + public boolean equals(Object other) { + return other instanceof NestedValueProvider + && Objects.equals(value, ((NestedValueProvider) other).value) + && Objects.equals(translator, ((NestedValueProvider) other).translator); + } + + @Override + public int hashCode() { + return Objects.hash(value, translator); + } } /** @@ -265,6 +289,21 @@ public String toString() { .add("default", defaultValue) .toString(); } + + @Override + public boolean equals(Object other) { + return other instanceof RuntimeValueProvider + && Objects.equals(klass, ((RuntimeValueProvider) other).klass) + && Objects.equals(methodName, ((RuntimeValueProvider) other).methodName) + && Objects.equals(propertyName, ((RuntimeValueProvider) other).propertyName) + && Objects.equals(defaultValue, ((RuntimeValueProvider) other).defaultValue) + && Objects.equals(optionsId, ((RuntimeValueProvider) other).optionsId); + } + + @Override + public int hashCode() { + return Objects.hash(klass, methodName, propertyName, defaultValue, optionsId); + } } /** For internal use only; no backwards compatibility guarantees. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java index 6d410f747d35d..6c85ffda7a862 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -93,6 +93,7 @@ public void testStaticValueProvider() { assertEquals("foo", provider.get()); assertTrue(provider.isAccessible()); assertEquals("foo", provider.toString()); + assertEquals(provider, StaticValueProvider.of("foo")); } @Test @@ -120,6 +121,7 @@ public void testDefaultRuntimeProvider() { TestOptions options = PipelineOptionsFactory.as(TestOptions.class); ValueProvider provider = options.getBar(); assertFalse(provider.isAccessible()); + assertEquals(provider, options.getBar()); } @Test @@ -232,11 +234,13 @@ public void testSerializeDeserializeWithArg() throws Exception { @Test public void testNestedValueProviderStatic() throws Exception { + SerializableFunction function = from -> from + "bar"; ValueProvider svp = StaticValueProvider.of("foo"); - ValueProvider nvp = NestedValueProvider.of(svp, from -> from + "bar"); + ValueProvider nvp = NestedValueProvider.of(svp, function); assertTrue(nvp.isAccessible()); assertEquals("foobar", nvp.get()); assertEquals("foobar", nvp.toString()); + assertEquals(nvp, NestedValueProvider.of(svp, function)); } @Test From 6a900d298f771249a75a4aeced68462ca3d2d94f Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 29 Oct 2019 16:03:47 -0700 Subject: [PATCH 2/2] [BEAM-7230] Cache DataSource instances based upon equivalent DataSourceConfiguration instances. --- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 94 +++++++++---------- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 19 ++++ 2 files changed, 65 insertions(+), 48 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 286136d94970c..9162448750612 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -935,10 +936,7 @@ public static class Write extends PTransform, PDone> { /** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */ public Write withDataSourceConfiguration(DataSourceConfiguration config) { - return new Write( - inner - .withDataSourceConfiguration(config) - .withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config))); + return new Write(inner.withDataSourceConfiguration(config)); } /** See {@link WriteVoid#withDataSourceProviderFn(SerializableFunction)}. */ @@ -1369,79 +1367,79 @@ public void process(ProcessContext c) { } } - /** Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. */ + /** + * Wraps a {@link DataSourceConfiguration} to provide a {@link PoolingDataSource}. + * + *

At most a single {@link DataSource} instance will be constructed during pipeline execution + * for each unique {@link DataSourceConfiguration} within the pipeline. + */ public static class PoolableDataSourceProvider implements SerializableFunction, HasDisplayData { - private static PoolableDataSourceProvider instance; - private static transient DataSource source; - private static SerializableFunction dataSourceProviderFn; + private static final ConcurrentHashMap instances = + new ConcurrentHashMap<>(); + private final DataSourceProviderFromDataSourceConfiguration config; private PoolableDataSourceProvider(DataSourceConfiguration config) { - dataSourceProviderFn = DataSourceProviderFromDataSourceConfiguration.of(config); + this.config = new DataSourceProviderFromDataSourceConfiguration(config); } - public static synchronized SerializableFunction of( - DataSourceConfiguration config) { - if (instance == null) { - instance = new PoolableDataSourceProvider(config); - } - return instance; + public static SerializableFunction of(DataSourceConfiguration config) { + return new PoolableDataSourceProvider(config); } @Override public DataSource apply(Void input) { - return buildDataSource(input); - } - - static synchronized DataSource buildDataSource(Void input) { - if (source == null) { - DataSource basicSource = dataSourceProviderFn.apply(input); - DataSourceConnectionFactory connectionFactory = - new DataSourceConnectionFactory(basicSource); - PoolableConnectionFactory poolableConnectionFactory = - new PoolableConnectionFactory(connectionFactory, null); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); - poolConfig.setMinIdle(0); - poolConfig.setMinEvictableIdleTimeMillis(10000); - poolConfig.setSoftMinEvictableIdleTimeMillis(30000); - GenericObjectPool connectionPool = - new GenericObjectPool(poolableConnectionFactory, poolConfig); - poolableConnectionFactory.setPool(connectionPool); - poolableConnectionFactory.setDefaultAutoCommit(false); - poolableConnectionFactory.setDefaultReadOnly(false); - source = new PoolingDataSource(connectionPool); - } - return source; + return instances.computeIfAbsent( + config.config, + ignored -> { + DataSource basicSource = config.apply(input); + DataSourceConnectionFactory connectionFactory = + new DataSourceConnectionFactory(basicSource); + PoolableConnectionFactory poolableConnectionFactory = + new PoolableConnectionFactory(connectionFactory, null); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxTotal(1); + poolConfig.setMinIdle(0); + poolConfig.setMinEvictableIdleTimeMillis(10000); + poolConfig.setSoftMinEvictableIdleTimeMillis(30000); + GenericObjectPool connectionPool = + new GenericObjectPool(poolableConnectionFactory, poolConfig); + poolableConnectionFactory.setPool(connectionPool); + poolableConnectionFactory.setDefaultAutoCommit(false); + poolableConnectionFactory.setDefaultReadOnly(false); + return new PoolingDataSource(connectionPool); + }); } @Override public void populateDisplayData(DisplayData.Builder builder) { - if (dataSourceProviderFn instanceof HasDisplayData) { - ((HasDisplayData) dataSourceProviderFn).populateDisplayData(builder); - } + config.populateDisplayData(builder); } } - private static class DataSourceProviderFromDataSourceConfiguration + /** + * Wraps a {@link DataSourceConfiguration} to provide a {@link DataSource}. + * + *

At most a single {@link DataSource} instance will be constructed during pipeline execution + * for each unique {@link DataSourceConfiguration} within the pipeline. + */ + public static class DataSourceProviderFromDataSourceConfiguration implements SerializableFunction, HasDisplayData { + private static final ConcurrentHashMap instances = + new ConcurrentHashMap<>(); private final DataSourceConfiguration config; - private static DataSourceProviderFromDataSourceConfiguration instance; private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) { this.config = config; } public static SerializableFunction of(DataSourceConfiguration config) { - if (instance == null) { - instance = new DataSourceProviderFromDataSourceConfiguration(config); - } - return instance; + return new DataSourceProviderFromDataSourceConfiguration(config); } @Override public DataSource apply(Void input) { - return config.buildDatasource(); + return instances.computeIfAbsent(config, (config) -> config.buildDatasource()); } @Override diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index ba6ae6231a55c..4811507440b71 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -57,6 +58,7 @@ import org.apache.beam.sdk.io.common.DatabaseTestHelper; import org.apache.beam.sdk.io.common.NetworkTestHelper; import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.testing.ExpectedLogs; @@ -66,6 +68,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Wait; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -916,4 +919,20 @@ public void testWriteWithEmptyPCollection() { pipeline.run(); } + + @Test + public void testSerializationAndCachingOfPoolingDataSourceProvider() { + SerializableFunction provider = + PoolableDataSourceProvider.of( + JdbcIO.DataSourceConfiguration.create( + "org.apache.derby.jdbc.ClientDriver", + "jdbc:derby://localhost:" + port + "/target/beam")); + SerializableFunction deserializedProvider = + SerializableUtils.ensureSerializable(provider); + + // Assert that that same instance is being returned even when there are multiple provider + // instances with the same configuration. Also check that the deserialized provider was + // able to produce an instance. + assertSame(provider.apply(null), deserializedProvider.apply(null)); + } }