Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ticdc data dir #1372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0e0dbc2
add a data dir field in cdc spec.
3AceShowHand May 19, 2021
d33d050
update go mod file.
3AceShowHand May 19, 2021
6b79d4e
add data dir into ticdc.
3AceShowHand May 19, 2021
1421709
add debug log.
3AceShowHand May 19, 2021
b49a641
remove debug.
3AceShowHand May 20, 2021
d77600c
add some testcase for ticdc data_dir.
3AceShowHand May 20, 2021
6a7c430
add data_dir by version.
3AceShowHand May 21, 2021
6439c29
add comment about CDCScript.WithDataDir.
3AceShowHand May 22, 2021
106d7d1
set data dir to '', if cluster version lower than 4.0.13
3AceShowHand May 24, 2021
32af2e9
fix panic on cdc data dir.
3AceShowHand May 24, 2021
9b6294e
add debug log .
3AceShowHand May 24, 2021
edb5736
tiny fix on adjust.
3AceShowHand May 24, 2021
6704135
check version on scale out.
3AceShowHand May 24, 2021
dabb1a3
fix conflicts at scale-out
3AceShowHand May 24, 2021
fed2886
fix scale_out issue.
3AceShowHand May 24, 2021
d98c095
fix cdc data dir is empty when display in confirm.
3AceShowHand May 24, 2021
122f040
do not throw error if data dir set in cdc, just ignore it.
3AceShowHand May 24, 2021
157291f
fix confirm
3AceShowHand May 24, 2021
e3db218
one more try.
3AceShowHand May 25, 2021
4f29ae0
try to fix rename.
3AceShowHand May 25, 2021
4bfb002
try to fix scale-out issue.
3AceShowHand May 25, 2021
76e1caa
warn for wrong data-dir use in cdc.
3AceShowHand May 25, 2021
cd32e8f
move warn to init config.
3AceShowHand May 25, 2021
1d26c06
refine log warn message.
3AceShowHand May 25, 2021
3c1938e
rename cdc start script folder.
3AceShowHand May 25, 2021
8312c3e
move warn to adjust..
3AceShowHand May 25, 2021
bd9b47a
tiny adjust.
3AceShowHand May 25, 2021
c2919f9
make warn red.
3AceShowHand May 25, 2021
ea70a06
move warn out of loop.
3AceShowHand May 25, 2021
d9ffcc4
remove cdc data dir in check cluster.
3AceShowHand May 25, 2021
45191d1
add mkdir.
3AceShowHand May 26, 2021
047a073
remove log.
3AceShowHand May 26, 2021
99dc83d
Merge branch 'master' into ling.jin/support-ticdc-data-dir
AstroProfundis May 26, 2021
0691dc3
Merge branch 'master' into ling.jin/support-ticdc-data-dir
3AceShowHand May 26, 2021
34e2030
Merge branch 'ling.jin/support-ticdc-data-dir' of https://github.com/…
3AceShowHand May 26, 2021
69a1116
support data-dir .
3AceShowHand May 27, 2021
102370e
add testcase for cdc script initialization by version.
3AceShowHand May 27, 2021
25d4b9c
tiny change.
3AceShowHand May 27, 2021
940aa46
fix.
3AceShowHand May 27, 2021
1a2bd76
add ignore .
3AceShowHand May 28, 2021
476a3c5
Merge branch 'master' into ling.jin/support-ticdc-data-dir
3AceShowHand May 28, 2021
c2ec4db
Merge branch 'master' into ling.jin/support-ticdc-data-dir
3AceShowHand May 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/gizak/termui/v3 v3.1.0
github.com/google/flatbuffers v2.0.0+incompatible // indirect
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion components/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,9 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
7 changes: 7 additions & 0 deletions embed/templates/scripts/run_cdc.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ exec bin/cdc server \
--addr "0.0.0.0:{{.Port}}" \
--advertise-addr "{{.IP}}:{{.Port}}" \
--pd "{{template "PDList" .Endpoints}}" \
{{- if .DataDir}}
{{- if .DataDirEnabled}}
--data-dir="{{.DataDir}}" \
{{- else}}
--sort-dir="{{.DataDir}}/tmp/sorter" \
{{- end}}
{{- end}}
{{- if .TLSEnabled}}
--ca tls/ca.crt \
--cert tls/cdc.crt \
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (m *Manager) CheckCluster(clusterOrTopoName string, opt CheckOptions, gOpt
opt.IdentityFile = m.specManager.Path(clusterName, "ssh", "id_rsa")

topo = *metadata.Topology
topo.AdjustByVersion(metadata.Version)
} else { // check before cluster is deployed
topoFileName := clusterOrTopoName

Expand Down
19 changes: 11 additions & 8 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ func (m *Manager) Deploy(
base.GlobalOptions.SSHType = sshType
}

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
// Check if TiKV's label set correctly
lbs, err := topo.LocationLabels()
if err != nil {
return err
}
if err := spec.CheckTiKVLabels(lbs, topo); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
if topo, ok := topo.(*spec.Specification); ok {
topo.AdjustByVersion(clusterVersion)
if !opt.NoLabels {
// Check if TiKV's label set correctly
lbs, err := topo.LocationLabels()
if err != nil {
return err
}
if err := spec.CheckTiKVLabels(lbs, topo); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
}

Expand Down
34 changes: 19 additions & 15 deletions pkg/cluster/manager/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (m *Manager) ScaleOut(

topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

// Inherit existing global configuration. We must assign the inherited values before unmarshalling
// because some default value rely on the global options and monitored options.
newPart := topo.NewPart()
Expand All @@ -77,6 +76,9 @@ func (m *Manager) ScaleOut(
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
return err
}
if newPartTopo, ok := newPart.(*spec.Specification); ok {
newPartTopo.AdjustByVersion(base.Version)
}

if err := validateNewTopo(newPart); err != nil {
return err
Expand All @@ -89,21 +91,23 @@ func (m *Manager) ScaleOut(
}
spec.ExpandRelativeDir(mergedTopo)

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
if topo, ok := mergedTopo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, placementRule, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
if !placementRule {
if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
if !opt.NoLabels {
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, placementRule, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
if !placementRule {
if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio

// Deploy component
tb := task.NewBuilder()

// for some component, dataDirs might need to be created due to upgrade
// eg: TiCDC support DataDir since v4.0.13
tb = tb.Mkdir(topo.BaseTopo().GlobalOptions.User, inst.GetHost(), dataDirs...)

if inst.IsImported() {
switch inst.ComponentName() {
case spec.ComponentPrometheus, spec.ComponentGrafana, spec.ComponentAlertmanager:
Expand Down
23 changes: 14 additions & 9 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type CDCSpec struct {
Patched bool `yaml:"patched,omitempty"`
Port int `yaml:"port" default:"8300"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
LogDir string `yaml:"log_dir,omitempty"`
Offline bool `yaml:"offline,omitempty"`
GCTTL int64 `yaml:"gc-ttl,omitempty" validate:"gc-ttl:editable"`
Expand Down Expand Up @@ -84,7 +85,7 @@ func (c *CDCComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.CDCServers))
for _, s := range c.Topology.CDCServers {
s := s
ins = append(ins, &CDCInstance{BaseInstance{
instance := &CDCInstance{BaseInstance{
InstanceSpec: s,
Name: c.Name(),
Host: s.Host,
Expand All @@ -103,7 +104,12 @@ func (c *CDCComponent) Instances() []Instance {
UptimeFn: func(tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, tlsCfg)
},
}, c.Topology})
}, c.Topology}
if s.DataDir != "" {
instance.Dirs = append(instance.Dirs, s.DataDir)
}

ins = append(ins, instance)
}
return ins
}
Expand Down Expand Up @@ -151,11 +157,10 @@ func (i *CDCInstance) InitConfig(
globalConfig := topo.ServerConfigs.CDC
instanceConfig := spec.Config

configFileSupported := false
if semver.Compare(clusterVersion, "v4.0.13") >= 0 && clusterVersion != "v5.0.0-rc" {
configFileSupported = true
} else if len(globalConfig)+len(instanceConfig) > 0 {
return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later")
if semver.Compare(clusterVersion, "v4.0.13") == -1 {
if len(globalConfig)+len(instanceConfig) > 0 {
return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later")
}
}

cfg := scripts.NewCDCScript(
Expand All @@ -167,8 +172,8 @@ func (i *CDCInstance) InitConfig(
spec.TZ,
).WithPort(spec.Port).WithNumaNode(spec.NumaNode).AppendEndpoints(topo.Endpoints(deployUser)...)

if configFileSupported {
cfg = cfg.WithConfigFileEnabled()
if len(paths.Data) != 0 {
cfg = cfg.PatchByVersion(clusterVersion, paths.Data[0])
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_cdc_%s_%d.sh", i.GetHost(), i.GetPort()))
Expand Down
11 changes: 11 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/meta"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/mod/semver"
)

const (
Expand Down Expand Up @@ -376,6 +377,16 @@ func (s *Specification) GetPDList() []string {
return pdList
}

// AdjustByVersion modify the spec by cluster version.
func (s *Specification) AdjustByVersion(clusterVersion string) {
// CDC does not support data dir for version below v4.0.13, and also v5.0.0-rc, set it to empty.
if semver.Compare(clusterVersion, "v4.0.13") == -1 || clusterVersion == "v5.0.0-rc" {
for _, server := range s.CDCServers {
server.DataDir = ""
}
}
}

// GetDashboardAddress returns the cluster's dashboard addr
func (s *Specification) GetDashboardAddress(tlsCfg *tls.Config, pdList ...string) (string, error) {
pc := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg)
Expand Down
67 changes: 64 additions & 3 deletions pkg/cluster/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/BurntSushi/toml"
. "github.com/pingcap/check"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"golang.org/x/mod/semver"
"gopkg.in/yaml.v2"
)

Expand All @@ -37,6 +38,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
// Test with without global DataDir.
topo := new(Specification)
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22})
data, err := yaml.Marshal(topo)
c.Assert(err, IsNil)

Expand All @@ -46,6 +48,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "data")
c.Assert(topo.CDCServers[0].DataDir, Equals, "data")

// Can keep the default value.
data, err = yaml.Marshal(topo)
Expand All @@ -55,21 +58,27 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "data")
c.Assert(topo.CDCServers[0].DataDir, Equals, "data")

// Test with global DataDir.
topo = new(Specification)
topo.GlobalOptions.DataDir = "/gloable_data"
topo.GlobalOptions.DataDir = "/global_data"
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22})
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.2", Port: 33, DataDir: "/my_data"})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.4", Port: 22, DataDir: "/cdc_data"})
data, err = yaml.Marshal(topo)
c.Assert(err, IsNil)

topo = new(Specification)
err = yaml.Unmarshal(data, topo)
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "/gloable_data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "/gloable_data/tikv-22")
c.Assert(topo.GlobalOptions.DataDir, Equals, "/global_data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "/global_data/tikv-22")
c.Assert(topo.TiKVServers[1].DataDir, Equals, "/my_data")

c.Assert(topo.CDCServers[0].DataDir, Equals, "/global_data/cdc-22")
c.Assert(topo.CDCServers[1].DataDir, Equals, "/cdc_data")
}

func (s *metaSuiteTopo) TestGlobalOptions(c *C) {
Expand All @@ -86,6 +95,9 @@ tidb_servers:
pd_servers:
- host: 172.16.5.53
data_dir: "pd-data"
cdc_servers:
- host: 172.16.5.233
data_dir: "cdc-data"
`), &topo)
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.User, Equals, "test1")
Expand All @@ -96,6 +108,10 @@ pd_servers:
c.Assert(topo.PDServers[0].SSHPort, Equals, 220)
c.Assert(topo.PDServers[0].DeployDir, Equals, "test-deploy/pd-2379")
c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data")

c.Assert(topo.CDCServers[0].SSHPort, Equals, 220)
c.Assert(topo.CDCServers[0].DeployDir, Equals, "test-deploy/cdc-8300")
c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data")
}

func (s *metaSuiteTopo) TestDataDirAbsolute(c *C) {
Expand All @@ -109,11 +125,19 @@ pd_servers:
data_dir: "pd-data"
- host: 172.16.5.54
client_port: 12379
cdc_servers:
- host: 172.16.5.233
data_dir: "cdc-data"
- host: 172.16.5.234
port: 23333
`), &topo)
c.Assert(err, IsNil)

c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data")
c.Assert(topo.PDServers[1].DataDir, Equals, "/test-data/pd-12379")

c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data")
c.Assert(topo.CDCServers[1].DataDir, Equals, "/test-data/cdc-23333")
}

func (s *metaSuiteTopo) TestGlobalConfig(c *C) {
Expand Down Expand Up @@ -684,6 +708,43 @@ tiflash_servers:
}
}

func (s *metaSuiteTopo) TestTiCDCDataDir(c *C) {
spec := &Specification{}
err := yaml.Unmarshal([]byte(`
cdc_servers:
- host: 172.16.6.191
data_dir: /tidb-data/cdc-8300
`), spec)
c.Assert(err, IsNil)

cdcComp := FindComponent(spec, ComponentCDC)
instances := cdcComp.Instances()
c.Assert(len(instances), Equals, 1)

checkByVersion := func(version string) {
ins := instances[0].(*CDCInstance)
cfg := scripts.NewCDCScript(ins.GetHost(), "", "", false, 0, "").
PatchByVersion(version, ins.DataDir())

// DataDir support since v4.0.13
checker := Equals
if semver.Compare(version, "v4.0.13") >= 0 && version != "v5.0.0-rc" {
checker = Not(checker)
c.Assert(len(cfg.DataDir), checker, 0)

// TiCDC support --data-dir since v4.0.14 and v5.0.3
expected := semver.Compare(version, "v4.0.14") >= 0 || semver.Compare(version, "v5.0.3") >= 0
c.Assert(cfg.DataDirEnabled, Equals, expected)
}
}

checkByVersion("v4.0.12")
checkByVersion("v4.0.13")
checkByVersion("v5.0.0-rc")
checkByVersion("v4.0.14")
checkByVersion("v5.0.3")
}

func (s *metaSuiteTopo) TestTiFlashUsersSettings(c *C) {
spec := &Specification{}
err := yaml.Unmarshal([]byte(`
Expand Down
Loading