Skip to content

Commit

Permalink
fix(executor): Fix concurrency error in PNS executor. Fixes #5250 (#5258
Browse files Browse the repository at this point in the history
)

* fix(executor): Fix concurrency error in PNS executor. Fixes #5250

Signed-off-by: Alex Collins <alex_collins@intuit.com>

* fix(executor): Fix concurrency error in PNS executor. Fixes #5250

Signed-off-by: Alex Collins <alex_collins@intuit.com>

* Revert "fix(executor): Fix concurrency error in PNS executor. Fixes #5250"

This reverts commit aa390c7.

Signed-off-by: Alex Collins <alex_collins@intuit.com>

* fix(executor): Fix concurrency error in PNS executor. Fixes #5250

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored Mar 2, 2021
1 parent 9b538d9 commit 69c40c0
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type PNSExecutor struct {
podName string
namespace string

mu sync.RWMutex

containers map[string]string // container name -> container ID

// ctrIDToPid maps a containerID to a process ID
Expand All @@ -57,6 +59,7 @@ func NewPNSExecutor(clientset *kubernetes.Clientset, podName, namespace string)
K8sAPIExecutor: delegate,
podName: podName,
namespace: namespace,
mu: sync.RWMutex{},
containers: make(map[string]string),
ctrIDToPid: make(map[string]int),
pidFileHandles: make(map[int]*os.File),
Expand Down Expand Up @@ -151,8 +154,10 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s
log.Info("container PID still unknown (maybe due to short running main container)")
err := p.K8sAPIExecutor.Until(ctx, func(pod *corev1.Pod) bool {
for _, c := range pod.Status.ContainerStatuses {
p.mu.Lock()
containerID := execcommon.GetContainerID(c.ContainerID)
p.containers[c.Name] = containerID
p.mu.Unlock()
log.Infof("mapped container name %q to container ID %q", c.Name, containerID)
}
return p.haveContainers(allContainerNames)
Expand Down Expand Up @@ -212,9 +217,11 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st
}
}

func (d *PNSExecutor) haveContainers(containerNames []string) bool {
func (p *PNSExecutor) haveContainers(containerNames []string) bool {
p.mu.RLock()
defer p.mu.RUnlock()
for _, n := range containerNames {
if d.ctrIDToPid[d.containers[n]] == 0 {
if p.ctrIDToPid[p.containers[n]] == 0 {
return false
}
}
Expand Down Expand Up @@ -273,6 +280,8 @@ func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriod
// getContainerPID returns the pid associated with the container id. Returns error if it was unable
// to be determined because no running root processes exist with that container ID
func (p *PNSExecutor) getContainerPID(containerName string) (int, error) {
p.mu.RLock()
defer p.mu.RUnlock()
containerID, ok := p.containers[containerName]
if !ok {
return 0, fmt.Errorf("container ID not found for container name %q", containerName)
Expand Down Expand Up @@ -327,6 +336,8 @@ func (p *PNSExecutor) secureRootFiles() error {
if err != nil {
return err
}
p.mu.Lock()
defer p.mu.Unlock()
p.ctrIDToPid[containerID] = pid
log.Infof("mapped pid %d to container ID %q", pid, containerID)
containerName, err := containerNameForPID(pid)
Expand Down

0 comments on commit 69c40c0

Please sign in to comment.