From 1bacb2dd4a0ecb2ab071893390cfef7dcfd086d3 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Sep 2020 15:59:24 +0800 Subject: [PATCH 1/8] add dumpling unit test --- dumpling/dumpling.go | 1 - dumpling/dumpling_test.go | 132 ++++++++++++++++++++++++++++++++++++++ dumpling/util_test.go | 9 --- 3 files changed, 132 insertions(+), 10 deletions(-) create mode 100644 dumpling/dumpling_test.go diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index b0fc123d11..25c14a4f43 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -85,7 +85,6 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { failpoint.Inject("dumpUnitProcessForever", func() { m.logger.Info("dump unit runs forever", zap.String("failpoint", "dumpUnitProcessForever")) <-ctx.Done() - failpoint.Return() }) // NOTE: remove output dir before start dumping diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go new file mode 100644 index 0000000000..3f3e4ee2dc --- /dev/null +++ b/dumpling/dumpling_test.go @@ -0,0 +1,132 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dumpling + +import ( + "context" + "os" + "strconv" + "sync" + "testing" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/failpoint" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testDumplingSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testDumplingSuite struct { + cfg *config.SubTaskConfig +} + +func getDBConfigFromEnv() config.DBConfig { + host := os.Getenv("MYSQL_HOST") + if host == "" { + host = "127.0.0.1" + } + port, _ := strconv.Atoi(os.Getenv("MYSQL_PORT")) + if port == 0 { + port = 3306 + } + user := os.Getenv("MYSQL_USER") + if user == "" { + user = "root" + } + pswd := os.Getenv("MYSQL_PSWD") + return config.DBConfig{ + Host: host, + User: user, + Password: pswd, + Port: port, + } +} + +func (d *testDumplingSuite) SetUpSuite(c *C) { + dir := c.MkDir() + d.cfg = &config.SubTaskConfig{ + Name: "dumplint_ut", + From: getDBConfigFromEnv(), + LoaderConfig: config.LoaderConfig{ + Dir: dir, + }, + } + c.Assert(log.InitLogger(&log.Config{}), IsNil) +} + +func (d *testDumplingSuite) TestDumpling(c *C) { + dumpling := NewDumpling(d.cfg) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := dumpling.Init(ctx) + c.Assert(err, IsNil) + resultCh := make(chan pb.ProcessResult, 1) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + dumpling.Process(ctx, resultCh) + }() + wg.Wait() + c.Assert(len(resultCh), Equals, 1) + result := <-resultCh + c.Assert(result.IsCanceled, IsFalse) + c.Assert(len(result.Errors), Equals, 0) + + c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `return("unknown error")`), IsNil) + // nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError") + + // return error + wg.Add(1) + go func() { + defer wg.Done() + dumpling.Process(ctx, resultCh) + }() + wg.Wait() + c.Assert(len(resultCh), Equals, 1) + result = <-resultCh + c.Assert(result.IsCanceled, IsFalse) + c.Assert(len(result.Errors), Equals, 1) + c.Assert(result.Errors[0].Message, Equals, "unknown error") + + // nolint:errcheck + failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError") + + c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever", `return("unknown error")`), IsNil) + // nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever") + + // cancel + wg.Add(1) + go func() { + defer wg.Done() + dumpling.Process(ctx, resultCh) + }() + cancel() + wg.Wait() + c.Assert(len(resultCh), Equals, 1) + result = <-resultCh + c.Assert(result.IsCanceled, IsTrue) + c.Assert(len(result.Errors), Equals, 0) +} diff --git a/dumpling/util_test.go b/dumpling/util_test.go index 285c931d18..4359737ea8 100644 --- a/dumpling/util_test.go +++ b/dumpling/util_test.go @@ -15,7 +15,6 @@ package dumpling import ( "strings" - "testing" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" @@ -23,14 +22,6 @@ import ( . "github.com/pingcap/check" ) -var _ = Suite(&testDumplingSuite{}) - -func TestSuite(t *testing.T) { - TestingT(t) -} - -type testDumplingSuite struct{} - func (m *testDumplingSuite) TestParseArgs(c *C) { logger := log.L() From 6fd5aacfea52d219642e6db7843ae7dffd597015 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 30 Sep 2020 12:02:29 +0800 Subject: [PATCH 2/8] address comment --- dumpling/dumpling_test.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index 3f3e4ee2dc..3ee95efc8d 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -81,13 +81,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { c.Assert(err, IsNil) resultCh := make(chan pb.ProcessResult, 1) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - dumpling.Process(ctx, resultCh) - }() - wg.Wait() + dumpling.Process(ctx, resultCh) c.Assert(len(resultCh), Equals, 1) result := <-resultCh c.Assert(result.IsCanceled, IsFalse) @@ -98,12 +92,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError") // return error - wg.Add(1) - go func() { - defer wg.Done() - dumpling.Process(ctx, resultCh) - }() - wg.Wait() + dumpling.Process(ctx, resultCh) c.Assert(len(resultCh), Equals, 1) result = <-resultCh c.Assert(result.IsCanceled, IsFalse) @@ -118,6 +107,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever") // cancel + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() From dc024e8f15f6b533ac471bba2d27708a334bd7e0 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 30 Sep 2020 13:27:36 +0800 Subject: [PATCH 3/8] address comment --- dumpling/dumpling.go | 6 ++++++ dumpling/dumpling_test.go | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index 25c14a4f43..957987671a 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -85,6 +85,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { failpoint.Inject("dumpUnitProcessForever", func() { m.logger.Info("dump unit runs forever", zap.String("failpoint", "dumpUnitProcessForever")) <-ctx.Done() + failpoint.Return() }) // NOTE: remove output dir before start dumping @@ -100,6 +101,11 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { return } + failpoint.Inject("dumpUnitProcessCancel", func() { + m.logger.Info("mock dump unit cancel", zap.String("failpoint", "dumpUnitProcessCancel")) + <-ctx.Done() + }) + newCtx, cancel := context.WithCancel(ctx) err = export.Dump(newCtx, m.dumpConfig) cancel() diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index 3ee95efc8d..d674450063 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -102,9 +102,9 @@ func (d *testDumplingSuite) TestDumpling(c *C) { // nolint:errcheck failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError") - c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever", `return("unknown error")`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessCancel", `return("unknown error")`), IsNil) // nolint:errcheck - defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessForever") + defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessCancel") // cancel var wg sync.WaitGroup From 753b373b1d67dd3207b3d9c64bef95b2b60a56ef Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 30 Sep 2020 14:02:39 +0800 Subject: [PATCH 4/8] debug ci --- dumpling/dumpling_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index d674450063..310ac6cd2c 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -85,7 +85,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { c.Assert(len(resultCh), Equals, 1) result := <-resultCh c.Assert(result.IsCanceled, IsFalse) - c.Assert(len(result.Errors), Equals, 0) + c.Assert(len(result.Errors), Equals, 0, Commentf("error: %v", result.Errors[0])) c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `return("unknown error")`), IsNil) // nolint:errcheck From 410b77571c77fd83ebb02e43735ff6bf58e4264f Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 30 Sep 2020 14:16:00 +0800 Subject: [PATCH 5/8] debug ci --- dumpling/dumpling_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index 310ac6cd2c..d4a456a00c 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -85,7 +85,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { c.Assert(len(resultCh), Equals, 1) result := <-resultCh c.Assert(result.IsCanceled, IsFalse) - c.Assert(len(result.Errors), Equals, 0, Commentf("error: %v", result.Errors[0])) + c.Assert(len(result.Errors), Equals, 0, Commentf("errrors: %v", result.Errors)) c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `return("unknown error")`), IsNil) // nolint:errcheck From 7fc54dcadf5e6bbd65af45212dc729df092e8839 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 30 Sep 2020 17:46:11 +0800 Subject: [PATCH 6/8] address comment --- dumpling/dumpling_test.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index d4a456a00c..beb1965319 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -17,8 +17,8 @@ import ( "context" "os" "strconv" - "sync" "testing" + "time" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" @@ -63,7 +63,7 @@ func getDBConfigFromEnv() config.DBConfig { func (d *testDumplingSuite) SetUpSuite(c *C) { dir := c.MkDir() d.cfg = &config.SubTaskConfig{ - Name: "dumplint_ut", + Name: "dumpling_ut", From: getDBConfigFromEnv(), LoaderConfig: config.LoaderConfig{ Dir: dir, @@ -74,7 +74,7 @@ func (d *testDumplingSuite) SetUpSuite(c *C) { func (d *testDumplingSuite) TestDumpling(c *C) { dumpling := NewDumpling(d.cfg) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := dumpling.Init(ctx) @@ -85,7 +85,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { c.Assert(len(resultCh), Equals, 1) result := <-resultCh c.Assert(result.IsCanceled, IsFalse) - c.Assert(len(result.Errors), Equals, 0, Commentf("errrors: %v", result.Errors)) + c.Assert(len(result.Errors), Equals, 0, Commentf("errors: %v", result.Errors)) c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `return("unknown error")`), IsNil) // nolint:errcheck @@ -107,16 +107,10 @@ func (d *testDumplingSuite) TestDumpling(c *C) { defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessCancel") // cancel - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - dumpling.Process(ctx, resultCh) - }() - cancel() - wg.Wait() + dumpling.Process(ctx, resultCh) c.Assert(len(resultCh), Equals, 1) result = <-resultCh c.Assert(result.IsCanceled, IsTrue) - c.Assert(len(result.Errors), Equals, 0) + c.Assert(len(result.Errors), Equals, 1) + c.Assert(result.Errors[0].String(), Matches, ".*context deadline exceeded.*") } From 465044584ef59f2bfb0a6bfbf98886eb07ce7722 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 9 Oct 2020 11:07:58 +0800 Subject: [PATCH 7/8] address comment --- dumpling/dumpling_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index beb1965319..0f329cf2a4 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -74,7 +74,7 @@ func (d *testDumplingSuite) SetUpSuite(c *C) { func (d *testDumplingSuite) TestDumpling(c *C) { dumpling := NewDumpling(d.cfg) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := dumpling.Init(ctx) @@ -107,7 +107,9 @@ func (d *testDumplingSuite) TestDumpling(c *C) { defer failpoint.Disable("github.com/pingcap/dm/dumpling/dumpUnitProcessCancel") // cancel - dumpling.Process(ctx, resultCh) + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + dumpling.Process(ctx2, resultCh) c.Assert(len(resultCh), Equals, 1) result = <-resultCh c.Assert(result.IsCanceled, IsTrue) From bff9d70af0df30a79c39746e787c96a1449ffac4 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 9 Oct 2020 12:56:46 +0800 Subject: [PATCH 8/8] remove commentf --- dumpling/dumpling_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dumpling/dumpling_test.go b/dumpling/dumpling_test.go index 0f329cf2a4..42f9378ef6 100644 --- a/dumpling/dumpling_test.go +++ b/dumpling/dumpling_test.go @@ -85,7 +85,7 @@ func (d *testDumplingSuite) TestDumpling(c *C) { c.Assert(len(resultCh), Equals, 1) result := <-resultCh c.Assert(result.IsCanceled, IsFalse) - c.Assert(len(result.Errors), Equals, 0, Commentf("errors: %v", result.Errors)) + c.Assert(len(result.Errors), Equals, 0) c.Assert(failpoint.Enable("github.com/pingcap/dm/dumpling/dumpUnitProcessWithError", `return("unknown error")`), IsNil) // nolint:errcheck