From f0d48f78b62800b9d7b8d0f9c6469f3912f47e58 Mon Sep 17 00:00:00 2001 From: flupec Date: Wed, 23 Jun 2021 14:43:32 +0400 Subject: [PATCH] Fixed deadlock, removed redundant channel on ping process --- pkg/fwdport/fwdport.go | 18 ++++++++-------- pkg/fwdservice/fwdservice.go | 41 ++++++++++++++++++++---------------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/pkg/fwdport/fwdport.go b/pkg/fwdport/fwdport.go index 2dae885d..968467a0 100644 --- a/pkg/fwdport/fwdport.go +++ b/pkg/fwdport/fwdport.go @@ -99,10 +99,6 @@ type pingingDialer struct { pingTargetPodName string } -func (p pingingDialer) stopPing() { - p.pingStopChan <- struct{}{} -} - func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { streamConn, streamProtocolVersion, dialErr := p.wrappedDialer.Dial(protocols...) if dialErr != nil { @@ -196,8 +192,8 @@ func (pfo *PortForwardOpts) PortForward() error { dialerWithPing := pingingDialer{ wrappedDialer: dialer, pingPeriod: time.Second * 30, - pingStopChan: make(chan struct{}), - pingTargetPodName: pfo.PodName, + pingStopChan: pfo.ManualStopChan, + pingTargetPodName: fmt.Sprintf("%s:%s", pfo.String(), pfo.PodPort), } var address []string @@ -217,7 +213,6 @@ func (pfo *PortForwardOpts) PortForward() error { if err = fw.ForwardPorts(); err != nil { log.Errorf("ForwardPorts error: %s", err.Error()) pfo.Stop() - dialerWithPing.stopPing() return err } @@ -345,12 +340,17 @@ func (pfo *PortForwardOpts) AddHosts() { pfo.HostFile.Unlock() } +// getBrothersInPodsAmount returns amount of port-forwards that proceeds on different ports under same pod +func (pfo *PortForwardOpts) getBrothersInPodsAmount() int { + return len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String())) +} + // removeHosts removes hosts /etc/hosts // associated with a forwarded pod func (pfo *PortForwardOpts) removeHosts() { - // We must not remove hosts entries if port-forwarding on one of the service ports is failing - if pfAmount := len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String())); pfAmount > 0 { + // We must not remove hosts entries if port-forwarding on one of the service ports is cancelled and others not + if pfo.getBrothersInPodsAmount() > 0 { return } diff --git a/pkg/fwdservice/fwdservice.go b/pkg/fwdservice/fwdservice.go index 837445d3..230e75e2 100644 --- a/pkg/fwdservice/fwdservice.go +++ b/pkg/fwdservice/fwdservice.go @@ -335,32 +335,31 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost } } -// AddServicePod +// AddServicePod adds PortForwardOpts to mapping func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) { + log.Debugf("ServiceForward: Add %s with %s port", pfo, pfo.PodPort) svcFwd.NamespaceServiceLock.Lock() - if _, found := svcFwd.PortForwards[pfo.String()]; !found { - portForwardsList := make([]*fwdport.PortForwardOpts, 0) - portForwardsList = append(portForwardsList, pfo) - svcFwd.PortForwards[pfo.String()] = portForwardsList + defer svcFwd.NamespaceServiceLock.Unlock() + if existPortForwards, found := svcFwd.PortForwards[pfo.String()]; !found { + svcFwd.PortForwards[pfo.String()] = []*fwdport.PortForwardOpts{pfo} } else { - portForwardList := svcFwd.PortForwards[pfo.String()] - if !svcFwd.contains(portForwardList, pfo) { - portForwardList = append(portForwardList, pfo) + if !svcFwd.contains(existPortForwards, pfo) { + existPortForwards = append(existPortForwards, pfo) + svcFwd.PortForwards[pfo.String()] = existPortForwards } } - svcFwd.NamespaceServiceLock.Unlock() } func (svcFwd *ServiceFWD) contains(portForwards []*fwdport.PortForwardOpts, pfo *fwdport.PortForwardOpts) bool { for _, pf := range portForwards { - if pfo.String() == pf.String() && pfo.PodPort == pf.PodPort { + if pfo.PodName == pf.PodName && pfo.Service == pf.Service && pfo.PodPort == pf.PodPort { return true } } return false } -// ListServicePodPortNames +// ListServicePodNames returns list of keys for mapping func (svcFwd *ServiceFWD) ListServicePodNames() []string { svcFwd.NamespaceServiceLock.Lock() currentPodNames := make([]string, 0, len(svcFwd.PortForwards)) @@ -379,8 +378,9 @@ func (svcFwd *ServiceFWD) GetServicePodPortForwards(servicePodName string) []*fw // RemoveServicePod removes all PortForwardOpts from mapping func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) { - log.Debugf("Removing all pods from serviceFwd by key=%s", servicePodName) + log.Debugf("ServiceForward: Removing all pods by key=%s", servicePodName) svcFwd.removeServicePodPort(servicePodName, svcFwd.allMatch) + log.Debugf("ServiceForward: Done removing all pods by key=%s", servicePodName) } func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool { @@ -389,32 +389,37 @@ func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool { // removeServicePodPort removes PortForwardOpts from mapping according to filter function func (svcFwd *ServiceFWD) removeServicePodPort(servicePodName string, filter func(pfo *fwdport.PortForwardOpts) bool) { - svcFwd.NamespaceServiceLock.Lock() - defer svcFwd.NamespaceServiceLock.Unlock() if pods, found := svcFwd.PortForwards[servicePodName]; found { - stay := make([]*fwdport.PortForwardOpts, 0, len(pods)) + stay := make([]*fwdport.PortForwardOpts, 0) for _, pod := range pods { if filter(pod) { - pod.Stop() - <-pod.DoneChan + defer svcFwd.stop(pod) } else { stay = append(stay, pod) } } + svcFwd.NamespaceServiceLock.Lock() if len(stay) == 0 { delete(svcFwd.PortForwards, servicePodName) } else { svcFwd.PortForwards[servicePodName] = stay } + svcFwd.NamespaceServiceLock.Unlock() } } +func (svcFwd *ServiceFWD) stop(pfo *fwdport.PortForwardOpts) { + pfo.Stop() + <-pfo.DoneChan +} + // RemoveServicePodByPort removes PortForwardOpts from mapping by specified pod port func (svcFwd *ServiceFWD) RemoveServicePodByPort(servicePodName string, podPort string) { - log.Debugf("Removing all pods from serviceFwd by key=%s and port=%s", servicePodName, podPort) + log.Debugf("ServiceForward: Removing all pods by key=%s and port=%s", servicePodName, podPort) svcFwd.removeServicePodPort(servicePodName, func(pfo *fwdport.PortForwardOpts) bool { return pfo.PodPort == podPort }) + log.Debugf("ServiceForward: Done removing all pods by key=%s and port=%s", servicePodName, podPort) } func portSearch(portName string, containers []v1.Container) (string, bool) {