Skip to content

Commit

Permalink
Merge pull request apache#17036 from [BEAM-12164] Convert all static …
Browse files Browse the repository at this point in the history
…instances to be transient in the connector in order to enable concurrent testing

* Convert all static instances to be transient in the connector in order to enable concurrent testing

* Initialized fields to null

* nullness

* Suppress uninitialized warnings

* Remove resetting dao factory fields in SpannerChangeStreamErrorTest.java

* Add validation package
  • Loading branch information
nancyxu123 authored and yeandy committed Mar 10, 2022
1 parent ddc5624 commit 197e86e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
* Factory class for creating instances that will handle each type of record within a change stream
* query. The instances created are all singletons.
*/
// static fields are un-initialized, because we start them during the first fetch call (with the
// singleton pattern)
@SuppressWarnings("initialization.static.fields.uninitialized")
// transient fields are un-initialized, because we start them during the first fetch call (with the
// singleton pattern).
@SuppressWarnings("initialization.fields.uninitialized")
public class ActionFactory implements Serializable {

private static final long serialVersionUID = -4060958761369602619L;
private static DataChangeRecordAction dataChangeRecordActionInstance;
private static HeartbeatRecordAction heartbeatRecordActionInstance;
private static ChildPartitionsRecordAction childPartitionsRecordActionInstance;
private static QueryChangeStreamAction queryChangeStreamActionInstance;
private static DetectNewPartitionsAction detectNewPartitionsActionInstance;
private transient DataChangeRecordAction dataChangeRecordActionInstance;
private transient HeartbeatRecordAction heartbeatRecordActionInstance;
private transient ChildPartitionsRecordAction childPartitionsRecordActionInstance;
private transient QueryChangeStreamAction queryChangeStreamActionInstance;
private transient DetectNewPartitionsAction detectNewPartitionsActionInstance;

/**
* Creates and returns a singleton instance of an action class capable of processing {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
* Factory class to create data access objects to perform change stream queries and access the
* metadata tables. The instances created are all singletons.
*/
// static fields are un-initialized, because we start them during the first fetch call (with the
// transient fields are un-initialized, because we start them during the first fetch call (with the
// singleton pattern)
// nullness checks for metadata instance and database are handled in the constructor
@SuppressWarnings({"initialization.static.fields.uninitialized", "nullness"})
@SuppressWarnings({"initialization.fields.uninitialized", "nullness"})
public class DaoFactory implements Serializable {

private static final long serialVersionUID = 7929063669009832487L;

private static PartitionMetadataAdminDao partitionMetadataAdminDao;
private static PartitionMetadataDao partitionMetadataDaoInstance;
private static ChangeStreamDao changeStreamDaoInstance;
private transient PartitionMetadataAdminDao partitionMetadataAdminDao;
private transient PartitionMetadataDao partitionMetadataDaoInstance;
private transient ChangeStreamDao changeStreamDaoInstance;

private final SpannerConfig changeStreamSpannerConfig;
private final SpannerConfig metadataSpannerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
* Factory class for creating instances that will map a struct to a connector model. The instances
* created are all singletons.
*/
// static fields are un-initialized, because we start them during the first fetch call (with the
// transient fields are un-initialized, because we start them during the first fetch call (with the
// singleton pattern)
@SuppressWarnings("initialization.static.fields.uninitialized")
@SuppressWarnings("initialization.fields.uninitialized")
public class MapperFactory implements Serializable {

private static final long serialVersionUID = -813434573067800902L;

private static ChangeStreamRecordMapper changeStreamRecordMapperInstance;
private static PartitionMetadataMapper partitionMetadataMapperInstance;
private transient ChangeStreamRecordMapper changeStreamRecordMapperInstance;
private transient PartitionMetadataMapper partitionMetadataMapperInstance;

/**
* Creates and returns a singleton instance of a mapper class capable of transforming a {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -102,7 +101,6 @@ public void tearDown() throws NoSuchFieldException, IllegalAccessException {
serviceHelper.reset();
serviceHelper.stop();
mockSpannerService.reset();
resetDaoFactoryFields();
}

@Test
Expand Down Expand Up @@ -427,21 +425,6 @@ private SpannerConfig getSpannerConfig() {
.withDatabaseId(TEST_DATABASE);
}

private static void resetDaoFactoryFields() throws NoSuchFieldException, IllegalAccessException {
java.lang.reflect.Field partitionMetadataAdminDaoField =
DaoFactory.class.getDeclaredField("partitionMetadataAdminDao");
partitionMetadataAdminDaoField.setAccessible(true);
partitionMetadataAdminDaoField.set(null, null);
java.lang.reflect.Field partitionMetadataDaoInstanceField =
DaoFactory.class.getDeclaredField("partitionMetadataDaoInstance");
partitionMetadataDaoInstanceField.setAccessible(true);
partitionMetadataDaoInstanceField.set(null, null);
java.lang.reflect.Field changeStreamDaoInstanceField =
DaoFactory.class.getDeclaredField("changeStreamDaoInstance");
changeStreamDaoInstanceField.setAccessible(true);
changeStreamDaoInstanceField.set(null, null);
}

private static final ResultSetMetadata PARTITION_METADATA_RESULT_SET_METADATA =
ResultSetMetadata.newBuilder()
.setRowType(
Expand Down

0 comments on commit 197e86e

Please sign in to comment.