Skip to content

Commit

Permalink
[BEAM-12164]: Add SDF for reading change stream records
Browse files Browse the repository at this point in the history
Adds ReadChangeStreamPartitionDoFn, which is an SDF to read partitions
from change streams and process them accordingly. This component
receives a change stream name, a partition, a start time and an end time
to query. It then initiates a change stream query with the received
parameters.

Within a change stream, 3 types of records can be received:

1. A Data record
2. A Heartbeat record
3. A Child partitions record

Upon receiving #1, the function updates the watermark with the record's
commit timestamp and emits the record into the output PCollection.
Upon receiving #2, the function updates the watermark with the record's
timestamp, but it does not emit any record into the PCollection.
Upon receiving #3, the function updates the watermark with the record's
timestamp and writes the new child partitions into the metadata table.
These partitions will be later scheduled by the DetectNewPartitions
component.

Once the change stream query for the element partition finishes, it
marks the partition as finished in the metadata table and terminates.
  • Loading branch information
thiagotnunes committed Jan 14, 2022
1 parent 8724555 commit c074b0f
Show file tree
Hide file tree
Showing 18 changed files with 2,403 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.cloud.Timestamp;
import java.math.BigDecimal;

/** Util class to manage timestamp conversions. */
public class TimestampConverter {

/** The number of microseconds in a {@link Timestamp#MAX_VALUE}. */
public static final long MAX_MICROS = timestampToMicros(Timestamp.MAX_VALUE);

/**
* Converts a {@link Timestamp} to its number of microseconds. Note there is precision loss here.
*
* @param timestamp the timestamp to be converted
* @return the number of microseconds in the given timestamp
*/
public static long timestampToMicros(Timestamp timestamp) {
final BigDecimal seconds = BigDecimal.valueOf(timestamp.getSeconds());
final BigDecimal nanos = BigDecimal.valueOf(timestamp.getNanos());
final BigDecimal micros = nanos.scaleByPowerOfTen(-3);

return seconds.scaleByPowerOfTen(6).add(micros).longValue();
}

/**
* Creates a {@link Timestamp} from a number of milliseconds. Note that microseconds and
* nanoseconds will always be zeroed here.
*
* @param millis the number of milliseconds
* @return a timestamp with the given milliseconds
*/
public static Timestamp timestampFromMillis(long millis) {
return Timestamp.ofTimeMicroseconds(millis * 1_000L);
}

/**
* Zeroes nanoseconds from the given {@link Timestamp} (precision is lost). The timestamp returned
* will be precise up to microseconds only.
*
* @param timestamp the timestamp to be truncated
* @return the timestamp with microseconds precision
*/
public static Timestamp truncateNanos(Timestamp timestamp) {
return Timestamp.ofTimeMicroseconds(timestampToMicros(timestamp));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;

/**
* Factory class for creating instances that will handle each type of record within a change stream
* query. The instances created are all singletons.
*/
// static fields are un-initialized, because we start them during the first fetch call (with the
// singleton pattern)
@SuppressWarnings("initialization.static.fields.uninitialized")
public class ActionFactory implements Serializable {

private static final long serialVersionUID = -4060958761369602619L;
private static DataChangeRecordAction dataChangeRecordActionInstance;
private static HeartbeatRecordAction heartbeatRecordActionInstance;
private static ChildPartitionsRecordAction childPartitionsRecordActionInstance;
private static QueryChangeStreamAction queryChangeStreamActionInstance;

/**
* Creates and returns a singleton instance of an action class capable of processing {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord}s.
*
* <p>This method is thread safe.
*
* @return singleton instance of the {@link DataChangeRecordAction}
*/
public synchronized DataChangeRecordAction dataChangeRecordAction() {
if (dataChangeRecordActionInstance == null) {
dataChangeRecordActionInstance = new DataChangeRecordAction();
}
return dataChangeRecordActionInstance;
}

/**
* Creates and returns a singleton instance of an action class capable of processing {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s. This method is thread
* safe.
*
* @param metrics metrics gathering class
* @return singleton instance of the {@link HeartbeatRecordAction}
*/
public synchronized HeartbeatRecordAction heartbeatRecordAction(ChangeStreamMetrics metrics) {
if (heartbeatRecordActionInstance == null) {
heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
}
return heartbeatRecordActionInstance;
}

/**
* Creates and returns a singleton instance of an action class capable of process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s. This method is
* thread safe.
*
* @param partitionMetadataDao DAO class to access the Connector's metadata tables
* @param metrics metrics gathering class
* @return singleton instance of the {@link ChildPartitionsRecordAction}
*/
public synchronized ChildPartitionsRecordAction childPartitionsRecordAction(
PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) {
if (childPartitionsRecordActionInstance == null) {
childPartitionsRecordActionInstance =
new ChildPartitionsRecordAction(partitionMetadataDao, metrics);
}
return childPartitionsRecordActionInstance;
}

/**
* Creates and returns a single instance of an action class capable of performing a change stream
* query for a given partition. It uses the {@link DataChangeRecordAction}, {@link
* HeartbeatRecordAction} and {@link ChildPartitionsRecordAction} to dispatch the necessary
* processing depending on the type of record received.
*
* @param changeStreamDao DAO class to perform a change stream query
* @param partitionMetadataDao DAO class to access the Connector's metadata tables
* @param changeStreamRecordMapper mapper class to transform change stream records into the
* Connector's domain models
* @param partitionMetadataMapper mapper class to transform partition metadata rows into the
* Connector's domain models
* @param dataChangeRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord}s
* @param heartbeatRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s
* @param childPartitionsRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
ChangeStreamDao changeStreamDao,
PartitionMetadataDao partitionMetadataDao,
ChangeStreamRecordMapper changeStreamRecordMapper,
PartitionMetadataMapper partitionMetadataMapper,
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction);
}
return queryChangeStreamActionInstance;
}
}
Loading

0 comments on commit c074b0f

Please sign in to comment.