Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): support gcs scheme in redo log #7993

Merged
merged 6 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 1000,
FlushIntervalInMs: config.MinFlushIntervalInMs,
Storage: "",
},
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -674,7 +675,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redo.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,9 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
34 changes: 0 additions & 34 deletions cdc/redo/applier.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/redo/convert.go → cdc/redo/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package redo
package common

import (
"testing"
Expand Down
75 changes: 0 additions & 75 deletions cdc/redo/common/redo.go

This file was deleted.

40 changes: 40 additions & 0 deletions cdc/redo/common/redo_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package common

import (
"github.com/pingcap/tiflow/cdc/model"
)

// LogMeta is used for store meta info.
type LogMeta struct {
CheckpointTs uint64 `msg:"checkpointTs"`
ResolvedTs uint64 `msg:"resolvedTs"`
}

// ParseMeta parses meta.
func ParseMeta(metas []*LogMeta, checkpointTs, resolvedTs *model.Ts) {
*checkpointTs = 0
*resolvedTs = 0
for _, meta := range metas {
if *checkpointTs < meta.CheckpointTs {
*checkpointTs = meta.CheckpointTs
}
if *resolvedTs < meta.ResolvedTs {
*resolvedTs = meta.ResolvedTs
}
}
}
File renamed without changes.
File renamed without changes.
144 changes: 0 additions & 144 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,156 +14,12 @@
package common

import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
if len(uri.Host) == 0 {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, errors.Errorf("please specify the bucket for s3 in %v", uri))
}

prefix := strings.Trim(uri.Path, "/")
s3 := &backuppb.S3{Bucket: uri.Host, Prefix: prefix}
options := &storage.BackendOptions{}
storage.ExtractQueryParameters(&uri, &options.S3)
if err := options.S3.Apply(s3); err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, err)
}

// we should set this to true, since br set it by default in parseBackend
s3.ForcePathStyle = true
backend := &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_S3{S3: s3},
}
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
S3Retryer: DefaultS3Retryer(),
})
if err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, err)
}

return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
if ext == MetaEXT {
return 0, DefaultMetaFileType, nil
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
}
if ext != LogEXT && ext != TmpEXT {
return 0, "", nil
}

var commitTs uint64
var captureID, namespace, changefeedID, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&captureID, &namespace, &changefeedID, &fileType, &commitTs, &uid}
} else {
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&captureID, &changefeedID, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
return commitTs, fileType, nil
}

// retryerWithLog wraps the client.DefaultRetryer, and logs when retrying.
type retryerWithLog struct {
client.DefaultRetryer
}

func isDeadlineExceedError(err error) bool {
return strings.Contains(err.Error(), "context deadline exceeded")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
if isDeadlineExceedError(r.Error) {
return false
}
return rl.DefaultRetryer.ShouldRetry(r)
}

func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration {
backoffTime := rl.DefaultRetryer.RetryRules(r)
if backoffTime > 0 {
log.Warn("failed to request s3, retrying", zap.Error(r.Error), zap.Duration("backoff", backoffTime))
}
return backoffTime
}

// DefaultS3Retryer is the default s3 retryer, maybe this function
// should be extracted to another place.
func DefaultS3Retryer() request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: 3,
MinRetryDelay: 1 * time.Second,
MinThrottleDelay: 2 * time.Second,
},
}
}

// FilterChangefeedFiles return the files that match to the changefeed.
func FilterChangefeedFiles(files []string, changefeedID model.ChangeFeedID) []string {
var (
Expand Down
Loading