diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/ChildPartitionsRecordAction.java index be2443152b39a..3aee4bf2efe26 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/ChildPartitionsRecordAction.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +49,13 @@ public class ChildPartitionsRecordAction { private final PartitionMetadataDao partitionMetadataDao; private final ChangeStreamMetrics metrics; - public ChildPartitionsRecordAction( + ChildPartitionsRecordAction( PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) { this.partitionMetadataDao = partitionMetadataDao; this.metrics = metrics; } + @VisibleForTesting public Optional run( PartitionMetadata partition, ChildPartitionsRecord record, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/DataChangeRecordAction.java index 3852d07ff9866..e584dfa11a6a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/DataChangeRecordAction.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public class DataChangeRecordAction { private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class); private static final Tracer TRACER = Tracing.getTracer(); + @VisibleForTesting public Optional run( PartitionMetadata partition, DataChangeRecord record, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/HeartbeatRecordAction.java index bc06e51ba27a3..54ac2bd3912b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/HeartbeatRecordAction.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +44,11 @@ public class HeartbeatRecordAction { private static final Tracer TRACER = Tracing.getTracer(); private final ChangeStreamMetrics metrics; - public HeartbeatRecordAction(ChangeStreamMetrics metrics) { + HeartbeatRecordAction(ChangeStreamMetrics metrics) { this.metrics = metrics; } + @VisibleForTesting public Optional run( PartitionMetadata partition, HeartbeatRecord record, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/QueryChangeStreamAction.java index 4ce43737d0ccc..7552cc844c2f0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/actions/QueryChangeStreamAction.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -65,7 +66,7 @@ public class QueryChangeStreamAction { private final HeartbeatRecordAction heartbeatRecordAction; private final ChildPartitionsRecordAction childPartitionsRecordAction; - public QueryChangeStreamAction( + QueryChangeStreamAction( ChangeStreamDao changeStreamDao, PartitionMetadataDao partitionMetadataDao, ChangeStreamRecordMapper changeStreamRecordMapper, @@ -80,6 +81,7 @@ public QueryChangeStreamAction( this.childPartitionsRecordAction = childPartitionsRecordAction; } + @VisibleForTesting public ProcessContinuation run( PartitionMetadata partition, RestrictionTracker tracker, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java index ea4da05da1c62..8abe45354acc8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java @@ -35,7 +35,7 @@ public class ChangeStreamDao { private final RpcPriority rpcPriority; private final String jobName; - public ChangeStreamDao( + ChangeStreamDao( String changeStreamName, DatabaseClient databaseClient, RpcPriority rpcPriority, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java index cb35264b8ec2e..5a09a76e74d61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java @@ -32,7 +32,7 @@ public class ChangeStreamResultSet implements AutoCloseable { private Duration totalStreamDuration; private long numberOfRecordsRead; - public ChangeStreamResultSet(ResultSet resultSet) { + ChangeStreamResultSet(ResultSet resultSet) { this.resultSet = resultSet; this.totalStreamDuration = Duration.ZERO; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java index 07cd13b82afe9..ef0acf2c21630 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java @@ -28,7 +28,7 @@ public class ChangeStreamResultSetMetadata { private final Duration totalStreamDuration; private final long numberOfRecordsRead; - public ChangeStreamResultSetMetadata( + ChangeStreamResultSetMetadata( Timestamp queryStartedAt, Timestamp recordStreamStartedAt, Timestamp recordStreamEndedAt, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 1c6b9dfcf4e00..4f55d2034b2b2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -62,7 +62,6 @@ public DaoFactory( this.jobName = jobName; } - // TODO: See if synchronized is a bottleneck and refactor if so public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() { if (partitionMetadataAdminDao == null) { DatabaseAdminClient databaseAdminClient = @@ -77,7 +76,6 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() { return partitionMetadataAdminDao; } - // TODO: See if synchronized is a bottleneck and refactor if so public synchronized PartitionMetricsAdminDao getPartitionMetricsAdminDao() { if (partitionMetricsAdminDao == null) { DatabaseAdminClient databaseAdminClient = @@ -92,7 +90,6 @@ public synchronized PartitionMetricsAdminDao getPartitionMetricsAdminDao() { return partitionMetricsAdminDao; } - // TODO: See if synchronized is a bottleneck and refactor if so public synchronized PartitionMetadataDao getPartitionMetadataDao() { final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(metadataSpannerConfig); if (partitionMetadataDaoInstance == null) { @@ -106,7 +103,6 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() { return partitionMetadataDaoInstance; } - // TODO: See if synchronized is a bottleneck and refactor if so public synchronized ChangeStreamDao getChangeStreamDao() { final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(changeStreamSpannerConfig); if (changeStreamDaoInstance == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java index b59194091accb..75db3e409b289 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java @@ -51,7 +51,7 @@ public class PartitionMetadataAdminDao { private final String databaseId; private final String tableName; - public PartitionMetadataAdminDao( + PartitionMetadataAdminDao( DatabaseAdminClient databaseAdminClient, String instanceId, String databaseId, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index c9fd7d4792bb1..8f68fd453289a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -51,7 +51,7 @@ public class PartitionMetadataDao { private final DatabaseClient databaseClient; private final PartitionMetadataMapper mapper; - public PartitionMetadataDao( + PartitionMetadataDao( String metadataTableName, String metricsTableName, DatabaseClient databaseClient, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetricsAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetricsAdminDao.java index e9fd5ea56e0e0..86700a4a03afd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetricsAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetricsAdminDao.java @@ -49,7 +49,7 @@ public class PartitionMetricsAdminDao { private final String databaseId; private final String tableName; - public PartitionMetricsAdminDao( + PartitionMetricsAdminDao( DatabaseAdminClient databaseAdminClient, String instanceId, String databaseId, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java index 132426fe5bdfe..515c57ba84b83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java @@ -42,6 +42,8 @@ // TODO: Add java docs public class ChangeStreamRecordMapper { + ChangeStreamRecordMapper() {} + public List toChangeStreamRecords( PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) { return row.getStructList(0).stream() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java index 60f65d492dccd..d54f3ed0c4094 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java @@ -35,6 +35,8 @@ public class PartitionMetadataMapper { + PartitionMetadataMapper() {} + public PartitionMetadata from(ResultSet resultSet) { return PartitionMetadata.newBuilder() .setPartitionToken(resultSet.getString(COLUMN_PARTITION_TOKEN)) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java index 7d304263877f4..a2128c38f9b17 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java @@ -19,6 +19,8 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; @@ -50,8 +52,8 @@ public class ChangeStreamRecordMapperTest { @Before public void setUp() { - this.mapper = new ChangeStreamRecordMapper(); - this.partition = + mapper = new ChangeStreamRecordMapper(); + partition = PartitionMetadata.newBuilder() .setPartitionToken("partitionToken") .setParentTokens(Sets.newHashSet("parentToken")) @@ -63,14 +65,13 @@ public void setUp() { .setScheduledAt(Timestamp.ofTimeMicroseconds(13L)) .setRunningAt(Timestamp.ofTimeMicroseconds(14L)) .build(); - this.resultSetMetadata = - new ChangeStreamResultSetMetadata( - Timestamp.ofTimeMicroseconds(1L), - Timestamp.ofTimeMicroseconds(2L), - Timestamp.ofTimeMicroseconds(3L), - Timestamp.ofTimeMicroseconds(4L), - Duration.millis(100), - 10_000L); + resultSetMetadata = mock(ChangeStreamResultSetMetadata.class); + when(resultSetMetadata.getQueryStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(1L)); + when(resultSetMetadata.getRecordStreamStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(2L)); + when(resultSetMetadata.getRecordStreamEndedAt()).thenReturn(Timestamp.ofTimeMicroseconds(3L)); + when(resultSetMetadata.getRecordReadAt()).thenReturn(Timestamp.ofTimeMicroseconds(4L)); + when(resultSetMetadata.getTotalStreamDuration()).thenReturn(Duration.millis(100)); + when(resultSetMetadata.getNumberOfRecordsRead()).thenReturn(10_000L); } @Test