Skip to content

Commit

Permalink
fix discovery overwriting dynamic resources (#44786)
Browse files Browse the repository at this point in the history
When the resource already exists, check its origin as well as
discovery group.
If it's not of discovery origin, then don't update it.
If it's not in the same discovery group, and its discovery group is not
blank, then don't update it.
  • Loading branch information
GavinFrazar authored Jul 30, 2024
1 parent 1a687ce commit de6f7bd
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 42 deletions.
15 changes: 8 additions & 7 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ func (s *Server) getCurrentDatabases() map[string]types.Database {
func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error {
s.Log.Debugf("Creating database %s.", database.GetName())
err := s.AccessPoint.CreateDatabase(ctx, database)
// If the database already exists but has an empty discovery group, update it.
if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup(
func() (types.ResourceWithLabels, error) {
// If the database already exists but has cloud origin and an empty
// discovery group, then update it.
if err != nil {
err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) {
return s.AccessPoint.GetDatabase(ctx, database.GetName())
}) {
})
if err != nil {
return trace.Wrap(err)
}
return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil))
}
if err != nil {
return trace.Wrap(err)
}
err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{
databaseEventPrefix + database.GetName(): {
ResourceType: types.DiscoveredResourceDatabase,
Expand Down
38 changes: 33 additions & 5 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1683,14 +1683,42 @@ func splitMatchers[T types.Matcher](matchers []T, matcherTypeCheck func(string)
return
}

func (s *Server) updatesEmptyDiscoveryGroup(getter func() (types.ResourceWithLabels, error)) bool {
if s.DiscoveryGroup == "" {
return false
func (s *Server) resolveCreateErr(createErr error, discoveryOrigin string, getter func() (types.ResourceWithLabels, error)) error {
// We can only resolve the error if we have a discovery group configured
// and the error is that the resource already exists.
if s.DiscoveryGroup == "" || !trace.IsAlreadyExists(createErr) {
return trace.Wrap(createErr)
}

old, err := getter()
if err != nil {
return false
return trace.NewAggregate(createErr, err)
}

// Check that the registered resource origin matches the origin we want.
oldOrigin, err := types.GetOrigin(old)
if err != nil {
return trace.NewAggregate(createErr, err)
}
if oldOrigin != discoveryOrigin {
return trace.Wrap(createErr,
"not updating because the resource origin indicates that it is not managed by auto-discovery",
)
}

// Check that the registered resource's discovery group is blank or matches
// this server's discovery group.
// We check if the old group is empty because that's a special case where
// the old/new groups don't match but we still want to update the resource.
// In this way, discovery agents with a discovery_group essentially claim
// the resources they discover that used to be (or currently are) discovered
// by an agent that did not have a discovery_group configured.
oldDiscoveryGroup, _ := old.GetLabel(types.TeleportInternalDiscoveryGroupName)
return oldDiscoveryGroup == ""
if oldDiscoveryGroup != "" && oldDiscoveryGroup != s.DiscoveryGroup {
return trace.Wrap(createErr,
"not updating because the resource is in a different discovery group",
)
}

return nil
}
125 changes: 105 additions & 20 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2786,12 +2786,7 @@ func TestGCPVMDiscovery(t *testing.T) {
// TestServer_onCreate tests the update of the discovery_group of a resource
// when a resource already exists with the same name but an empty discovery_group.
func TestServer_onCreate(t *testing.T) {
_, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test")
_, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */)
accessPoint := &fakeAccessPoint{
kube: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */),
database: awsRedshiftDBEmptyDiscoveryGroup,
}
accessPoint := &fakeAccessPoint{}
s := &Server{
Config: &Config{
DiscoveryGroup: "test-cluster",
Expand All @@ -2801,31 +2796,106 @@ func TestServer_onCreate(t *testing.T) {
}

t.Run("onCreate update kube", func(t *testing.T) {
// With cloud origin and an empty discovery group, it should update.
accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */)
err := s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.NoError(t, err)
require.True(t, accessPoint.updateKube)
require.True(t, accessPoint.updatedKube)

// Reset the updated flag and set the registered kube cluster to have
// non-cloud origin. It should not update.
accessPoint.updatedKube = false
accessPoint.kube.SetOrigin(types.OriginDynamic)
err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.Error(t, err)
require.False(t, accessPoint.updatedKube)

// Reset the update flag.
accessPoint.updateKube = false
// Reset the updated flag and set the registered kube cluster to have
// an empty origin. It should not update.
accessPoint.updatedKube = false
accessPoint.kube.SetOrigin("")
err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.Error(t, err)
require.False(t, accessPoint.updatedKube)

// Reset the update flag and set the registered kube cluster to have
// a non-empty discovery group. It should not update.
accessPoint.updatedKube = false
accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "nonEmpty")
// Update the kube cluster with non-empty discovery group.
err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster"))
require.Error(t, err)
require.False(t, accessPoint.updateKube)
require.False(t, accessPoint.updatedKube)
})

t.Run("onCreate update database", func(t *testing.T) {
_, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test")
_, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */)

// With cloud origin and an empty discovery group, it should update.
accessPoint.database = awsRedshiftDBEmptyDiscoveryGroup
err := s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.NoError(t, err)
require.True(t, accessPoint.updateDatabase)
require.True(t, accessPoint.updatedDatabase)

// Reset the update flag.
accessPoint.updateDatabase = false
// Reset the updated flag and set the db to empty discovery group
// but non-cloud origin. It should not update.
accessPoint.updatedDatabase = false
accessPoint.database.SetOrigin(types.OriginDynamic)
err = s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.Error(t, err)
require.False(t, accessPoint.updatedDatabase)

// Reset the updated flag and set the db to empty discovery group
// but empty origin. It should not update.
accessPoint.updatedDatabase = false
accessPoint.database.SetOrigin("")
err = s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.Error(t, err)
require.False(t, accessPoint.updatedDatabase)

// Reset the updated flag and set the registered db to have a non-empty
// discovery group. It should not update.
accessPoint.updatedDatabase = false
accessPoint.database = awsRedshiftDB
// Update the db with non-empty discovery group.
err = s.onDatabaseCreate(context.Background(), awsRedshiftDB)
require.Error(t, err)
require.False(t, accessPoint.updateDatabase)
require.False(t, accessPoint.updatedDatabase)
})

t.Run("onCreate update app", func(t *testing.T) {
kubeSvc := newMockKubeService("service1", "ns1", "",
map[string]string{"test-label": "testval"}, nil,
[]corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}})

// With kube origin and empty discovery group, it should update.
accessPoint.app = mustConvertKubeServiceToApp(t, "" /*empty discovery group*/, "http", kubeSvc, kubeSvc.Spec.Ports[0])
err := s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]))
require.NoError(t, err)
require.True(t, accessPoint.updatedApp)

// Reset the updated flag and set the app to empty discovery group
// but non-cloud origin. It should not update.
accessPoint.updatedApp = false
accessPoint.app.SetOrigin(types.OriginDynamic)
err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]))
require.Error(t, err)
require.False(t, accessPoint.updatedApp)

// Reset the updated flag and set the app to empty discovery group
// but non-cloud origin. It should not update.
accessPoint.updatedApp = false
accessPoint.app.SetOrigin("")
err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]))
require.Error(t, err)
require.False(t, accessPoint.updatedApp)

// Reset the updated flag and set the app to non-empty discovery group.
// It should not update.
accessPoint.updatedApp = false
accessPoint.app = mustConvertKubeServiceToApp(t, "nonEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])
err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]))
require.Error(t, err)
require.False(t, accessPoint.updatedApp)
})
}

Expand Down Expand Up @@ -2924,10 +2994,12 @@ type fakeAccessPoint struct {
ping func(context.Context) (proto.PingResponse, error)
enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)

updateKube bool
updateDatabase bool
updatedKube bool
updatedDatabase bool
updatedApp bool
kube types.KubeCluster
database types.Database
app types.Application
upsertedServerInfos chan types.ServerInfo
reports map[string][]discoveryconfig.Status
}
Expand Down Expand Up @@ -2974,7 +3046,7 @@ func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Dat
}

func (f *fakeAccessPoint) UpdateDatabase(ctx context.Context, database types.Database) error {
f.updateDatabase = true
f.updatedDatabase = true
return nil
}

Expand All @@ -2984,7 +3056,20 @@ func (f *fakeAccessPoint) CreateKubernetesCluster(ctx context.Context, cluster t

// UpdateKubernetesCluster updates existing kubernetes cluster resource.
func (f *fakeAccessPoint) UpdateKubernetesCluster(ctx context.Context, cluster types.KubeCluster) error {
f.updateKube = true
f.updatedKube = true
return nil
}

func (f *fakeAccessPoint) GetApp(ctx context.Context, name string) (types.Application, error) {
return f.app, nil
}

func (f *fakeAccessPoint) CreateApp(ctx context.Context, _ types.Application) error {
return trace.AlreadyExists("already exists")
}

func (f *fakeAccessPoint) UpdateApp(ctx context.Context, _ types.Application) error {
f.updatedApp = true
return nil
}

Expand Down
11 changes: 7 additions & 4 deletions lib/srv/discovery/kube_services_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,14 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error {
// In this case, we need to update the resource with the
// discovery group label to ensure the user doesn't have to manually delete
// the resource.
if trace.IsAlreadyExists(err) {
return trace.Wrap(s.onAppUpdate(ctx, app, nil))
}
if err != nil {
return trace.Wrap(err)
err := s.resolveCreateErr(err, types.OriginDiscoveryKubernetes, func() (types.ResourceWithLabels, error) {
return s.AccessPoint.GetApp(ctx, app.GetName())
})
if err != nil {
return trace.Wrap(err)
}
return trace.Wrap(s.onAppUpdate(ctx, app, nil))
}
err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{
appEventPrefix + app.GetName(): {
Expand Down
12 changes: 6 additions & 6 deletions lib/srv/discovery/kube_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster
s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName())
err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster)
// If the kube already exists but has an empty discovery group, update it.
if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup(
func() (types.ResourceWithLabels, error) {
if err != nil {
err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) {
return s.AccessPoint.GetKubernetesCluster(ctx, kubeCluster.GetName())
}) {
})
if err != nil {
return trace.Wrap(err)
}
return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil))
}
if err != nil {
return trace.Wrap(err)
}
err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{
kubeEventPrefix + kubeCluster.GetName(): {
ResourceType: types.DiscoveredResourceKubernetes,
Expand Down

0 comments on commit de6f7bd

Please sign in to comment.