From 197e86ed60933ace334a9d04cf37fcd46900d047 Mon Sep 17 00:00:00 2001 From: nancyxu123 Date: Wed, 9 Mar 2022 09:30:45 -0800 Subject: [PATCH] Merge pull request #17036 from [BEAM-12164] Convert all static instances to be transient in the connector in order to enable concurrent testing * Convert all static instances to be transient in the connector in order to enable concurrent testing * Initialized fields to null * nullness * Suppress uninitialized warnings * Remove resetting dao factory fields in SpannerChangeStreamErrorTest.java * Add validation package --- .../changestreams/action/ActionFactory.java | 16 ++++++++-------- .../spanner/changestreams/dao/DaoFactory.java | 10 +++++----- .../changestreams/mapper/MapperFactory.java | 8 ++++---- .../SpannerChangeStreamErrorTest.java | 17 ----------------- 4 files changed, 17 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index c6cd7256ff601..5277c368f7c6e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -29,17 +29,17 @@ * Factory class for creating instances that will handle each type of record within a change stream * query. The instances created are all singletons. */ -// static fields are un-initialized, because we start them during the first fetch call (with the -// singleton pattern) -@SuppressWarnings("initialization.static.fields.uninitialized") +// transient fields are un-initialized, because we start them during the first fetch call (with the +// singleton pattern). +@SuppressWarnings("initialization.fields.uninitialized") public class ActionFactory implements Serializable { private static final long serialVersionUID = -4060958761369602619L; - private static DataChangeRecordAction dataChangeRecordActionInstance; - private static HeartbeatRecordAction heartbeatRecordActionInstance; - private static ChildPartitionsRecordAction childPartitionsRecordActionInstance; - private static QueryChangeStreamAction queryChangeStreamActionInstance; - private static DetectNewPartitionsAction detectNewPartitionsActionInstance; + private transient DataChangeRecordAction dataChangeRecordActionInstance; + private transient HeartbeatRecordAction heartbeatRecordActionInstance; + private transient ChildPartitionsRecordAction childPartitionsRecordActionInstance; + private transient QueryChangeStreamAction queryChangeStreamActionInstance; + private transient DetectNewPartitionsAction detectNewPartitionsActionInstance; /** * Creates and returns a singleton instance of an action class capable of processing {@link diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 28655f69a26a8..7c94720c7875b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -27,17 +27,17 @@ * Factory class to create data access objects to perform change stream queries and access the * metadata tables. The instances created are all singletons. */ -// static fields are un-initialized, because we start them during the first fetch call (with the +// transient fields are un-initialized, because we start them during the first fetch call (with the // singleton pattern) // nullness checks for metadata instance and database are handled in the constructor -@SuppressWarnings({"initialization.static.fields.uninitialized", "nullness"}) +@SuppressWarnings({"initialization.fields.uninitialized", "nullness"}) public class DaoFactory implements Serializable { private static final long serialVersionUID = 7929063669009832487L; - private static PartitionMetadataAdminDao partitionMetadataAdminDao; - private static PartitionMetadataDao partitionMetadataDaoInstance; - private static ChangeStreamDao changeStreamDaoInstance; + private transient PartitionMetadataAdminDao partitionMetadataAdminDao; + private transient PartitionMetadataDao partitionMetadataDaoInstance; + private transient ChangeStreamDao changeStreamDaoInstance; private final SpannerConfig changeStreamSpannerConfig; private final SpannerConfig metadataSpannerConfig; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java index 52274635c9221..6ad51d23a1be1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java @@ -23,15 +23,15 @@ * Factory class for creating instances that will map a struct to a connector model. The instances * created are all singletons. */ -// static fields are un-initialized, because we start them during the first fetch call (with the +// transient fields are un-initialized, because we start them during the first fetch call (with the // singleton pattern) -@SuppressWarnings("initialization.static.fields.uninitialized") +@SuppressWarnings("initialization.fields.uninitialized") public class MapperFactory implements Serializable { private static final long serialVersionUID = -813434573067800902L; - private static ChangeStreamRecordMapper changeStreamRecordMapperInstance; - private static PartitionMetadataMapper partitionMetadataMapperInstance; + private transient ChangeStreamRecordMapper changeStreamRecordMapperInstance; + private transient PartitionMetadataMapper partitionMetadataMapperInstance; /** * Creates and returns a singleton instance of a mapper class capable of transforming a {@link diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java index 050993391b78b..4789c6c567515 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.TestPipeline; @@ -102,7 +101,6 @@ public void tearDown() throws NoSuchFieldException, IllegalAccessException { serviceHelper.reset(); serviceHelper.stop(); mockSpannerService.reset(); - resetDaoFactoryFields(); } @Test @@ -427,21 +425,6 @@ private SpannerConfig getSpannerConfig() { .withDatabaseId(TEST_DATABASE); } - private static void resetDaoFactoryFields() throws NoSuchFieldException, IllegalAccessException { - java.lang.reflect.Field partitionMetadataAdminDaoField = - DaoFactory.class.getDeclaredField("partitionMetadataAdminDao"); - partitionMetadataAdminDaoField.setAccessible(true); - partitionMetadataAdminDaoField.set(null, null); - java.lang.reflect.Field partitionMetadataDaoInstanceField = - DaoFactory.class.getDeclaredField("partitionMetadataDaoInstance"); - partitionMetadataDaoInstanceField.setAccessible(true); - partitionMetadataDaoInstanceField.set(null, null); - java.lang.reflect.Field changeStreamDaoInstanceField = - DaoFactory.class.getDeclaredField("changeStreamDaoInstance"); - changeStreamDaoInstanceField.setAccessible(true); - changeStreamDaoInstanceField.set(null, null); - } - private static final ResultSetMetadata PARTITION_METADATA_RESULT_SET_METADATA = ResultSetMetadata.newBuilder() .setRowType(