Skip to content

Commit

Permalink
refactor etcddriver
Browse files Browse the repository at this point in the history
  • Loading branch information
dxyinme committed Dec 30, 2023
1 parent e9568ed commit a576ac3
Showing 1 changed file with 20 additions and 27 deletions.
47 changes: 20 additions & 27 deletions driver/etcddriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type EtcdDriver struct {
nodeID string
serviceName string

cli *clientv3.Client
lease int64
nodes *sync.Map
leaseID clientv3.LeaseID
logger dlog.Logger
cli *clientv3.Client
nodes *sync.Map
logger dlog.Logger

leaseTimeout int64
leaseID clientv3.LeaseID
lease clientv3.Lease

ctx context.Context
cancel context.CancelFunc
Expand All @@ -47,13 +49,12 @@ func newEtcdDriver(cli *clientv3.Client) *EtcdDriver {
// 设置key value,绑定租约
func (e *EtcdDriver) putKeyWithLease(ctx context.Context, key, val string) (clientv3.LeaseID, error) {
//设置租约时间,最少5s
if e.lease < etcdDefaultLease {
e.lease = etcdDefaultLease
if e.leaseTimeout < etcdDefaultLease {
e.leaseTimeout = etcdDefaultLease
}

subCtx, cancel := context.WithTimeout(ctx, etcdBusinessTimeout)
defer cancel()
resp, err := e.cli.Grant(subCtx, e.lease)
resp, err := e.lease.Grant(ctx, e.leaseTimeout)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -120,27 +121,27 @@ func (e *EtcdDriver) getServices() []string {
return addrs
}

func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
func (e *EtcdDriver) createLease(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
var err error
e.lease = clientv3.NewLease(e.cli)
e.leaseID, err = e.putKeyWithLease(ctx, nodeID, nodeID)
if err != nil {
e.logger.Errorf("putKeyWithLease error: %v", err)
return nil, err
}

return e.cli.KeepAlive(ctx, e.leaseID)
return e.lease.KeepAlive(ctx, e.leaseID)
}

func (e *EtcdDriver) revoke(ctx context.Context) {
_, err := e.cli.Lease.Revoke(ctx, e.leaseID)
_, err := e.lease.Revoke(ctx, e.leaseID)
if err != nil {
e.logger.Infof("lease revoke error: %v", err)
}
}

func (e *EtcdDriver) heartBeat(ctx context.Context) {
label:
leaseCh, err := e.keepAlive(ctx, e.nodeID)
leaseCh, err := e.createLease(ctx, e.nodeID)
if err != nil {
e.logger.Errorf("keep alive error, %v", err)
return
Expand All @@ -152,24 +153,16 @@ label:
e.logger.Infof("driver stopped")
return
}
case _, ok := <-leaseCh:
case resp, ok := <-leaseCh:
{
// if lease timeout, goto top of
// this function to keepalive
if !ok {
e.logger.Errorf("extend lease error, release")
goto label
}
}
case <-time.After(etcdBusinessTimeout):
{
e.logger.Errorf("ectd cli keepalive timeout")
return
}
case <-time.After(time.Duration(e.lease/2) * (time.Second)):
{
// if near to nodes time,
// renew the lease
goto label

e.logger.Infof("leaseID=%0x", resp.ID)
}
}
}
Expand Down Expand Up @@ -209,7 +202,7 @@ func (e *EtcdDriver) withOption(opt Option) (err error) {
switch opt.Type() {
case OptionTypeTimeout:
{
e.lease = int64(opt.(TimeoutOption).timeout.Seconds())
e.leaseTimeout = int64(opt.(TimeoutOption).timeout.Seconds())
}
case OptionTypeLogger:
{
Expand Down

0 comments on commit a576ac3

Please sign in to comment.