Skip to content

Commit

Permalink
Fixed deadlock, removed redundant channel on ping process
Browse files Browse the repository at this point in the history
  • Loading branch information
flupec authored and Vasilii Avtaev committed Jun 23, 2021
1 parent f18be8d commit f0d48f7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
18 changes: 9 additions & 9 deletions pkg/fwdport/fwdport.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ type pingingDialer struct {
pingTargetPodName string
}

func (p pingingDialer) stopPing() {
p.pingStopChan <- struct{}{}
}

func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
streamConn, streamProtocolVersion, dialErr := p.wrappedDialer.Dial(protocols...)
if dialErr != nil {
Expand Down Expand Up @@ -196,8 +192,8 @@ func (pfo *PortForwardOpts) PortForward() error {
dialerWithPing := pingingDialer{
wrappedDialer: dialer,
pingPeriod: time.Second * 30,
pingStopChan: make(chan struct{}),
pingTargetPodName: pfo.PodName,
pingStopChan: pfo.ManualStopChan,
pingTargetPodName: fmt.Sprintf("%s:%s", pfo.String(), pfo.PodPort),
}

var address []string
Expand All @@ -217,7 +213,6 @@ func (pfo *PortForwardOpts) PortForward() error {
if err = fw.ForwardPorts(); err != nil {
log.Errorf("ForwardPorts error: %s", err.Error())
pfo.Stop()
dialerWithPing.stopPing()
return err
}

Expand Down Expand Up @@ -345,12 +340,17 @@ func (pfo *PortForwardOpts) AddHosts() {
pfo.HostFile.Unlock()
}

// getBrothersInPodsAmount returns amount of port-forwards that proceeds on different ports under same pod
func (pfo *PortForwardOpts) getBrothersInPodsAmount() int {
return len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String()))
}

// removeHosts removes hosts /etc/hosts
// associated with a forwarded pod
func (pfo *PortForwardOpts) removeHosts() {

// We must not remove hosts entries if port-forwarding on one of the service ports is failing
if pfAmount := len(pfo.ServiceFwd.GetServicePodPortForwards(pfo.String())); pfAmount > 0 {
// We must not remove hosts entries if port-forwarding on one of the service ports is cancelled and others not
if pfo.getBrothersInPodsAmount() > 0 {
return
}

Expand Down
41 changes: 23 additions & 18 deletions pkg/fwdservice/fwdservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,32 +335,31 @@ func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
}
}

// AddServicePod
// AddServicePod adds PortForwardOpts to mapping
func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) {
log.Debugf("ServiceForward: Add %s with %s port", pfo, pfo.PodPort)
svcFwd.NamespaceServiceLock.Lock()
if _, found := svcFwd.PortForwards[pfo.String()]; !found {
portForwardsList := make([]*fwdport.PortForwardOpts, 0)
portForwardsList = append(portForwardsList, pfo)
svcFwd.PortForwards[pfo.String()] = portForwardsList
defer svcFwd.NamespaceServiceLock.Unlock()
if existPortForwards, found := svcFwd.PortForwards[pfo.String()]; !found {
svcFwd.PortForwards[pfo.String()] = []*fwdport.PortForwardOpts{pfo}
} else {
portForwardList := svcFwd.PortForwards[pfo.String()]
if !svcFwd.contains(portForwardList, pfo) {
portForwardList = append(portForwardList, pfo)
if !svcFwd.contains(existPortForwards, pfo) {
existPortForwards = append(existPortForwards, pfo)
svcFwd.PortForwards[pfo.String()] = existPortForwards
}
}
svcFwd.NamespaceServiceLock.Unlock()
}

func (svcFwd *ServiceFWD) contains(portForwards []*fwdport.PortForwardOpts, pfo *fwdport.PortForwardOpts) bool {
for _, pf := range portForwards {
if pfo.String() == pf.String() && pfo.PodPort == pf.PodPort {
if pfo.PodName == pf.PodName && pfo.Service == pf.Service && pfo.PodPort == pf.PodPort {
return true
}
}
return false
}

// ListServicePodPortNames
// ListServicePodNames returns list of keys for mapping
func (svcFwd *ServiceFWD) ListServicePodNames() []string {
svcFwd.NamespaceServiceLock.Lock()
currentPodNames := make([]string, 0, len(svcFwd.PortForwards))
Expand All @@ -379,8 +378,9 @@ func (svcFwd *ServiceFWD) GetServicePodPortForwards(servicePodName string) []*fw

// RemoveServicePod removes all PortForwardOpts from mapping
func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) {
log.Debugf("Removing all pods from serviceFwd by key=%s", servicePodName)
log.Debugf("ServiceForward: Removing all pods by key=%s", servicePodName)
svcFwd.removeServicePodPort(servicePodName, svcFwd.allMatch)
log.Debugf("ServiceForward: Done removing all pods by key=%s", servicePodName)
}

func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool {
Expand All @@ -389,32 +389,37 @@ func (svcFwd *ServiceFWD) allMatch(_ *fwdport.PortForwardOpts) bool {

// removeServicePodPort removes PortForwardOpts from mapping according to filter function
func (svcFwd *ServiceFWD) removeServicePodPort(servicePodName string, filter func(pfo *fwdport.PortForwardOpts) bool) {
svcFwd.NamespaceServiceLock.Lock()
defer svcFwd.NamespaceServiceLock.Unlock()
if pods, found := svcFwd.PortForwards[servicePodName]; found {
stay := make([]*fwdport.PortForwardOpts, 0, len(pods))
stay := make([]*fwdport.PortForwardOpts, 0)
for _, pod := range pods {
if filter(pod) {
pod.Stop()
<-pod.DoneChan
defer svcFwd.stop(pod)
} else {
stay = append(stay, pod)
}
}
svcFwd.NamespaceServiceLock.Lock()
if len(stay) == 0 {
delete(svcFwd.PortForwards, servicePodName)
} else {
svcFwd.PortForwards[servicePodName] = stay
}
svcFwd.NamespaceServiceLock.Unlock()
}
}

func (svcFwd *ServiceFWD) stop(pfo *fwdport.PortForwardOpts) {
pfo.Stop()
<-pfo.DoneChan
}

// RemoveServicePodByPort removes PortForwardOpts from mapping by specified pod port
func (svcFwd *ServiceFWD) RemoveServicePodByPort(servicePodName string, podPort string) {
log.Debugf("Removing all pods from serviceFwd by key=%s and port=%s", servicePodName, podPort)
log.Debugf("ServiceForward: Removing all pods by key=%s and port=%s", servicePodName, podPort)
svcFwd.removeServicePodPort(servicePodName, func(pfo *fwdport.PortForwardOpts) bool {
return pfo.PodPort == podPort
})
log.Debugf("ServiceForward: Done removing all pods by key=%s and port=%s", servicePodName, podPort)
}

func portSearch(portName string, containers []v1.Container) (string, bool) {
Expand Down

0 comments on commit f0d48f7

Please sign in to comment.