Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: nodePort #2704 - publish ready endpoints #3447

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,30 @@ func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname boo
return targets
}

func isPodStatusReady(status v1.PodStatus) bool {
_, condition := getPodCondition(&status, v1.PodReady)
return condition != nil && condition.Status == v1.ConditionTrue
}

func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if status == nil {
return -1, nil
}
return getPodConditionFromList(status.Conditions, conditionType)
}

func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}

func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
var (
internalIPs endpoint.Targets
Expand All @@ -615,19 +639,42 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe
return nil, err
}

var nodesReady []*v1.Node
var nodesRunning []*v1.Node
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}

if _, ok := nodesMap[node]; !ok {
nodesMap[node] = *new(struct{})
nodes = append(nodes, node)
nodesRunning = append(nodesRunning, node)

if isPodStatusReady(v.Status) {
nodesReady = append(nodesReady, node)
// Check pod not terminating
if v.GetDeletionTimestamp() == nil {
nefelim4ag marked this conversation as resolved.
Show resolved Hide resolved
nodes = append(nodes, node)
}
}
}
}
}

if len(nodes) > 0 {
// Works same as service endpoints
} else if len(nodesReady) > 0 {
// 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
// Publish all endpoints not always a bad thing
log.Debugf("All pods in terminating state, use ready")
nodes = nodesReady
nefelim4ag marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.Debugf("All pods not ready, use all running")
nodes = nodesRunning
nefelim4ag marked this conversation as resolved.
Show resolved Hide resolved
}
default:
nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
Expand Down
110 changes: 110 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,9 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames []string
nodeIndex []int
phases []v1.PodPhase
conditions []v1.PodCondition
labelSelector labels.Selector
deletionTimestamp []*metav1.Time
}{
{
title: "annotated NodePort services return an endpoint with IP addresses of the cluster's nodes",
Expand Down Expand Up @@ -1817,6 +1819,8 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames: []string{"pod-0"},
nodeIndex: []int{1},
phases: []v1.PodPhase{v1.PodRunning},
conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}},
deletionTimestamp: []*metav1.Time{{}},
},
{
title: "annotated NodePort services with ExternalTrafficPolicy=Local and multiple pods on a single node return an endpoint with unique IP addresses of the cluster's nodes where pods is running only",
Expand Down Expand Up @@ -1859,6 +1863,110 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames: []string{"pod-0", "pod-1"},
nodeIndex: []int{1, 1},
phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning},
conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionFalse},
{Type: v1.PodReady, Status: v1.ConditionFalse},
},
deletionTimestamp: []*metav1.Time{{},{}},
},
{
title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state",
svcNamespace: "testing",
svcName: "foo",
svcType: v1.ServiceTypeNodePort,
svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
labels: map[string]string{},
annotations: map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
expected: []*endpoint.Endpoint{
{DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA},
},
nodes: []*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
podNames: []string{"pod-0", "pod-1"},
nodeIndex: []int{0, 1},
phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning},
conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: v1.PodReady, Status: v1.ConditionFalse},
},
deletionTimestamp: []*metav1.Time{{},{}},
},
{
title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state & not in Terminating",
svcNamespace: "testing",
svcName: "foo",
svcType: v1.ServiceTypeNodePort,
svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
labels: map[string]string{},
annotations: map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
expected: []*endpoint.Endpoint{
{DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA},
},
nodes: []*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.3"},
{Type: v1.NodeInternalIP, Address: "10.0.1.3"},
},
},
}},
podNames: []string{"pod-0", "pod-1", "pod-2"},
nodeIndex: []int{0, 1, 2},
phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning, v1.PodRunning},
conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: v1.PodReady, Status: v1.ConditionFalse},
{Type: v1.PodReady, Status: v1.ConditionTrue},
},
deletionTimestamp: []*metav1.Time{nil, nil, {}},
},
{
title: "access=private annotation NodePort services return an endpoint with private IP addresses of the cluster's nodes",
Expand Down Expand Up @@ -2150,9 +2258,11 @@ func TestServiceSourceNodePortServices(t *testing.T) {
Name: podname,
Labels: tc.labels,
Annotations: tc.annotations,
DeletionTimestamp: tc.deletionTimestamp[i],
},
Status: v1.PodStatus{
Phase: tc.phases[i],
Conditions: []v1.PodCondition{tc.conditions[i]},
},
}

Expand Down
Loading