diff --git a/server/http_handler.go b/server/http_handler.go index bc362104a5fb6..b69eef27ef4eb 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -76,8 +76,12 @@ const ( ) // For query string -const qTableID = "table_id" -const qLimit = "limit" +const ( + qTableID = "table_id" + qLimit = "limit" + qOperation = "op" + qSeconds = "seconds" +) const ( headerContentType = "Content-Type" @@ -666,7 +670,27 @@ func (h configReloadHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) // ServeHTTP recovers binlog service. func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) { - binloginfo.DisableSkipBinlogFlag() + op := req.FormValue(qOperation) + switch op { + case "reset": + binloginfo.ResetSkippedCommitterCounter() + case "nowait": + binloginfo.DisableSkipBinlogFlag() + case "status": + default: + sec, err := strconv.ParseInt(req.FormValue(qSeconds), 10, 64) + if sec <= 0 || err != nil { + sec = 1800 + } + binloginfo.DisableSkipBinlogFlag() + timeout := time.Duration(sec) * time.Second + err = binloginfo.WaitBinlogRecover(timeout) + if err != nil { + writeError(w, err) + return + } + } + writeData(w, binloginfo.GetBinlogStatus()) } // ServeHTTP handles request of list a database or table's schemas. diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 060744990baa5..2420e354e1ba2 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" @@ -237,6 +238,73 @@ func (ts *HTTPHandlerTestSuite) TestGetRegionByIDWithError(c *C) { defer resp.Body.Close() } +func (ts *HTTPHandlerTestSuite) TestBinlogRecover(c *C) { + ts.startServer(c) + defer ts.stopServer(c) + binloginfo.EnableSkipBinlogFlag() + c.Assert(binloginfo.IsBinlogSkipped(), Equals, true) + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + + // Invalid operation will use the default operation. + binloginfo.EnableSkipBinlogFlag() + c.Assert(binloginfo.IsBinlogSkipped(), Equals, true) + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + + binloginfo.EnableSkipBinlogFlag() + c.Assert(binloginfo.IsBinlogSkipped(), Equals, true) + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + + binloginfo.EnableSkipBinlogFlag() + c.Assert(binloginfo.IsBinlogSkipped(), Equals, true) + binloginfo.AddOneSkippedCommitter() + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusBadRequest) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + binloginfo.RemoveOneSkippedCommitter() + + binloginfo.AddOneSkippedCommitter() + c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(1)) + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=reset")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(0)) + + binloginfo.EnableSkipBinlogFlag() + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + + // Only the first should work. + binloginfo.EnableSkipBinlogFlag() + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait&op=reset")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(binloginfo.IsBinlogSkipped(), Equals, false) + + resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=status")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) +} + func (ts *HTTPHandlerTestSuite) TestRegionsFromMeta(c *C) { ts.startServer(c) defer ts.stopServer(c) diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 6df2f20382973..638da8dd42b73 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -113,12 +113,83 @@ var statusListener = func(_ BinlogStatus) error { return nil } +// EnableSkipBinlogFlag enables the skipBinlog flag. +// NOTE: it is used *ONLY* for test. +func EnableSkipBinlogFlag() { + atomic.StoreUint32(&skipBinlog, 1) + logutil.Logger(context.Background()).Warn("[binloginfo] enable the skipBinlog flag") +} + // DisableSkipBinlogFlag disable the skipBinlog flag. func DisableSkipBinlogFlag() { atomic.StoreUint32(&skipBinlog, 0) logutil.Logger(context.Background()).Warn("[binloginfo] disable the skipBinlog flag") } +// IsBinlogSkipped gets the skipBinlog flag. +func IsBinlogSkipped() bool { + return atomic.LoadUint32(&skipBinlog) > 0 +} + +// BinlogRecoverStatus is used for display the binlog recovered status after some operations. +type BinlogRecoverStatus struct { + Skipped bool + SkippedCommitterCounter int32 +} + +// GetBinlogStatus returns the binlog recovered status. +func GetBinlogStatus() *BinlogRecoverStatus { + return &BinlogRecoverStatus{ + Skipped: IsBinlogSkipped(), + SkippedCommitterCounter: SkippedCommitterCount(), + } +} + +var skippedCommitterCounter int32 + +// WaitBinlogRecover returns when all committing transaction finished. +func WaitBinlogRecover(timeout time.Duration) error { + logutil.Logger(context.Background()).Warn("[binloginfo] start waiting for binlog recovering") + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + start := time.Now() + for { + select { + case <-ticker.C: + if atomic.LoadInt32(&skippedCommitterCounter) == 0 { + logutil.Logger(context.Background()).Warn("[binloginfo] binlog recovered") + return nil + } + if time.Since(start) > timeout { + logutil.Logger(context.Background()).Warn("[binloginfo] waiting for binlog recovering timed out", + zap.Duration("duration", timeout)) + return errors.New("timeout") + } + } + } +} + +// SkippedCommitterCount returns the number of alive committers whick skipped the binlog writing. +func SkippedCommitterCount() int32 { + return atomic.LoadInt32(&skippedCommitterCounter) +} + +// ResetSkippedCommitterCounter is used to reset the skippedCommitterCounter. +func ResetSkippedCommitterCounter() { + atomic.StoreInt32(&skippedCommitterCounter, 0) + logutil.Logger(context.Background()).Warn("[binloginfo] skippedCommitterCounter is reset to 0") +} + +// AddOneSkippedCommitter adds one committer to skippedCommitterCounter. +func AddOneSkippedCommitter() { + atomic.AddInt32(&skippedCommitterCounter, 1) +} + +// RemoveOneSkippedCommitter removes one committer from skippedCommitterCounter. +func RemoveOneSkippedCommitter() { + atomic.AddInt32(&skippedCommitterCounter, -1) +} + // SetIgnoreError sets the ignoreError flag, this function called when TiDB start // up and find config.Binlog.IgnoreError is true. func SetIgnoreError(on bool) { @@ -147,16 +218,32 @@ func RegisterStatusListener(listener func(BinlogStatus) error) { statusListener = listener } +// WriteResult is used for the returned chan of WriteBinlog. +type WriteResult struct { + skipped bool + err error +} + +// Skipped if true stands for the binlog writing is skipped. +func (wr *WriteResult) Skipped() bool { + return wr.skipped +} + +// GetError gets the error of WriteBinlog. +func (wr *WriteResult) GetError() error { + return wr.err +} + // WriteBinlog writes a binlog to Pump. -func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { +func (info *BinlogInfo) WriteBinlog(clusterID uint64) *WriteResult { skip := atomic.LoadUint32(&skipBinlog) if skip > 0 { metrics.CriticalErrorCounter.Add(1) - return nil + return &WriteResult{true, nil} } if info.Client == nil { - return errors.New("pumps client is nil") + return &WriteResult{false, errors.New("pumps client is nil")} } // it will retry in PumpsClient if write binlog fail. @@ -177,18 +264,18 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error { logutil.Logger(context.Background()).Warn("update binlog status failed", zap.Error(err)) } } - return nil + return &WriteResult{true, nil} } if strings.Contains(err.Error(), "received message larger than max") { // This kind of error is not critical, return directly. - return errors.Errorf("binlog data is too large (%s)", err.Error()) + return &WriteResult{false, errors.Errorf("binlog data is too large (%s)", err.Error())} } - return terror.ErrCritical.GenWithStackByArgs(err) + return &WriteResult{false, terror.ErrCritical.GenWithStackByArgs(err)} } - return nil + return &WriteResult{false, nil} } // SetDDLBinlog sets DDL binlog in the kv.Transaction. diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index a10b064811d9a..dead446276f9a 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -270,7 +270,8 @@ func (s *testBinlogSuite) TestMaxRecvSize(c *C) { }, Client: s.client, } - err := info.WriteBinlog(1) + binlogWR := info.WriteBinlog(1) + err := binlogWR.GetError() c.Assert(err, NotNil) c.Assert(terror.ErrCritical.Equal(err), IsFalse, Commentf("%v", err)) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 72a1e345ef6ab..aafca4ae34397 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1076,19 +1076,9 @@ func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys) } -func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { - err := c.execute(ctx) - if err != nil { - c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) - } else { - c.txn.commitTS = c.commitTS - c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) - } - return errors.Trace(err) -} - // execute executes the two-phase commit protocol. -func (c *twoPhaseCommitter) execute(ctx context.Context) error { +func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { + var binlogSkipped bool defer func() { // Always clean up all written keys if the txn does not commit. c.mu.RLock() @@ -1112,12 +1102,22 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { c.cleanWg.Done() }() } + c.txn.commitTS = c.commitTS + if binlogSkipped { + binloginfo.RemoveOneSkippedCommitter() + } else { + if err != nil { + c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) + } else { + c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) + } + } }() binlogChan := c.prewriteBinlog() prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars) start := time.Now() - err := c.prewriteKeys(prewriteBo, c.keys) + err = c.prewriteKeys(prewriteBo, c.keys) commitDetail := c.getDetail() commitDetail.PrewriteTime = time.Since(start) if prewriteBo.totalSleep > 0 { @@ -1127,9 +1127,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { commitDetail.Mu.Unlock() } if binlogChan != nil { - binlogErr := <-binlogChan - if binlogErr != nil { - return errors.Trace(binlogErr) + binlogWriteResult := <-binlogChan + if binlogWriteResult != nil { + binlogSkipped = binlogWriteResult.Skipped() + binlogErr := binlogWriteResult.GetError() + if binlogErr != nil { + return binlogErr + } } } if err != nil { @@ -1219,11 +1223,11 @@ func (c *twoPhaseCommitter) checkSchemaValid() error { return nil } -func (c *twoPhaseCommitter) prewriteBinlog() chan error { +func (c *twoPhaseCommitter) prewriteBinlog() chan *binloginfo.WriteResult { if !c.shouldWriteBinlog() { return nil } - ch := make(chan error, 1) + ch := make(chan *binloginfo.WriteResult, 1) go func() { binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) bin := binInfo.Data @@ -1231,8 +1235,12 @@ func (c *twoPhaseCommitter) prewriteBinlog() chan error { if bin.Tp == binlog.BinlogType_Prewrite { bin.PrewriteKey = c.keys[0] } - err := binInfo.WriteBinlog(c.store.clusterID) - ch <- errors.Trace(err) + wr := binInfo.WriteBinlog(c.store.clusterID) + if wr.Skipped() { + binInfo.Data.PrewriteValue = nil + binloginfo.AddOneSkippedCommitter() + } + ch <- wr }() return ch } @@ -1246,7 +1254,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int binInfo.Data.CommitTs = commitTS binInfo.Data.PrewriteValue = nil go func() { - err := binInfo.WriteBinlog(c.store.clusterID) + binlogWriteResult := binInfo.WriteBinlog(c.store.clusterID) + err := binlogWriteResult.GetError() if err != nil { logutil.Logger(context.Background()).Error("failed to write binlog", zap.Error(err)) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 357b81b191969..13eb05f265a6b 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -297,7 +297,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // latches disabled // pessimistic transaction should also bypass latch. if txn.store.txnLatches == nil || txn.IsPessimistic() { - err = committer.executeAndWriteFinishBinlog(ctx) + err = committer.execute(ctx) logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err)) return errors.Trace(err) } @@ -315,7 +315,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if lock.IsStale() { return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS) } - err = committer.executeAndWriteFinishBinlog(ctx) + err = committer.execute(ctx) if err == nil { lock.SetCommitTS(committer.commitTS) }