diff --git a/CHANGELOG.yml b/CHANGELOG.yml index af23adc0ab..3d0d4ba580 100644 --- a/CHANGELOG.yml +++ b/CHANGELOG.yml @@ -78,6 +78,12 @@ items: body: >- The OSS code-base will no longer report usage data to the proprietary collector at Ambassador Labs. The actual calls to the collector remain, but will be no-ops unless a proper collector client is installed using an extension point. + - type: feature + title: Add deployments, statefulSets, replicaSets to workloads Helm chart value + body: >- + The Helm chart value workloads now supports the kinds deployments.enabled, statefulSets.enabled, and replicaSets.enabled. + By default, all three are enabled, but can be disabled by setting the corresponding value to false. + When disabled, the traffic-manager will ignore workloads of a corresponding kind, and Telepresence will not be able to intercept them. - version: 2.20.2 date: 2024-10-21 notes: diff --git a/charts/telepresence/README.md b/charts/telepresence/README.md index 94f1a9d616..14a55b7677 100644 --- a/charts/telepresence/README.md +++ b/charts/telepresence/README.md @@ -102,6 +102,9 @@ The following tables lists the configurable parameters of the Telepresence chart | client.routing.allowConflictingSubnets | Allow the specified subnets to be routed even if they conflict with other routes on the local machine. | `[]` | | client.dns.excludeSuffixes | Suffixes for which the client DNS resolver will always fail (or fallback in case of the overriding resolver) | `[".com", ".io", ".net", ".org", ".ru"]` | | client.dns.includeSuffixes | Suffixes for which the client DNS resolver will always attempt to do a lookup. Includes have higher priority than excludes. | `[]` | +| workloads.deployments.enabled | Enable/Disable the support for Deployments. | `true` | +| workloads.replicaSets.enabled | Enable/Disable the support for ReplicaSets. | `true` | +| workloads.statefulSets.enabled | Enable/Disable the support for StatefulSets. | `true` | | workloads.argoRollouts.enabled | Enable/Disable the argo-rollouts integration. | `false` | ### RBAC diff --git a/charts/telepresence/templates/_helpers.tpl b/charts/telepresence/templates/_helpers.tpl index da3ce3b5bf..97e10c6282 100644 --- a/charts/telepresence/templates/_helpers.tpl +++ b/charts/telepresence/templates/_helpers.tpl @@ -88,7 +88,7 @@ RBAC rules required to create an intercept in a namespace; excludes any rules th - apiGroups: ["apps"] resources: ["deployments", "replicasets", "statefulsets"] verbs: ["get", "watch", "list"] -{{- if .Values.workloads.argoRollouts.enabled }} +{{- if and .Values.workloads .Values.workloads.argoRollouts .Values.workloads.argoRollouts.enabled }} - apiGroups: ["argoproj.io"] resources: ["rollouts"] verbs: ["get", "watch", "list"] diff --git a/charts/telepresence/templates/deployment.yaml b/charts/telepresence/templates/deployment.yaml index 676a587efe..d340dd7919 100644 --- a/charts/telepresence/templates/deployment.yaml +++ b/charts/telepresence/templates/deployment.yaml @@ -81,9 +81,26 @@ spec: value: {{ .grpc.maxReceiveSize }} {{- end }} {{- end }} - {{- if .workloads.argoRollouts }} - - name: ARGO_ROLLOUTS_ENABLED - value: {{ .workloads.argoRollouts.enabled | quote }} + {{- if .workloads }} + {{- with .workloads }} + - name: ENABLED_WORKLOAD_KINDS + value: >- + {{- if or (not .deployments) .deployments.enabled }} + Deployment + {{- end }} + {{- if or (not .statefulSets) .statefulSets.enabled }} + StatefulSet + {{- end }} + {{- if or (not .replicaSets) .replicaSets.enabled }} + ReplicaSet + {{- end }} + {{- if and .argoRollouts .argoRollouts.enabled }} + Rollout + {{- end }} + {{- end }} + {{- else }} + - name: ENABLED_WORKLOAD_KINDS + value: Deployment StatefulSet ReplicaSet {{- end }} {{- if .agentInjector.enabled }} {{- /* diff --git a/charts/telepresence/values.yaml b/charts/telepresence/values.yaml index dd85245a98..23c6e22ead 100644 --- a/charts/telepresence/values.yaml +++ b/charts/telepresence/values.yaml @@ -347,6 +347,12 @@ client: # Controls which workload kinds are recognized by Telepresence workloads: + deployments: + enabled: true + replicaSets: + enabled: true + statefulSets: + enabled: true argoRollouts: enabled: false diff --git a/cmd/traffic/cmd/manager/cluster/info.go b/cmd/traffic/cmd/manager/cluster/info.go index c76b22d7de..487bfab04a 100644 --- a/cmd/traffic/cmd/manager/cluster/info.go +++ b/cmd/traffic/cmd/manager/cluster/info.go @@ -129,6 +129,8 @@ func NewInfo(ctx context.Context) Info { } } + dlog.Infof(ctx, "Enabled support for the following workload kinds: %v", env.EnabledWorkloadKinds) + // make an attempt to create a service with ClusterIP that is out of range and then // check the error message for the correct range as suggested tin the second answer here: // https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster diff --git a/cmd/traffic/cmd/manager/managerutil/argorollouts.go b/cmd/traffic/cmd/manager/managerutil/argorollouts.go deleted file mode 100644 index da55341274..0000000000 --- a/cmd/traffic/cmd/manager/managerutil/argorollouts.go +++ /dev/null @@ -1,9 +0,0 @@ -package managerutil - -import ( - "context" -) - -func ArgoRolloutsEnabled(ctx context.Context) bool { - return GetEnv(ctx).ArgoRolloutsEnabled -} diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig.go b/cmd/traffic/cmd/manager/managerutil/envconfig.go index 77cc0906e3..204047e03e 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig.go @@ -2,6 +2,7 @@ package managerutil import ( "context" + "fmt" "net/netip" "reflect" "strconv" @@ -18,6 +19,7 @@ import ( "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) // Env is the traffic-manager's environment. It does not define any defaults because all @@ -70,7 +72,7 @@ type Env struct { ClientDnsIncludeSuffixes []string `env:"CLIENT_DNS_INCLUDE_SUFFIXES, parser=split-trim, default="` ClientConnectionTTL time.Duration `env:"CLIENT_CONNECTION_TTL, parser=time.ParseDuration"` - ArgoRolloutsEnabled bool `env:"ARGO_ROLLOUTS_ENABLED, parser=bool, default=false"` + EnabledWorkloadKinds []workload.WorkloadKind `env:"ENABLED_WORKLOAD_KINDS, parser=split-trim, default=Deployment StatefulSet ReplicaSet"` // For testing only CompatibilityVersion *semver.Version `env:"COMPATIBILITY_VERSION, parser=version, default="` @@ -256,6 +258,25 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { }, Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*semver.Version))) }, } + fhs[reflect.TypeOf([]workload.WorkloadKind{})] = envconfig.FieldTypeHandler{ + Parsers: map[string]func(string) (any, error){ + "split-trim": func(str string) (any, error) { //nolint:unparam // API requirement + if len(str) == 0 { + return nil, nil + } + ss := strings.Split(str, " ") + ks := make([]workload.WorkloadKind, len(ss)) + for i, s := range ss { + ks[i] = workload.WorkloadKind(s) + if !ks[i].IsValid() { + return nil, fmt.Errorf("invalid workload kind: %q", s) + } + } + return ks, nil + }, + }, + Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]workload.WorkloadKind))) }, + } return fhs } diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig_test.go b/cmd/traffic/cmd/manager/managerutil/envconfig_test.go index 7dd401d4af..527d63b22f 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig_test.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig_test.go @@ -13,6 +13,7 @@ import ( "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) func TestEnvconfig(t *testing.T) { @@ -53,6 +54,7 @@ func TestEnvconfig(t *testing.T) { PodCIDRStrategy: "auto", PodIP: netip.AddrFrom4([4]byte{203, 0, 113, 18}), ServerPort: 8081, + EnabledWorkloadKinds: []workload.WorkloadKind{workload.DeploymentWorkloadKind, workload.StatefulSetWorkloadKind, workload.ReplicaSetWorkloadKind}, } testcases := map[string]struct { @@ -65,12 +67,10 @@ func TestEnvconfig(t *testing.T) { }, "simple": { Input: map[string]string{ - "AGENT_REGISTRY": "ghcr.io/telepresenceio", - "ARGO_ROLLOUTS_ENABLED": "true", + "AGENT_REGISTRY": "ghcr.io/telepresenceio", }, Output: func(e *managerutil.Env) { e.AgentRegistry = "ghcr.io/telepresenceio" - e.ArgoRolloutsEnabled = true }, }, "complex": { diff --git a/cmd/traffic/cmd/manager/mutator/agent_injector.go b/cmd/traffic/cmd/manager/mutator/agent_injector.go index a47de6517d..20f128eaa0 100644 --- a/cmd/traffic/cmd/manager/mutator/agent_injector.go +++ b/cmd/traffic/cmd/manager/mutator/agent_injector.go @@ -28,6 +28,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/maps" "github.com/telepresenceio/telepresence/v2/pkg/tracing" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) var podResource = meta.GroupVersionResource{Version: "v1", Group: "", Resource: "pods"} //nolint:gochecknoglobals // constant @@ -145,9 +146,19 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ return nil, nil } - supportedKinds := []string{"Deployment", "ReplicaSet", "StatefulSet"} - if managerutil.ArgoRolloutsEnabled(ctx) { - supportedKinds = append(supportedKinds, "Rollout") + enabledWorkloads := managerutil.GetEnv(ctx).EnabledWorkloadKinds + supportedKinds := make([]string, len(enabledWorkloads)) + for i, wlKind := range enabledWorkloads { + switch wlKind { + case workload.DeploymentWorkloadKind: + supportedKinds[i] = "Deployment" + case workload.ReplicaSetWorkloadKind: + supportedKinds[i] = "ReplicaSet" + case workload.StatefulSetWorkloadKind: + supportedKinds[i] = "StatefulSet" + case workload.RolloutWorkloadKind: + supportedKinds[i] = "Rollout" + } } wl, err := agentmap.FindOwnerWorkload(ctx, k8sapi.Pod(pod), supportedKinds) if err != nil { diff --git a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go index 755d450246..ca648fc8c1 100644 --- a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go +++ b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go @@ -31,6 +31,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/informer" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) const serviceAccountMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" @@ -770,6 +771,8 @@ func TestTrafficAgentConfigGenerator(t *testing.T) { AgentImageTag: "2.14.0", AgentPort: 9900, AgentAppProtocolStrategy: appProtoStrategy, + + EnabledWorkloadKinds: []workload.WorkloadKind{workload.DeploymentWorkloadKind, workload.StatefulSetWorkloadKind, workload.ReplicaSetWorkloadKind}, } ctx := dlog.NewTestContext(t, false) @@ -1830,6 +1833,8 @@ func TestTrafficAgentInjector(t *testing.T) { AgentImageTag: "2.13.3", AgentPort: 9900, AgentInjectPolicy: agentconfig.WhenEnabled, + + EnabledWorkloadKinds: []workload.WorkloadKind{workload.DeploymentWorkloadKind, workload.StatefulSetWorkloadKind, workload.ReplicaSetWorkloadKind}, } ctx = managerutil.WithEnv(ctx, env) agentmap.GeneratorConfigFunc = env.GeneratorConfig @@ -1907,9 +1912,18 @@ func toAdmissionRequest(resource meta.GroupVersionResource, object any) *admissi } func generateForPod(t *testing.T, ctx context.Context, pod *core.Pod, gc agentmap.GeneratorConfig) (agentconfig.SidecarExt, error) { - supportedKinds := []string{"Deployment", "ReplicaSet", "StatefulSet"} - if managerutil.ArgoRolloutsEnabled(ctx) { - supportedKinds = append(supportedKinds, "Rollout") + supportedKinds := make([]string, 0, 4) + for _, wlKind := range managerutil.GetEnv(ctx).EnabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + supportedKinds = append(supportedKinds, "Deployment") + case workload.ReplicaSetWorkloadKind: + supportedKinds = append(supportedKinds, "ReplicaSet") + case workload.StatefulSetWorkloadKind: + supportedKinds = append(supportedKinds, "StatefulSet") + case workload.RolloutWorkloadKind: + supportedKinds = append(supportedKinds, "Rollout") + } } wl, err := agentmap.FindOwnerWorkload(ctx, k8sapi.Pod(pod), supportedKinds) if err != nil { diff --git a/cmd/traffic/cmd/manager/mutator/watcher.go b/cmd/traffic/cmd/manager/mutator/watcher.go index 77b94a1f03..70bda356b2 100644 --- a/cmd/traffic/cmd/manager/mutator/watcher.go +++ b/cmd/traffic/cmd/manager/mutator/watcher.go @@ -560,19 +560,25 @@ func (c *configWatcher) StartWatchers(ctx context.Context) error { return err } } - for _, si := range c.dps { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.dps != nil { + for _, si := range c.dps { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } - for _, si := range c.rss { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.rss != nil { + for _, si := range c.rss { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } - for _, si := range c.sss { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.sss != nil { + for _, si := range c.sss { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } if c.rls != nil { @@ -834,22 +840,36 @@ func (c *configWatcher) Start(ctx context.Context) { c.svs = make([]cache.SharedIndexInformer, len(nss)) c.cms = make([]cache.SharedIndexInformer, len(nss)) - c.dps = make([]cache.SharedIndexInformer, len(nss)) - c.rss = make([]cache.SharedIndexInformer, len(nss)) - c.sss = make([]cache.SharedIndexInformer, len(nss)) + for _, wlKind := range env.EnabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + c.dps = make([]cache.SharedIndexInformer, len(nss)) + case workload.ReplicaSetWorkloadKind: + c.rss = make([]cache.SharedIndexInformer, len(nss)) + case workload.StatefulSetWorkloadKind: + c.sss = make([]cache.SharedIndexInformer, len(nss)) + case workload.RolloutWorkloadKind: + c.rls = make([]cache.SharedIndexInformer, len(nss)) + } + } for i, ns := range nss { c.cms[i] = c.startConfigMap(ctx, ns) c.svs[i] = c.startServices(ctx, ns) - c.dps[i] = workload.StartDeployments(ctx, ns) - c.rss[i] = workload.StartReplicaSets(ctx, ns) - c.sss[i] = workload.StartStatefulSets(ctx, ns) + if c.dps != nil { + c.dps[i] = workload.StartDeployments(ctx, ns) + } + if c.rss != nil { + c.rss[i] = workload.StartReplicaSets(ctx, ns) + } + if c.sss != nil { + c.sss[i] = workload.StartStatefulSets(ctx, ns) + } c.startPods(ctx, ns) kf := informer.GetK8sFactory(ctx, ns) kf.Start(ctx.Done()) kf.WaitForCacheSync(ctx.Done()) } - if managerutil.ArgoRolloutsEnabled(ctx) { - c.rls = make([]cache.SharedIndexInformer, len(nss)) + if c.rls != nil { for i, ns := range nss { c.rls[i] = workload.StartRollouts(ctx, ns) rf := informer.GetArgoRolloutsFactory(ctx, ns) diff --git a/cmd/traffic/cmd/manager/service.go b/cmd/traffic/cmd/manager/service.go index f2b09f1d72..cc4ce73b34 100644 --- a/cmd/traffic/cmd/manager/service.go +++ b/cmd/traffic/cmd/manager/service.go @@ -32,6 +32,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/tracing" "github.com/telepresenceio/telepresence/v2/pkg/tunnel" "github.com/telepresenceio/telepresence/v2/pkg/version" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) // Clock is the mechanism used by the Manager state to get the current time. @@ -583,9 +584,19 @@ func (s *service) GetKnownWorkloadKinds(ctx context.Context, request *rpc.Sessio } ctx = managerutil.WithSessionInfo(ctx, request) dlog.Debugf(ctx, "GetKnownWorkloadKinds called") - kinds := []rpc.WorkloadInfo_Kind{rpc.WorkloadInfo_DEPLOYMENT, rpc.WorkloadInfo_REPLICASET, rpc.WorkloadInfo_STATEFULSET} - if managerutil.ArgoRolloutsEnabled(ctx) { - kinds = append(kinds, rpc.WorkloadInfo_ROLLOUT) + enabledWorkloadKinds := managerutil.GetEnv(ctx).EnabledWorkloadKinds + kinds := make([]rpc.WorkloadInfo_Kind, len(enabledWorkloadKinds)) + for i, wlKind := range enabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + kinds[i] = rpc.WorkloadInfo_DEPLOYMENT + case workload.ReplicaSetWorkloadKind: + kinds[i] = rpc.WorkloadInfo_REPLICASET + case workload.StatefulSetWorkloadKind: + kinds[i] = rpc.WorkloadInfo_STATEFULSET + case workload.RolloutWorkloadKind: + kinds[i] = rpc.WorkloadInfo_ROLLOUT + } } return &rpc.KnownWorkloadKinds{Kinds: kinds}, nil } diff --git a/cmd/traffic/cmd/manager/state/state.go b/cmd/traffic/cmd/manager/state/state.go index e245401118..c75f0ecd7f 100644 --- a/cmd/traffic/cmd/manager/state/state.go +++ b/cmd/traffic/cmd/manager/state/state.go @@ -495,7 +495,7 @@ func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan } ns := client.Namespace ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww workload.Watcher) { - ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.ArgoRolloutsEnabled(ctx)) + ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.GetEnv(ctx).EnabledWorkloadKinds) return ww }) if err != nil { diff --git a/docs/reference/intercepts/sidecar.md b/docs/reference/intercepts/sidecar.md index e54c0df7b7..fcf9006527 100644 --- a/docs/reference/intercepts/sidecar.md +++ b/docs/reference/intercepts/sidecar.md @@ -9,7 +9,7 @@ The injection is triggered by a Kubernetes Mutating Webhook and will only happen once. The Traffic Agent is responsible for redirecting intercepted traffic to the developer's workstation. -The intercept will intercept all`tcp` and/or `udp` traffic to the +The intercept will intercept all `tcp` and/or `udp` traffic to the intercepted service and send all of that traffic down to the developer's workstation. This means that an intercept will affect all users of the intercepted service. @@ -21,6 +21,17 @@ Kubernetes has various Currently, Telepresence supports intercepting (installing a traffic-agent on) `Deployments`, `ReplicaSets`, `StatefulSets`, and `ArgoRollouts`. +### Disable workloads + +By default, traffic-manager will observe `Deployments`, `ReplicaSets` and `StatefulSets`. +Each workload used today adds certain overhead. If you are not intercepting a specific workload type, you can disable it to reduce that overhead. +That can be achieved by setting the Helm chart values `workloads..enabled=false` when installing the traffic-manager. +The following are the Helm chart values to disable the workload types: + +- `workloads.deployments.enabled=false` for `Deployments`, +- `workloads.replicaSets.enabled=false` for `ReplicaSets`, +- `workloads.statefulSets.enabled=false` for `StatefulSets`. + ### Enable ArgoRollouts In order to use `ArgoRollouts`, you must pass the Helm chart value `workloads.argoRollouts.enabled=true` when installing the traffic-manager. diff --git a/docs/release-notes.md b/docs/release-notes.md index 850c3befb2..c2b57e5c0d 100644 --- a/docs/release-notes.md +++ b/docs/release-notes.md @@ -30,6 +30,12 @@ See [Streaming Transitions from SPDY to WebSockets](https://kubernetes.io/blog/2 The OSS code-base will no longer report usage data to the proprietary collector at Ambassador Labs. The actual calls to the collector remain, but will be no-ops unless a proper collector client is installed using an extension point. +##
feature
Add deployments, statefulSets, replicaSets to workloads Helm chart value
+
+ +The Helm chart value workloads now supports the kinds deployments.enabled, statefulSets.enabled, and replicaSets.enabled. By default, all three are enabled, but can be disabled by setting the corresponding value to false. When disabled, the traffic-manager will ignore workloads of a corresponding kind, and Telepresence will not be able to intercept them. +
+ ## Version 2.20.2 (October 21) ##
bugfix
Crash in traffic-manager configured with agentInjector.enabled=false
diff --git a/docs/release-notes.mdx b/docs/release-notes.mdx index 2216b89545..a54156e1ed 100644 --- a/docs/release-notes.mdx +++ b/docs/release-notes.mdx @@ -28,6 +28,10 @@ See [Streaming Transitions from SPDY to WebSockets](https://kubernetes.io/blog/2 Make usage data collection configurable using an extension point, and default to no-ops The OSS code-base will no longer report usage data to the proprietary collector at Ambassador Labs. The actual calls to the collector remain, but will be no-ops unless a proper collector client is installed using an extension point. + + Add deployments, statefulSets, replicaSets to workloads Helm chart value + The Helm chart value workloads now supports the kinds deployments.enabled, statefulSets.enabled, and replicaSets.enabled. By default, all three are enabled, but can be disabled by setting the corresponding value to false. When disabled, the traffic-manager will ignore workloads of a corresponding kind, and Telepresence will not be able to intercept them. + ## Version 2.20.2 (October 21) Crash in traffic-manager configured with agentInjector.enabled=false diff --git a/integration_test/workload_configuration_test.go b/integration_test/workload_configuration_test.go new file mode 100644 index 0000000000..599e957ae9 --- /dev/null +++ b/integration_test/workload_configuration_test.go @@ -0,0 +1,135 @@ +package integration_test + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/telepresenceio/telepresence/v2/integration_test/itest" +) + +type workloadConfigurationSuite struct { + itest.Suite + itest.NamespacePair +} + +func (s *workloadConfigurationSuite) SuiteName() string { + return "WorkloadConfiguration" +} + +func init() { + itest.AddTrafficManagerSuite("-workload-configuration", func(h itest.NamespacePair) itest.TestingSuite { + return &workloadConfigurationSuite{Suite: itest.Suite{Harness: h}, NamespacePair: h} + }) +} + +func (s *workloadConfigurationSuite) disabledWorkloadKind(tp, wl string) { + ctx := s.Context() + require := s.Require() + + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + defer s.uninstallAgents(ctx, wl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + // give it time for the workload to be detected (if it was going to be) + time.Sleep(6 * time.Second) + + list := itest.TelepresenceOk(ctx, "list") + require.Equal("No Workloads (Deployments, StatefulSets, ReplicaSets, or Rollouts)", list) + + _, stderr, err := itest.Telepresence(ctx, "intercept", wl) + require.Error(err) + require.Contains(stderr, fmt.Sprintf("connector.CreateIntercept: workload \"%s.%s\" not found", wl, s.NamespacePair.AppNamespace())) +} + +func (s *workloadConfigurationSuite) uninstallAgents(ctx context.Context, wl string) { + dfltCtx := itest.WithUser(ctx, "default") + itest.TelepresenceOk(dfltCtx, "connect", "--namespace", s.AppNamespace(), "--manager-namespace", s.ManagerNamespace()) + itest.TelepresenceOk(dfltCtx, "uninstall", "--agent", wl) + itest.TelepresenceDisconnectOk(dfltCtx) +} + +func (s *workloadConfigurationSuite) Test_DisabledReplicaSet() { + s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.replicaSets.enabled=false") + defer s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.replicaSets.enabled=true") + s.disabledWorkloadKind("ReplicaSet", "rs-echo") +} + +func (s *workloadConfigurationSuite) Test_DisabledStatefulSet() { + s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.statefulSets.enabled=false") + defer s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.statefulSets.enabled=true") + s.disabledWorkloadKind("StatefulSet", "ss-echo") +} + +func (s *workloadConfigurationSuite) Test_InterceptsDeploymentWithDisabledReplicaSets() { + ctx := s.Context() + require := s.Require() + + wl, tp := "echo-one", "Deployment" + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.replicaSets.enabled=false") + defer s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.replicaSets.enabled=true") + + defer s.uninstallAgents(ctx, wl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + require.Eventually( + func() bool { + stdout, _, err := itest.Telepresence(ctx, "list") + return err == nil && strings.Contains(stdout, fmt.Sprintf("%s: ready to intercept", wl)) + }, + 6*time.Second, // waitFor + 2*time.Second, // polling interval + ) + + stdout := itest.TelepresenceOk(ctx, "intercept", wl) + require.Contains(stdout, fmt.Sprintf("Using %s %s", tp, wl)) + + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + require.Contains(stdout, fmt.Sprintf("%s: intercepted", wl)) + itest.TelepresenceOk(ctx, "leave", wl) +} + +func (s *workloadConfigurationSuite) Test_InterceptsReplicaSetWithDisabledDeployments() { + ctx := s.Context() + require := s.Require() + + wl, tp := "echo-one", "Deployment" + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + interceptableWl := s.KubectlOk(ctx, "get", "replicasets", "-l", fmt.Sprintf("app=%s", wl), "-o", "jsonpath={.items[*].metadata.name}") + + s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.deployments.enabled=false") + defer s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.deployments.enabled=true") + + defer s.uninstallAgents(ctx, interceptableWl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + require.Eventually( + func() bool { + stdout, _, err := itest.Telepresence(ctx, "list") + return err == nil && strings.Contains(stdout, fmt.Sprintf("%s: ready to intercept", interceptableWl)) + }, + 6*time.Second, // waitFor + 2*time.Second, // polling interval + ) + + stdout := itest.TelepresenceOk(ctx, "intercept", interceptableWl) + require.Contains(stdout, fmt.Sprintf("Using %s %s", "ReplicaSet", interceptableWl)) + + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + require.Contains(stdout, fmt.Sprintf("%s: intercepted", interceptableWl)) + itest.TelepresenceOk(ctx, "leave", interceptableWl) +} diff --git a/pkg/client/userd/trafficmgr/session.go b/pkg/client/userd/trafficmgr/session.go index d7cae2dc49..6e9daa0a20 100644 --- a/pkg/client/userd/trafficmgr/session.go +++ b/pkg/client/userd/trafficmgr/session.go @@ -1111,22 +1111,32 @@ func (s *session) localWorkloadsWatcher(ctx context.Context, namespace string, s ctx = informer.WithFactory(ctx, namespace) fc = informer.GetFactory(ctx, namespace) } - workload.StartDeployments(ctx, namespace) - workload.StartReplicaSets(ctx, namespace) - workload.StartStatefulSets(ctx, namespace) - kf := fc.GetK8sInformerFactory() - kf.Start(ctx.Done()) - rolloutsEnabled := slices.Index(knownWorkloadKinds.Kinds, manager.WorkloadInfo_ROLLOUT) >= 0 - if rolloutsEnabled { - workload.StartRollouts(ctx, namespace) - af := fc.GetArgoRolloutsInformerFactory() - af.Start(ctx.Done()) + enabledWorkloadKinds := make([]workload.WorkloadKind, len(knownWorkloadKinds.Kinds)) + for i, kind := range knownWorkloadKinds.Kinds { + switch kind { + case manager.WorkloadInfo_DEPLOYMENT: + enabledWorkloadKinds[i] = workload.DeploymentWorkloadKind + workload.StartDeployments(ctx, namespace) + case manager.WorkloadInfo_REPLICASET: + enabledWorkloadKinds[i] = workload.ReplicaSetWorkloadKind + workload.StartReplicaSets(ctx, namespace) + case manager.WorkloadInfo_STATEFULSET: + enabledWorkloadKinds[i] = workload.StatefulSetWorkloadKind + workload.StartStatefulSets(ctx, namespace) + case manager.WorkloadInfo_ROLLOUT: + enabledWorkloadKinds[i] = workload.RolloutWorkloadKind + workload.StartRollouts(ctx, namespace) + af := fc.GetArgoRolloutsInformerFactory() + af.Start(ctx.Done()) + } } - ww, err := workload.NewWatcher(ctx, namespace, rolloutsEnabled) + kf := fc.GetK8sInformerFactory() + kf.Start(ctx.Done()) + + ww, err := workload.NewWatcher(ctx, namespace, enabledWorkloadKinds) if err != nil { - workload.StartRollouts(ctx, namespace) return err } kf.WaitForCacheSync(ctx.Done()) diff --git a/pkg/workload/watcher.go b/pkg/workload/watcher.go index 5d8fc173cc..88e8023c0f 100644 --- a/pkg/workload/watcher.go +++ b/pkg/workload/watcher.go @@ -3,6 +3,7 @@ package workload import ( "context" "math" + "slices" "sync" "time" @@ -34,6 +35,19 @@ type WorkloadEvent struct { Workload k8sapi.Workload } +type WorkloadKind string + +const ( + DeploymentWorkloadKind WorkloadKind = "Deployment" + StatefulSetWorkloadKind WorkloadKind = "StatefulSet" + ReplicaSetWorkloadKind WorkloadKind = "ReplicaSet" + RolloutWorkloadKind WorkloadKind = "Rollout" +) + +func (w *WorkloadKind) IsValid() bool { + return w != nil && slices.Contains([]WorkloadKind{DeploymentWorkloadKind, StatefulSetWorkloadKind, ReplicaSetWorkloadKind, RolloutWorkloadKind}, *w) +} + func (e EventType) String() string { switch e { case EventTypeAdd: @@ -53,17 +67,17 @@ type Watcher interface { type watcher struct { sync.Mutex - namespace string - subscriptions map[uuid.UUID]chan<- []WorkloadEvent - timer *time.Timer - events []WorkloadEvent - rolloutsEnabled bool + namespace string + subscriptions map[uuid.UUID]chan<- []WorkloadEvent + timer *time.Timer + events []WorkloadEvent + enabledWorkloadKinds []WorkloadKind } -func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, error) { +func NewWatcher(ctx context.Context, ns string, enabledWorkloadKinds []WorkloadKind) (Watcher, error) { w := new(watcher) w.namespace = ns - w.rolloutsEnabled = rolloutsEnabled + w.enabledWorkloadKinds = enabledWorkloadKinds w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { w.Lock() @@ -92,14 +106,17 @@ func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, return w, nil } -func hasValidReplicasetOwner(wl k8sapi.Workload, rolloutsEnabled bool) bool { +func hasValidReplicasetOwner(wl k8sapi.Workload, enabledWorkloadKinds []WorkloadKind) bool { for _, ref := range wl.GetOwnerReferences() { if ref.Controller != nil && *ref.Controller { switch ref.Kind { case "Deployment": - return true + if slices.Contains(enabledWorkloadKinds, DeploymentWorkloadKind) { + return true + } + case "Rollout": - if rolloutsEnabled { + if slices.Contains(enabledWorkloadKinds, RolloutWorkloadKind) { return true } } @@ -120,41 +137,47 @@ func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { kf := informer.GetFactory(ctx, w.namespace) ai := kf.GetK8sInformerFactory().Apps().V1() dlog.Debugf(ctx, "workload.Watcher producing initial events for namespace %s", w.namespace) - if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range dps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) && !trafficManagerSelector.Matches(labels.Set(obj.Labels)) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, DeploymentWorkloadKind) { + if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range dps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) && !trafficManagerSelector.Matches(labels.Set(obj.Labels)) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range rps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, ReplicaSetWorkloadKind) { + if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range rps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range sps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, StatefulSetWorkloadKind) { + if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if w.rolloutsEnabled { + if slices.Contains(w.enabledWorkloadKinds, RolloutWorkloadKind) { ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() if sps, err := ri.Rollouts().Lister().Rollouts(w.namespace).List(labels.Everything()); err == nil { for _, obj := range sps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { initialEvents = append(initialEvents, WorkloadEvent{ Type: EventTypeAdd, Workload: wl, @@ -244,24 +267,27 @@ func (w *watcher) watch(ix cache.SharedIndexInformer, ns string, hasValidControl func (w *watcher) addEventHandler(ctx context.Context, ns string) error { kf := informer.GetFactory(ctx, ns) hvc := func(wl k8sapi.Workload) bool { - return hasValidReplicasetOwner(wl, w.rolloutsEnabled) + return hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) } ai := kf.GetK8sInformerFactory().Apps().V1() - if err := w.watch(ai.Deployments().Informer(), ns, hvc); err != nil { - return err - } - if err := w.watch(ai.ReplicaSets().Informer(), ns, hvc); err != nil { - return err - } - if err := w.watch(ai.StatefulSets().Informer(), ns, hvc); err != nil { - return err - } - if !w.rolloutsEnabled { - dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") - } else { - ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() - if err := w.watch(ri.Rollouts().Informer(), ns, hvc); err != nil { + for _, wlKind := range w.enabledWorkloadKinds { + var ssi cache.SharedIndexInformer + switch wlKind { + case DeploymentWorkloadKind: + ssi = ai.Deployments().Informer() + case ReplicaSetWorkloadKind: + ssi = ai.ReplicaSets().Informer() + case StatefulSetWorkloadKind: + ssi = ai.StatefulSets().Informer() + case RolloutWorkloadKind: + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + ssi = ri.Rollouts().Informer() + default: + continue + } + + if err := w.watch(ssi, ns, hvc); err != nil { return err } } diff --git a/rpc/manager/manager.proto b/rpc/manager/manager.proto index 61a382d6a7..40fbd38397 100644 --- a/rpc/manager/manager.proto +++ b/rpc/manager/manager.proto @@ -779,8 +779,8 @@ service Manager { rpc ReviewIntercept(ReviewInterceptRequest) returns (google.protobuf.Empty); // GetKnownWorkloadKinds returns the known workload kinds - // that the manager can handle. This base set should always include Deployment, StatefulSet, and ReplicaSet, - // and it may include Rollout (Argo Rollouts) if the support for it is enabled. + // that the manager can handle. This set may include Deployment, StatefulSet, ReplicaSet, Rollout (Argo Rollouts) + // as configured in the manager's Helm values. rpc GetKnownWorkloadKinds(SessionInfo) returns (KnownWorkloadKinds); // LookupDNS performs a DNS lookup in the cluster. If the caller has intercepts diff --git a/rpc/manager/manager_grpc.pb.go b/rpc/manager/manager_grpc.pb.go index ca64eaf312..cfeb97d124 100644 --- a/rpc/manager/manager_grpc.pb.go +++ b/rpc/manager/manager_grpc.pb.go @@ -142,8 +142,8 @@ type ManagerClient interface { // error, and setting a human-readable status message. ReviewIntercept(ctx context.Context, in *ReviewInterceptRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) // GetKnownWorkloadKinds returns the known workload kinds - // that the manager can handle. This base set should always include Deployment, StatefulSet, and ReplicaSet, - // and it may include Rollout (Argo Rollouts) if the support for it is enabled. + // that the manager can handle. This set may include Deployment, StatefulSet, ReplicaSet, Rollout (Argo Rollouts) + // as configured in the manager's Helm values. GetKnownWorkloadKinds(ctx context.Context, in *SessionInfo, opts ...grpc.CallOption) (*KnownWorkloadKinds, error) // LookupDNS performs a DNS lookup in the cluster. If the caller has intercepts // active, the lookup will be performed from the intercepted pods. @@ -834,8 +834,8 @@ type ManagerServer interface { // error, and setting a human-readable status message. ReviewIntercept(context.Context, *ReviewInterceptRequest) (*emptypb.Empty, error) // GetKnownWorkloadKinds returns the known workload kinds - // that the manager can handle. This base set should always include Deployment, StatefulSet, and ReplicaSet, - // and it may include Rollout (Argo Rollouts) if the support for it is enabled. + // that the manager can handle. This set may include Deployment, StatefulSet, ReplicaSet, Rollout (Argo Rollouts) + // as configured in the manager's Helm values. GetKnownWorkloadKinds(context.Context, *SessionInfo) (*KnownWorkloadKinds, error) // LookupDNS performs a DNS lookup in the cluster. If the caller has intercepts // active, the lookup will be performed from the intercepted pods.