Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump/: Accelerat transaction status queries throught GetMvccByEncodeKey #632

Merged
merged 12 commits into from
Jun 18, 2019
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.1
github.com/pingcap/kvproto v0.0.0-20181109035735-8e3f33ac4929
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190411060258-7e41749fa69c
github.com/pingcap/pd v2.1.3+incompatible
Expand Down
25 changes: 25 additions & 0 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Server struct {
lastWriteBinlogUnixNano int64
pdCli pd.Client
cfg *Config
tiStore kv.Storage

writeBinlogCount int64
alivePullerCount int64
Expand Down Expand Up @@ -166,6 +168,7 @@ func NewServer(cfg *Config) (*Server, error) {
ctx: ctx,
cancel: cancel,
metrics: metrics,
tiStore: tiStore,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: pdCli,
cfg: cfg,
Expand Down Expand Up @@ -597,6 +600,23 @@ func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "\n\n PrewriteValue: \n")
fmt.Fprint(w, prewriteValue.String())
}

if len(binlog.PrewriteKey) > 0 {
tikvStorage := s.tiStore.(tikv.Storage)
healper := storage.Helper{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not one Helper instance?

Copy link
Contributor Author

@july2993 july2993 Jun 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no performance cost and only use here so I think better just construct here, don't make Server struct complicate

july2993 marked this conversation as resolved.
Show resolved Hide resolved
Store: tikvStorage,
RegionCache: tikvStorage.GetRegionCache(),
}

resp, err := healper.GetMvccByEncodedKey(binlog.PrewriteKey)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can extract one function queryTransactionStatus, then just call it. the implementation output too many unless and old mvcc records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for debug just keep all info?

if err != nil {
fmt.Fprintf(w, "GetMvccByEncodedKey failed: %s", err.Error())
return
}

fmt.Fprint(w, "\n\n GetMvccByEncodedKey: \n")
fmt.Fprint(w, resp.String())
}
}

// PumpStatus returns all pumps' status.
Expand Down Expand Up @@ -782,6 +802,11 @@ func (s *Server) Close() {
s.pdCli.Close()
}
log.Info("has closed pdCli")

if s.tiStore != nil {
s.tiStore.Close()
}
log.Info("has closed tiStore")
suzaku marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *Server) waitUntilCommitTSSaved(ctx context.Context, ts int64, checkInterval time.Duration) error {
Expand Down
60 changes: 60 additions & 0 deletions pump/storage/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"go.uber.org/zap"
)

// Helper is a middleware to get some information from tikv/pd.
type Helper struct {
Store tikv.Storage
RegionCache *tikv.RegionCache
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just define a function GetMVCCByEncodedKey(tikv.Storage, kv.Key) instead? It seems like we have no other use for the Helper struct here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Copy from https://github.com/pingcap/tidb/blob/71def9c7263432c0dfa6a5960f6db824775177c9/store/helper/helper.go#L47
// we can use it directly if we upgrade to the latest version of TiDB dependency.

add comment in helper.go
and we may add other myself if not use TiDB code directly.

keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey)
if err != nil {
return nil, errors.Trace(err)
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByKey,
MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{
Key: encodedKey,
},
}
kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute)
if err != nil {
log.Info("get MVCC by encoded key failed",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Info("get MVCC by encoded key failed",
log.Error("get MVCC by encoded key failed",

zap.Binary("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Binary("startKey", keyLocation.StartKey),
zap.Binary("endKey", keyLocation.EndKey),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}
return kvResp.MvccGetByKey, nil
}
123 changes: 93 additions & 30 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
pkgutil "github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Append struct {
metadata *leveldb.DB
sorter *sorter
tiStore kv.Storage
helper *Helper
tiLockResolver *tikv.LockResolver
latestTS int64

Expand Down Expand Up @@ -137,14 +139,26 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
return nil, errors.Trace(err)
}

tikvStorage, ok := tiStore.(tikv.Storage)
if !ok {
return nil, errors.New("not tikv.Storage")
}

helper := &Helper{
Store: tikvStorage,
RegionCache: tikvStorage.GetRegionCache(),
}

writeCh := make(chan *request, chanCapacity)

append = &Append{
dir: dir,
vlog: vlog,
metadata: metadata,
writeCh: writeCh,
options: options,
tiStore: tiStore,
helper: helper,
tiLockResolver: tiLockResolver,

close: make(chan struct{}),
Expand Down Expand Up @@ -344,22 +358,92 @@ func (a *Append) updateStatus() {
}
}

// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts
july2993 marked this conversation as resolved.
Show resolved Hide resolved
func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = commitTS

req := a.writeBinlog(cbinlog)
if req.err != nil {
return errors.Annotate(req.err, "writeBinlog failed")
}

// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}

err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
if err != nil {
return errors.Annotate(req.err, "put into metadata failed")
}

return nil
}

func (a *Append) resolve(startTS int64) bool {
latestTS := atomic.LoadInt64(&a.latestTS)
if latestTS <= 0 {
return false
}

startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond)
maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond)

if maxSecond-startSecond <= maxTxnTimeoutSecond {
pbinlog, err := a.readBinlogByTS(startTS)
if err != nil {
log.Error(errors.ErrorStack(err))
return false
}

pbinlog, err := a.readBinlogByTS(startTS)
resp, err := a.helper.GetMvccByEncodedKey(pbinlog.PrewriteKey)
if err != nil {
log.Error(errors.ErrorStack(err))
log.Error("GetMvccByEncodedKey failed", zap.Error(err))
} else if resp.RegionError != nil {
log.Error("GetMvccByEncodedKey failed", zap.Stringer("RegionError", resp.RegionError))
} else if len(resp.Error) > 0 {
log.Error("GetMvccByEncodedKey failed", zap.String("Error", resp.Error))
} else {
for _, w := range resp.Info.Writes {
if !(int64(w.StartTs) == startTS) {
july2993 marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if w.Type != kvrpcpb.Op_Rollback {
// Sanity checks
july2993 marked this conversation as resolved.
Show resolved Hide resolved
if int64(w.CommitTs) <= startTS {
log.Warn("op type not Rollback, but have unexpect commit ts",
july2993 marked this conversation as resolved.
Show resolved Hide resolved
zap.Int64("startTS", startTS),
zap.Uint64("commitTS", w.CommitTs))
return false
}

err := a.writeCBinlog(pbinlog, int64(w.CommitTs))
if err != nil {
log.Error("writeCBinlog failed", zap.Error(err))
return false
}
} else {
// Will get the same value as start ts if it's rollback, set to 0 for log
w.CommitTs = 0
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

log.Info("known txn is committed or rollback from tikv",
zap.Int64("start ts", startTS),
zap.Uint64("commit ts", w.CommitTs))
return true
}
}

startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond)
maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond)

if maxSecond-startSecond <= maxTxnTimeoutSecond {
return false
}

Expand All @@ -377,35 +461,14 @@ func (a *Append) resolve(startTS int64) bool {
// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts
if status.IsCommitted() {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = int64(status.CommitTS())

req := a.writeBinlog(cbinlog)
if req.err != nil {
log.Error("writeBinlog failed", zap.Error(req.err))
return false
}

// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}

err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
err := a.writeCBinlog(pbinlog, int64(status.CommitTS()))
if err != nil {
log.Error("put into metadata failed", zap.Error(req.err))
log.Error("writeCBinlog failed", zap.Error(err))
return false
}
}

log.Info("known txn is committed from tikv",
log.Info("known txn is committed or rollback from tikv",
zap.Int64("start ts", startTS),
zap.Uint64("commit ts", status.CommitTS()))
return true
Expand Down