Skip to content

Commit

Permalink
*: wait for binlog recovering when using HTTP API (#13740) (#13892)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and sre-bot committed Dec 6, 2019
1 parent 050a356 commit 0201879
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 34 deletions.
30 changes: 27 additions & 3 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ const (
)

// For query string
const qTableID = "table_id"
const qLimit = "limit"
const (
qTableID = "table_id"
qLimit = "limit"
qOperation = "op"
qSeconds = "seconds"
)

const (
headerContentType = "Content-Type"
Expand Down Expand Up @@ -666,7 +670,27 @@ func (h configReloadHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

// ServeHTTP recovers binlog service.
func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
binloginfo.DisableSkipBinlogFlag()
op := req.FormValue(qOperation)
switch op {
case "reset":
binloginfo.ResetSkippedCommitterCounter()
case "nowait":
binloginfo.DisableSkipBinlogFlag()
case "status":
default:
sec, err := strconv.ParseInt(req.FormValue(qSeconds), 10, 64)
if sec <= 0 || err != nil {
sec = 1800
}
binloginfo.DisableSkipBinlogFlag()
timeout := time.Duration(sec) * time.Second
err = binloginfo.WaitBinlogRecover(timeout)
if err != nil {
writeError(w, err)
return
}
}
writeData(w, binloginfo.GetBinlogStatus())
}

// ServeHTTP handles request of list a database or table's schemas.
Expand Down
68 changes: 68 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -237,6 +238,73 @@ func (ts *HTTPHandlerTestSuite) TestGetRegionByIDWithError(c *C) {
defer resp.Body.Close()
}

func (ts *HTTPHandlerTestSuite) TestBinlogRecover(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Invalid operation will use the default operation.
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
binloginfo.AddOneSkippedCommitter()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)
binloginfo.RemoveOneSkippedCommitter()

binloginfo.AddOneSkippedCommitter()
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(1))
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(0))

binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Only the first should work.
binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait&op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=status"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
}

func (ts *HTTPHandlerTestSuite) TestRegionsFromMeta(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
Expand Down
101 changes: 94 additions & 7 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,83 @@ var statusListener = func(_ BinlogStatus) error {
return nil
}

// EnableSkipBinlogFlag enables the skipBinlog flag.
// NOTE: it is used *ONLY* for test.
func EnableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 1)
logutil.Logger(context.Background()).Warn("[binloginfo] enable the skipBinlog flag")
}

// DisableSkipBinlogFlag disable the skipBinlog flag.
func DisableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 0)
logutil.Logger(context.Background()).Warn("[binloginfo] disable the skipBinlog flag")
}

// IsBinlogSkipped gets the skipBinlog flag.
func IsBinlogSkipped() bool {
return atomic.LoadUint32(&skipBinlog) > 0
}

// BinlogRecoverStatus is used for display the binlog recovered status after some operations.
type BinlogRecoverStatus struct {
Skipped bool
SkippedCommitterCounter int32
}

// GetBinlogStatus returns the binlog recovered status.
func GetBinlogStatus() *BinlogRecoverStatus {
return &BinlogRecoverStatus{
Skipped: IsBinlogSkipped(),
SkippedCommitterCounter: SkippedCommitterCount(),
}
}

var skippedCommitterCounter int32

// WaitBinlogRecover returns when all committing transaction finished.
func WaitBinlogRecover(timeout time.Duration) error {
logutil.Logger(context.Background()).Warn("[binloginfo] start waiting for binlog recovering")
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&skippedCommitterCounter) == 0 {
logutil.Logger(context.Background()).Warn("[binloginfo] binlog recovered")
return nil
}
if time.Since(start) > timeout {
logutil.Logger(context.Background()).Warn("[binloginfo] waiting for binlog recovering timed out",
zap.Duration("duration", timeout))
return errors.New("timeout")
}
}
}
}

// SkippedCommitterCount returns the number of alive committers whick skipped the binlog writing.
func SkippedCommitterCount() int32 {
return atomic.LoadInt32(&skippedCommitterCounter)
}

// ResetSkippedCommitterCounter is used to reset the skippedCommitterCounter.
func ResetSkippedCommitterCounter() {
atomic.StoreInt32(&skippedCommitterCounter, 0)
logutil.Logger(context.Background()).Warn("[binloginfo] skippedCommitterCounter is reset to 0")
}

// AddOneSkippedCommitter adds one committer to skippedCommitterCounter.
func AddOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, 1)
}

// RemoveOneSkippedCommitter removes one committer from skippedCommitterCounter.
func RemoveOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, -1)
}

// SetIgnoreError sets the ignoreError flag, this function called when TiDB start
// up and find config.Binlog.IgnoreError is true.
func SetIgnoreError(on bool) {
Expand Down Expand Up @@ -147,16 +218,32 @@ func RegisterStatusListener(listener func(BinlogStatus) error) {
statusListener = listener
}

// WriteResult is used for the returned chan of WriteBinlog.
type WriteResult struct {
skipped bool
err error
}

// Skipped if true stands for the binlog writing is skipped.
func (wr *WriteResult) Skipped() bool {
return wr.skipped
}

// GetError gets the error of WriteBinlog.
func (wr *WriteResult) GetError() error {
return wr.err
}

// WriteBinlog writes a binlog to Pump.
func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
func (info *BinlogInfo) WriteBinlog(clusterID uint64) *WriteResult {
skip := atomic.LoadUint32(&skipBinlog)
if skip > 0 {
metrics.CriticalErrorCounter.Add(1)
return nil
return &WriteResult{true, nil}
}

if info.Client == nil {
return errors.New("pumps client is nil")
return &WriteResult{false, errors.New("pumps client is nil")}
}

// it will retry in PumpsClient if write binlog fail.
Expand All @@ -177,18 +264,18 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
logutil.Logger(context.Background()).Warn("update binlog status failed", zap.Error(err))
}
}
return nil
return &WriteResult{true, nil}
}

if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
return &WriteResult{false, errors.Errorf("binlog data is too large (%s)", err.Error())}
}

return terror.ErrCritical.GenWithStackByArgs(err)
return &WriteResult{false, terror.ErrCritical.GenWithStackByArgs(err)}
}

return nil
return &WriteResult{false, nil}
}

// SetDDLBinlog sets DDL binlog in the kv.Transaction.
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ func (s *testBinlogSuite) TestMaxRecvSize(c *C) {
},
Client: s.client,
}
err := info.WriteBinlog(1)
binlogWR := info.WriteBinlog(1)
err := binlogWR.GetError()
c.Assert(err, NotNil)
c.Assert(terror.ErrCritical.Equal(err), IsFalse, Commentf("%v", err))
}
Expand Down
51 changes: 30 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,19 +1076,9 @@ func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte
return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys)
}

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
var binlogSkipped bool
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
Expand All @@ -1112,12 +1102,22 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.cleanWg.Done()
}()
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
binloginfo.RemoveOneSkippedCommitter()
} else {
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
}
}()

binlogChan := c.prewriteBinlog()
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
err = c.prewriteKeys(prewriteBo, c.keys)
commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.totalSleep > 0 {
Expand All @@ -1127,9 +1127,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
commitDetail.Mu.Unlock()
}
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
return errors.Trace(binlogErr)
binlogWriteResult := <-binlogChan
if binlogWriteResult != nil {
binlogSkipped = binlogWriteResult.Skipped()
binlogErr := binlogWriteResult.GetError()
if binlogErr != nil {
return binlogErr
}
}
}
if err != nil {
Expand Down Expand Up @@ -1219,20 +1223,24 @@ func (c *twoPhaseCommitter) checkSchemaValid() error {
return nil
}

func (c *twoPhaseCommitter) prewriteBinlog() chan error {
func (c *twoPhaseCommitter) prewriteBinlog() chan *binloginfo.WriteResult {
if !c.shouldWriteBinlog() {
return nil
}
ch := make(chan error, 1)
ch := make(chan *binloginfo.WriteResult, 1)
go func() {
binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
bin := binInfo.Data
bin.StartTs = int64(c.startTS)
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = c.keys[0]
}
err := binInfo.WriteBinlog(c.store.clusterID)
ch <- errors.Trace(err)
wr := binInfo.WriteBinlog(c.store.clusterID)
if wr.Skipped() {
binInfo.Data.PrewriteValue = nil
binloginfo.AddOneSkippedCommitter()
}
ch <- wr
}()
return ch
}
Expand All @@ -1246,7 +1254,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
binInfo.Data.CommitTs = commitTS
binInfo.Data.PrewriteValue = nil
go func() {
err := binInfo.WriteBinlog(c.store.clusterID)
binlogWriteResult := binInfo.WriteBinlog(c.store.clusterID)
err := binlogWriteResult.GetError()
if err != nil {
logutil.Logger(context.Background()).Error("failed to write binlog",
zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// latches disabled
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -315,7 +315,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if lock.IsStale() {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS)
}
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
Expand Down

0 comments on commit 0201879

Please sign in to comment.