From 4fe292edd0b6d2881218b464e73fa2f620da5e1f Mon Sep 17 00:00:00 2001 From: zlong lin Date: Mon, 27 Jun 2022 13:46:35 +0800 Subject: [PATCH 1/4] 1 --- cluster/cluster.go | 7 +++++-- cluster/handler.go | 23 ++++++++++++++++++++--- internal/message/message.go | 7 +++++++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 2418e2cf..e55280f1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -52,9 +52,12 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* } resp := &clusterpb.RegisterResponse{} - for _, m := range c.members { + for k, m := range c.members { if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr { - return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr) + // 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册 + c.members = append(c.members[:k], c.members[k+1:]...) + break + //return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr) } } diff --git a/cluster/handler.go b/cluster/handler.go index 98cb2854..f74644dc 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -54,10 +54,27 @@ var ( type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool) func cache() { - data, err := json.Marshal(map[string]interface{}{ + hrdata := map[string]interface{}{ "code": 200, - "sys": map[string]float64{"heartbeat": env.Heartbeat.Seconds()}, - }) + "sys": map[string]interface{}{"heartbeat": env.Heartbeat.Seconds()}, + } + if dict, ok := message.GetDictionary(); ok { + hrdata = map[string]interface{}{ + "code": 200, + "sys": map[string]interface{}{ + "heartbeat": env.Heartbeat.Seconds(), + "dict": dict, + }, + } + } + + // data, err := json.Marshal(map[string]interface{}{ + // "code": 200, + // "sys": map[string]float64{ + // "heartbeat": env.Heartbeat.Seconds(), + // }, + // }) + data, err := json.Marshal(hrdata) if err != nil { panic(err) } diff --git a/internal/message/message.go b/internal/message/message.go index 46632d7d..00942524 100644 --- a/internal/message/message.go +++ b/internal/message/message.go @@ -244,3 +244,10 @@ func SetDictionary(dict map[string]uint16) { codes[code] = r } } + +func GetDictionary() (map[string]uint16, bool) { + if len(routes) <= 0 { + return nil, false + } + return routes, true +} From 1675ee0b9233426fe97be5426a30c434897b758b Mon Sep 17 00:00:00 2001 From: zlong lin Date: Wed, 27 Jul 2022 17:42:40 +0800 Subject: [PATCH 2/4] add servertime on handshake --- cluster/handler.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cluster/handler.go b/cluster/handler.go index f74644dc..121c0b27 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -56,18 +56,21 @@ type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool func cache() { hrdata := map[string]interface{}{ "code": 200, - "sys": map[string]interface{}{"heartbeat": env.Heartbeat.Seconds()}, + "sys": map[string]interface{}{ + "heartbeat": env.Heartbeat.Seconds(), + "servertime": time.Now().UTC().Unix(), + }, } if dict, ok := message.GetDictionary(); ok { hrdata = map[string]interface{}{ "code": 200, "sys": map[string]interface{}{ - "heartbeat": env.Heartbeat.Seconds(), - "dict": dict, + "heartbeat": env.Heartbeat.Seconds(), + "servertime": time.Now().UTC().Unix(), + "dict": dict, }, } } - // data, err := json.Marshal(map[string]interface{}{ // "code": 200, // "sys": map[string]float64{ From a82c6aba9e8a7a0890fd4a71d9a3ef8203ed2cb9 Mon Sep 17 00:00:00 2001 From: zlong lin Date: Sun, 21 Aug 2022 15:28:16 +0800 Subject: [PATCH 3/4] slice del bug fixed --- cluster/cluster.go | 12 ++++++++++-- cluster/handler.go | 8 ++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index f701e220..0f6ac146 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -55,7 +55,11 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (* for k, m := range c.members { if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr { // 节点异常崩溃,不会执行unregister,此时再次启动该节点,由于已存在注册信息,将再也无法成功注册,这里做个修改,先移除后重新注册 - c.members = append(c.members[:k], c.members[k+1:]...) + if k >= len(c.members)-1 { + c.members = c.members[:k] + } else { + c.members = append(c.members[:k], c.members[k+1:]...) + } break //return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr) } @@ -190,7 +194,11 @@ func (c *cluster) delMember(addr string) { } } if index != -1 { - c.members = append(c.members[:index], c.members[index+1:]...) + if index >= len(c.members)-1 { + c.members = c.members[:index] + } else { + c.members = append(c.members[:index], c.members[index+1:]...) + } } c.mu.Unlock() } diff --git a/cluster/handler.go b/cluster/handler.go index 027e53c8..04f4de8e 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -160,7 +160,11 @@ func (h *LocalHandler) delMember(addr string) { for name, members := range h.remoteServices { for i, maddr := range members { if addr == maddr.ServiceAddr { - members = append(members[:i], members[i+1:]...) + if i == len(members)-1 { + members = members[:i] + } else { + members = append(members[:i], members[i+1:]...) + } } } if len(members) == 0 { @@ -212,7 +216,7 @@ func (h *LocalHandler) handle(conn net.Conn) { members := h.currentNode.cluster.remoteAddrs() for _, remote := range members { - log.Println("Notify remote server success", remote) + log.Println("Notify remote server", remote) pool, err := h.currentNode.rpcClient.getConnPool(remote) if err != nil { log.Println("Cannot retrieve connection pool for address", remote, err) From fd888e9e79f0e4620ad9512e2ce5f8fdd9d88894 Mon Sep 17 00:00:00 2001 From: zlong lin Date: Mon, 22 Aug 2022 09:53:25 +0800 Subject: [PATCH 4/4] bug fixed --- cluster/cluster.go | 2 +- cluster/handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 0f6ac146..627d3268 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -133,7 +133,7 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest // Register services to current node c.currentNode.handler.delMember(req.ServiceAddr) c.mu.Lock() - if index == len(c.members)-1 { + if index >= len(c.members)-1 { c.members = c.members[:index] } else { c.members = append(c.members[:index], c.members[index+1:]...) diff --git a/cluster/handler.go b/cluster/handler.go index 04f4de8e..6ef01ad3 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -160,7 +160,7 @@ func (h *LocalHandler) delMember(addr string) { for name, members := range h.remoteServices { for i, maddr := range members { if addr == maddr.ServiceAddr { - if i == len(members)-1 { + if i >= len(members)-1 { members = members[:i] } else { members = append(members[:i], members[i+1:]...)