Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: use leader lease to determine tso service validity #1676

Merged
merged 9 commits into from
Aug 19, 2019
5 changes: 3 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}
start := time.Now()
if err = s.validateRequest(request.GetHeader()); err != nil {
return err
// TSO uses leader lease to determine validity. No need to check leader here.
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tso.GetRespTS(count)
Expand Down
54 changes: 23 additions & 31 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/server/kv"
"github.com/pingcap/pd/server/tso"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
Expand All @@ -36,6 +35,8 @@ import (
// The timeout to wait transfer etcd leader to complete.
const moveLeaderTimeout = 5 * time.Second

const leaderTickInterval = 50 * time.Millisecond
Luffbee marked this conversation as resolved.
Show resolved Hide resolved

// IsLeader returns whether the server is leader or not.
func (s *Server) IsLeader() bool {
// If server is not started. Both leaderID and ID could be 0.
Expand Down Expand Up @@ -216,21 +217,13 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) {
func (s *Server) campaignLeader() error {
log.Info("start to campaign leader", zap.String("campaign-leader-name", s.Name()))

lessor := clientv3.NewLease(s.client)
lease := NewLeaderLease(s.client)
defer func() {
lessor.Close()
lease.Close()
log.Info("exit campaign leader")
}()

start := time.Now()
ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout)
leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease)
cancel()

if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lessor grants too slow", zap.Duration("cost", cost))
}

err := lease.Grant(s.cfg.LeaderLease)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -239,7 +232,7 @@ func (s *Server) campaignLeader() error {
// The leader key must not exist, so the CreateRevision is 0.
resp, err := kv.NewSlowLogTxn(s.client).
If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(lease.ID))).
Commit()
if err != nil {
return errors.WithStack(err)
Expand All @@ -248,15 +241,21 @@ func (s *Server) campaignLeader() error {
return errors.New("failed to campaign leader, other server may campaign ok")
}

// Make the leader keepalived.
ctx, cancel = context.WithCancel(s.serverLoopCtx)
// Start keepalive and enable TSO service.
// TSO service is strictly enabled/disabled by leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go lease.KeepAlive(ctx)
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))

ch, err := lessor.KeepAlive(ctx, leaseResp.ID)
if err != nil {
return errors.WithStack(err)
log.Debug("sync timestamp for tso")
if err = s.tso.SyncTimestamp(lease); err != nil {
return err
}
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))
defer s.tso.ResetTimestamp()

err = s.reloadConfigFromKV()
if err != nil {
Expand All @@ -269,29 +268,22 @@ func (s *Server) campaignLeader() error {
}
defer s.stopRaftCluster()

log.Debug("sync timestamp for tso")
if err = s.tso.SyncTimestamp(); err != nil {
return err
}
defer s.tso.ResetTimestamp()

s.enableLeader()
defer s.disableLeader()

CheckPDVersion(s.scheduleOpt)
log.Info("PD cluster leader is ready to serve", zap.String("leader-name", s.Name()))

tsTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsTicker.Stop()
leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case _, ok := <-ch:
if !ok {
log.Info("keep alive channel is closed")
case <-leaderTicker.C:
if lease.IsExpired() {
log.Info("lease expired, leader step down")
return nil
}
case <-tsTicker.C:
if err = s.tso.UpdateTimestamp(); err != nil {
log.Info("failed to update timestamp")
return err
Expand Down
132 changes: 132 additions & 0 deletions server/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// LeaderLease is used for renewing leadership of PD server.
type LeaderLease struct {
client *clientv3.Client
lease clientv3.Lease
ID clientv3.LeaseID
leaseTimeout time.Duration

expireTime atomic.Value
}

// NewLeaderLease creates a lease.
func NewLeaderLease(client *clientv3.Client) *LeaderLease {
return &LeaderLease{
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *LeaderLease) Grant(leaseTimeout int64) error {
start := time.Now()
ctx, cancel := context.WithTimeout(l.client.Ctx(), requestTimeout)
leaseResp, err := l.lease.Grant(ctx, leaseTimeout)
cancel()
if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lease grants too slow", zap.Duration("cost", cost))
}
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about returning this error before line 51?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

return errors.WithStack(err)
}
l.ID = leaseResp.ID
l.leaseTimeout = time.Duration(leaseTimeout) * time.Second
l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second))
return nil
}

// Close releases the lease.
func (l *LeaderLease) Close() error {
return l.lease.Close()
Copy link
Contributor

@nolouch nolouch Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we need Revoke the lease before close? actually Close do not try to release the lease.

}

// IsExpired checks if the lease is expired. If it returns true, current PD
// server should step down and try to re-elect again.
func (l *LeaderLease) IsExpired() bool {
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *LeaderLease) KeepAlive(ctx context.Context) {
shafreeck marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use 3 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary value borrowed from etcd's keep alive code.


var maxExpire time.Time
for {
select {
case t := <-timeCh:
if t.After(maxExpire) {
maxExpire = t
l.expireTime.Store(t)
}
case <-time.After(l.leaseTimeout):
return
case <-ctx.Done():
return
}
}
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *LeaderLease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
go func() {
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, time.Duration(l.leaseTimeout))
defer cancel()
res, err := l.lease.KeepAliveOnce(ctx1, l.ID)
if err != nil {
log.Warn("leader lease keep alive failed", zap.Error(err))
return
}
if res.TTL > 0 {
expire := start.Add(time.Duration(res.TTL) * time.Second)
select {
case ch <- expire:
case <-ctx1.Done():
}
}
}()

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()

return ch
}
12 changes: 11 additions & 1 deletion server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ const (
maxLogical = int64(1 << 18)
)

// Lease is an interface to determine if tso is in lease of providing service.
type Lease interface {
IsExpired() bool
}

// TimestampOracle is used to maintain the logic of tso.
type TimestampOracle struct {
// For tso, set after pd becomes leader.
ts atomic.Value
lastSavedTime time.Time
lease Lease

rootPath string
member string
Expand Down Expand Up @@ -100,7 +106,7 @@ func (t *TimestampOracle) saveTimestamp(ts time.Time) error {
}

// SyncTimestamp is used to synchronize the timestamp.
func (t *TimestampOracle) SyncTimestamp() error {
func (t *TimestampOracle) SyncTimestamp(lease Lease) error {
tsoCounter.WithLabelValues("sync").Inc()

last, err := t.loadTimestamp()
Expand Down Expand Up @@ -131,6 +137,7 @@ func (t *TimestampOracle) SyncTimestamp() error {
current := &atomicObject{
physical: next,
}
t.lease = lease
t.ts.Store(current)

return nil
Expand Down Expand Up @@ -238,6 +245,9 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) {
time.Sleep(UpdateTimestampStep)
continue
}
if t.lease == nil || t.lease.IsExpired() {
return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
Expand Down