Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: wait for binlog recovering when using HTTP API (#13740) #13892

Merged
merged 2 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ const (
)

// For query string
const qTableID = "table_id"
const qLimit = "limit"
const (
qTableID = "table_id"
qLimit = "limit"
qOperation = "op"
qSeconds = "seconds"
)

const (
headerContentType = "Content-Type"
Expand Down Expand Up @@ -666,7 +670,27 @@ func (h configReloadHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

// ServeHTTP recovers binlog service.
func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
binloginfo.DisableSkipBinlogFlag()
op := req.FormValue(qOperation)
switch op {
case "reset":
binloginfo.ResetSkippedCommitterCounter()
case "nowait":
binloginfo.DisableSkipBinlogFlag()
case "status":
default:
sec, err := strconv.ParseInt(req.FormValue(qSeconds), 10, 64)
if sec <= 0 || err != nil {
sec = 1800
}
binloginfo.DisableSkipBinlogFlag()
timeout := time.Duration(sec) * time.Second
err = binloginfo.WaitBinlogRecover(timeout)
if err != nil {
writeError(w, err)
return
}
}
writeData(w, binloginfo.GetBinlogStatus())
}

// ServeHTTP handles request of list a database or table's schemas.
Expand Down
68 changes: 68 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -237,6 +238,73 @@ func (ts *HTTPHandlerTestSuite) TestGetRegionByIDWithError(c *C) {
defer resp.Body.Close()
}

func (ts *HTTPHandlerTestSuite) TestBinlogRecover(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Invalid operation will use the default operation.
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
binloginfo.AddOneSkippedCommitter()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)
binloginfo.RemoveOneSkippedCommitter()

binloginfo.AddOneSkippedCommitter()
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(1))
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(0))

binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Only the first should work.
binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait&op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=status"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
}

func (ts *HTTPHandlerTestSuite) TestRegionsFromMeta(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
Expand Down
101 changes: 94 additions & 7 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,83 @@ var statusListener = func(_ BinlogStatus) error {
return nil
}

// EnableSkipBinlogFlag enables the skipBinlog flag.
// NOTE: it is used *ONLY* for test.
func EnableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 1)
logutil.Logger(context.Background()).Warn("[binloginfo] enable the skipBinlog flag")
}

// DisableSkipBinlogFlag disable the skipBinlog flag.
func DisableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 0)
logutil.Logger(context.Background()).Warn("[binloginfo] disable the skipBinlog flag")
}

// IsBinlogSkipped gets the skipBinlog flag.
func IsBinlogSkipped() bool {
return atomic.LoadUint32(&skipBinlog) > 0
}

// BinlogRecoverStatus is used for display the binlog recovered status after some operations.
type BinlogRecoverStatus struct {
Skipped bool
SkippedCommitterCounter int32
}

// GetBinlogStatus returns the binlog recovered status.
func GetBinlogStatus() *BinlogRecoverStatus {
return &BinlogRecoverStatus{
Skipped: IsBinlogSkipped(),
SkippedCommitterCounter: SkippedCommitterCount(),
}
}

var skippedCommitterCounter int32

// WaitBinlogRecover returns when all committing transaction finished.
func WaitBinlogRecover(timeout time.Duration) error {
logutil.Logger(context.Background()).Warn("[binloginfo] start waiting for binlog recovering")
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&skippedCommitterCounter) == 0 {
logutil.Logger(context.Background()).Warn("[binloginfo] binlog recovered")
return nil
}
if time.Since(start) > timeout {
logutil.Logger(context.Background()).Warn("[binloginfo] waiting for binlog recovering timed out",
zap.Duration("duration", timeout))
return errors.New("timeout")
}
}
}
}

// SkippedCommitterCount returns the number of alive committers whick skipped the binlog writing.
func SkippedCommitterCount() int32 {
return atomic.LoadInt32(&skippedCommitterCounter)
}

// ResetSkippedCommitterCounter is used to reset the skippedCommitterCounter.
func ResetSkippedCommitterCounter() {
atomic.StoreInt32(&skippedCommitterCounter, 0)
logutil.Logger(context.Background()).Warn("[binloginfo] skippedCommitterCounter is reset to 0")
}

// AddOneSkippedCommitter adds one committer to skippedCommitterCounter.
func AddOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, 1)
}

// RemoveOneSkippedCommitter removes one committer from skippedCommitterCounter.
func RemoveOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, -1)
}

// SetIgnoreError sets the ignoreError flag, this function called when TiDB start
// up and find config.Binlog.IgnoreError is true.
func SetIgnoreError(on bool) {
Expand Down Expand Up @@ -147,16 +218,32 @@ func RegisterStatusListener(listener func(BinlogStatus) error) {
statusListener = listener
}

// WriteResult is used for the returned chan of WriteBinlog.
type WriteResult struct {
skipped bool
err error
}

// Skipped if true stands for the binlog writing is skipped.
func (wr *WriteResult) Skipped() bool {
return wr.skipped
}

// GetError gets the error of WriteBinlog.
func (wr *WriteResult) GetError() error {
return wr.err
}

// WriteBinlog writes a binlog to Pump.
func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
func (info *BinlogInfo) WriteBinlog(clusterID uint64) *WriteResult {
skip := atomic.LoadUint32(&skipBinlog)
if skip > 0 {
metrics.CriticalErrorCounter.Add(1)
return nil
return &WriteResult{true, nil}
}

if info.Client == nil {
return errors.New("pumps client is nil")
return &WriteResult{false, errors.New("pumps client is nil")}
}

// it will retry in PumpsClient if write binlog fail.
Expand All @@ -177,18 +264,18 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
logutil.Logger(context.Background()).Warn("update binlog status failed", zap.Error(err))
}
}
return nil
return &WriteResult{true, nil}
}

if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
return &WriteResult{false, errors.Errorf("binlog data is too large (%s)", err.Error())}
}

return terror.ErrCritical.GenWithStackByArgs(err)
return &WriteResult{false, terror.ErrCritical.GenWithStackByArgs(err)}
}

return nil
return &WriteResult{false, nil}
}

// SetDDLBinlog sets DDL binlog in the kv.Transaction.
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ func (s *testBinlogSuite) TestMaxRecvSize(c *C) {
},
Client: s.client,
}
err := info.WriteBinlog(1)
binlogWR := info.WriteBinlog(1)
err := binlogWR.GetError()
c.Assert(err, NotNil)
c.Assert(terror.ErrCritical.Equal(err), IsFalse, Commentf("%v", err))
}
Expand Down
51 changes: 30 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,19 +1076,9 @@ func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte
return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys)
}

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
var binlogSkipped bool
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
Expand All @@ -1112,12 +1102,22 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.cleanWg.Done()
}()
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
binloginfo.RemoveOneSkippedCommitter()
} else {
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
}
}()

binlogChan := c.prewriteBinlog()
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
err = c.prewriteKeys(prewriteBo, c.keys)
commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.totalSleep > 0 {
Expand All @@ -1127,9 +1127,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
commitDetail.Mu.Unlock()
}
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
return errors.Trace(binlogErr)
binlogWriteResult := <-binlogChan
if binlogWriteResult != nil {
binlogSkipped = binlogWriteResult.Skipped()
binlogErr := binlogWriteResult.GetError()
if binlogErr != nil {
return binlogErr
}
}
}
if err != nil {
Expand Down Expand Up @@ -1219,20 +1223,24 @@ func (c *twoPhaseCommitter) checkSchemaValid() error {
return nil
}

func (c *twoPhaseCommitter) prewriteBinlog() chan error {
func (c *twoPhaseCommitter) prewriteBinlog() chan *binloginfo.WriteResult {
if !c.shouldWriteBinlog() {
return nil
}
ch := make(chan error, 1)
ch := make(chan *binloginfo.WriteResult, 1)
go func() {
binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
bin := binInfo.Data
bin.StartTs = int64(c.startTS)
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = c.keys[0]
}
err := binInfo.WriteBinlog(c.store.clusterID)
ch <- errors.Trace(err)
wr := binInfo.WriteBinlog(c.store.clusterID)
if wr.Skipped() {
binInfo.Data.PrewriteValue = nil
binloginfo.AddOneSkippedCommitter()
}
ch <- wr
}()
return ch
}
Expand All @@ -1246,7 +1254,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
binInfo.Data.CommitTs = commitTS
binInfo.Data.PrewriteValue = nil
go func() {
err := binInfo.WriteBinlog(c.store.clusterID)
binlogWriteResult := binInfo.WriteBinlog(c.store.clusterID)
err := binlogWriteResult.GetError()
if err != nil {
logutil.Logger(context.Background()).Error("failed to write binlog",
zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// latches disabled
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -315,7 +315,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if lock.IsStale() {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS)
}
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
Expand Down