From 150b98238c880d48eb65e6711a68229592786dff Mon Sep 17 00:00:00 2001 From: Ian Date: Sat, 13 Jul 2019 12:19:15 +0800 Subject: [PATCH] refine gc strategy of pump (#646) (#663) --- pump/config.go | 13 +++---- pump/metrics.go | 8 +++++ pump/server.go | 77 ++++++++++++++++++++++++++--------------- pump/storage/storage.go | 13 +++++-- 4 files changed, 74 insertions(+), 37 deletions(-) diff --git a/pump/config.go b/pump/config.go index aaaac6462..203cd0738 100644 --- a/pump/config.go +++ b/pump/config.go @@ -50,12 +50,13 @@ type Config struct { Socket string `toml:"socket" json:"socket"` EtcdURLs string `toml:"pd-urls" json:"pd-urls"` EtcdDialTimeout time.Duration - DataDir string `toml:"data-dir" json:"data-dir"` - HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` - GC int `toml:"gc" json:"gc"` - LogFile string `toml:"log-file" json:"log-file"` - LogRotate string `toml:"log-rotate" json:"log-rotate"` - Security security.Config `toml:"security" json:"security"` + DataDir string `toml:"data-dir" json:"data-dir"` + HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` + // pump only stores binlog events whose ts >= current time - GC(day) + GC int `toml:"gc" json:"gc"` + LogFile string `toml:"log-file" json:"log-file"` + LogRotate string `toml:"log-rotate" json:"log-rotate"` + Security security.Config `toml:"security" json:"security"` GenFakeBinlogInterval int `toml:"gen-binlog-interval" json:"gen-binlog-interval"` diff --git a/pump/metrics.go b/pump/metrics.go index 9e6bd6f41..83ce1a0f5 100644 --- a/pump/metrics.go +++ b/pump/metrics.go @@ -28,6 +28,14 @@ var ( Name: "loss_binlog_count", Help: "Total loss binlog count", }) + + detectedDrainerBinlogPurged = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "binlog", + Subsystem: "pump", + Name: "detected_drainer_binlog_purge_count", + Help: "binlog purge count > 0 means some unread binlog was purged", + }, []string{"id"}) ) var registry = prometheus.NewRegistry() diff --git a/pump/server.go b/pump/server.go index 461486cd3..2ffc11547 100644 --- a/pump/server.go +++ b/pump/server.go @@ -3,7 +3,6 @@ package pump import ( "encoding/json" "fmt" - "math" "net" "net/http" "net/url" @@ -24,7 +23,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tipb/go-binlog" + binlog "github.com/pingcap/tipb/go-binlog" pb "github.com/pingcap/tipb/go-binlog" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -35,13 +34,13 @@ import ( "google.golang.org/grpc/credentials" ) -var notifyDrainerTimeout = time.Second * 10 - -// GlobalConfig is global config of pump -var GlobalConfig *globalConfig - -const ( - pdReconnTimes = 30 +var ( + notifyDrainerTimeout = time.Second * 10 + pdReconnTimes = 30 + earlyAlertGC = 20 * time.Hour + detectDrainerCheckpointInterval = 10 * time.Minute + // GlobalConfig is global config of pump + GlobalConfig *globalConfig ) // Server implements the gRPC interface, @@ -268,6 +267,11 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi // don't use pos.Suffix now, use offset like last commitTS last := in.StartFrom.Offset + gcTS := s.storage.GetGCTS() + if last <= gcTS { + log.Errorf("drainer request a purged binlog (gc ts = %d), request %+v, some binlog events may be loss", gcTS, in) + } + ctx, cancel := context.WithCancel(s.ctx) defer cancel() binlogs := s.storage.PullCommitBinlog(ctx, last) @@ -372,6 +376,9 @@ func (s *Server) Start() error { s.wg.Add(1) go s.printServerInfo() + s.wg.Add(1) + go s.detectDrainerCheckpoint() + // register pump with gRPC server and start to serve listeners binlog.RegisterPumpServer(s.gs, s) @@ -483,6 +490,29 @@ func (s *Server) genForwardBinlog() { } } +func (s *Server) detectDrainerCheckpoint() { + defer s.wg.Done() + + ticker := time.NewTicker(detectDrainerCheckpointInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + log.Info("detect drainer checkpoint routine exit") + return + case <-ticker.C: + gcTS := s.storage.GetGCTS() + alertGCMS := earlyAlertGC.Nanoseconds() / 1000 / 1000 + alertGCTS := gcTS + int64(oracle.EncodeTSO(alertGCMS)) + + log.Infof("use gc ts %d to detect drainer checkpoint", gcTS) + // detect whether the binlog before drainer's checkpoint had been purged + s.detectDrainerCheckPoints(s.ctx, alertGCTS) + } + } +} + func (s *Server) printServerInfo() { defer s.wg.Done() @@ -519,43 +549,34 @@ func (s *Server) gcBinlogFile() { continue } - safeTSO, err := s.getSafeGCTSOForDrainers() - if err != nil { - log.Warn("get save gc tso for drainers failed: %+v", err) - continue - } - log.Infof("get safe ts for drainers success, ts: %d", safeTSO) - millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000 gcTS := int64(oracle.EncodeTSO(millisecond)) - if safeTSO < gcTS { - gcTS = safeTSO - } + log.Infof("send gc request to storage, ts: %d", gcTS) - s.storage.GCTS(gcTS) + s.storage.GC(gcTS) } } -func (s *Server) getSafeGCTSOForDrainers() (int64, error) { +func (s *Server) detectDrainerCheckPoints(ctx context.Context, gcTS int64) { pumpNode := s.node.(*pumpNode) - drainers, err := pumpNode.Nodes(s.ctx, "drainers") + drainers, err := pumpNode.Nodes(ctx, "drainers") if err != nil { - return 0, errors.Trace(err) + log.Error("fail to query status of drainers: %v", err) + return } - var minTSO int64 = math.MaxInt64 for _, drainer := range drainers { if drainer.State == node.Offline { continue } - if drainer.MaxCommitTS < minTSO { - minTSO = drainer.MaxCommitTS + if drainer.MaxCommitTS < gcTS { + log.Errorf("drainer(%s) checkpoint(max commit ts = %d) is older than pump gc ts(%d), some binlogs are purged", drainer.NodeID, drainer.MaxCommitTS, gcTS) + // will add test when binlog have failpoint + detectedDrainerBinlogPurged.WithLabelValues(drainer.NodeID).Inc() } } - - return minTSO, nil } func (s *Server) startMetrics() { diff --git a/pump/storage/storage.go b/pump/storage/storage.go index a27224194..fd23077cd 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -49,7 +49,9 @@ type Storage interface { WriteBinlog(binlog *pb.Binlog) error // delete <= ts - GCTS(ts int64) + GC(ts int64) + + GetGCTS() int64 MaxCommitTS() int64 @@ -533,8 +535,13 @@ func (a *Append) Close() error { return err } -// GCTS implement Storage.GCTS -func (a *Append) GCTS(ts int64) { +// GetGCTS implement Storage.GetGCTS +func (a *Append) GetGCTS() int64 { + return atomic.LoadInt64(&a.gcTS) +} + +// GC implement Storage.GC +func (a *Append) GC(ts int64) { lastTS := atomic.LoadInt64(&a.gcTS) if ts <= lastTS { log.Infof("ignore gc ts: %d, last gc ts: %d", ts, lastTS)