Skip to content

Commit

Permalink
Revert "refact etcd"
Browse files Browse the repository at this point in the history
This reverts commit 049bed1.
  • Loading branch information
dxyinme committed Dec 30, 2023
1 parent 049bed1 commit 1e30bdd
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions driver/etcddriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ func (e *EtcdDriver) putKeyWithLease(ctx context.Context, key, val string) (clie
e.lease = etcdDefaultLease
}

resp, err := e.cli.Grant(ctx, e.lease)
subCtx, cancel := context.WithTimeout(ctx, etcdBusinessTimeout)
defer cancel()
resp, err := e.cli.Grant(subCtx, e.lease)
if err != nil {
return 0, err
}
//注册服务并绑定租约
_, err = e.cli.Put(ctx, key, val, clientv3.WithLease(resp.ID))
_, err = e.cli.Put(subCtx, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -118,14 +120,15 @@ func (e *EtcdDriver) getServices() []string {
return addrs
}

func (e *EtcdDriver) createLease(ctx context.Context, nodeID string) (*clientv3.LeaseKeepAliveResponse, error) {
func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
var err error
e.leaseID, err = e.putKeyWithLease(ctx, nodeID, nodeID)
if err != nil {
e.logger.Errorf("putKeyWithLease error: %v", err)
return nil, err
}
return e.cli.KeepAliveOnce(ctx, e.leaseID)

return e.cli.KeepAlive(ctx, e.leaseID)
}

func (e *EtcdDriver) revoke(ctx context.Context) {
Expand All @@ -137,7 +140,7 @@ func (e *EtcdDriver) revoke(ctx context.Context) {

func (e *EtcdDriver) heartBeat(ctx context.Context) {
label:
_, err := e.createLease(ctx, e.nodeID)
leaseCh, err := e.keepAlive(ctx, e.nodeID)
if err != nil {
e.logger.Errorf("keep alive error, %v", err)
return
Expand All @@ -149,16 +152,24 @@ label:
e.logger.Infof("driver stopped")
return
}
case <-time.After(time.Duration(e.lease) * (time.Second) / 2):
case _, ok := <-leaseCh:
{
// if near to nodes time,
// renew the lease
resp, err := e.cli.KeepAliveOnce(ctx, e.leaseID)
if err != nil {
e.logger.Errorf("keep alive error: %v", err)
// if lease timeout, goto top of
// this function to keepalive
if !ok {
goto label
}
e.logger.Infof("leaseID=%0x", resp.ID)
}
case <-time.After(etcdBusinessTimeout):
{
e.logger.Errorf("ectd cli keepalive timeout")
return
}
case <-time.After(time.Duration(e.lease/2) * (time.Second)):
{
// if near to nodes time,
// renew the lease
goto label
}
}
}
Expand Down

0 comments on commit 1e30bdd

Please sign in to comment.