diff --git a/driver/etcddriver.go b/driver/etcddriver.go index cc7dd27..912db01 100644 --- a/driver/etcddriver.go +++ b/driver/etcddriver.go @@ -21,11 +21,13 @@ type EtcdDriver struct { nodeID string serviceName string - cli *clientv3.Client - lease int64 - nodes *sync.Map - leaseID clientv3.LeaseID - logger dlog.Logger + cli *clientv3.Client + nodes *sync.Map + logger dlog.Logger + + leaseTimeout int64 + leaseID clientv3.LeaseID + lease clientv3.Lease ctx context.Context cancel context.CancelFunc @@ -47,13 +49,12 @@ func newEtcdDriver(cli *clientv3.Client) *EtcdDriver { // 设置key value,绑定租约 func (e *EtcdDriver) putKeyWithLease(ctx context.Context, key, val string) (clientv3.LeaseID, error) { //设置租约时间,最少5s - if e.lease < etcdDefaultLease { - e.lease = etcdDefaultLease + if e.leaseTimeout < etcdDefaultLease { + e.leaseTimeout = etcdDefaultLease } - subCtx, cancel := context.WithTimeout(ctx, etcdBusinessTimeout) defer cancel() - resp, err := e.cli.Grant(subCtx, e.lease) + resp, err := e.lease.Grant(ctx, e.leaseTimeout) if err != nil { return 0, err } @@ -120,19 +121,19 @@ func (e *EtcdDriver) getServices() []string { return addrs } -func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { +func (e *EtcdDriver) createLease(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) { var err error + e.lease = clientv3.NewLease(e.cli) e.leaseID, err = e.putKeyWithLease(ctx, nodeID, nodeID) if err != nil { e.logger.Errorf("putKeyWithLease error: %v", err) return nil, err } - - return e.cli.KeepAlive(ctx, e.leaseID) + return e.lease.KeepAlive(ctx, e.leaseID) } func (e *EtcdDriver) revoke(ctx context.Context) { - _, err := e.cli.Lease.Revoke(ctx, e.leaseID) + _, err := e.lease.Revoke(ctx, e.leaseID) if err != nil { e.logger.Infof("lease revoke error: %v", err) } @@ -140,7 +141,7 @@ func (e *EtcdDriver) revoke(ctx context.Context) { func (e *EtcdDriver) heartBeat(ctx context.Context) { label: - leaseCh, err := e.keepAlive(ctx, e.nodeID) + leaseCh, err := e.createLease(ctx, e.nodeID) if err != nil { e.logger.Errorf("keep alive error, %v", err) return @@ -152,24 +153,16 @@ label: e.logger.Infof("driver stopped") return } - case _, ok := <-leaseCh: + case resp, ok := <-leaseCh: { // if lease timeout, goto top of // this function to keepalive if !ok { + e.logger.Errorf("extend lease error, release") goto label } - } - case <-time.After(etcdBusinessTimeout): - { - e.logger.Errorf("ectd cli keepalive timeout") - return - } - case <-time.After(time.Duration(e.lease/2) * (time.Second)): - { - // if near to nodes time, - // renew the lease - goto label + + e.logger.Infof("leaseID=%0x", resp.ID) } } } @@ -209,7 +202,7 @@ func (e *EtcdDriver) withOption(opt Option) (err error) { switch opt.Type() { case OptionTypeTimeout: { - e.lease = int64(opt.(TimeoutOption).timeout.Seconds()) + e.leaseTimeout = int64(opt.(TimeoutOption).timeout.Seconds()) } case OptionTypeLogger: {