Skip to content

Commit

Permalink
Unify pod and service fault injection commands
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chacin <pablochacin@gmail.com>
  • Loading branch information
pablochacin committed Oct 19, 2023
1 parent 9ad5ada commit b08a9e9
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 179 deletions.
87 changes: 15 additions & 72 deletions pkg/disruptors/commads.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func buildGrpcFaultCmd(

// TODO: make port mandatory
if fault.Port != intstr.NullValue {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
cmd = append(cmd, "-t", fault.Port.Str())
}

if fault.AverageDelay > 0 {
Expand Down Expand Up @@ -78,7 +78,7 @@ func buildHTTPFaultCmd(

// TODO: make port mandatory
if fault.Port != intstr.NullValue {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
cmd = append(cmd, "-t", fault.Port.Str())
}

if fault.AverageDelay > 0 {
Expand Down Expand Up @@ -131,21 +131,25 @@ type PodHTTPFaultCommand struct {

// Commands return the command for injecting a HttpFault in a Pod
func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, c.fault.Port) {
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %q", pod.Name, c.fault.Port.Str())
if utils.HasHostNetwork(pod) {
return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name)
}

if utils.HasHostNetwork(pod) {
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
// find the container port for fault injection
port, err := utils.FindPort(c.fault.Port, pod)
if err != nil {
return VisitCommands{}, err
}
podFault := c.fault
podFault.Port = port

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
}

return VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, c.fault, c.duration, c.options),
Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options),
Cleanup: buildCleanupCmd(),
}, nil
}
Expand All @@ -159,76 +163,15 @@ type PodGrpcFaultCommand struct {

// Commands return the command for injecting a GrpcFault in a Pod
func (c PodGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, c.fault.Port) {
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %q", pod.Name, c.fault.Port.Str())
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
}

return VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options),
Cleanup: buildCleanupCmd(),
}, nil
}

// ServiceHTTPFaultCommand implements the PodVisitCommands interface for injecting HttpFaults in a Pod
type ServiceHTTPFaultCommand struct {
service corev1.Service
fault HTTPFault
duration time.Duration
options HTTPDisruptionOptions
}

// Commands return the command for injecting a HttpFault in a Service
func (c ServiceHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(c.service, c.fault.Port, pod)
if err != nil {
return VisitCommands{}, err
}

if utils.HasHostNetwork(pod) {
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitCommands{}, fmt.Errorf("fault cannot be safely injected because pod %q uses hostNetwork", pod.Name)
}

// copy fault to change target port for the pod
podFault := c.fault
podFault.Port = port

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
}

return VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options),
Cleanup: buildCleanupCmd(),
}, nil
}

// Cleanup defines the command to execute for cleaning up if command execution fails
func (c ServiceHTTPFaultCommand) Cleanup(_ corev1.Pod) []string {
return buildCleanupCmd()
}

// ServiceGrpcFaultCommand implements the PodVisitCommands interface for injecting a
// GrpcFault in a Service
type ServiceGrpcFaultCommand struct {
service corev1.Service
fault GrpcFault
duration time.Duration
options GrpcDisruptionOptions
}

// Commands return the VisitCommands for injecting a GrpcFault in a Service
func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(c.service, c.fault.Port, pod)
// find the container port for fault injection
port, err := utils.FindPort(c.fault.Port, pod)
if err != nil {
return VisitCommands{}, err
}

podFault := c.fault
podFault.Port = port

Expand All @@ -238,7 +181,7 @@ func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error)
}

return VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, podFault, c.duration, c.options),
Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options),
Cleanup: buildCleanupCmd(),
}, nil
}
25 changes: 20 additions & 5 deletions pkg/disruptors/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/grafana/xk6-disruptor/pkg/kubernetes"
"github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers"
"github.com/grafana/xk6-disruptor/pkg/utils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -83,9 +84,16 @@ func (d *serviceDisruptor) InjectHTTPFaults(
duration time.Duration,
options HTTPDisruptionOptions,
) error {
command := ServiceHTTPFaultCommand{
service: d.service,
fault: fault,
// Map service port to a target pod port
port, err := utils.GetTargetPort(d.service, fault.Port)
if err != nil {
return err
}
podFault := fault
podFault.Port = port

command := PodHTTPFaultCommand{
fault: podFault,
duration: duration,
options: options,
}
Expand All @@ -105,8 +113,15 @@ func (d *serviceDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
command := ServiceGrpcFaultCommand{
service: d.service,
// Map service port to a target pod port
port, err := utils.GetTargetPort(d.service, fault.Port)
if err != nil {
return err
}
podFault := fault
podFault.Port = port

command := PodGrpcFaultCommand{
fault: fault,
duration: duration,
options: options,
Expand Down
58 changes: 20 additions & 38 deletions pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,51 @@ import (

"github.com/grafana/xk6-disruptor/pkg/types/intstr"
corev1 "k8s.io/api/core/v1"
k8sintstr "k8s.io/apimachinery/pkg/util/intstr"
)

// getTargetPort returns the ServicePort object that corresponds to the given port searching for a match
// for the port number or port name
func getTargetPort(service corev1.Service, svcPort intstr.IntOrString) (corev1.ServicePort, error) {
// GetTargetPort returns the target port for the fiven service port
func GetTargetPort(service corev1.Service, svcPort intstr.IntOrString) (intstr.IntOrString, error) {
// Handle default port mapping
// TODO: make port required
if svcPort == intstr.NullValue || (svcPort.IsInt() && svcPort.Int32() == 0) {
if len(service.Spec.Ports) > 1 {
return corev1.ServicePort{}, fmt.Errorf("no port selected and service exposes more than one service")
return intstr.NullValue, fmt.Errorf("no port selected and service exposes more than one service")
}
return service.Spec.Ports[0], nil
return intstr.IntOrString(service.Spec.Ports[0].TargetPort.String()), nil
}

for _, p := range service.Spec.Ports {
if p.Port == svcPort.Int32() || p.Name == svcPort.Str() {
return p, nil
if p.Name == svcPort.Str() || (svcPort.IsInt() && p.Port == svcPort.Int32()) {
return intstr.IntOrString(p.TargetPort.String()), nil
}
}
return corev1.ServicePort{}, fmt.Errorf("the service does not expose the given svcPort: %s", svcPort)
}

// MapPort returns the port in the Pod that maps to the given service port
func MapPort(service corev1.Service, port intstr.IntOrString, pod corev1.Pod) (intstr.IntOrString, error) {
svcPort, err := getTargetPort(service, port)
if err != nil {
return intstr.NullValue, err
}
return intstr.NullValue, fmt.Errorf("the service does not expose the given svcPort: %s", svcPort)
}

switch svcPort.TargetPort.Type {
case k8sintstr.String:
// FindPort returns the port in the Pod that maps to the given port by port number or name
func FindPort(port intstr.IntOrString, pod corev1.Pod) (intstr.IntOrString, error) {
switch port.Type() {
case intstr.StringValue:
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == svcPort.TargetPort.StrVal {
return intstr.FromInt32(port.ContainerPort), nil
for _, p := range container.Ports {
if p.Name == port.Str() {
return intstr.FromInt32(p.ContainerPort), nil
}
}
}

case k8sintstr.Int:
case intstr.IntValue:
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == svcPort.TargetPort.IntVal {
return intstr.FromInt32(port.ContainerPort), nil
for _, p := range container.Ports {
if p.ContainerPort == port.Int32() {
return intstr.FromInt32(p.ContainerPort), nil
}
}
}
}

return intstr.NullValue, fmt.Errorf("pod %q does match port %q for service %q", pod.Name, port.Str(), service.Name)
}

// HasPort verifies if a pods listen to the given port
func HasPort(pod corev1.Pod, port intstr.IntOrString) bool {
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if containerPort.ContainerPort == port.Int32() {
return true
}
}
}
return false
return intstr.NullValue, fmt.Errorf("pod %q does exports port %q", pod.Name, port.Str())
}

// HasHostNetwork returns whether a pod has HostNetwork enabled, i.e. it shares the host's network namespace.
Expand Down
Loading

0 comments on commit b08a9e9

Please sign in to comment.