Skip to content

Commit

Permalink
refactor: change constructors visibility (apache#81)
Browse files Browse the repository at this point in the history
* refactor: make action constructors package private

* refactor: change visibility of dao constructors

* refactor: change visibility of mapper constructors
  • Loading branch information
thiagotnunes authored Nov 19, 2021
1 parent 4e50773 commit 74fe36c
Show file tree
Hide file tree
Showing 14 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,12 +49,13 @@ public class ChildPartitionsRecordAction {
private final PartitionMetadataDao partitionMetadataDao;
private final ChangeStreamMetrics metrics;

public ChildPartitionsRecordAction(
ChildPartitionsRecordAction(
PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) {
this.partitionMetadataDao = partitionMetadataDao;
this.metrics = metrics;
}

@VisibleForTesting
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
ChildPartitionsRecord record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +43,7 @@ public class DataChangeRecordAction {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class);
private static final Tracer TRACER = Tracing.getTracer();

@VisibleForTesting
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
DataChangeRecord record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,10 +44,11 @@ public class HeartbeatRecordAction {
private static final Tracer TRACER = Tracing.getTracer();
private final ChangeStreamMetrics metrics;

public HeartbeatRecordAction(ChangeStreamMetrics metrics) {
HeartbeatRecordAction(ChangeStreamMetrics metrics) {
this.metrics = metrics;
}

@VisibleForTesting
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
HeartbeatRecord record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -65,7 +66,7 @@ public class QueryChangeStreamAction {
private final HeartbeatRecordAction heartbeatRecordAction;
private final ChildPartitionsRecordAction childPartitionsRecordAction;

public QueryChangeStreamAction(
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
PartitionMetadataDao partitionMetadataDao,
ChangeStreamRecordMapper changeStreamRecordMapper,
Expand All @@ -80,6 +81,7 @@ public QueryChangeStreamAction(
this.childPartitionsRecordAction = childPartitionsRecordAction;
}

@VisibleForTesting
public ProcessContinuation run(
PartitionMetadata partition,
RestrictionTracker<OffsetRange, Long> tracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ChangeStreamDao {
private final RpcPriority rpcPriority;
private final String jobName;

public ChangeStreamDao(
ChangeStreamDao(
String changeStreamName,
DatabaseClient databaseClient,
RpcPriority rpcPriority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ChangeStreamResultSet implements AutoCloseable {
private Duration totalStreamDuration;
private long numberOfRecordsRead;

public ChangeStreamResultSet(ResultSet resultSet) {
ChangeStreamResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
this.totalStreamDuration = Duration.ZERO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ChangeStreamResultSetMetadata {
private final Duration totalStreamDuration;
private final long numberOfRecordsRead;

public ChangeStreamResultSetMetadata(
ChangeStreamResultSetMetadata(
Timestamp queryStartedAt,
Timestamp recordStreamStartedAt,
Timestamp recordStreamEndedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public DaoFactory(
this.jobName = jobName;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
if (partitionMetadataAdminDao == null) {
DatabaseAdminClient databaseAdminClient =
Expand All @@ -77,7 +76,6 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
return partitionMetadataAdminDao;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized PartitionMetricsAdminDao getPartitionMetricsAdminDao() {
if (partitionMetricsAdminDao == null) {
DatabaseAdminClient databaseAdminClient =
Expand All @@ -92,7 +90,6 @@ public synchronized PartitionMetricsAdminDao getPartitionMetricsAdminDao() {
return partitionMetricsAdminDao;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized PartitionMetadataDao getPartitionMetadataDao() {
final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(metadataSpannerConfig);
if (partitionMetadataDaoInstance == null) {
Expand All @@ -106,7 +103,6 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() {
return partitionMetadataDaoInstance;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized ChangeStreamDao getChangeStreamDao() {
final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
if (changeStreamDaoInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class PartitionMetadataAdminDao {
private final String databaseId;
private final String tableName;

public PartitionMetadataAdminDao(
PartitionMetadataAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class PartitionMetadataDao {
private final DatabaseClient databaseClient;
private final PartitionMetadataMapper mapper;

public PartitionMetadataDao(
PartitionMetadataDao(
String metadataTableName,
String metricsTableName,
DatabaseClient databaseClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class PartitionMetricsAdminDao {
private final String databaseId;
private final String tableName;

public PartitionMetricsAdminDao(
PartitionMetricsAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
// TODO: Add java docs
public class ChangeStreamRecordMapper {

ChangeStreamRecordMapper() {}

public List<ChangeStreamRecord> toChangeStreamRecords(
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
return row.getStructList(0).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

public class PartitionMetadataMapper {

PartitionMetadataMapper() {}

public PartitionMetadata from(ResultSet resultSet) {
return PartitionMetadata.newBuilder()
.setPartitionToken(resultSet.getString(COLUMN_PARTITION_TOKEN))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
Expand Down Expand Up @@ -50,8 +52,8 @@ public class ChangeStreamRecordMapperTest {

@Before
public void setUp() {
this.mapper = new ChangeStreamRecordMapper();
this.partition =
mapper = new ChangeStreamRecordMapper();
partition =
PartitionMetadata.newBuilder()
.setPartitionToken("partitionToken")
.setParentTokens(Sets.newHashSet("parentToken"))
Expand All @@ -63,14 +65,13 @@ public void setUp() {
.setScheduledAt(Timestamp.ofTimeMicroseconds(13L))
.setRunningAt(Timestamp.ofTimeMicroseconds(14L))
.build();
this.resultSetMetadata =
new ChangeStreamResultSetMetadata(
Timestamp.ofTimeMicroseconds(1L),
Timestamp.ofTimeMicroseconds(2L),
Timestamp.ofTimeMicroseconds(3L),
Timestamp.ofTimeMicroseconds(4L),
Duration.millis(100),
10_000L);
resultSetMetadata = mock(ChangeStreamResultSetMetadata.class);
when(resultSetMetadata.getQueryStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(1L));
when(resultSetMetadata.getRecordStreamStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(2L));
when(resultSetMetadata.getRecordStreamEndedAt()).thenReturn(Timestamp.ofTimeMicroseconds(3L));
when(resultSetMetadata.getRecordReadAt()).thenReturn(Timestamp.ofTimeMicroseconds(4L));
when(resultSetMetadata.getTotalStreamDuration()).thenReturn(Duration.millis(100));
when(resultSetMetadata.getNumberOfRecordsRead()).thenReturn(10_000L);
}

@Test
Expand Down

0 comments on commit 74fe36c

Please sign in to comment.