Skip to content

Commit

Permalink
latest template and some documentation changes (#789)
Browse files Browse the repository at this point in the history
* latest template and some documentation changes

* applied review comments

* applied review comments
  • Loading branch information
aksharauke authored Mar 14, 2024
1 parent 6f34482 commit 8c104f1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
71 changes: 68 additions & 3 deletions docs/reverse-replication/ReverseReplicationUserGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ The progress of files created per shard is also captured in the shard_file_proce

To confirm that the records have indeed been written to the source database, best approach is to check the record count on the source database, if that matches the expected value. Note that verifying data takes more than just record count matching. The suggested tool for the same is [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator).

#### Tracking which shards are lagging

The following sample SQL gives the shards which are yet to catchup in the writer job. The SQL needs to be fired on the [metadata database](./RunnigReverseReplication.md#arguments) that is specified when launching the reverse replication flow. Replace the run_id with the relevant run identifier and the window interval with appropriate value.

```
select w.shard,r.shard,r.created_upto , w.file_start_interval, TIMESTAMP_DIFF(r.created_upto , w.file_start_interval, SECOND) as shard_lag from shard_file_create_progress r , shard_file_process_progress w where r.run_id = w.run_id and r.shard = w.shard and
TIMESTAMP_DIFF(r.created_upto , w.file_start_interval, SECOND) > --give the window interval here , example 10 --
and r.run_id= -- give the run identifer here--
order by shard_lag DESC
```
### Troubleshooting

Following are some scenarios and how to handle them.
Expand Down Expand Up @@ -187,7 +197,7 @@ In this case, check if you observe the following:

1. The source database table does not have a primary key
2. The primary key value was not present in the change stream data
3. Check the shard_skipped_files table created in the metadata database. The contains the intervals for which the file was found in GCS, for the cases when no change record was generated for the interval.If a file is present in the shard_skipped_files table and also exists in GCS - this indicates a data loss scenario - please raise a bug.
3. When there is no data written to Spanner for a given interval for a given shard, no file is created in GCS. In such a case, the interval is skipped by the writer Dataflow job. This can be verified in the logs by searching for the text ```skipping the file```. If a file is marked as skipped in the logs but it exists in GCS - this indicates a data loss scenario - please raise a bug.
4. Check the shard_file_process_progress table in the metadata database. If it is lagging, then wait for the pipeline to catch up so such that data gets reverse replicated.


Expand Down Expand Up @@ -243,12 +253,67 @@ Note: Additional optional parameters for the reader job are [here](https://githu

Note: Additional optional parameters for the writer job are [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/gcs-to-sourcedb/README_GCS_to_Sourcedb.md#optional-parameters).

### Ignorable errors

Dataflow retries most of the errors. Following errors if shown up in the Dataflow UI can be ignored.

#### Reader job

1. File not found exception like below. These are thrown at the time Dataflow workers auto-scale and the work gets reassigned among the workers.

```
java.io.FileNotFoundException: Rewrite from <GCS bucket name>/.temp-beam/<file name> to <GCS file path> has failed
```

2. Spanner DEADLINE_EXCEEDED exception.

3. GC thrashing exception like below

```
Shutting down JVM after 8 consecutive periods of measured GC thrashing.
```

4. GCS throttling can slow down writes for a while. Below exception results during that period and can be ignored.

```
Operation ongoing in bundle process_bundle-<bundle number> for PTransform{id=Write To GCS/Write rows to output writeDynamic/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles-ptransform-46, name=Write To GCS/Write rows to output writeDynamic/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles-ptransform-46, state=process} for at least 05m20s without outputting or completing
```

```
java.io.IOException: Error executing batch GCS request
```

#### Writer job

1. To preserve ordering, the writer job processes files in incrementing window intervals. If the reader job is lagging in creating files, the writer job waits for the expected file for a given window to be written to GCS. In such cases, below messages and logged and can be ignored.

```
Operation ongoing in step Write to source for at least 05m00s without outputting or completing in state process-timers in thread DataflowWorkUnits-11418 with id 1891593
at java.base@11.0.20/java.lang.Thread.sleep(Native Method)
at app//com.google.cloud.teleport.v2.templates.utils.GCSReader.checkAndReturnIfFileExists
```

2. Large amount of logging can result in below. This does not halt the Dataflow job from processing.

```
Throttling logger worker. It used up its 30s quota for logs in only 17.465s
```

#### Common to both jobs

1. Ephemeral network glitches results in below ignorable error.

```
StatusRuntimeException: UNAVAILABLE: ping timeout
```

## Reverse Replication Limitations

The following sections list the known limitations that exist currently with the Reverse Replication flows:

1. Currently only MySQL source database is supported.
2. Certain transformations are not supported, below section lists those:
2. If forward migration and reverse replication are running in parallel, there is no mechanism to prevent the forward migration of data that was written to source via the reverse replication flow. The impact of this is unnecessary processing of redundant data. The best practice is to start reverse replication post cutover when forward migration has ended.
3. Certain transformations are not supported, below section lists those:

### Reverse transformations
Reverse transformation can not be supported for following scenarios out of the box:
Expand Down Expand Up @@ -287,7 +352,7 @@ retention_period = '7d',
value_capture_type = 'NEW_ROW'
);
```

8. Reverse replication should start once the forward migration has ended and not in parallel to forward migration. This is to avoid reverse replicated writes to source flowing back to forward migration jobs.

## Customize

Expand Down
1 change: 1 addition & 0 deletions docs/reverse-replication/RunnigReverseReplication.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The script takes in multiple arguments to orchestrate the pipeline. They are:
- `sourceWriterTemplateLocation` : the dataflow template location for the Source writer job.
- `spannerReaderTemplateLocation`: the dataflow template location for the Spanner reader job
- `startTimestamp`: Timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp.
- `readerMaxWorkers`: Number of maximum workers for the reader job.
- `readerShardingCustomClassName`: the fully qualified custom class name for sharding logic.
- `readerShardingCustomJarPath` : the GCS path to custom jar for sharding logic.
- `readerShardingCustomParameters`: the custom parameters to be passed to the custom sharding logic implementation.
Expand Down
13 changes: 8 additions & 5 deletions reverse_replication/reverse-replication-runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"

dataflow "cloud.google.com/go/dataflow/apiv1beta3"
"cloud.google.com/go/storage"
Expand Down Expand Up @@ -55,6 +56,7 @@ var (
skipMetadataDatabaseCreation bool
networkTags string
runIdentifier string
readerMaxWorkers int
)

const (
Expand Down Expand Up @@ -88,8 +90,8 @@ func setupGlobalFlags() {
flag.StringVar(&serviceAccountEmail, "serviceAccountEmail", "", "The email address of the service account to run the job as.")
flag.IntVar(&readerWorkers, "readerWorkers", 5, "Number of workers for reader job.")
flag.IntVar(&writerWorkers, "writerWorkers", 5, "Number of workers for writer job.")
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-02-09-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-central2/2024-01-09-00_RC01/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
flag.StringVar(&spannerReaderTemplateLocation, "spannerReaderTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/Spanner_Change_Streams_to_Sharded_File_Sink", "The dataflow template location for the Spanner reader job.")
flag.StringVar(&sourceWriterTemplateLocation, "sourceWriterTemplateLocation", "gs://dataflow-templates-us-east7/2024-03-06-00_RC00/flex/GCS_to_Sourcedb", "The dataflow template location for the Source writer job.")
flag.StringVar(&jobsToLaunch, "jobsToLaunch", "both", "Whether to launch the spanner reader job or the source writer job or both. Default is both. Support values are both,reader,writer.")
flag.BoolVar(&skipChangeStreamCreation, "skipChangeStreamCreation", false, "Whether to skip the change stream creation. Default is false.")
flag.BoolVar(&skipMetadataDatabaseCreation, "skipMetadataDatabaseCreation", false, "Whether to skip Metadata database creation.Default is false.")
Expand All @@ -98,6 +100,7 @@ func setupGlobalFlags() {
flag.StringVar(&readerShardingCustomJarPath, "readerShardingCustomJarPath", "", "The GCS path to custom jar for sharding logic.")
flag.StringVar(&runIdentifier, "runIdentifier", "", "The run identifier for the Dataflow jobs.")
flag.StringVar(&readerShardingCustomParameters, "readerShardingCustomParameters", "", "Any custom parameters to be supplied to custom sharding class.")
flag.IntVar(&readerMaxWorkers, "readerMaxWorkers", 20, "Number of max workers for reader job.")

}

Expand Down Expand Up @@ -305,7 +308,7 @@ func main() {
readerParams["shardingCustomParameters"] = readerShardingCustomParameters
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-reader-%s", jobNamePrefix, runId),
JobName: fmt.Sprintf("%s-reader-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: spannerReaderTemplateLocation},
Parameters: readerParams,
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
Expand All @@ -316,6 +319,7 @@ func main() {
Subnetwork: vpcSubnetwork,
IpConfiguration: workerIpAddressConfig,
ServiceAccountEmail: serviceAccountEmail,
MaxWorkers: int32(readerMaxWorkers),
},
}

Expand All @@ -331,7 +335,6 @@ func main() {
fmt.Printf("unable to launch reader job: %v \n REQUEST BODY: %+v\n", err, req)
return
}
//fmt.Println("Launched reader job: ", fmt.Sprintf("%s-reader-%s", jobNamePrefix, runId))
fmt.Println("Launched reader job: ", readerJobResponse.Job)
}

Expand All @@ -345,7 +348,7 @@ func main() {
}

launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-writer-%s", jobNamePrefix, runId),
JobName: fmt.Sprintf("%s-writer-%s-%s", jobNamePrefix, runId, utils.GenerateHashStr()),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: sourceWriterTemplateLocation},
Parameters: map[string]string{
"sourceShardsFilePath": sourceShardsFilePath,
Expand Down

0 comments on commit 8c104f1

Please sign in to comment.