Skip to content

Commit

Permalink
Backport of Fix issue with peer stream node cleanup. into release/1.1…
Browse files Browse the repository at this point in the history
…5.x (#17247)

* backport of commit 61a281a

* backport of commit 28a83da

* backport of commit 4feb116

---------

Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
  • Loading branch information
hc-github-team-consul-core and hashi-derek authored May 8, 2023
1 parent 9cba115 commit 3a1ea22
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 168 deletions.
3 changes: 3 additions & 0 deletions .changelog/17235.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
peering: Fix issue where peer streams could incorrectly deregister services in various scenarios.
```
39 changes: 23 additions & 16 deletions agent/grpc-external/services/peerstream/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -338,7 +339,7 @@ func (s *Server) handleUpdateService(
for _, nodeSnap := range snap.Nodes {
// First register the node - skip the unchanged ones
changed := true
if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok {
if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok {
if storedNode.IsSame(nodeSnap.Node) {
changed = false
}
Expand All @@ -354,7 +355,7 @@ func (s *Server) handleUpdateService(
// Then register all services on that node - skip the unchanged ones
for _, svcSnap := range nodeSnap.Services {
changed = true
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok {
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok {
if storedSvcInst.IsSame(svcSnap.Service) {
changed = false
}
Expand All @@ -374,7 +375,7 @@ func (s *Server) handleUpdateService(
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
changed := true
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok {
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok {
if chk.IsSame(c) {
changed = false
}
Expand Down Expand Up @@ -512,8 +513,10 @@ func (s *Server) handleUpdateService(

// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName)
// The wildcard is used here so that all services, regardless of namespace are returned
// by the following query. Without this, the node might accidentally be cleaned up early.
wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName)
_, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
Expand All @@ -526,10 +529,10 @@ func (s *Server) handleUpdateService(
err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()),
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
Expand Down Expand Up @@ -632,31 +635,35 @@ type nodeCheckIdentity struct {
checkID string
}

func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity {
func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity {
return nodeSvcInstIdentity{
nodeID: string(nodeID),
nodeID: node,
serviceID: serviceID,
}
}

func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity {
func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity {
return nodeCheckIdentity{
serviceID: serviceID,
checkID: string(checkID),
nodeID: string(nodeID),
nodeID: node,
}
}

func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) {
nodesMap := map[types.NodeID]*structs.Node{}
func buildStoredMap(storedInstances structs.CheckServiceNodes) (
map[string]*structs.Node,
map[nodeSvcInstIdentity]*structs.NodeService,
map[nodeCheckIdentity]*structs.HealthCheck,
) {
nodesMap := map[string]*structs.Node{}
svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{}
checksMap := map[nodeCheckIdentity]*structs.HealthCheck{}

for _, csn := range storedInstances {
nodesMap[csn.Node.ID] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service
nodesMap[csn.Node.Node] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service
for _, chk := range csn.Checks {
checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk
checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk
}
}
return nodesMap, svcInstMap, checksMap
Expand Down
2 changes: 1 addition & 1 deletion agent/grpc-external/services/peerstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type StateStore interface {
ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
Expand Down
Loading

0 comments on commit 3a1ea22

Please sign in to comment.