Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: add dial timeout and verify sink connection status #561

Merged
merged 5 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ type Capture struct {
}

// NewCapture returns a new Capture instance
func NewCapture(pdEndpoints []string, advertiseAddr string) (c *Capture, err error) {
func NewCapture(ctx context.Context, pdEndpoints []string, advertiseAddr string) (c *Capture, err error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: pdEndpoints,
Context: ctx,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {

func (s *Server) run(ctx context.Context) (err error) {
capture, err := NewCapture(
strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr)
ctx, strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,14 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
}
db, err := sql.Open("mysql", dsnStr)
if err != nil {
return nil, errors.Annotatef(err, "Open database connection failed, dsn: %s", dsnStr)
return nil, errors.Annotate(err, "Open database connection failed")
}
log.Info("Start mysql sink", zap.String("dsn", dsnStr))
err = db.PingContext(ctx)
if err != nil {
return nil, errors.Annotatef(err, "fail to open MySQL connection")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add dsnStr in the error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could leak the password.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes. Seems dsnStr in L298 also needs to be removed

}

log.Info("Start mysql sink")

db.SetMaxIdleConns(params.workerCount)
db.SetMaxOpenConns(params.workerCount)
Expand Down
25 changes: 21 additions & 4 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/chzyer/readline"
_ "github.com/go-sql-driver/mysql" // mysql driver
"github.com/mattn/go-shellwords"
"github.com/pingcap/errors"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
Expand Down Expand Up @@ -44,6 +45,8 @@ var (
changefeedID string
captureID string
interval uint

defaultContextTimeoutDuration = 10 * time.Second
)

// cf holds changefeed id, which is used for output only
Expand Down Expand Up @@ -90,8 +93,9 @@ func newCliCommand() *cobra.Command {
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cliPdAddr},
DialTimeout: 5 * time.Second,
DialTimeout: defaultContextTimeoutDuration,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Expand All @@ -104,12 +108,25 @@ func newCliCommand() *cobra.Command {
},
})
if err != nil {
return err
// PD embeds an etcd server.
return errors.Annotate(err, "fail to open PD client")
}
cdcEtcdCli = kv.NewCDCEtcdClient(etcdCli)
pdCli, err = pd.NewClient([]string{cliPdAddr}, pd.SecurityOption{})
pdCli, err = pd.NewClient([]string{cliPdAddr}, pd.SecurityOption{},
pd.WithGRPCDialOptions(
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 3 * time.Second,
}),
))
if err != nil {
return err
return errors.Annotate(err, "fail to open PD client")
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/client_capture.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cmd

import (
"context"

_ "github.com/go-sql-driver/mysql" // mysql driver
"github.com/spf13/cobra"
)
Expand All @@ -24,7 +22,9 @@ func newListCaptureCommand() *cobra.Command {
Use: "list",
Short: "List all captures in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
captures, err := getAllCaptures(context.Background())
ctx, cancel := contextTimeout()
defer cancel()
captures, err := getAllCaptures(ctx)
if err != nil {
return err
}
Expand Down
40 changes: 28 additions & 12 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "pause",
Short: "Pause a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
ctx, cancel := contextTimeout()
defer cancel()
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminStop,
Expand All @@ -53,7 +54,8 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "resume",
Short: "Resume a paused replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
ctx, cancel := contextTimeout()
defer cancel()
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand All @@ -65,7 +67,8 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "remove",
Short: "Remove a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
ctx, cancel := contextTimeout()
defer cancel()
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminRemove,
Expand All @@ -87,7 +90,9 @@ func newListChangefeedCommand() *cobra.Command {
Use: "list",
Short: "List all replication tasks (changefeeds) in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
_, raw, err := cdcEtcdCli.GetChangeFeeds(context.Background())
ctx, cancel := contextTimeout()
defer cancel()
_, raw, err := cdcEtcdCli.GetChangeFeeds(ctx)
if err != nil {
return err
}
Expand All @@ -106,23 +111,25 @@ func newQueryChangefeedCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
info, err := cdcEtcdCli.GetChangeFeedInfo(context.Background(), changefeedID)
ctx, cancel := contextTimeout()
defer cancel()
info, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
status, _, err := cdcEtcdCli.GetChangeFeedStatus(context.Background(), changefeedID)
status, _, err := cdcEtcdCli.GetChangeFeedStatus(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
taskPositions, err := cdcEtcdCli.GetAllTaskPositions(context.Background(), changefeedID)
taskPositions, err := cdcEtcdCli.GetAllTaskPositions(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
var count uint64
for _, pinfo := range taskPositions {
count += pinfo.Count
}
processorInfos, err := cdcEtcdCli.GetAllTaskStatus(context.Background(), changefeedID)
processorInfos, err := cdcEtcdCli.GetAllTaskStatus(ctx, changefeedID)
if err != nil {
return err
}
Expand All @@ -145,7 +152,8 @@ func newCreateChangefeedCommand() *cobra.Command {
Short: "Create a new replication task (changefeed)",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
ctx, cancel := contextTimeout()
defer cancel()
id := uuid.New().String()
if startTs == 0 {
ts, logical, err := pdCli.GetTS(ctx)
Expand Down Expand Up @@ -217,6 +225,10 @@ func newCreateChangefeedCommand() *cobra.Command {
if err != nil {
return err
}
err = verifySink(ctx, info.SinkURI, info.Config, info.Opts)
if err != nil {
return err
}
cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", id, d)
return cdcEtcdCli.SaveChangeFeedInfo(ctx, info, id)
},
Expand Down Expand Up @@ -245,6 +257,8 @@ func newStatisticsChangefeedCommand() *cobra.Command {
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
tick := time.NewTicker(time.Duration(interval) * time.Second)
lastTime := time.Now()
var lastCount uint64
Expand All @@ -253,25 +267,27 @@ func newStatisticsChangefeedCommand() *cobra.Command {
case sig := <-sc:
switch sig {
case syscall.SIGTERM:
cancel()
os.Exit(0)
default:
cancel()
os.Exit(1)
}
case <-tick.C:
now := time.Now()
status, _, err := cdcEtcdCli.GetChangeFeedStatus(context.Background(), changefeedID)
status, _, err := cdcEtcdCli.GetChangeFeedStatus(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
taskPositions, err := cdcEtcdCli.GetAllTaskPositions(context.Background(), changefeedID)
taskPositions, err := cdcEtcdCli.GetAllTaskPositions(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
var count uint64
for _, pinfo := range taskPositions {
count += pinfo.Count
}
ts, _, err := pdCli.GetTS(context.Background())
ts, _, err := pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/client_meta.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cmd

import (
"context"

"github.com/spf13/cobra"
)

Expand All @@ -22,7 +20,9 @@ func newDeleteMetaCommand() *cobra.Command {
Use: "delete",
Short: "Delete all meta data in etcd, confirm that you know what this command will do and use it at your own risk",
RunE: func(cmd *cobra.Command, args []string) error {
err := cdcEtcdCli.ClearAllCDCInfo(context.Background())
ctx, cancel := contextTimeout()
defer cancel()
err := cdcEtcdCli.ClearAllCDCInfo(ctx)
if err == nil {
cmd.Println("already truncate all meta in etcd!")
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/client_processor.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cmd

import (
"context"

_ "github.com/go-sql-driver/mysql" // mysql driver
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
Expand All @@ -26,7 +24,9 @@ func newListProcessorCommand() *cobra.Command {
Use: "list",
Short: "List all processors in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
info, err := cdcEtcdCli.GetProcessors(context.Background())
ctx, cancel := contextTimeout()
defer cancel()
info, err := cdcEtcdCli.GetProcessors(ctx)
if err != nil {
return err
}
Expand All @@ -41,11 +41,13 @@ func newQueryProcessorCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a sub replication task (processor)",
RunE: func(cmd *cobra.Command, args []string) error {
_, status, err := cdcEtcdCli.GetTaskStatus(context.Background(), changefeedID, captureID)
ctx, cancel := contextTimeout()
defer cancel()
_, status, err := cdcEtcdCli.GetTaskStatus(ctx, changefeedID, captureID)
if err != nil && errors.Cause(err) != model.ErrTaskStatusNotExists {
return err
}
_, position, err := cdcEtcdCli.GetTaskPosition(context.Background(), changefeedID, captureID)
_, position, err := cdcEtcdCli.GetTaskPosition(ctx, changefeedID, captureID)
if err != nil && errors.Cause(err) != model.ErrTaskPositionNotExists {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/client_tso.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"context"
"os"

"github.com/pingcap/tidb/store/tikv/oracle"
Expand All @@ -24,7 +23,9 @@ func newQueryTsoCommand() *cobra.Command {
Use: "query",
Short: "Get tso from PD",
RunE: func(cmd *cobra.Command, args []string) error {
ts, logic, err := pdCli.GetTS(context.Background())
ctx, cancel := contextTimeout()
defer cancel()
ts, logic, err := pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand Down
31 changes: 31 additions & 0 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/tidb/store/tikv"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3/concurrency"
)

func contextTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultContextTimeoutDuration)
}

func getAllCaptures(ctx context.Context) ([]*capture, error) {
_, raw, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
Expand Down Expand Up @@ -138,6 +143,32 @@ func verifyTables(ctx context.Context, cfg *filter.ReplicaConfig, startTs uint64
return
}

func verifySink(
ctx context.Context, sinkURI string, cfg *filter.ReplicaConfig, opts map[string]string,
) error {
filter, err := filter.NewFilter(cfg)
if err != nil {
return err
}
errCh := make(chan error)
s, err := sink.NewSink(ctx, sinkURI, filter, cfg, opts, errCh)
if err != nil {
return err
}
err = s.Close()
if err != nil {
return err
}
select {
case err = <-errCh:
if err != nil {
return err
}
default:
}
return nil
}

// strictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped
// into the Config struct, issue an error and stop the server from starting.
func strictDecodeFile(path, component string, cfg interface{}) error {
Expand Down
9 changes: 7 additions & 2 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
Expand Down Expand Up @@ -68,6 +66,13 @@ function run() {
exit 1
fi

# Make sure bad sink url fails at creating changefeed.
badsink=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://badsink" | grep -oE 'fail')
if [[ -z $badsink ]]; then
echo "[$(date)] <<<<< unexpect output got ${badsink} >>>>>"
exit 1
fi

cleanup_process $CDC_BINARY
}

Expand Down