From 914dab53c1b172ab1854be6a772d9934348eb03f Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Wed, 11 Dec 2019 17:29:58 +0800 Subject: [PATCH] *: cherry pick #378 and #416 (#418) --- Makefile | 40 ++++++++++++++++++++----- dm/worker/worker_test.go | 2 +- pkg/streamer/file_test.go | 15 ++++++---- pkg/streamer/hub_test.go | 4 ++- pkg/streamer/reader_test.go | 2 +- pkg/utils/storage.go | 39 ------------------------ pkg/utils/storage_unix.go | 55 ++++++++++++++++++++++++++++++++++ pkg/utils/storage_windows.go | 42 ++++++++++++++++++++++++++ relay/writer/file_test.go | 2 +- relay/writer/file_util_test.go | 4 +-- 10 files changed, 147 insertions(+), 58 deletions(-) create mode 100644 pkg/utils/storage_unix.go create mode 100644 pkg/utils/storage_windows.go diff --git a/Makefile b/Makefile index dbe7d016d0..ce741ea2a0 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,10 @@ GO := GO111MODULE=on go GOBUILD := CGO_ENABLED=0 $(GO) build GOTEST := CGO_ENABLED=1 $(GO) test PACKAGES := $$(go list ./... | grep -vE 'tests|cmd|vendor|pbmock') +PACKAGES_RELAY := $$(go list ./... | grep 'github.com/pingcap/dm/relay') +PACKAGES_SYNCER := $$(go list ./... | grep 'github.com/pingcap/dm/syncer') +PACKAGES_PKG_BINLOG := $$(go list ./... | grep 'github.com/pingcap/dm/pkg/binlog') +PACKAGES_OTHERS := $$(go list ./... | grep -vE 'tests|cmd|vendor|pbmock|github.com/pingcap/dm/relay|github.com/pingcap/dm/syncer|github.com/pingcap/dm/pkg/binlog') FILES := $$(find . -name "*.go" | grep -vE "vendor") TOPDIRS := $$(ls -d */ | grep -vE "vendor") SHELL := /usr/bin/env bash @@ -17,10 +21,15 @@ FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/dm/" FAILPOINT := bin/failpoint-ctl RACE_FLAG = +TEST_RACE_FLAG = -race ifeq ("$(WITH_RACE)", "1") RACE_FLAG = -race GOBUILD = CGO_ENABLED=1 $(GO) build endif +ifeq ("$(WITH_RACE)", "0") + TEST_RACE_FLAG = + GOTEST = CGO_ENABLED=0 $(GO) test +endif FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null) FAILPOINT_DISABLE := $$(find $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev/null) @@ -34,7 +43,7 @@ ifeq ($(ARCH), $(LINUX)) LDFLAGS += -X "github.com/pingcap/dm/dm/master.SampleConfigFile=$(shell cat dm/master/dm-master.toml | base64 -w 0)" else LDFLAGS += -X "github.com/pingcap/dm/dm/worker.SampleConfigFile=$(shell cat dm/worker/dm-worker.toml | base64)" - LDFLAGS += -X "github.com/pingcap/dm/dm/master.SampleConfigFile=$(shell cat dm/master/dm-master.toml | base64)" + LDFLAGS += -X "github.com/pingcap/dm/dm/master.SampleConfigFile=$(shell cat dm/master/dm-master.toml | base64)" endif .PHONY: build test unit_test dm_integration_test_build integration_test \ @@ -56,15 +65,32 @@ dmctl: test: unit_test integration_test -unit_test: +define run_unit_test + @echo "running unit test for packages:" $(1) bash -x ./tests/wait_for_mysql.sh mkdir -p $(TEST_DIR) which $(FAILPOINT) >/dev/null 2>&1 || $(GOBUILD) -o $(FAILPOINT) github.com/pingcap/failpoint/failpoint-ctl $(FAILPOINT_ENABLE) @export log_level=error; \ - $(GOTEST) -covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit_test.out" -race $(PACKAGES) \ + $(GOTEST) -covermode=atomic -coverprofile="$(TEST_DIR)/cov.$(2).out" $(TEST_RACE_FLAG) $(1) \ || { $(FAILPOINT_DISABLE); exit 1; } $(FAILPOINT_DISABLE) +endef + +unit_test: + $(call run_unit_test,$(PACKAGES),unit_test) + +unit_test_relay: + $(call run_unit_test,$(PACKAGES_RELAY),unit_test_relay) + +unit_test_syncer: + $(call run_unit_test,$(PACKAGES_SYNCER),unit_test_syncer) + +unit_test_pkg_binlog: + $(call run_unit_test,$(PACKAGES_PKG_BINLOG),unit_test_pkg_binlog) + +unit_test_others: + $(call run_unit_test,$(PACKAGES_OTHERS),unit_test_others) check: fmt lint vet terror_check @@ -95,11 +121,11 @@ terror_check: dm_integration_test_build: which $(FAILPOINT) >/dev/null 2>&1 || $(GOBUILD) -o $(FAILPOINT) github.com/pingcap/failpoint/failpoint-ctl $(FAILPOINT_ENABLE) - $(GOTEST) -c -race -cover -covermode=atomic \ + $(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \ -coverpkg=github.com/pingcap/dm/... \ -o bin/dm-worker.test github.com/pingcap/dm/cmd/dm-worker \ || { $(FAILPOINT_DISABLE); exit 1; } - $(GOTEST) -c -race -cover -covermode=atomic \ + $(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \ -coverpkg=github.com/pingcap/dm/... \ -o bin/dm-master.test github.com/pingcap/dm/cmd/dm-master \ || { $(FAILPOINT_DISABLE); exit 1; } @@ -107,7 +133,7 @@ dm_integration_test_build: -coverpkg=github.com/pingcap/dm/... \ -o bin/dmctl.test github.com/pingcap/dm/cmd/dm-ctl \ || { $(FAILPOINT_DISABLE); exit 1; } - $(GOTEST) -c -race -cover -covermode=atomic \ + $(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \ -coverpkg=github.com/pingcap/dm/... \ -o bin/dm-tracer.test github.com/pingcap/dm/cmd/dm-tracer \ || { $(FAILPOINT_DISABLE); exit 1; } @@ -140,7 +166,7 @@ coverage_fix_cover_mode: coverage: coverage_fix_cover_mode GO111MODULE=off go get github.com/zhouqiang-cl/gocovmerge gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" - grep -vE ".*.pb.go|.*.__failpoint_binding__.go" $(TEST_DIR)/cov.unit_test.out > $(TEST_DIR)/unit_test.out + gocovmerge "$(TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.__failpoint_binding__.go" > $(TEST_DIR)/unit_test.out ifeq ("$(JenkinsCI)", "1") GO111MODULE=off go get github.com/mattn/goveralls @goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN) diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 6de376694f..30dec19cb4 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -167,7 +167,7 @@ func (t *testServer) TestTaskAutoResume(c *C) { c.Assert(err, IsNil) // check task in paused state - c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { + c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { for _, st := range s.worker.QueryStatus(taskName) { if st.Name == taskName && st.Stage == pb.Stage_Paused { return true diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index c3d43c80be..23ae78ddee 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "path/filepath" + "regexp" "sync" "time" @@ -218,7 +219,7 @@ func (t *testFileSuite) TestGetFirstBinlogName(c *C) { // sub directory not exist name, err := getFirstBinlogName(baseDir, uuid) - c.Assert(err, ErrorMatches, ".*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(name, Equals, "") // empty directory @@ -271,7 +272,7 @@ func (t *testFileSuite) TestFileSizeUpdated(c *C) { // file not exists cmp, err := fileSizeUpdated(filePath, latestSize) - c.Assert(err, ErrorMatches, ".*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(cmp, Equals, 0) // create and write the file @@ -313,7 +314,7 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { // a. relay log dir not exist upNotExist, err := relaySubDirUpdated(ctx, watcherInterval, "/not-exists-directory", "/not-exists-filepath", "not-exists-file", 0) - c.Assert(err, ErrorMatches, ".*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(upNotExist, Equals, "") // create relay log dir @@ -334,7 +335,7 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { // c. latest file path not exist upNotExist, err = relaySubDirUpdated(ctx, watcherInterval, subDir, "/no-exists-filepath", relayFiles[0], 0) - c.Assert(err, ErrorMatches, ".*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(upNotExist, Equals, "") // 1. file increased when adding watching @@ -476,7 +477,7 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { currentUUID = UUIDs[0] needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*no such file or directory.*", UUIDs[1])) + c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(nextUUID, Equals, "") @@ -493,7 +494,9 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { latestFilePath = filepath.Join(relayDir, UUIDs[0], "mysql-bin.000001") needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*no such file or directory.*", latestFilePath)) + c.Assert(err, ErrorMatches, fmt.Sprintf( + ".*%s.*(no such file or directory|The system cannot find the path specified).*", + regexp.QuoteMeta(latestFilePath))) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(nextUUID, Equals, "") diff --git a/pkg/streamer/hub_test.go b/pkg/streamer/hub_test.go index 39683dd576..ab302914b7 100644 --- a/pkg/streamer/hub_test.go +++ b/pkg/streamer/hub_test.go @@ -14,6 +14,8 @@ package streamer import ( + "path/filepath" + . "github.com/pingcap/check" ) @@ -49,7 +51,7 @@ func (t *testHubSuite) TestRelayLogInfo(c *C) { // string representation rli3.UUID = "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000001" - c.Assert(rli3.String(), Equals, rli3.UUID+"/"+rli3.Filename) + c.Assert(rli3.String(), Equals, filepath.Join(rli3.UUID, rli3.Filename)) } func (t *testHubSuite) TestRelayLogInfoHub(c *C) { diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 27de8eacee..a08c10cc20 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -82,7 +82,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { // relay log file not exists, failed needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) - c.Assert(err, ErrorMatches, ".*no such file or directory.*") + c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the path specified).*") // empty relay log file, failed, got EOF err = os.MkdirAll(relayDir, 0700) diff --git a/pkg/utils/storage.go b/pkg/utils/storage.go index 7251db8894..a4c7611b35 100644 --- a/pkg/utils/storage.go +++ b/pkg/utils/storage.go @@ -13,48 +13,9 @@ package utils -import ( - "reflect" - - "golang.org/x/sys/unix" - - "github.com/pingcap/dm/pkg/terror" -) - // StorageSize represents the storage's capacity and available size // Learn from tidb-binlog source code. type StorageSize struct { Capacity uint64 Available uint64 } - -// GetStorageSize gets storage's capacity and available size -func GetStorageSize(dir string) (size StorageSize, err error) { - var stat unix.Statfs_t - - err = unix.Statfs(dir, &stat) - if err != nil { - return size, terror.ErrStatFileSize.Delegate(err) - } - - // When container is run in MacOS, `bsize` obtained by `statfs` syscall is not the fundamental block size, - // but the `iosize` (optimal transfer block size) instead, it's usually 1024 times larger than the `bsize`. - // for example `4096 * 1024`. To get the correct block size, we should use `frsize`. But `frsize` isn't - // guaranteed to be supported everywhere, so we need to check whether it's supported before use it. - // For more details, please refer to: https://github.com/docker/for-mac/issues/2136 - bSize := uint64(stat.Bsize) - field := reflect.ValueOf(&stat).Elem().FieldByName("Frsize") - if field.IsValid() { - if field.Kind() == reflect.Uint64 { - bSize = field.Uint() - } else { - bSize = uint64(field.Int()) - } - } - - // Available blocks * size per block = available space in bytes - size.Available = stat.Bavail * bSize - size.Capacity = stat.Blocks * bSize - - return -} diff --git a/pkg/utils/storage_unix.go b/pkg/utils/storage_unix.go new file mode 100644 index 0000000000..7c1a723c4c --- /dev/null +++ b/pkg/utils/storage_unix.go @@ -0,0 +1,55 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package utils + +import ( + "reflect" + + "golang.org/x/sys/unix" + + "github.com/pingcap/dm/pkg/terror" +) + +// GetStorageSize gets storage's capacity and available size +func GetStorageSize(dir string) (size StorageSize, err error) { + var stat unix.Statfs_t + + err = unix.Statfs(dir, &stat) + if err != nil { + return size, terror.ErrStatFileSize.Delegate(err) + } + + // When container is run in MacOS, `bsize` obtained by `statfs` syscall is not the fundamental block size, + // but the `iosize` (optimal transfer block size) instead, it's usually 1024 times larger than the `bsize`. + // for example `4096 * 1024`. To get the correct block size, we should use `frsize`. But `frsize` isn't + // guaranteed to be supported everywhere, so we need to check whether it's supported before use it. + // For more details, please refer to: https://github.com/docker/for-mac/issues/2136 + bSize := uint64(stat.Bsize) + field := reflect.ValueOf(&stat).Elem().FieldByName("Frsize") + if field.IsValid() { + if field.Kind() == reflect.Uint64 { + bSize = field.Uint() + } else { + bSize = uint64(field.Int()) + } + } + + // Available blocks * size per block = available space in bytes + size.Available = stat.Bavail * bSize + size.Capacity = stat.Blocks * bSize + + return +} diff --git a/pkg/utils/storage_windows.go b/pkg/utils/storage_windows.go new file mode 100644 index 0000000000..68a1ecdfcb --- /dev/null +++ b/pkg/utils/storage_windows.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package utils + +import ( + "syscall" + "unsafe" + + "github.com/pingcap/dm/pkg/terror" +) + +var ( + kernel32 = syscall.MustLoadDLL("kernel32.dll") + getDiskFreeSpaceExW = kernel32.MustFindProc("GetDiskFreeSpaceExW") +) + +// GetStorageSize gets storage's capacity and available size +func GetStorageSize(dir string) (size StorageSize, err error) { + r, _, e := getDiskFreeSpaceExW.Call( + uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))), + uintptr(unsafe.Pointer(&size.Available)), + uintptr(unsafe.Pointer(&size.Capacity)), + 0, + ) + if r == 0 { + err = terror.ErrStatFileSize.Delegate(e) + } + return +} diff --git a/relay/writer/file_test.go b/relay/writer/file_test.go index b19197c565..97c5ffba28 100644 --- a/relay/writer/file_test.go +++ b/relay/writer/file_test.go @@ -357,7 +357,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check // write again, duplicate, but we already rotated and new binlog file not created _, err = w4.WriteEvent(rotateEv) - c.Assert(err, check.ErrorMatches, ".*no such file or directory.*") + c.Assert(err, check.ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") // cfg.Filename should contain both one FormatDescriptionEvent and one RotateEvent, next file should be empty filename1 = filepath.Join(cfg.RelayDir, cfg.Filename) diff --git a/relay/writer/file_util_test.go b/relay/writer/file_util_test.go index 0c1703357b..a75996efc2 100644 --- a/relay/writer/file_util_test.go +++ b/relay/writer/file_util_test.go @@ -42,7 +42,7 @@ func (t *testFileUtilSuite) TestCheckBinlogHeaderExist(c *check.C) { // file not exists filename := filepath.Join(c.MkDir(), "test-mysql-bin.000001") exist, err := checkBinlogHeaderExist(filename) - c.Assert(err, check.ErrorMatches, ".*no such file or directory.*") + c.Assert(err, check.ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(exist, check.IsFalse) // empty file @@ -99,7 +99,7 @@ func (t *testFileUtilSuite) TestCheckFormatDescriptionEventExist(c *check.C) { // file not exists filename := filepath.Join(c.MkDir(), "test-mysql-bin.000001") exist, err := checkFormatDescriptionEventExist(filename) - c.Assert(err, check.ErrorMatches, ".*no such file or directory.*") + c.Assert(err, check.ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") c.Assert(exist, check.IsFalse) // empty file