Skip to content

Commit

Permalink
[close #232] Add https for tikv sink (#233)
Browse files Browse the repository at this point in the history
* add https for tikv-sink

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* remove len check

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* add https for tikv-sink

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* .

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix check

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
Co-authored-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
zeminzhou and pingyu authored Sep 22, 2022
1 parent b3bd7f2 commit 1b6667f
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 34 deletions.
1 change: 1 addition & 0 deletions cdc/cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (m *feedStateManager) PushAdminJob(job *model.AdminJob) {

func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
job := m.popAdminJob()

if job == nil || job.CfID != m.state.ID {
return false
}
Expand Down
16 changes: 13 additions & 3 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,17 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {

func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
config := tikvconfig.DefaultConfig()
pdAddrPrefix := "http://"

if sinkURI.Query().Get("ca-path") != "" {
config.Security = tikvconfig.NewSecurity(
sinkURI.Query().Get("ca-path"),
sinkURI.Query().Get("cert-path"),
sinkURI.Query().Get("key-path"),
nil,
)
pdAddrPrefix = "https://"
}

pdAddr := strings.Split(sinkURI.Host, ",")
if len(pdAddr) > 0 {
Expand All @@ -439,11 +450,10 @@ func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config,
err = fmt.Errorf("Invalid pd addr: %v, err: %v", addr, err)
return nil, nil, cerror.WrapError(cerror.ErrTiKVInvalidConfig, err)
}
// TODO: support https
pdAddr[i] = "http://" + addr
pdAddr[i] = pdAddrPrefix + addr
}
} else {
pdAddr = append(pdAddr, "http://127.0.0.1:2379")
pdAddr = append(pdAddr, pdAddrPrefix+"127.0.0.1:2379")
}

s := sinkURI.Query().Get("concurrency")
Expand Down
34 changes: 24 additions & 10 deletions cdc/cdc/sink/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,33 @@ func TestExtractRawKVEntry(t *testing.T) {

func TestTiKVSinkConfig(t *testing.T) {
defer testleak.AfterTestT(t)()

require := require.New(t)

uri := "tikv://127.0.0.1:1001,127.0.0.2:1002/?concurrency=10"
sinkURI, err := url.Parse(uri)
require.NoError(err)
cases := []string{
"tikv://127.0.0.1:1001,127.0.0.2:1002,127.0.0.1:1003/?concurrency=12",
"tikv://127.0.0.1:1001,127.0.0.1:1002/?concurrency=10&ca-path=./ca-cert.pem&cert-path=./client-cert.pem&key-path=./client-key",
}

opts := make(map[string]string)
_, pdAddr, err := parseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Len(pdAddr, 2)
require.Equal([]string{"http://127.0.0.1:1001", "http://127.0.0.2:1002"}, pdAddr)
require.Equal("10", opts["concurrency"])
expected := []struct {
pdAddr []string
concurrency string
security tikvconfig.Security
}{
{[]string{"http://127.0.0.1:1001", "http://127.0.0.2:1002", "http://127.0.0.1:1003"}, "12", tikvconfig.Security{}},
{[]string{"https://127.0.0.1:1001", "https://127.0.0.1:1002"}, "10", tikvconfig.NewSecurity("./ca-cert.pem", "./client-cert.pem", "./client-key", nil)},
}

for i, uri := range cases {
sinkURI, err := url.Parse(uri)
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Equal(expected[i].pdAddr, pdAddr)
require.Equal(expected[i].concurrency, opts["concurrency"])
require.Equal(expected[i].security, config.Security)
}
}

func TestTiKVSinkBatcher(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions cdc/tests/integration_tests/_utils/check_sync_diff
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# parameter 3: dst pd
# parameter 4: max check times

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
TLS_DIR=$(cd $CUR/../_certificates && pwd)

workdir=$1
UP_PD=$2
DOWN_PD=$3
Expand All @@ -15,7 +18,6 @@ fi
PWD=$(pwd)

if ! command -v rawkv_data &>/dev/null; then
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
cd $CUR/../../..
make rawkv_data
cd $PWD
Expand All @@ -26,7 +28,7 @@ cd $workdir
i=0
while [ $i -lt $check_time ]; do
rm -rf $workdir/rawkv_data/
rawkv_data checksum --src-pd $UP_PD --dst-pd $DOWN_PD
rawkv_data checksum --src-pd $UP_PD --dst-pd $DOWN_PD --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLS_DIR/client-key.pem
ret=$?
if [ "$ret" == 0 ]; then
echo "check diff successfully"
Expand Down
5 changes: 3 additions & 2 deletions cdc/tests/integration_tests/_utils/rawkv_op
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
# parameter 2: put/delete
# parameter 3: key count

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
TLS_DIR=$(cd $CUR/../_certificates && pwd)
set -e

if ! command -v rawkv_data &>/dev/null; then
echo "make rawkv_data"
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
PWD=$(pwd)
cd $CUR/../../..
make rakv_data
cd $PWD
fi

echo "run put data"
rawkv_data $2 --src-pd $1 --count $3
rawkv_data $2 --src-pd $1 --count $3 --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLD_DIR/client-key.pem
45 changes: 45 additions & 0 deletions cdc/tests/integration_tests/sink_tls/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
TLS_DIR=$(cd $CUR/../_certificates && pwd)
CDC_BINARY=tikv-cdc.test
SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1
DOWN_PD=https://$TLS_PD_HOST:$TLS_PD_PORT

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
start_tls_tidb_cluster --workdir $WORK_DIR --tlsdir $TLS_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(tikv-cdc cli tso query --pd=$UP_PD)
sleep 10
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

case $SINK_TYPE in
tikv) SINK_URI="tikv://${TLS_PD_HOST}:${TLS_PD_PORT}/?ca-path=$TLS_DIR/ca.pem&cert-path=$TLS_DIR/client.pem&key-path=$TLS_DIR/client-key.pem" ;;
*) SINK_URI="" ;;
esac

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri=$SINK_URI

rawkv_op $UP_PD put 10000
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
rawkv_op $UP_PD delete 10000
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
10 changes: 3 additions & 7 deletions cdc/tests/utils/rawkv_data/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,19 @@ func NewChecksumCommand() *cobra.Command {

func runChecksum(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), true)
if err != nil {
return err
}
ctx := context.Background()

if cfg.DstPD == "" {
return fmt.Errorf("Downstream cluster PD is not set")
}

srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer srcCli.Close()

dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.DstSec))
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/tests/utils/rawkv_data/gen_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func NewDeleteCommand() *cobra.Command {

func runPointDelete(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), false)
if err != nil {
return err
}

ctx := context.Background()
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down Expand Up @@ -110,18 +110,18 @@ func runPointDelete(cmd *cobra.Command) error {

func runPointPut(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), false)
if err != nil {
return err
}
ctx := context.Background()

cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer cli.Close()
atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down
58 changes: 53 additions & 5 deletions cdc/tests/utils/rawkv_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package main
import (
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/config"
)

const (
Expand All @@ -30,23 +32,34 @@ const (
flagDstPD = "dst-pd"
flagCount = "count"
flagStartIndex = "start-index"
flagCAPath = "ca-path"
flagCertPath = "cert-path"
flagKeyPath = "key-path"
)

type Config struct {
SrcPD string `json:"src-pd"`
DstPD string `json:"dst_pd"`
StartIndex int `json:"start_index"`
Count int `json:"count"`
SrcPD string `json:"src-pd"`
DstPD string `json:"dst-pd"`
StartIndex int `json:"start-index"`
Count int `json:"count"`
CAPath string `json:"ca-path"`
CertPath string `json:"cert-path"`
KeyPath string `json:"key-path"`
SrcSec config.Security `json:"src-sec"`
DstSec config.Security `json:"dst-sec"`
}

func AddFlags(cmd *cobra.Command) {
cmd.PersistentFlags().String(flagSrcPD, "127.0.0.1:2379", "Upstream PD address")
cmd.PersistentFlags().String(flagDstPD, "", "Downstream PD address")
cmd.PersistentFlags().Int(flagStartIndex, 0, "The start index of generated keys")
cmd.PersistentFlags().Int(flagCount, 1000, "The number of key")
cmd.PersistentFlags().String(flagCAPath, "", "Path to CA certificate")
cmd.PersistentFlags().String(flagCertPath, "", "Path to client certificate")
cmd.PersistentFlags().String(flagKeyPath, "", "Path to client key")
}

func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error {
var err error
if cfg.SrcPD, err = flags.GetString(flagSrcPD); err != nil {
return err
Expand All @@ -60,6 +73,41 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.Count, err = flags.GetInt(flagCount); err != nil {
return err
}
if cfg.CAPath, err = flags.GetString(flagCAPath); err != nil {
return err
}
if cfg.CertPath, err = flags.GetString(flagCertPath); err != nil {
return err
}
if cfg.KeyPath, err = flags.GetString(flagKeyPath); err != nil {
return err
}

if cfg.SrcPD == "" {
return fmt.Errorf("Upstream cluster PD is not set")
}
if strings.HasPrefix(cfg.SrcPD, "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
cfg.SrcSec.ClusterSSLCA = cfg.CAPath
cfg.SrcSec.ClusterSSLCert = cfg.CertPath
cfg.SrcSec.ClusterSSLKey = cfg.KeyPath
}

if requireDstPD {
if cfg.DstPD == "" {
return fmt.Errorf("Downstream cluster PD is not set")
}
if strings.HasPrefix(cfg.DstPD, "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
cfg.DstSec.ClusterSSLCA = cfg.CAPath
cfg.DstSec.ClusterSSLCert = cfg.CertPath
cfg.DstSec.ClusterSSLKey = cfg.KeyPath
}
}
return nil
}

Expand Down

0 comments on commit 1b6667f

Please sign in to comment.