Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add await support for networking.k8s.io/v1 variant of ingress #1795

Merged
merged 10 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## HEAD (Unreleased)
- Add await support for networking.k8s.io/v1 variant of ingress (https://github.com/pulumi/pulumi-kubernetes/pull/1795)

- Schematize overlay types (https://github.com/pulumi/pulumi-kubernetes/pull/1793)

Expand Down
18 changes: 13 additions & 5 deletions provider/pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ const (
coreV1ServiceAccount = "v1/ServiceAccount"
extensionsV1Beta1Deployment = "extensions/v1beta1/Deployment"
extensionsV1Beta1Ingress = "extensions/v1beta1/Ingress"
networkingV1Ingress = "networking.k8s.io/v1/Ingress"
networkingV1Beta1Ingress = "networking.k8s.io/v1beta1/Ingress"
rbacAuthorizationV1ClusterRole = "rbac.authorization.k8s.io/v1/ClusterRole"
rbacAuthorizationV1ClusterRoleBinding = "rbac.authorization.k8s.io/v1/ClusterRoleBinding"
rbacAuthorizationV1Role = "rbac.authorization.k8s.io/v1/Role"
Expand Down Expand Up @@ -150,6 +152,12 @@ var deploymentAwaiter = awaitSpec{
awaitDeletion: untilAppsDeploymentDeleted,
}

var ingressAwaiter = awaitSpec{
awaitCreation: awaitIngressInit,
awaitRead: awaitIngressRead,
awaitUpdate: awaitIngressUpdate,
}

var jobAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return makeJobInitAwaiter(c).Await()
Expand Down Expand Up @@ -224,11 +232,11 @@ var awaiters = map[string]awaitSpec{
awaitCreation: untilCoreV1ServiceAccountInitialized,
},
extensionsV1Beta1Deployment: deploymentAwaiter,
extensionsV1Beta1Ingress: {
awaitCreation: awaitIngressInit,
awaitRead: awaitIngressRead,
awaitUpdate: awaitIngressUpdate,
},

extensionsV1Beta1Ingress: ingressAwaiter,
networkingV1Beta1Ingress: ingressAwaiter,
networkingV1Ingress: ingressAwaiter,

rbacAuthorizationV1ClusterRole: { /* NONE */ },
rbacAuthorizationV1ClusterRoleBinding: { /* NONE */ },
rbacAuthorizationV1Role: { /* NONE */ },
Expand Down
126 changes: 86 additions & 40 deletions provider/pkg/await/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
networkingv1b1 "k8s.io/api/networking/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -217,7 +218,7 @@ func (iia *ingressInitAwaiter) await(
select {
case <-iia.config.ctx.Done():
// On cancel, check one last time if the ingress is ready.
if iia.ingressReady && iia.checkIfEndpointsReady() {
if _, ready := iia.checkIfEndpointsReady(); ready && iia.ingressReady {
return nil
}
return &cancellationError{
Expand All @@ -226,7 +227,7 @@ func (iia *ingressInitAwaiter) await(
}
case <-timeout:
// On timeout, check one last time if the ingress is ready.
if iia.ingressReady && iia.checkIfEndpointsReady() {
if _, ready := iia.checkIfEndpointsReady(); ready && iia.ingressReady {
return nil
}
return &timeoutError{
Expand Down Expand Up @@ -290,64 +291,103 @@ func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) {
}

iia.ingress = ingress
obj, err := decodeIngress(ingress)
if err != nil {

// To the best of my knowledge, this works across all known ingress api version variations.
ingressesRaw, ok := openapi.Pluck(ingress.Object, "status", "loadBalancer", "ingress")
if !ok {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", ingress)
return
}

logger.V(3).Infof("Received status for ingress %q: %#v", inputIngressName, obj.Status)
ingresses, ok := ingressesRaw.([]interface{})
if !ok {
logger.V(3).Infof("Unexpected ingress object structure from unstructured: %#v", ingress)
return
}

// Update status of ingress object so that we can check success.
iia.ingressReady = len(obj.Status.LoadBalancer.Ingress) > 0
iia.ingressReady = len(ingresses) > 0

logger.V(3).Infof("Waiting for ingress %q to update .status.loadBalancer with hostname/IP",
inputIngressName)
}

func decodeIngress(u *unstructured.Unstructured) (*networkingv1b1.Ingress, error) {
func decodeIngress(u *unstructured.Unstructured, to interface{}) error {
b, err := u.MarshalJSON()
if err != nil {
return nil, err
return err
}
var obj networkingv1b1.Ingress
err = json.Unmarshal(b, &obj)
err = json.Unmarshal(b, to)
if err != nil {
return nil, err
return err
}

return &obj, nil
return nil
}

func (iia *ingressInitAwaiter) checkIfEndpointsReady() bool {
obj, err := decodeIngress(iia.ingress)
if err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return false
}
func (iia *ingressInitAwaiter) checkIfEndpointsReady() (string, bool) {
apiVersion := iia.ingress.GetAPIVersion()
switch apiVersion {
case "extensions/v1beta1", "networking.k8s.io/v1beta1":
var obj networkingv1beta1.Ingress

for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return false
if err := decodeIngress(iia.ingress, &obj); err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// Ignore ExternalName services
if iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// Ignore ExternalName services
if path.Backend.ServiceName != "" && iia.knownExternalNameServices.Has(path.Backend.ServiceName) {
continue
}

if path.Backend.ServiceName != "" && !iia.knownEndpointObjects.Has(path.Backend.ServiceName) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.ServiceName)))
return apiVersion, false
}
}
}
case "networking.k8s.io/v1":
var obj networkingv1.Ingress
if err := decodeIngress(iia.ingress, &obj); err != nil {
logger.V(3).Infof("Unable to decode Ingress object from unstructured: %#v", iia.ingress)
return apiVersion, false
}

return false
for _, rule := range obj.Spec.Rules {
if rule.HTTP == nil {
iia.config.logStatus(diag.Error, fmt.Sprintf("expected value %q is unset for ingress: %s",
".spec.rules[*].http", obj.Name))
return apiVersion, false
}
for _, path := range rule.HTTP.Paths {
// TODO: Should we worry about "resource" backends?
if path.Backend.Service == nil {
continue
}

// Ignore ExternalName services
if path.Backend.Service.Name != "" && iia.knownExternalNameServices.Has(path.Backend.Service.Name) {
continue
}

if path.Backend.Service.Name != "" && !iia.knownEndpointObjects.Has(path.Backend.Service.Name) {
iia.config.logStatus(diag.Info, fmt.Sprintf("No matching service found for ingress rule: %s",
expectedIngressPath(rule.Host, path.Path, path.Backend.Service.Name)))
return apiVersion, false
}
}
}
}

return true
return apiVersion, true
}

// expectedIngressPath is a helper to print a useful error message.
Expand Down Expand Up @@ -403,10 +443,15 @@ func (iia *ingressInitAwaiter) processEndpointEvent(event watch.Event, settledCh
func (iia *ingressInitAwaiter) errorMessages() []string {
messages := make([]string, 0)

if !iia.checkIfEndpointsReady() {
messages = append(messages,
if apiVersion, ready := iia.checkIfEndpointsReady(); !ready {
field := ".spec.rules[].http.paths[].backend.serviceName"
switch apiVersion {
case "networking.k8s.io/v1":
field = ".spec.rules[].http.paths[].backend.service.name"
}
messages = append(messages, fmt.Sprintf(
"Ingress has at least one rule that does not target any Service. "+
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service")
"Field '%v' may not match any active Service", field))
}

if !iia.ingressReady {
Expand All @@ -419,11 +464,12 @@ func (iia *ingressInitAwaiter) errorMessages() []string {
}

func (iia *ingressInitAwaiter) checkAndLogStatus() bool {
success := iia.ingressReady && iia.checkIfEndpointsReady()
_, ready := iia.checkIfEndpointsReady()
success := iia.ingressReady && ready
if success {
iia.config.logStatus(diag.Info,
fmt.Sprintf("%sIngress initialization complete", cmdutil.EmojiOr("✅ ", "")))
} else if iia.checkIfEndpointsReady() {
} else if ready {
iia.config.logStatus(diag.Info, "[2/3] Waiting for update of .status.loadBalancer with hostname/IP")
}

Expand Down
68 changes: 67 additions & 1 deletion provider/pkg/await/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ func Test_Extensions_Ingress(t *testing.T) {
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress (networking/v1) is allocated an IP address and all paths match an existing Endpoint",
ingressInput: initializedIngressV1,
do: func(ingresses, services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
// API server passes initialized ingress and endpoint objects back.
ingresses <- watchAddedEvent(initializedIngress("default", "foo", "foo-4setj4y6"))
endpoints <- watchAddedEvent(initializedEndpoint("default", "foo-4setj4y6"))

// Mark endpoint objects as having settled. Success.
settled <- struct{}{}
},
},
{
description: "Should succeed when Ingress is allocated an IP address and path references an ExternalName Service",
ingressInput: initializedIngress,
Expand Down Expand Up @@ -158,6 +170,15 @@ func Test_Extensions_Ingress_Read(t *testing.T) {
"Field '.spec.rules[].http.paths[].backend.serviceName' may not match any active Service",
},
},
{
description: "Read should fail if not all Ingress (networking/v1) paths match existing Endpoints",
ingressInput: ingressInput,
ingress: initializedIngressV1,
expectedSubErrors: []string{
"Ingress has at least one rule that does not target any Service. " +
"Field '.spec.rules[].http.paths[].backend.service.name' may not match any active Service",
},
},
{
description: "Read should succeed when Ingress is allocated an IP address and Service is type ExternalName",
ingressInput: ingressInput,
Expand Down Expand Up @@ -257,7 +278,7 @@ func ingressInput(namespace, name, targetService string) *unstructured.Unstructu

func initializedIngress(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "extensions/v1beta1",
"apiVersion": "networking.k8s.io/v1beta1",
"kind": "Ingress",
"metadata": {
"name": "%s",
Expand Down Expand Up @@ -296,6 +317,51 @@ func initializedIngress(namespace, name, targetService string) *unstructured.Uns
return obj
}

func initializedIngressV1(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "networking.k8s.io/v1",
"kind": "Ingress",
"metadata": {
"name": "%s",
"namespace": "%s"
},
"spec": {
"rules": [
{
"http": {
"paths": [
{
"backend": {
"service": {
"name": "%s",
"port": {
"number": 80
}
}
},
"path": "/nginx"
}
]
}
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"hostname": "localhost"
}
]
}
}
}`, name, namespace, targetService))
if err != nil {
panic(err)
}
return obj
}

func initializedIngressUnspecifiedPath(namespace, name, targetService string) *unstructured.Unstructured {
obj, err := decodeUnstructured(fmt.Sprintf(`{
"apiVersion": "extensions/v1beta1",
Expand Down
4 changes: 2 additions & 2 deletions provider/pkg/clients/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1b1 "k8s.io/api/networking/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,7 +35,7 @@ func FromUnstructured(obj *unstructured.Unstructured) (metav1.Object, error) {
case kinds.Job:
output = new(batchv1.Job)
case kinds.Ingress:
output = new(networkingv1b1.Ingress)
output = new(networkingv1.Ingress)
case kinds.PersistentVolume:
output = new(corev1.PersistentVolume)
case kinds.PersistentVolumeClaim:
Expand Down
1 change: 0 additions & 1 deletion tests/sdk/go/helm-local/step1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
return err
}


return nil
})
}
Loading