diff --git a/.changelog/17235.txt b/.changelog/17235.txt new file mode 100644 index 000000000000..3356b715ef31 --- /dev/null +++ b/.changelog/17235.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix issue where peer streams could incorrectly deregister services in various scenarios. +``` diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 20ceac148929..45f66add78b7 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -10,6 +10,7 @@ import ( newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -338,7 +339,7 @@ func (s *Server) handleUpdateService( for _, nodeSnap := range snap.Nodes { // First register the node - skip the unchanged ones changed := true - if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok { + if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok { if storedNode.IsSame(nodeSnap.Node) { changed = false } @@ -354,7 +355,7 @@ func (s *Server) handleUpdateService( // Then register all services on that node - skip the unchanged ones for _, svcSnap := range nodeSnap.Services { changed = true - if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok { + if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok { if storedSvcInst.IsSame(svcSnap.Service) { changed = false } @@ -374,7 +375,7 @@ func (s *Server) handleUpdateService( for _, svcSnap := range nodeSnap.Services { for _, c := range svcSnap.Checks { changed := true - if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok { + if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok { if chk.IsSame(c) { changed = false } @@ -512,8 +513,10 @@ func (s *Server) handleUpdateService( // Delete any nodes that do not have any other services registered on them. for node := range unusedNodes { - nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) - _, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName) + // The wildcard is used here so that all services, regardless of namespace are returned + // by the following query. Without this, the node might accidentally be cleaned up early. + wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName) + _, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName) if err != nil { return fmt.Errorf("failed to query services on node: %w", err) } @@ -526,10 +529,10 @@ func (s *Server) handleUpdateService( err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{ Node: node, PeerName: peerName, - EnterpriseMeta: *nodeMeta, + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()), }) if err != nil { - ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node) + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node) return fmt.Errorf("failed to deregister node %q: %w", ident, err) } } @@ -632,31 +635,35 @@ type nodeCheckIdentity struct { checkID string } -func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity { +func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity { return nodeSvcInstIdentity{ - nodeID: string(nodeID), + nodeID: node, serviceID: serviceID, } } -func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity { +func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity { return nodeCheckIdentity{ serviceID: serviceID, checkID: string(checkID), - nodeID: string(nodeID), + nodeID: node, } } -func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) { - nodesMap := map[types.NodeID]*structs.Node{} +func buildStoredMap(storedInstances structs.CheckServiceNodes) ( + map[string]*structs.Node, + map[nodeSvcInstIdentity]*structs.NodeService, + map[nodeCheckIdentity]*structs.HealthCheck, +) { + nodesMap := map[string]*structs.Node{} svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{} checksMap := map[nodeCheckIdentity]*structs.HealthCheck{} for _, csn := range storedInstances { - nodesMap[csn.Node.ID] = csn.Node - svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service + nodesMap[csn.Node.Node] = csn.Node + svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service for _, chk := range csn.Checks { - checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk + checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk } } return nodesMap, svcInstMap, checksMap diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index 8a5e33e93c5e..a67da563a039 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -119,7 +119,7 @@ type StateStore interface { ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) - NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) + NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index ed5809e4554e..cc1ff1b41de4 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1591,7 +1591,11 @@ func Test_ExportedServicesCount(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - services := []string{"web", "api", "mongo"} + services := []string{ + structs.NewServiceName("web", nil).String(), + structs.NewServiceName("api", nil).String(), + structs.NewServiceName("mongo", nil).String(), + } update := cache.UpdateEvent{ CorrelationID: subExportedServiceList, Result: &pbpeerstream.ExportedServiceList{ @@ -1935,36 +1939,30 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } } -func Test_processResponse_ExportedServiceUpdates(t *testing.T) { - srv, store := newTestServer(t, func(c *Config) { - backend := c.Backend.(*testStreamBackend) - backend.leader = func() bool { - return false - } - }) - - type testCase struct { - name string - seed []*structs.RegisterRequest - input *pbpeerstream.ExportedService - expect map[string]structs.CheckServiceNodes - exportedServices []string - } - - peerName := "billing" - peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" - remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) - - // "api" service is imported from the billing-ap partition, corresponding to the billing peer. - // Locally it is stored to the default partition. - defaultMeta := *acl.DefaultEnterpriseMeta() - apiSN := structs.NewServiceName("api", &defaultMeta) +type PeeringProcessResponse_testCase struct { + name string + seed []*structs.RegisterRequest + inputServiceName structs.ServiceName + input *pbpeerstream.ExportedService + expect map[structs.ServiceName]structs.CheckServiceNodes + exportedServices []string +} +func processResponse_ExportedServiceUpdates( + t *testing.T, + srv *testServer, + store *state.Store, + localEntMeta acl.EnterpriseMeta, + peerName string, + tests []PeeringProcessResponse_testCase, +) { // create a peering in the state store + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" require.NoError(t, store.PeeringWrite(31, &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ - ID: peerID, - Name: peerName, + ID: peerID, + Name: peerName, + Partition: localEntMeta.PartitionOrDefault(), }, })) @@ -1972,7 +1970,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - run := func(t *testing.T, tc testCase) { + run := func(t *testing.T, tc PeeringProcessResponse_testCase) { // Seed the local catalog with some data to reconcile against. // and increment the tracker's imported services count var serviceNames []structs.ServiceName @@ -1986,14 +1984,14 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { in := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - ResourceID: apiSN.String(), + ResourceID: tc.inputServiceName.String(), Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, tc.input), } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, in) require.NoError(t, err) if len(tc.exportedServices) > 0 { @@ -2006,63 +2004,81 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, resp) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, resp) require.NoError(t, err) // Test the count and contents separately to ensure the count code path is hit. require.Equal(t, mst.GetImportedServicesCount(), len(tc.exportedServices)) require.ElementsMatch(t, mst.ImportedServices, tc.exportedServices) } - _, allServices, err := srv.GetStore().ServiceList(nil, &defaultMeta, peerName) + wildcardNS := acl.NewEnterpriseMetaWithPartition(localEntMeta.PartitionOrDefault(), acl.WildcardName) + _, allServices, err := srv.GetStore().ServiceList(nil, &wildcardNS, peerName) require.NoError(t, err) // This ensures that only services specified under tc.expect are stored. It includes // all exported services plus their sidecar proxies. for _, svc := range allServices { - _, ok := tc.expect[svc.Name] + _, ok := tc.expect[svc] require.True(t, ok) } for svc, expect := range tc.expect { - t.Run(svc, func(t *testing.T) { - _, got, err := srv.GetStore().CheckServiceNodes(nil, svc, &defaultMeta, peerName) + t.Run(svc.String(), func(t *testing.T) { + _, got, err := srv.GetStore().CheckServiceNodes(nil, svc.Name, &svc.EnterpriseMeta, peerName) require.NoError(t, err) requireEqualInstances(t, expect, got) }) } } - tt := []testCase{ + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func Test_processResponse_ExportedServiceUpdates(t *testing.T) { + peerName := "billing" + localEntMeta := *acl.DefaultEnterpriseMeta() + + remoteMeta := *structs.DefaultEnterpriseMetaInPartition("billing-ap") + pbRemoteMeta := pbcommon.NewEnterpriseMetaFromStructs(remoteMeta) + + apiLocalSN := structs.NewServiceName("api", &localEntMeta) + redisLocalSN := structs.NewServiceName("redis", &localEntMeta) + tests := []PeeringProcessResponse_testCase{ { name: "upsert two service instances to the same node", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2071,42 +2087,42 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2114,21 +2130,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2137,27 +2153,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2167,7 +2183,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting a service with an empty exported service event", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2176,7 +2192,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2194,41 +2210,43 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), + input: &pbpeerstream.ExportedService{}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, }, }, { name: "upsert two service instances to different nodes", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2237,60 +2255,60 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2301,7 +2319,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2309,21 +2327,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2333,7 +2351,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting one service name from a node does not delete other service names", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2342,7 +2360,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2366,7 +2384,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2384,37 +2402,38 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2432,7 +2451,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2456,7 +2475,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2480,7 +2499,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2504,7 +2523,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1-sidecar-proxy", Service: "api-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2523,68 +2542,69 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - exportedServices: []string{"redis"}, - expect: map[string]structs.CheckServiceNodes{ + exportedServices: []string{redisLocalSN.String()}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis-sidecar-proxy": { + structs.NewServiceName("redis-sidecar-proxy", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-sidecar-proxy-check", ServiceID: "redis-2-sidecar-proxy", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2594,7 +2614,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "service checks are cleaned up when not present in a response", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2603,7 +2623,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2621,19 +2641,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2642,20 +2663,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Service check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{}, @@ -2665,7 +2686,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "node checks are cleaned up when not present in a response", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2674,7 +2695,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2698,7 +2719,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2716,19 +2737,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2737,27 +2759,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Node check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2765,24 +2787,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2790,7 +2812,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2800,7 +2822,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "replacing a service instance on a node cleans up the old instance", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2809,7 +2831,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2833,7 +2855,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2851,20 +2873,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, // New service ID and checks for the api service. Service: &pbservice.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2883,19 +2906,19 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2908,24 +2931,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "new-api-v2-check", ServiceID: "new-api-v2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2938,7 +2961,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2947,12 +2970,13 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - run(t, tc) - }) - } + srv, store := newTestServer(t, func(c *Config) { + backend := c.Backend.(*testStreamBackend) + backend.leader = func() bool { + return false + } + }) + processResponse_ExportedServiceUpdates(t, srv, store, localEntMeta, peerName, tests) } // TestLogTraceProto tests that all PB trace log helpers redact the diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index f36ca055d102..61f8831ec78b 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -369,9 +369,8 @@ func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName) defer s.mu.Unlock() s.ImportedServices = make([]string, len(serviceNames)) - for i, sn := range serviceNames { - s.ImportedServices[i] = sn.Name + s.ImportedServices[i] = sn.String() } } @@ -389,7 +388,7 @@ func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName) s.ExportedServices = make([]string, len(serviceNames)) for i, sn := range serviceNames { - s.ExportedServices[i] = sn.Name + s.ExportedServices[i] = sn.String() } }