diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 66b551ab57b8..6fc6ddd6b660 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -32,6 +32,8 @@ type PNSExecutor struct { podName string namespace string + mu sync.RWMutex + containers map[string]string // container name -> container ID // ctrIDToPid maps a containerID to a process ID @@ -57,6 +59,7 @@ func NewPNSExecutor(clientset *kubernetes.Clientset, podName, namespace string) K8sAPIExecutor: delegate, podName: podName, namespace: namespace, + mu: sync.RWMutex{}, containers: make(map[string]string), ctrIDToPid: make(map[string]int), pidFileHandles: make(map[int]*os.File), @@ -151,8 +154,10 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s log.Info("container PID still unknown (maybe due to short running main container)") err := p.K8sAPIExecutor.Until(ctx, func(pod *corev1.Pod) bool { for _, c := range pod.Status.ContainerStatuses { + p.mu.Lock() containerID := execcommon.GetContainerID(c.ContainerID) p.containers[c.Name] = containerID + p.mu.Unlock() log.Infof("mapped container name %q to container ID %q", c.Name, containerID) } return p.haveContainers(allContainerNames) @@ -212,9 +217,11 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st } } -func (d *PNSExecutor) haveContainers(containerNames []string) bool { +func (p *PNSExecutor) haveContainers(containerNames []string) bool { + p.mu.RLock() + defer p.mu.RUnlock() for _, n := range containerNames { - if d.ctrIDToPid[d.containers[n]] == 0 { + if p.ctrIDToPid[p.containers[n]] == 0 { return false } } @@ -273,6 +280,8 @@ func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriod // getContainerPID returns the pid associated with the container id. Returns error if it was unable // to be determined because no running root processes exist with that container ID func (p *PNSExecutor) getContainerPID(containerName string) (int, error) { + p.mu.RLock() + defer p.mu.RUnlock() containerID, ok := p.containers[containerName] if !ok { return 0, fmt.Errorf("container ID not found for container name %q", containerName) @@ -327,6 +336,8 @@ func (p *PNSExecutor) secureRootFiles() error { if err != nil { return err } + p.mu.Lock() + defer p.mu.Unlock() p.ctrIDToPid[containerID] = pid log.Infof("mapped pid %d to container ID %q", pid, containerID) containerName, err := containerNameForPID(pid)