Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization: Unified Sorter (#972) #1122

Merged
merged 4 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ ignore:
- "*.toml"
- "*.md"
- "docs/.*"
- "testing_utils/.*"
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'kv_gen')
CDC_PKG := github.com/pingcap/ticdc
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done)
FAILPOINT := bin/failpoint-ctl
Expand Down Expand Up @@ -105,6 +105,8 @@ integration_test_build: check_failpoint_ctl
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)

integration_test: integration_test_mysql
Expand Down Expand Up @@ -143,8 +145,8 @@ check: check-copyright fmt lint check-static tidy errdoc

coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
@goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
Expand All @@ -156,7 +158,7 @@ else
endif

check-static: tools/bin/golangci-lint
tools/bin/golangci-lint run --timeout 10m0s
tools/bin/golangci-lint run --timeout 10m0s --skip-files kv_gen

clean:
go clean -i ./...
Expand Down
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cdc

import (
"github.com/pingcap/ticdc/cdc/puller/sorter"
"time"

"github.com/pingcap/ticdc/cdc/entry"
Expand All @@ -37,5 +38,6 @@ func init() {
puller.InitMetrics(registry)
sink.InitMetrics(registry)
entry.InitMetrics(registry)
sorter.InitMetrics(registry)
initProcessorMetrics(registry)
}
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SortEngine string
const (
SortInMemory SortEngine = "memory"
SortInFile SortEngine = "file"
SortUnified SortEngine = "unified"
)

// FeedState represents the running state of a changefeed
Expand Down
18 changes: 11 additions & 7 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package model

import (
Expand All @@ -32,6 +34,7 @@ const (

// RegionFeedEvent from the kv layer.
// Only one of the event will be setted.
//msgp:ignore RegionFeedEvent
type RegionFeedEvent struct {
Val *RawKVEntry
Resolved *ResolvedSpan
Expand All @@ -53,25 +56,26 @@ func (e *RegionFeedEvent) GetValue() interface{} {

// ResolvedSpan guarantees all the KV value event
// with commit ts less than ResolvedTs has been emitted.
//msgp:ignore ResolvedSpan
type ResolvedSpan struct {
Span regionspan.ComparableSpan
ResolvedTs uint64
}

// RawKVEntry notify the KV operator
type RawKVEntry struct {
OpType OpType
Key []byte
OpType OpType `msg:"op_type"`
Key []byte `msg:"key"`
// nil for delete type
Value []byte
Value []byte `msg:"value"`
// nil for insert type
OldValue []byte
StartTs uint64
OldValue []byte `msg:"old_value"`
StartTs uint64 `msg:"start_ts"`
// Commit or resolved TS
CRTs uint64
CRTs uint64 `msg:"crts"`

// Additonal debug info
RegionID uint64
RegionID uint64 `msg:"region_id"`
}

func (v *RawKVEntry) String() string {
Expand Down
Loading