Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
lblackstone committed Jan 16, 2020
1 parent e88d24e commit cffa7a6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 61 deletions.
47 changes: 25 additions & 22 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,8 @@ func (k *kubeProvider) Invoke(ctx context.Context,
req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {

// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Rather than returning an error here, any invoke logic must
// not assume that a cluster is accessible.
if k.clusterUnreachable {
glog.V(3).Infof(
"configured Kubernetes cluster is unreachable. Invoke call logic may operate in a degraded state.")
}
// inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check
// k.clusterUnreachable and handle that condition appropriately.

// Always fail.
tok := req.GetTok()
Expand All @@ -404,13 +400,8 @@ func (k *kubeProvider) StreamInvoke(
req *pulumirpc.InvokeRequest, server pulumirpc.ResourceProvider_StreamInvokeServer) error {

// Important: Some invoke logic is intended to run during preview, and the Kubernetes provider
// inputs may not have resolved yet. Rather than returning an error here, any invoke logic must
// not assume that a cluster is accessible.
if k.clusterUnreachable {
glog.V(3).Infof(
"configured Kubernetes cluster is unreachable. StreamInvoke call logic may operate " +
"in a degraded state.")
}
// inputs may not have resolved yet. Any invoke logic that depends on an active cluster must check
// k.clusterUnreachable and handle that condition appropriately.

// Unmarshal arguments.
tok := req.GetTok()
Expand Down Expand Up @@ -440,13 +431,17 @@ func (k *kubeProvider) StreamInvoke(
// expected to never terminate, and users of the various SDKs need a way to tell the
// provider to stop streaming and reclaim the resources associated with the stream.
//
// Still, we implement this cancellation also for `list`, primarily for coompleteness. We'd
// Still, we implement this cancellation also for `list`, primarily for completeness. We'd
// like to avoid an unpleasant and non-actionable error that would appear on a `Send` on a
// client that is no longer accepting requests. This also helps to guard against the
// possibility that some dark corner of gRPC signals cancellation by accident, e.g., during
// shutdown.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := ""
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -537,6 +532,10 @@ func (k *kubeProvider) StreamInvoke(
// Set up resource watcher.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := ""
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -617,6 +616,10 @@ func (k *kubeProvider) StreamInvoke(
// Set up log stream for Pod.
//

if k.clusterUnreachable {
return fmt.Errorf("configured Kubernetes cluster is unreachable")
}

namespace := "default"
if args["namespace"].HasValue() {
namespace = args["namespace"].StringValue()
Expand Down Expand Up @@ -1136,15 +1139,15 @@ func (k *kubeProvider) Create(
// {inputs: {...}, live: {...}}. This is important both for `Diff` and for `Update`. See
// comments in those methods for details.
//
urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Create(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Create requires a connection to a k8s cluster, so bail out immediately if it is unreachable.
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Create(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Parse inputs
newResInputs, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.properties", label),
Expand Down Expand Up @@ -1474,15 +1477,15 @@ func (k *kubeProvider) Update(
// discovery client is completely dynamic.)
// - [ ] Support server-side apply, when it comes out.
//
urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Update(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Update requires a connection to a k8s cluster, so bail out immediately if it is unreachable.
if k.clusterUnreachable {
return nil, fmt.Errorf("configured Kubernetes cluster is unreachable")
}

urn := resource.URN(req.GetUrn())
label := fmt.Sprintf("%s.Update(%s)", k.label(), urn)
glog.V(9).Infof("%s executing", label)

// Obtain old properties, create a Kubernetes `unstructured.Unstructured`.
oldState, err := plugin.UnmarshalProperties(req.GetOlds(), plugin.MarshalOptions{
Label: fmt.Sprintf("%s.olds", label), KeepUnknowns: true, SkipNulls: true, KeepSecrets: true,
Expand Down
7 changes: 0 additions & 7 deletions tests/integration/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,5 @@ func TestProvider(t *testing.T) {
assert.NotEqual(t, providerNsName.(string), namespacedPodNamespace.(string))
assert.Equal(t, ns2Name.(string), namespacedPodNamespace.(string))
},
EditDirs: []integration.EditDir{
{
Dir: "step2",
Additive: false,
ExpectFailure: true,
},
},
})
}
32 changes: 0 additions & 32 deletions tests/integration/provider/step2/index.ts

This file was deleted.

0 comments on commit cffa7a6

Please sign in to comment.