From e85e03360ab8b1657d47016b4d38365ebb30e077 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Tue, 17 Apr 2018 17:06:21 -0700 Subject: [PATCH] Implement core of pure client-go-based Kubernetes provider This commit will introduce several important pieces that we will need to implement the gRPC provider interface: - [x] A discovery client, which "discovers" which version of the Kubernetes API a running instance of the API server supports and dynamically configures itself to use that API. This means that this one Kubernetes provider will work for (we believe) all versions of Kubernetes that support OpenAPI (rather than having one client per version, as is true of Terraform). It also frees us to use regular Kubernetes API objects, freeing us of the need to map "our" types to the underlying Kubernetes API (again, as is the case with Terraform. - [x] Simple utilities for creating pools of Kubernetes clients, parsing and creating canonical names for resources, and so on. - [x] A small validation library, which allows us to dynamically see which version of the Kubernetes API a particular API server supports, and then validate that some API object against it. In particular, this includes all Kubernetes API objects, even CRDs (which, again, is not supported by Terraform). - [x] A stub gRPC server. (This panics when you call it but we will fix that soon enough.) - [x] A simple build system that makes it easy to propagate version information into the client code. - [ ] Implementations of the gRPC server's core functions (e.g., `Update`, etc.) - [ ] Tests. Once we've finished bootstrapping we will port the tests for `pulumi/pulumi-kubernetes` to the new provider. Follow up commits will address the last, unfinished bullet points. --- Makefile | 53 +--- cmd/pulumi-resource-kubernetes/main.go | 9 +- cmd/pulumi-tfgen-kubernetes/main.go | 13 - pkg/provider/client.go | 245 ++++++++++++++++++ pkg/provider/openapi.go | 338 +++++++++++++++++++++++++ pkg/provider/provider.go | 119 +++++++++ pkg/provider/serve.go | 35 +++ resources.go | 101 -------- 8 files changed, 745 insertions(+), 168 deletions(-) delete mode 100644 cmd/pulumi-tfgen-kubernetes/main.go create mode 100644 pkg/provider/client.go create mode 100644 pkg/provider/openapi.go create mode 100644 pkg/provider/provider.go create mode 100644 pkg/provider/serve.go delete mode 100644 resources.go diff --git a/Makefile b/Makefile index 72be940cd2..21f3921dad 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PROJECT_NAME := Kubernetes Package +PROJECT_NAME := Pulumi Kubernetes Resource Provider include build/common.mk PACK := kubernetes @@ -6,63 +6,18 @@ PACKDIR := pack PROJECT := github.com/pulumi/pulumi-kubernetes NODE_MODULE_NAME := @pulumi/kubernetes -TFGEN := pulumi-tfgen-${PACK} PROVIDER := pulumi-resource-${PACK} VERSION := $(shell scripts/get-version) +VERSION_FLAGS := -ldflags "-X github.com/pulumi/pulumi-kubernetes/pkg/version.Version=${VERSION}" + GOMETALINTERBIN=gometalinter GOMETALINTER=${GOMETALINTERBIN} --config=Gometalinter.json TESTPARALLELISM := 10 build:: - go install -ldflags "-X github.com/pulumi/pulumi-kubernetes/pkg/version.Version=${VERSION}" ${PROJECT}/cmd/${TFGEN} - go install -ldflags "-X github.com/pulumi/pulumi-kubernetes/pkg/version.Version=${VERSION}" ${PROJECT}/cmd/${PROVIDER} - for LANGUAGE in "nodejs" "python" ; do \ - $(TFGEN) $$LANGUAGE --out ${PACKDIR}/$$LANGUAGE/ || exit 3 ; \ - done - cd ${PACKDIR}/nodejs/ && \ - yarn install && \ - yarn link @pulumi/pulumi && \ - yarn run tsc - cp README.md LICENSE ${PACKDIR}/nodejs/package.json ${PACKDIR}/nodejs/yarn.lock ${PACKDIR}/nodejs/bin/ - cd ${PACKDIR}/python/ && \ - python setup.py clean --all 2>/dev/null && \ - rm -rf ./bin/ ../python.bin/ && cp -R . ../python.bin && mv ../python.bin ./bin && \ - sed -i.bak "s/\$${VERSION}/$(VERSION)/g" ./bin/setup.py && rm ./bin/setup.py.bak && \ - cd ./bin && python setup.py build sdist + go install $(VERSION_FLAGS) ${PROJECT}/cmd/${PROVIDER} lint:: $(GOMETALINTER) ./cmd/... resources.go | sort ; exit "$${PIPESTATUS[0]}" - -install:: - GOBIN=$(PULUMI_BIN) go install -ldflags "-X github.com/pulumi/pulumi-kubernetes/pkg/version.Version=${VERSION}" ${PROJECT}/cmd/${PROVIDER} - [ ! -e "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)" ] || rm -rf "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)" - mkdir -p "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)" - cp -r pack/nodejs/bin/. "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)" - rm -rf "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)/node_modules" - cd "$(PULUMI_NODE_MODULES)/$(NODE_MODULE_NAME)" && \ - yarn install --offline --production && \ - (yarn unlink > /dev/null 2>&1 || true) && \ - yarn link - cd ${PACKDIR}/python && pip install --upgrade --user -e . - -test_all:: - PATH=$(PULUMI_BIN):$(PATH) go test -v -cover -timeout 1h -parallel ${TESTPARALLELISM} ./examples - -.PHONY: publish_tgz -publish_tgz: - $(call STEP_MESSAGE) - ./scripts/publish_tgz.sh - -.PHONY: publish_packages -publish_packages: - $(call STEP_MESSAGE) - ./scripts/publish_packages.sh - -# The travis_* targets are entrypoints for CI. -.PHONY: travis_cron travis_push travis_pull_request travis_api -travis_cron: all -travis_push: only_build publish_tgz only_test publish_packages -travis_pull_request: all -travis_api: all diff --git a/cmd/pulumi-resource-kubernetes/main.go b/cmd/pulumi-resource-kubernetes/main.go index 31097c9844..0344914f17 100644 --- a/cmd/pulumi-resource-kubernetes/main.go +++ b/cmd/pulumi-resource-kubernetes/main.go @@ -1,13 +1,12 @@ -// Copyright 2016-2018, Pulumi Corporation. All rights reserved. - package main import ( - kubernetes "github.com/pulumi/pulumi-kubernetes" + "github.com/pulumi/pulumi-kubernetes/pkg/provider" "github.com/pulumi/pulumi-kubernetes/pkg/version" - "github.com/pulumi/pulumi-terraform/pkg/tfbridge" ) +var providerName = "kubernetes" + func main() { - tfbridge.Main("kubernetes", version.Version, kubernetes.Provider()) + provider.Serve(providerName, version.Version) } diff --git a/cmd/pulumi-tfgen-kubernetes/main.go b/cmd/pulumi-tfgen-kubernetes/main.go deleted file mode 100644 index c2242b800e..0000000000 --- a/cmd/pulumi-tfgen-kubernetes/main.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2016-2018, Pulumi Corporation. All rights reserved. - -package main - -import ( - kubernetes "github.com/pulumi/pulumi-kubernetes" - "github.com/pulumi/pulumi-kubernetes/pkg/version" - "github.com/pulumi/pulumi-terraform/pkg/tfgen" -) - -func main() { - tfgen.Main("kubernetes", version.Version, kubernetes.Provider()) -} diff --git a/pkg/provider/client.go b/pkg/provider/client.go new file mode 100644 index 0000000000..388c3e318d --- /dev/null +++ b/pkg/provider/client.go @@ -0,0 +1,245 @@ +package provider + +import ( + "fmt" + "strings" + "sync" + + "github.com/golang/glog" + + "github.com/emicklei/go-restful-swagger12" + "github.com/googleapis/gnostic/OpenAPIv2" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + apiVers "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +// -------------------------------------------------------------------------- + +// In-memory, caching Kubernetes discovery client. +// +// The Kubernetes discovery client "discovers" the API server's capabilities, and opaquely handles +// the mapping of unstructured property bag -> typed API objects, regardless (in theory) of the +// version of the API server, greatly simplifying the logic required to interface with the cluster. +// +// This code implements the in-memory caching mechanism for this client, so that we do not have to +// retrieve this information multiple times to satisfy some set of requests. + +// -------------------------------------------------------------------------- + +type memcachedDiscoveryClient struct { + cl discovery.DiscoveryInterface + lock sync.RWMutex + servergroups *metav1.APIGroupList + serverresources map[string]*metav1.APIResourceList + schemas map[string]*swagger.ApiDeclaration + schema *openapi_v2.Document +} + +var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{} + +// NewMemcachedDiscoveryClient creates a new DiscoveryClient that +// caches results in memory +func NewMemcachedDiscoveryClient(cl discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface { + c := &memcachedDiscoveryClient{cl: cl} + c.Invalidate() + return c +} + +func (c *memcachedDiscoveryClient) Fresh() bool { + return true +} + +func (c *memcachedDiscoveryClient) Invalidate() { + c.lock.Lock() + defer c.lock.Unlock() + + c.servergroups = nil + c.serverresources = make(map[string]*metav1.APIResourceList) + c.schemas = make(map[string]*swagger.ApiDeclaration) +} + +func (c *memcachedDiscoveryClient) RESTClient() rest.Interface { + return c.cl.RESTClient() +} + +func (c *memcachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) { + c.lock.Lock() + defer c.lock.Unlock() + + var err error + if c.servergroups != nil { + return c.servergroups, nil + } + c.servergroups, err = c.cl.ServerGroups() + return c.servergroups, err +} + +func (c *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + c.lock.Lock() + defer c.lock.Unlock() + + var err error + if v := c.serverresources[groupVersion]; v != nil { + return v, nil + } + c.serverresources[groupVersion], err = c.cl.ServerResourcesForGroupVersion(groupVersion) + return c.serverresources[groupVersion], err +} + +func (c *memcachedDiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) { + return c.cl.ServerResources() +} + +func (c *memcachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return c.cl.ServerPreferredResources() +} + +func (c *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + return c.cl.ServerPreferredNamespacedResources() +} + +func (c *memcachedDiscoveryClient) ServerVersion() (*apiVers.Info, error) { + return c.cl.ServerVersion() +} + +func (c *memcachedDiscoveryClient) SwaggerSchema(version schema.GroupVersion) (*swagger.ApiDeclaration, error) { + key := version.String() + + c.lock.Lock() + defer c.lock.Unlock() + + if c.schemas[key] != nil { + return c.schemas[key], nil + } + + schema, err := c.cl.SwaggerSchema(version) + if err != nil { + return nil, err + } + + c.schemas[key] = schema + return schema, nil +} + +func (c *memcachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.schema != nil { + return c.schema, nil + } + + schema, err := c.cl.OpenAPISchema() + if err != nil { + return nil, err + } + + c.schema = schema + return schema, nil +} + +// -------------------------------------------------------------------------- +// Client utilities. +// -------------------------------------------------------------------------- + +// clientForResource returns the ResourceClient for a given object +func clientForResource( + pool dynamic.ClientPool, disco discovery.DiscoveryInterface, obj runtime.Object, defNs string, +) (dynamic.ResourceInterface, error) { + gvk := obj.GetObjectKind().GroupVersionKind() + meta, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + namespace := meta.GetNamespace() + if namespace == "" { + namespace = defNs + } + + return clientForGVK(pool, disco, gvk, namespace) +} + +// clientForResource returns the ResourceClient for a given object +func clientForGVK( + pool dynamic.ClientPool, disco discovery.DiscoveryInterface, gvk schema.GroupVersionKind, + namespace string, +) (dynamic.ResourceInterface, error) { + client, err := pool.ClientForGroupVersionKind(gvk) + if err != nil { + return nil, err + } + + resource, err := serverResourceForGVK(disco, gvk) + if err != nil { + return nil, err + } + + glog.V(3).Infof("Fetching client for %s namespace=%s", resource, namespace) + rc := client.Resource(resource, namespace) + return rc, nil +} + +func serverResourceForGVK( + disco discovery.ServerResourcesInterface, gvk schema.GroupVersionKind, +) (*metav1.APIResource, error) { + resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + return nil, fmt.Errorf("unable to fetch resource description for %s: %v", gvk.GroupVersion(), err) + } + + for _, r := range resources.APIResources { + if r.Kind == gvk.Kind { + glog.V(3).Infof("Using resource '%s' for %s", r.Name, gvk) + return &r, nil + } + } + + return nil, fmt.Errorf("Server is unable to handle %s", gvk) +} + +// resourceNameForGVK returns a lowercase plural form of a type, for +// human messages. Returns lowercased kind if discovery lookup fails. +func resourceNameForObj(disco discovery.ServerResourcesInterface, o runtime.Object) string { + return resourceNameForGVK(disco, o.GetObjectKind().GroupVersionKind()) +} + +// resourceNameForGVK returns a lowercase plural form of a type, for +// human messages. Returns lowercased kind if discovery lookup fails. +func resourceNameForGVK( + disco discovery.ServerResourcesInterface, gvk schema.GroupVersionKind, +) string { + rls, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + glog.V(3).Infof("Discovery failed for %s: %s, falling back to kind", gvk, err) + return strings.ToLower(gvk.Kind) + } + + for _, rl := range rls.APIResources { + if rl.Kind == gvk.Kind { + return rl.Name + } + } + + glog.V(3).Infof("Discovery failed to find %s, falling back to kind", gvk) + return strings.ToLower(gvk.Kind) +} + +// fqObjName returns "namespace.name" +func fqObjName(o metav1.Object) string { + return fqName(o.GetNamespace(), o.GetName()) +} + +// fqName returns "namespace.name" +func fqName(namespace, name string) string { + if namespace == "" { + return name + } + return fmt.Sprintf("%s.%s", namespace, name) +} diff --git a/pkg/provider/openapi.go b/pkg/provider/openapi.go new file mode 100644 index 0000000000..d93ba42474 --- /dev/null +++ b/pkg/provider/openapi.go @@ -0,0 +1,338 @@ +package provider + +import ( + "fmt" + "reflect" + "regexp" + + "github.com/emicklei/go-restful-swagger12" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" +) + +// -------------------------------------------------------------------------- + +// OpenAPI spec validation code. +// +// This code allows us to easily validate unstructured property bag objects against the OpenAPI spec +// exposed by the API server. The OpenAPI spec would typically be obtained from the API server, and +// it represents not only the spec of the Kubernetes version running the API server itself, but also +// the flags it was started with, (e.g., RBAC enabled or not, etc.). +// +// NOTE: This is largely copied from k8s.io/kubernetes/pkg/api/validation/schema.go, with some +// slight modifications, such as rebasing onto the newer versions of client-go/apimachinery object +// types. + +// -------------------------------------------------------------------------- + +// InvalidTypeError is returned when an invalid type is encountered +type InvalidTypeError struct { + ExpectedKind reflect.Kind + ObservedKind reflect.Kind + FieldName string +} + +func (i *InvalidTypeError) Error() string { + return fmt.Sprintf( + "expected type %s, for field %s, got %s", i.ExpectedKind.String(), i.FieldName, i.ObservedKind.String()) +} + +// NewInvalidTypeError creates an InvalidTypeError object +func NewInvalidTypeError(expected reflect.Kind, observed reflect.Kind, fieldName string) error { + return &InvalidTypeError{expected, observed, fieldName} +} + +// TypeNotFoundError is returned when specified type can not found in schema +type TypeNotFoundError string + +func (tnfe TypeNotFoundError) Error() string { + return fmt.Sprintf("couldn't find type: %s", string(tnfe)) +} + +// SwaggerSchema represents an OpenAPI/Swagger schema +type SwaggerSchema struct { + api *swagger.ApiDeclaration + delegate discovery.SwaggerSchemaInterface +} + +// NewSwaggerSchemaFor returns the SwaggerSchema object ready to validate objects of given +// GroupVersion +func NewSwaggerSchemaFor( + delegate discovery.SwaggerSchemaInterface, gv schema.GroupVersion, +) (*SwaggerSchema, error) { + glog.V(3).Infof("Fetching schema for %v", gv) + swagger, err := delegate.SwaggerSchema(gv) + if err != nil { + return nil, err + } + return &SwaggerSchema{api: swagger, delegate: delegate}, nil +} + +// validateList unpacks a list and validate every item in the list. It return nil if every item is +// ok. Otherwise it return an error list contain errors of every item. +func (s *SwaggerSchema) validateList(obj map[string]interface{}) []error { + items, exists := obj["items"] + if !exists { + return []error{fmt.Errorf("no items field in %#v", obj)} + } + return s.validateItems(items) +} + +func (s *SwaggerSchema) validateItems(items interface{}) []error { + allErrs := []error{} + itemList, ok := items.([]interface{}) + if !ok { + return append(allErrs, fmt.Errorf("items isn't a slice")) + } + for i, item := range itemList { + fields, ok := item.(map[string]interface{}) + if !ok { + allErrs = append(allErrs, fmt.Errorf("items[%d] isn't a map[string]interface{}", i)) + continue + } + groupVersion := fields["apiVersion"] + if groupVersion == nil { + allErrs = append(allErrs, fmt.Errorf("items[%d].apiVersion not set", i)) + continue + } + itemVersion, ok := groupVersion.(string) + if !ok { + allErrs = append(allErrs, fmt.Errorf("items[%d].apiVersion isn't string type", i)) + continue + } + if len(itemVersion) == 0 { + allErrs = append(allErrs, fmt.Errorf("items[%d].apiVersion is empty", i)) + } + kind := fields["kind"] + if kind == nil { + allErrs = append(allErrs, fmt.Errorf("items[%d].kind not set", i)) + continue + } + itemKind, ok := kind.(string) + if !ok { + allErrs = append(allErrs, fmt.Errorf("items[%d].kind isn't string type", i)) + continue + } + if len(itemKind) == 0 { + allErrs = append(allErrs, fmt.Errorf("items[%d].kind is empty", i)) + } + gv, err := schema.ParseGroupVersion(itemVersion) + if err != nil { + allErrs = append(allErrs, fmt.Errorf("items[%d].apiVersion is invalid", i)) + } + errs := s.ValidateObject(item, "", gv.Version+"."+itemKind) + if len(errs) >= 1 { + allErrs = append(allErrs, errs...) + } + } + + return allErrs +} + +// Validate is the primary entrypoint into this class +func (s *SwaggerSchema) Validate(obj *unstructured.Unstructured) []error { + if obj.IsList() { + return s.validateList(obj.UnstructuredContent()) + } + gvk := obj.GroupVersionKind() + return s.ValidateObject(obj.UnstructuredContent(), "", fmt.Sprintf("%s.%s", gvk.Version, gvk.Kind)) +} + +// ValidateObject validates a JSON object against the schema +func (s *SwaggerSchema) ValidateObject(obj interface{}, fieldName, typeName string) []error { + allErrs := []error{} + models := s.api.Models + model, ok := models.At(typeName) + + // Verify the api version matches. This is required for nested types with differing api versions + // because s.api only has schema for 1 api version (the parent object type's version). e.g. an + // extensions/v1beta1 Template embedding a /v1 Service requires the schema for the + // extensions/v1beta1 api to delegate to the schema for the /v1 api. Only do this for !ok objects + // so that cross ApiVersion vendored types take precedence. + if !ok && s.delegate != nil { + fields, mapOk := obj.(map[string]interface{}) + if !mapOk { + return append(allErrs, fmt.Errorf("field %s for %s: expected object of type map[string]interface{}, but the actual type is %T", fieldName, typeName, obj)) + } + if delegated, errs := s.delegateIfDifferentAPIVersion(&unstructured.Unstructured{Object: fields}); delegated { + allErrs = append(allErrs, errs...) + return allErrs + } + } + + if !ok { + return append(allErrs, TypeNotFoundError(typeName)) + } + properties := model.Properties + if len(properties.List) == 0 { + // The object does not have any sub-fields. + return nil + } + fields, ok := obj.(map[string]interface{}) + if !ok { + return append(allErrs, fmt.Errorf("field %s for %s: expected object of type map[string]interface{}, but the actual type is %T", fieldName, typeName, obj)) + } + if len(fieldName) > 0 { + fieldName = fieldName + "." + } + // handle required fields + for _, requiredKey := range model.Required { + if _, ok := fields[requiredKey]; !ok { + allErrs = append(allErrs, fmt.Errorf("field %s%s for %s is required", fieldName, requiredKey, typeName)) + } + } + for key, value := range fields { + details, ok := properties.At(key) + + // Special case for runtime.RawExtension and runtime.Objects because they always fail to validate + // This is because the actual values will be of some sub-type (e.g. Deployment) not the expected + // super-type (RawExtension) + if s.isGenericArray(details) { + errs := s.validateItems(value) + if len(errs) > 0 { + allErrs = append(allErrs, errs...) + } + continue + } + if !ok { + allErrs = append(allErrs, fmt.Errorf("found invalid field %s for %s", key, typeName)) + continue + } + if details.Type == nil && details.Ref == nil { + allErrs = append(allErrs, fmt.Errorf("could not find the type of %s%s from object %v", fieldName, key, details)) + } + var fieldType string + if details.Type != nil { + fieldType = *details.Type + } else { + fieldType = *details.Ref + } + if value == nil { + glog.V(3).Infof("Skipping nil field: %s%s", fieldName, key) + continue + } + errs := s.validateField(value, fieldName+key, fieldType, &details) + if len(errs) > 0 { + allErrs = append(allErrs, errs...) + } + } + return allErrs +} + +// delegateIfDifferentAPIVersion delegates the validation of an object if its ApiGroup does not +// match the current SwaggerSchema. +// +// First return value is true if the validation was delegated (by a different ApiGroup +// SwaggerSchema) Second return value is the result of the delegated validation if performed. +func (s *SwaggerSchema) delegateIfDifferentAPIVersion( + obj *unstructured.Unstructured, +) (bool, []error) { + // Never delegate objects in the same ApiVersion or we will get infinite recursion + if !s.isDifferentAPIVersion(obj) { + return false, nil + } + + // Delegate validation of this object to the correct SwaggerSchema for its ApiGroup + newSchema, err := NewSwaggerSchemaFor(s.delegate, obj.GroupVersionKind().GroupVersion()) + if err != nil { + return true, []error{err} + } + return true, newSchema.Validate(obj) +} + +// isDifferentAPIVersion Returns true if obj lives in a different ApiVersion than the SwaggerSchema +// does. The SwaggerSchema will not be able to process objects in different ApiVersions unless they +// are vendored. +func (s *SwaggerSchema) isDifferentAPIVersion(obj *unstructured.Unstructured) bool { + groupVersion := obj.GetAPIVersion() + return len(groupVersion) > 0 && s.api.ApiVersion != groupVersion +} + +// isGenericArray Returns true if p is an array of generic Objects - either RawExtension or Object. +func (s *SwaggerSchema) isGenericArray(p swagger.ModelProperty) bool { + return p.DataTypeFields.Type != nil && + *p.DataTypeFields.Type == "array" && + p.Items != nil && + p.Items.Ref != nil && + (*p.Items.Ref == "runtime.RawExtension" || *p.Items.Ref == "runtime.Object") +} + +// This matches type name in the swagger spec, such as "v1.Binding". +var versionRegexp = regexp.MustCompile(`^(v.+|unversioned)\..*`) + +func (s *SwaggerSchema) validateField(value interface{}, fieldName, fieldType string, fieldDetails *swagger.ModelProperty) []error { + allErrs := []error{} + if reflect.TypeOf(value) == nil { + return append(allErrs, fmt.Errorf("unexpected nil value for field %v", fieldName)) + } + // TODO: caesarxuchao: because we have multiple group/versions and objects may reference objects + // in other group, the commented out way of checking if a filedType is a type defined by us is + // outdated. We use a hacky way for now. + // + // TODO: the type name in the swagger spec is something like "v1.Binding", and the "v1" is + // generated from the package name, not the groupVersion of the type. We need to fix go-restful to + // embed the group name in the type name, otherwise we couldn't handle identically named types in + // different groups correctly. + if versionRegexp.MatchString(fieldType) { + // if strings.HasPrefix(fieldType, apiVersion) { + return s.ValidateObject(value, fieldName, fieldType) + } + switch fieldType { + case "string": + // Be loose about what we accept for 'string' since we use IntOrString in a couple of places + _, isString := value.(string) + _, isNumber := value.(float64) + _, isInteger := value.(int) + _, isInteger64 := value.(int64) + if !isString && !isNumber && !isInteger && !isInteger64 { + return append(allErrs, NewInvalidTypeError(reflect.String, reflect.TypeOf(value).Kind(), fieldName)) + } + case "array": + arr, ok := value.([]interface{}) + if !ok { + return append(allErrs, NewInvalidTypeError(reflect.Array, reflect.TypeOf(value).Kind(), fieldName)) + } + var arrType string + if fieldDetails.Items.Ref == nil && fieldDetails.Items.Type == nil { + return append(allErrs, NewInvalidTypeError(reflect.Array, reflect.TypeOf(value).Kind(), fieldName)) + } + if fieldDetails.Items.Ref != nil { + arrType = *fieldDetails.Items.Ref + } else { + arrType = *fieldDetails.Items.Type + } + for ix := range arr { + errs := s.validateField(arr[ix], fmt.Sprintf("%s[%d]", fieldName, ix), arrType, nil) + if len(errs) > 0 { + allErrs = append(allErrs, errs...) + } + } + case "uint64": + case "int64": + case "integer": + _, isNumber := value.(float64) + _, isInteger := value.(int) + _, isInteger64 := value.(int64) + if !isNumber && !isInteger && !isInteger64 { + return append(allErrs, NewInvalidTypeError(reflect.Int, reflect.TypeOf(value).Kind(), fieldName)) + } + case "float64": + if _, ok := value.(float64); !ok { + return append(allErrs, NewInvalidTypeError(reflect.Float64, reflect.TypeOf(value).Kind(), fieldName)) + } + case "boolean": + if _, ok := value.(bool); !ok { + return append(allErrs, NewInvalidTypeError(reflect.Bool, reflect.TypeOf(value).Kind(), fieldName)) + } + // API servers before release 1.3 produce swagger spec with `type: "any"` as the fallback type, + // while newer servers produce spec with `type: "object"`. We have both here so that kubectl can + // work with both old and new api servers. + case "object": + case "any": + default: + return append(allErrs, fmt.Errorf("unexpected type: %v", fieldType)) + } + return allErrs +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go new file mode 100644 index 0000000000..54665295b0 --- /dev/null +++ b/pkg/provider/provider.go @@ -0,0 +1,119 @@ +package provider + +import ( + "context" + "fmt" + + pbempty "github.com/golang/protobuf/ptypes/empty" + pulumirpc "github.com/pulumi/pulumi/sdk/proto/go" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" +) + +// -------------------------------------------------------------------------- + +// Kubernetes resource provider. +// +// Implements functionality for the Pulumi Kubernetes Resource Provider. This code is responsible +// for producing sensible responses for the gRPC server to send back to a client when it requests +// something to do with the Kubernetes resources it's meant to manage. + +// -------------------------------------------------------------------------- + +type kube struct { + client discovery.CachedDiscoveryInterface + pool dynamic.ClientPool + module string + version string +} + +var _ pulumirpc.ResourceProviderServer = (*kube)(nil) + +func kubeProvider( + module, version string, kubeconfig clientcmd.ClientConfig, +) (pulumirpc.ResourceProviderServer, error) { + // Configure the discovery client. + conf, err := kubeconfig.ClientConfig() + if err != nil { + return nil, fmt.Errorf("Unable to read kubectl config: %v", err) + } + + disco, err := discovery.NewDiscoveryClientForConfig(conf) + if err != nil { + return nil, err + } + + // Cache the discovery information (OpenAPI schema, etc.) so we don't have to retrieve it for + // every request. + discoCache := NewMemcachedDiscoveryClient(disco) + mapper := discovery.NewDeferredDiscoveryRESTMapper(discoCache, dynamic.VersionInterfaces) + pathresolver := dynamic.LegacyAPIPathResolverFunc + + // Create client pool, reusing one client per API group (e.g., one each for core, extensions, + // apps, etc.) + pool := dynamic.NewClientPool(conf, mapper, pathresolver) + + return &kube{ + client: discoCache, + pool: pool, + module: module, + }, nil +} + +// Configure configures the resource provider with "globals" that control its behavior. +func (k *kube) Configure(context.Context, *pulumirpc.ConfigureRequest) (*pbempty.Empty, error) { + return &pbempty.Empty{}, nil +} + +// Invoke dynamically executes a built-in function in the provider. +func (k *kube) Invoke(context.Context, *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) { + panic("Invoke not implemented") +} + +// Check validates that the given property bag is valid for a resource of the given type and returns +// the inputs that should be passed to successive calls to Diff, Create, or Update for this +// resource. As a rule, the provider inputs returned by a call to Check should preserve the original +// representation of the properties as present in the program inputs. Though this rule is not +// required for correctness, violations thereof can negatively impact the end-user experience, as +// the provider inputs are using for detecting and rendering diffs. +func (k *kube) Check(ctx context.Context, req *pulumirpc.CheckRequest) (*pulumirpc.CheckResponse, error) { + panic("Check not implemented") +} + +// Diff checks what impacts a hypothetical update will have on the resource's properties. +func (k *kube) Diff(context.Context, *pulumirpc.DiffRequest) (*pulumirpc.DiffResponse, error) { + panic("Diff not implemented") +} + +// Create allocates a new instance of the provided resource and returns its unique ID afterwards. +// (The input ID must be blank.) If this call fails, the resource must not have been created (i.e., +// it is "transacational"). +func (k *kube) Create(context.Context, *pulumirpc.CreateRequest) (*pulumirpc.CreateResponse, error) { + panic("Create not implemented") +} + +// Read the current live state associated with a resource. Enough state must be include in the +// inputs to uniquely identify the resource; this is typically just the resource ID, but may also +// include some properties. +func (k *kube) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*pulumirpc.ReadResponse, error) { + panic("Read not implemented") +} + +// Update updates an existing resource with new values. +func (k *kube) Update(context.Context, *pulumirpc.UpdateRequest) (*pulumirpc.UpdateResponse, error) { + panic("Update not implemented") +} + +// Delete tears down an existing resource with the given ID. If it fails, the resource is assumed +// to still exist. +func (k *kube) Delete(context.Context, *pulumirpc.DeleteRequest) (*pbempty.Empty, error) { + panic("Delete not implemented") +} + +// GetPluginInfo returns generic information about this plugin, like its version. +func (k *kube) GetPluginInfo(context.Context, *pbempty.Empty) (*pulumirpc.PluginInfo, error) { + return &pulumirpc.PluginInfo{ + Version: k.version, + }, nil +} diff --git a/pkg/provider/serve.go b/pkg/provider/serve.go new file mode 100644 index 0000000000..f92d144833 --- /dev/null +++ b/pkg/provider/serve.go @@ -0,0 +1,35 @@ +package provider + +import ( + "os" + + "github.com/pulumi/pulumi/pkg/resource/provider" + "github.com/pulumi/pulumi/pkg/util/cmdutil" + lumirpc "github.com/pulumi/pulumi/sdk/proto/go" + "k8s.io/client-go/tools/clientcmd" + + // Load auth plugins. Removing this will likely cause compilation error. + _ "k8s.io/client-go/plugin/pkg/client/auth" +) + +// Serve launches the gRPC server for the Pulumi Kubernetes resource provider. +func Serve(providerName, version string) { + // Start gRPC service. + err := provider.Main( + providerName, func(host *provider.HostClient) (lumirpc.ResourceProviderServer, error) { + // Use client-go to resolve the final configuration values for the client. Typically these + // values would would reside in the $KUBECONFIG file, but can also be altered in several + // places, including in env variables, client-go default values, and (if we allowed it) CLI + // flags. + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + kubeconfig := clientcmd.NewInteractiveDeferredLoadingClientConfig( + loadingRules, &clientcmd.ConfigOverrides{}, os.Stdin) + + return kubeProvider(providerName, version, kubeconfig) + }) + + if err != nil { + cmdutil.ExitError(err.Error()) + } +} diff --git a/resources.go b/resources.go deleted file mode 100644 index 10a9f3b2a9..0000000000 --- a/resources.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2016-2018, Pulumi Corporation. All rights reserved. - -package kubernetes - -import ( - "unicode" - - "github.com/hashicorp/terraform/helper/schema" - "github.com/pulumi/pulumi-terraform/pkg/tfbridge" - "github.com/pulumi/pulumi/pkg/tokens" - kubernetes "github.com/terraform-providers/terraform-provider-kubernetes/kubernetes" -) - -// all of the Kubernetes token components used below. -const ( - // packages: - kubernetesPkg = "kubernetes" - kubernetesCore = "" // All resources are placed in the root `kubernetes` namespace -) - -// kubernetesMember manufactures a type token for the Kubernetes package and the given module and type. -func kubernetesMember(mod string, mem string) tokens.ModuleMember { - return tokens.ModuleMember(kubernetesPkg + ":" + mod + ":" + mem) -} - -// kubernetesType manufactures a type token for the Kubernetes package and the given module and type. -func kubernetesType(mod string, typ string) tokens.Type { - return tokens.Type(kubernetesMember(mod, typ)) -} - -// kubernetesDataSource manufactures a standard resource token given a module and resource name. It automatically uses the Kubernetes -// package and names the file by simply lower casing the data source's first character. -func kubernetesDataSource(mod string, res string) tokens.ModuleMember { - fn := string(unicode.ToLower(rune(res[0]))) + res[1:] - return kubernetesMember(mod+"/"+fn, res) -} - -// kubernetesResource manufactures a standard resource token given a module and resource name. It automatically uses the Kubernetes -// package and names the file by simply lower casing the resource's first character. -func kubernetesResource(mod string, res string) tokens.Type { - fn := string(unicode.ToLower(rune(res[0]))) + res[1:] - return kubernetesType(mod+"/"+fn, res) -} - -// Provider returns additional overlaid schema and metadata associated with the Kubernetes package. -func Provider() tfbridge.ProviderInfo { - p := kubernetes.Provider().(*schema.Provider) - prov := tfbridge.ProviderInfo{ - P: p, - Name: "kubernetes", - Description: "A Pulumi package for creating and managing Kubernetes resources.", - Keywords: []string{"pulumi", "kubernetes"}, - Homepage: "https://pulumi.io/kubernetes", - Repository: "https://github.com/pulumi/pulumi-kubernetes", - Resources: map[string]*tfbridge.ResourceInfo{ - // TODO[pulumi/pulumi-kubernetes#10] Until we are auto-generating `metadata.name` with a random suffix by - // default, we must mark all Kubernetes resources as delete-before-replace. Without this, any change which - // forces a replace will re-use the same resource name, which will fail. - "kubernetes_config_map": {Tok: kubernetesResource(kubernetesCore, "ConfigMap"), DeleteBeforeReplace: true}, - "kubernetes_daemonset": {Tok: kubernetesResource(kubernetesCore, "DaemonSet"), DeleteBeforeReplace: true}, - "kubernetes_deployment": {Tok: kubernetesResource(kubernetesCore, "Deployment"), DeleteBeforeReplace: true}, - "kubernetes_horizontal_pod_autoscaler": {Tok: kubernetesResource(kubernetesCore, "HorizontalPodAutoscaler"), DeleteBeforeReplace: true}, - "kubernetes_ingress": {Tok: kubernetesResource(kubernetesCore, "Ingres"), DeleteBeforeReplace: true}, - "kubernetes_job": {Tok: kubernetesResource(kubernetesCore, "Job"), DeleteBeforeReplace: true}, - "kubernetes_limit_range": {Tok: kubernetesResource(kubernetesCore, "LimitRange"), DeleteBeforeReplace: true}, - "kubernetes_namespace": {Tok: kubernetesResource(kubernetesCore, "Namespace"), DeleteBeforeReplace: true}, - "kubernetes_persistent_volume": {Tok: kubernetesResource(kubernetesCore, "PersistentVolume"), DeleteBeforeReplace: true}, - "kubernetes_persistent_volume_claim": {Tok: kubernetesResource(kubernetesCore, "PersistentVolumeClaim"), DeleteBeforeReplace: true}, - "kubernetes_pod": {Tok: kubernetesResource(kubernetesCore, "Pod"), DeleteBeforeReplace: true}, - "kubernetes_replication_controller": {Tok: kubernetesResource(kubernetesCore, "ReplicationController"), DeleteBeforeReplace: true}, - "kubernetes_resource_quota": {Tok: kubernetesResource(kubernetesCore, "ResourceQuota"), DeleteBeforeReplace: true}, - "kubernetes_secret": {Tok: kubernetesResource(kubernetesCore, "Secret"), DeleteBeforeReplace: true}, - "kubernetes_service": {Tok: kubernetesResource(kubernetesCore, "Service"), DeleteBeforeReplace: true}, - "kubernetes_service_account": {Tok: kubernetesResource(kubernetesCore, "ServiceAccount"), DeleteBeforeReplace: true}, - "kubernetes_stateful_set": {Tok: kubernetesResource(kubernetesCore, "StatefulSet"), DeleteBeforeReplace: true}, - "kubernetes_storage_class": {Tok: kubernetesResource(kubernetesCore, "StorageClass"), DeleteBeforeReplace: true}, - }, - DataSources: map[string]*tfbridge.DataSourceInfo{ - "kubernetes_service": {Tok: kubernetesDataSource(kubernetesCore, "getService")}, - "kubernetes_storage_class": {Tok: kubernetesDataSource(kubernetesCore, "getStorageClass")}, - }, - Overlay: &tfbridge.OverlayInfo{ - Files: []string{}, - Modules: map[string]*tfbridge.OverlayInfo{}, - }, - JavaScript: &tfbridge.JavaScriptInfo{ - PeerDependencies: map[string]string{ - "@pulumi/pulumi": "^0.11.0-dev-168-g7e14a09b", - }, - }, - Python: &tfbridge.PythonInfo{ - Requires: map[string]string{ - "pulumi": ">=0.11.0", - }, - }, - } - - // TODO[pulumi/pulumi-kubernetes#10: Auto-populate `res.metadata.name` - - return prov -}