Skip to content

Commit

Permalink
Fix endless await for services without selectors/endpoints (#3142)
Browse files Browse the repository at this point in the history
Our current await logic for services expects the service to have _some_
endpoints, and all of those endpoints must be ready. This doesn't allow
for the valid case where the service doesn't have any endpoints, and in
those cases we will end up waiting forever.

This changes our logic to instead expect 0 non-ready endpoints. This is
equivalent to the current behavior when there are non-zero endpoints
while also allowing the zero-endpoint case.

We also short-circuit the endpoint logic when the service has no
selector, since the endpoint will need to be manually configured.

Fixes #605.
Fixes #799.
Refs #2824.
  • Loading branch information
blampe authored Aug 13, 2024
1 parent 5d981f9 commit 3fb2976
Show file tree
Hide file tree
Showing 26 changed files with 372 additions and 360 deletions.
16 changes: 0 additions & 16 deletions .github/dependabot.yml

This file was deleted.

7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@

- Updated Kubernetes schemas and libraries to v1.31.0. (https://github.com/pulumi/pulumi-kubernetes/pull/3144)

### Fixed

- `Services` with selectors targeting 0 `Pods` will no longer hang indefinitely.
(https://github.com/pulumi/pulumi-kubernetes/issues/605)
- `Services` without selectors will no longer hang indefinitely.
(https://github.com/pulumi/pulumi-kubernetes/issues/799)

## 4.16.0 (August 7, 2024)

### Added
Expand Down
8 changes: 4 additions & 4 deletions provider/cmd/pulumi-resource-kubernetes/schema.json

Large diffs are not rendered by default.

54 changes: 33 additions & 21 deletions provider/pkg/await/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -83,7 +84,8 @@ type serviceInitAwaiter struct {
config awaitConfig
service *unstructured.Unstructured
serviceReady bool
endpointsReady bool
endpointsReady bool // True if the service has at least 1 ready endpoint.
endpointsPending bool // True if any endpoints are non-ready.
endpointsSettled bool
serviceType string
}
Expand All @@ -103,6 +105,7 @@ func makeServiceInitAwaiter(c awaitConfig) *serviceInitAwaiter {
service: c.currentOutputs,
serviceReady: false,
endpointsReady: false,
endpointsPending: false,
endpointsSettled: false,
serviceType: t,
}
Expand Down Expand Up @@ -240,7 +243,7 @@ func (sia *serviceInitAwaiter) await(
select {
case <-sia.config.ctx.Done():
// On cancel, check one last time if the service is ready.
if sia.serviceReady && sia.endpointsReady {
if sia.serviceReady && !sia.endpointsPending {
return nil
}
return &cancellationError{
Expand All @@ -249,7 +252,7 @@ func (sia *serviceInitAwaiter) await(
}
case <-timeout:
// On timeout, check one last time if the service is ready.
if sia.serviceReady && sia.endpointsReady {
if sia.serviceReady && !sia.endpointsPending {
return nil
}
return &timeoutError{
Expand Down Expand Up @@ -314,33 +317,25 @@ func (sia *serviceInitAwaiter) processEndpointEvent(event watch.Event, settledCh
inputServiceName := sia.config.currentOutputs.GetName()

// Get endpoint object.
endpoint, isUnstructured := event.Object.(*unstructured.Unstructured)
obj, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("Endpoint watch received unknown object type %q",
reflect.TypeOf(endpoint))
reflect.TypeOf(obj))
return
}

// Ignore if it's not one of the endpoint objects created by the service.
//
// NOTE: Because the client pool is per-namespace, the endpointName can be used as an
// ID, as it's guaranteed by the API server to be unique.
if endpoint.GetName() != inputServiceName {
if obj.GetName() != inputServiceName {
return
}

// Start over, prove that service is ready.
sia.endpointsReady = false

// Update status of endpoint objects so we can check success.
if event.Type == watch.Added || event.Type == watch.Modified {
subsets, hasTargets := openapi.Pluck(endpoint.Object, "subsets")
targets, targetsIsSlice := subsets.([]any)
endpointTargetsPod := hasTargets && targetsIsSlice && len(targets) > 0

sia.endpointsReady = endpointTargetsPod
} else if event.Type == watch.Deleted {
sia.endpointsReady = false
var endpoints corev1.Endpoints
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &endpoints)
if err != nil {
return
}

// Every time we get an update to one of our endpoints objects, give it a few seconds
Expand All @@ -350,6 +345,19 @@ func (sia *serviceInitAwaiter) processEndpointEvent(event watch.Event, settledCh
time.Sleep(10 * time.Second)
settledCh <- struct{}{}
}()

// Start over, prove that the service is ready.
sia.endpointsReady = false
sia.endpointsPending = false

if event.Type == watch.Deleted {
return
}

for _, subset := range endpoints.Subsets {
sia.endpointsReady = sia.endpointsReady || len(subset.Addresses) > 0
sia.endpointsPending = sia.endpointsPending || len(subset.NotReadyAddresses) > 0
}
}

func (sia *serviceInitAwaiter) errorMessages() []string {
Expand Down Expand Up @@ -383,11 +391,15 @@ func (sia *serviceInitAwaiter) isExternalNameService() bool {
return sia.serviceType == string(v1.ServiceTypeExternalName)
}

func (sia *serviceInitAwaiter) isWithoutSelector() bool {
selector, _, _ := unstructured.NestedMap(sia.service.Object, "spec", "selector")
return len(selector) == 0
}

// shouldWaitForPods determines whether to wait for Pods to be ready before marking the Service ready.
func (sia *serviceInitAwaiter) shouldWaitForPods() bool {
// For these special cases, skip the wait for Pod logic.
if sia.isExternalNameService() || sia.isHeadlessService() {
sia.endpointsReady = true
if sia.isExternalNameService() || sia.isHeadlessService() || sia.isWithoutSelector() {
return false
}

Expand All @@ -399,7 +411,7 @@ func (sia *serviceInitAwaiter) checkAndLogStatus() bool {
return sia.serviceReady
}

success := sia.serviceReady && sia.endpointsSettled && sia.endpointsReady
success := sia.serviceReady && sia.endpointsSettled && !sia.endpointsPending
if success {
sia.config.logger.LogStatus(diag.Info,
fmt.Sprintf("%sService initialization complete", cmdutil.EmojiOr("✅ ", "")))
Expand Down
127 changes: 105 additions & 22 deletions provider/pkg/await/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func Test_Core_Service(t *testing.T) {
subErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods",
"Service was not allocated an IP address; does your cloud provider support this?"}},
"Service was not allocated an IP address; does your cloud provider support this?",
},
},
},
{
description: "Should succeed when unrelated Service fails",
Expand Down Expand Up @@ -104,7 +106,22 @@ func Test_Core_Service(t *testing.T) {
subErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods",
"Service was not allocated an IP address; does your cloud provider support this?"}},
"Service was not allocated an IP address; does your cloud provider support this?",
},
},
},
{
description: "Should succeed with no endpoints",
serviceInput: serviceInput,
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized service back.
services <- watchAddedEvent(initializedService("default", "foo-4setj4y6"))

// Pass uninitialized endpoint objects. Mark them as settled.
endpoints <- watchAddedEvent(
emptyEndpoint("default", "foo-4setj4y6"))
settled <- struct{}{}
},
},
{
description: "Should fail if Endpoints have not initialized",
Expand All @@ -125,7 +142,9 @@ func Test_Core_Service(t *testing.T) {
object: initializedService("default", "foo-4setj4y6"),
subErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods"}},
"field '.spec.selector' may not match labels on any Pods",
},
},
},
{
description: "Should fail if Service is not allocated an IP address",
Expand All @@ -146,7 +165,8 @@ func Test_Core_Service(t *testing.T) {
object: serviceInput("default", "foo-4setj4y6"),
subErrors: []string{
"Service was not allocated an IP address; does your cloud provider support this?",
}},
},
},
},

{
Expand Down Expand Up @@ -212,6 +232,13 @@ func Test_Core_Service(t *testing.T) {
timeout <- time.Now()
},
},
{
description: "Should succeed if no selector",
serviceInput: serviceWithoutSelector,
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
services <- watchAddedEvent(serviceWithoutSelector("default", "foo-4setj4y6"))
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -245,7 +272,8 @@ func Test_Core_Service_Read(t *testing.T) {
endpoint: uninitializedEndpoint,
expectedSubErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods"},
"field '.spec.selector' may not match labels on any Pods",
},
},
{
description: "Read should succeed if Service does target Pods",
Expand Down Expand Up @@ -287,21 +315,23 @@ func Test_Core_Service_Read(t *testing.T) {
}

for _, test := range tests {
awaiter := makeServiceInitAwaiter(mockAwaitConfig(test.serviceInput("default", "foo-4setj4y6")))
service := test.service("default", "foo-4setj4y6")

var err error
if test.endpoint != nil {
endpoint := test.endpoint("default", "foo-4setj4y6")
err = awaiter.read(service, unstructuredList(*endpoint), test.version)
} else {
err = awaiter.read(service, unstructuredList(), test.version)
}
if test.expectedSubErrors != nil {
assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description)
} else {
assert.Nil(t, err, test.description)
}
t.Run(test.description, func(t *testing.T) {
awaiter := makeServiceInitAwaiter(mockAwaitConfig(test.serviceInput("default", "foo-4setj4y6")))
service := test.service("default", "foo-4setj4y6")

var err error
if test.endpoint != nil {
endpoint := test.endpoint("default", "foo-4setj4y6")
err = awaiter.read(service, unstructuredList(*endpoint), test.version)
} else {
err = awaiter.read(service, unstructuredList(), test.version)
}
if test.expectedSubErrors != nil {
assert.Equal(t, test.expectedSubErrors, err.(*initializationError).SubErrors(), test.description)
} else {
assert.Nil(t, err, test.description)
}
})
}
}

Expand Down Expand Up @@ -393,6 +423,44 @@ func initializedService(namespace, name string) *unstructured.Unstructured {
return obj
}

func serviceWithoutSelector(namespace, name string) *unstructured.Unstructured {
obj, _ := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "%s",
"namespace": "%s"
},
"spec": {
"ports": [
{
"port": 6379
}
]
}
}`, name, namespace))
return obj
}

func emptyEndpoint(namespace, name string) *unstructured.Unstructured {
obj, err := decodeUnstructured(
fmt.Sprintf(`{
"apiVersion": "v1",
"kind": "Endpoints",
"metadata": {
"labels": {
"app": "%s"
},
"name": "%s",
"namespace": "%s"
}
}`, name, name, namespace))
if err != nil {
panic(err)
}
return obj
}

func uninitializedEndpoint(namespace, name string) *unstructured.Unstructured {
obj, err := decodeUnstructured(
fmt.Sprintf(`{
Expand All @@ -405,7 +473,22 @@ func uninitializedEndpoint(namespace, name string) *unstructured.Unstructured {
"name": "%s",
"namespace": "%s"
},
"subsets": null
"subsets": [
{
"notReadyAddresses": [
{
"ip": "35.192.99.34"
}
],
"ports": [
{
"name": "https",
"port": 443,
"protocol": "TCP"
}
]
}
]
}`, name, name, namespace))
if err != nil {
panic(err)
Expand Down Expand Up @@ -496,7 +579,6 @@ func headlessEmptyServiceInput(namespace, name string) *unstructured.Unstructure
panic(err)
}
return obj

}

func headlessEmptyServiceOutput1(namespace, name string) *unstructured.Unstructured {
Expand Down Expand Up @@ -620,6 +702,7 @@ func headlessNonemptyServiceOutput(namespace, name string) *unstructured.Unstruc
}
return obj
}

func unstructuredList(us ...unstructured.Unstructured) *unstructured.UnstructuredList {
return &unstructured.UnstructuredList{Items: us}
}
14 changes: 3 additions & 11 deletions provider/pkg/gen/additionalComments.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,10 @@ Or (for Jobs): The Pod succeeded ('.status.phase' set to "Succeeded").
1. Service object exists.
2. Related Endpoint objects are created. Each time we get an update, wait 10 seconds
for any stragglers.
3. The endpoints objects target some number of living objects (unless the Service is
an "empty headless" Service [1] or a Service with '.spec.type: ExternalName').
3. There are no "not ready" endpoints -- unless the Service is an "empty
headless" Service [1], a Service with '.spec.type: ExternalName', or a Service
without a selector.
4. External IP address is allocated (if Service has '.spec.type: LoadBalancer').
Known limitations:
Services targeting ReplicaSets (and, by extension, Deployments,
StatefulSets, etc.) with '.spec.replicas' set to 0 are not handled, and will time
out. To work around this limitation, set 'pulumi.com/skipAwait: "true"' on
'.metadata.annotations' for the Service. Work to handle this case is in progress [2].
[1] https://kubernetes.io/docs/concepts/services-networking/service/#headless-services
[2] https://github.com/pulumi/pulumi-kubernetes/pull/703
`
case kinds.StatefulSet:
comment += `
Expand Down
Loading

0 comments on commit 3fb2976

Please sign in to comment.