From b08a9e9fe1aaeb80e7301f60b240287bb8755457 Mon Sep 17 00:00:00 2001 From: Pablo Chacin Date: Thu, 19 Oct 2023 20:48:11 +0200 Subject: [PATCH] Unify pod and service fault injection commands Signed-off-by: Pablo Chacin --- pkg/disruptors/commads.go | 87 ++++------------------ pkg/disruptors/service.go | 25 +++++-- pkg/utils/kubernetes.go | 58 +++++---------- pkg/utils/kubernetes_test.go | 140 +++++++++++++++++++---------------- 4 files changed, 131 insertions(+), 179 deletions(-) diff --git a/pkg/disruptors/commads.go b/pkg/disruptors/commads.go index 0f1ae074..7dcde9a2 100644 --- a/pkg/disruptors/commads.go +++ b/pkg/disruptors/commads.go @@ -25,7 +25,7 @@ func buildGrpcFaultCmd( // TODO: make port mandatory if fault.Port != intstr.NullValue { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + cmd = append(cmd, "-t", fault.Port.Str()) } if fault.AverageDelay > 0 { @@ -78,7 +78,7 @@ func buildHTTPFaultCmd( // TODO: make port mandatory if fault.Port != intstr.NullValue { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + cmd = append(cmd, "-t", fault.Port.Str()) } if fault.AverageDelay > 0 { @@ -131,13 +131,17 @@ type PodHTTPFaultCommand struct { // Commands return the command for injecting a HttpFault in a Pod func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, c.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %q", pod.Name, c.fault.Port.Str()) + if utils.HasHostNetwork(pod) { + return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name) } - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + // find the container port for fault injection + port, err := utils.FindPort(c.fault.Port, pod) + if err != nil { + return VisitCommands{}, err } + podFault := c.fault + podFault.Port = port targetAddress, err := utils.PodIP(pod) if err != nil { @@ -145,7 +149,7 @@ func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { } return VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, c.fault, c.duration, c.options), + Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), Cleanup: buildCleanupCmd(), }, nil } @@ -159,76 +163,15 @@ type PodGrpcFaultCommand struct { // Commands return the command for injecting a GrpcFault in a Pod func (c PodGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, c.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %q", pod.Name, c.fault.Port.Str()) - } - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - return VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), - Cleanup: buildCleanupCmd(), - }, nil -} - -// ServiceHTTPFaultCommand implements the PodVisitCommands interface for injecting HttpFaults in a Pod -type ServiceHTTPFaultCommand struct { - service corev1.Service - fault HTTPFault - duration time.Duration - options HTTPDisruptionOptions -} - -// Commands return the command for injecting a HttpFault in a Service -func (c ServiceHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(c.service, c.fault.Port, pod) - if err != nil { - return VisitCommands{}, err - } - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name) } - // copy fault to change target port for the pod - podFault := c.fault - podFault.Port = port - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - return VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), - Cleanup: buildCleanupCmd(), - }, nil -} - -// Cleanup defines the command to execute for cleaning up if command execution fails -func (c ServiceHTTPFaultCommand) Cleanup(_ corev1.Pod) []string { - return buildCleanupCmd() -} - -// ServiceGrpcFaultCommand implements the PodVisitCommands interface for injecting a -// GrpcFault in a Service -type ServiceGrpcFaultCommand struct { - service corev1.Service - fault GrpcFault - duration time.Duration - options GrpcDisruptionOptions -} - -// Commands return the VisitCommands for injecting a GrpcFault in a Service -func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(c.service, c.fault.Port, pod) + // find the container port for fault injection + port, err := utils.FindPort(c.fault.Port, pod) if err != nil { return VisitCommands{}, err } - podFault := c.fault podFault.Port = port @@ -238,7 +181,7 @@ func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) } return VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, podFault, c.duration, c.options), + Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), Cleanup: buildCleanupCmd(), }, nil } diff --git a/pkg/disruptors/service.go b/pkg/disruptors/service.go index d2938e2e..47bee3fe 100644 --- a/pkg/disruptors/service.go +++ b/pkg/disruptors/service.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/xk6-disruptor/pkg/kubernetes" "github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers" + "github.com/grafana/xk6-disruptor/pkg/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,9 +84,16 @@ func (d *serviceDisruptor) InjectHTTPFaults( duration time.Duration, options HTTPDisruptionOptions, ) error { - command := ServiceHTTPFaultCommand{ - service: d.service, - fault: fault, + // Map service port to a target pod port + port, err := utils.GetTargetPort(d.service, fault.Port) + if err != nil { + return err + } + podFault := fault + podFault.Port = port + + command := PodHTTPFaultCommand{ + fault: podFault, duration: duration, options: options, } @@ -105,8 +113,15 @@ func (d *serviceDisruptor) InjectGrpcFaults( duration time.Duration, options GrpcDisruptionOptions, ) error { - command := ServiceGrpcFaultCommand{ - service: d.service, + // Map service port to a target pod port + port, err := utils.GetTargetPort(d.service, fault.Port) + if err != nil { + return err + } + podFault := fault + podFault.Port = port + + command := PodGrpcFaultCommand{ fault: fault, duration: duration, options: options, diff --git a/pkg/utils/kubernetes.go b/pkg/utils/kubernetes.go index 8bb6369e..48c00a32 100644 --- a/pkg/utils/kubernetes.go +++ b/pkg/utils/kubernetes.go @@ -5,69 +5,51 @@ import ( "github.com/grafana/xk6-disruptor/pkg/types/intstr" corev1 "k8s.io/api/core/v1" - k8sintstr "k8s.io/apimachinery/pkg/util/intstr" ) -// getTargetPort returns the ServicePort object that corresponds to the given port searching for a match -// for the port number or port name -func getTargetPort(service corev1.Service, svcPort intstr.IntOrString) (corev1.ServicePort, error) { +// GetTargetPort returns the target port for the fiven service port +func GetTargetPort(service corev1.Service, svcPort intstr.IntOrString) (intstr.IntOrString, error) { // Handle default port mapping // TODO: make port required if svcPort == intstr.NullValue || (svcPort.IsInt() && svcPort.Int32() == 0) { if len(service.Spec.Ports) > 1 { - return corev1.ServicePort{}, fmt.Errorf("no port selected and service exposes more than one service") + return intstr.NullValue, fmt.Errorf("no port selected and service exposes more than one service") } - return service.Spec.Ports[0], nil + return intstr.IntOrString(service.Spec.Ports[0].TargetPort.String()), nil } for _, p := range service.Spec.Ports { - if p.Port == svcPort.Int32() || p.Name == svcPort.Str() { - return p, nil + if p.Name == svcPort.Str() || (svcPort.IsInt() && p.Port == svcPort.Int32()) { + return intstr.IntOrString(p.TargetPort.String()), nil } } - return corev1.ServicePort{}, fmt.Errorf("the service does not expose the given svcPort: %s", svcPort) -} -// MapPort returns the port in the Pod that maps to the given service port -func MapPort(service corev1.Service, port intstr.IntOrString, pod corev1.Pod) (intstr.IntOrString, error) { - svcPort, err := getTargetPort(service, port) - if err != nil { - return intstr.NullValue, err - } + return intstr.NullValue, fmt.Errorf("the service does not expose the given svcPort: %s", svcPort) +} - switch svcPort.TargetPort.Type { - case k8sintstr.String: +// FindPort returns the port in the Pod that maps to the given port by port number or name +func FindPort(port intstr.IntOrString, pod corev1.Pod) (intstr.IntOrString, error) { + switch port.Type() { + case intstr.StringValue: for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.Name == svcPort.TargetPort.StrVal { - return intstr.FromInt32(port.ContainerPort), nil + for _, p := range container.Ports { + if p.Name == port.Str() { + return intstr.FromInt32(p.ContainerPort), nil } } } - case k8sintstr.Int: + case intstr.IntValue: for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.ContainerPort == svcPort.TargetPort.IntVal { - return intstr.FromInt32(port.ContainerPort), nil + for _, p := range container.Ports { + if p.ContainerPort == port.Int32() { + return intstr.FromInt32(p.ContainerPort), nil } } } } - return intstr.NullValue, fmt.Errorf("pod %q does match port %q for service %q", pod.Name, port.Str(), service.Name) -} - -// HasPort verifies if a pods listen to the given port -func HasPort(pod corev1.Pod, port intstr.IntOrString) bool { - for _, container := range pod.Spec.Containers { - for _, containerPort := range container.Ports { - if containerPort.ContainerPort == port.Int32() { - return true - } - } - } - return false + return intstr.NullValue, fmt.Errorf("pod %q does exports port %q", pod.Name, port.Str()) } // HasHostNetwork returns whether a pod has HostNetwork enabled, i.e. it shares the host's network namespace. diff --git a/pkg/utils/kubernetes_test.go b/pkg/utils/kubernetes_test.go index a9a6694d..daf3d0dd 100644 --- a/pkg/utils/kubernetes_test.go +++ b/pkg/utils/kubernetes_test.go @@ -31,69 +31,43 @@ func buildServicWithPort(name string, portName string, port int32, target k8sint Build() } -func Test_ServicePortMapping(t *testing.T) { +func Test_FindPort(t *testing.T) { t.Parallel() testCases := []struct { title string - serviceName string - namespace string - service corev1.Service pod corev1.Pod - endpoints *corev1.Endpoints port intstr.IntOrString expectError bool expected intstr.IntOrString }{ { - title: "invalid Port option", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(8080)), + title: "Numeric port", pod: buildPodWithPort("pod-1", "http", 80), port: intstr.FromInt(80), - expectError: true, - expected: intstr.FromInt(0), - }, - { - title: "Numeric target port specified", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), - pod: buildPodWithPort("pod-1", "http", 80), - port: intstr.FromInt(8080), expectError: false, expected: intstr.FromInt(80), }, { - title: "Named target port", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromString("http")), + title: "Numeric port not exposed", pod: buildPodWithPort("pod-1", "http", 80), - port: intstr.FromInt32(8080), - expectError: false, - expected: intstr.FromInt(80), + port: intstr.FromInt(8080), + expectError: true, + expected: intstr.NullValue, }, { - title: "Default port mapping", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + title: "Named port", pod: buildPodWithPort("pod-1", "http", 80), - port: intstr.FromInt(0), + port: intstr.FromString("http"), expectError: false, expected: intstr.FromInt(80), }, { - title: "No target for mapping", - serviceName: "test-svc", - namespace: "test-ns", - service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), - pod: buildPodWithPort("pod-1", "http", 8080), - port: intstr.FromInt(8080), + title: "Named port not exposed port", + pod: buildPodWithPort("pod-1", "http", 80), + port: intstr.FromString("http2"), expectError: true, - expected: intstr.FromInt(0), + expected: intstr.NullValue, }, } @@ -103,7 +77,7 @@ func Test_ServicePortMapping(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - port, err := MapPort(tc.service, tc.port, tc.pod) + port, err := FindPort(tc.port, tc.pod) if !tc.expectError && err != nil { t.Errorf(" failed: %v", err) return @@ -126,46 +100,84 @@ func Test_ServicePortMapping(t *testing.T) { } } -func Test_ValidatePort(t *testing.T) { +func Test_GetTargetPort(t *testing.T) { t.Parallel() testCases := []struct { - title string - namespace string - pod corev1.Pod - targetPort intstr.IntOrString - expect bool + title string + + service corev1.Service + endpoints *corev1.Endpoints + port intstr.IntOrString + expectError bool + expected intstr.IntOrString }{ { - title: "Pods listen to the specified port", - namespace: "testns", - pod: builders.NewPodBuilder("test-pod-1"). - WithContainer(corev1.Container{Ports: []corev1.ContainerPort{{ContainerPort: 8080}}}). - WithNamespace("testns"). - Build(), - targetPort: intstr.FromInt(8080), - expect: true, + title: "Numeric service port specified", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromInt(8080), + expectError: false, + expected: intstr.FromInt(80), }, { - title: "Pod doesn't listen to the specified port", - namespace: "testns", - pod: builders.NewPodBuilder("test-pod-2"). - WithContainer(corev1.Container{Ports: []corev1.ContainerPort{{ContainerPort: 9090}}}). - WithNamespace("testns"). - Build(), - targetPort: intstr.FromInt(8080), - expect: false, + title: "Named service port", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromString("http"), + expectError: false, + expected: intstr.FromInt(80), + }, + { + title: "Named target port", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromString("http")), + port: intstr.FromInt(8080), + expectError: false, + expected: intstr.FromString("http"), + }, + { + title: "Default port mapping", + service: buildServicWithPort("test-svc", "http", 8080, k8sintstr.FromInt(80)), + port: intstr.FromInt(0), + expectError: false, + expected: intstr.FromInt(80), + }, + { + title: "Numeric port not exposed", + service: buildServicWithPort("test-svc", "http", 80, k8sintstr.FromInt(80)), + port: intstr.FromInt(8080), + expectError: true, + }, + { + title: "Named port not exposed", + service: buildServicWithPort("test-svc", "http", 80, k8sintstr.FromString("http")), + port: intstr.FromString("http2"), + expectError: true, }, } for _, tc := range testCases { tc := tc + t.Run(tc.title, func(t *testing.T) { t.Parallel() - validation := HasPort(tc.pod, tc.targetPort) - if validation != tc.expect { - t.Errorf("expected %t got %t", tc.expect, validation) + port, err := GetTargetPort(tc.service, tc.port) + if !tc.expectError && err != nil { + t.Errorf(" failed: %v", err) + return + } + + if tc.expectError && err == nil { + t.Errorf("should had failed") + return + } + + if tc.expectError && err != nil { + return + } + + if tc.expected != port { + t.Errorf("expected %q got %q", tc.expected.Str(), port.Str()) + return } }) }