diff --git a/drainer/metrics.go b/drainer/metrics.go index 572345e96..971f36968 100644 --- a/drainer/metrics.go +++ b/drainer/metrics.go @@ -69,6 +69,15 @@ var ( Help: "save checkpoint tso of drainer.", }) + checkpointDelayHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "checkpoint_delay_seconds", + Help: "How much the downstream checkpoint lag behind", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 22), + }) + executeHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "binlog", @@ -123,6 +132,7 @@ func init() { registry.MustRegister(ddlJobsCounter) registry.MustRegister(errorCount) registry.MustRegister(checkpointTSOGauge) + registry.MustRegister(checkpointDelayHistogram) registry.MustRegister(eventCounter) registry.MustRegister(executeHistogram) registry.MustRegister(binlogReachDurationHistogram) diff --git a/drainer/syncer.go b/drainer/syncer.go index 90fc55d12..2f6e7dfad 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -221,6 +221,8 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { lastSaveTS = ts eventCounter.WithLabelValues("savepoint").Add(1) } + delay := oracle.GetPhysical(time.Now()) - oracle.ExtractPhysical(uint64(ts)) + checkpointDelayHistogram.Observe(float64(delay) / 1e3) } }