From 0a3817f1e9363bfb9ba5f615047ff5bcae8e7384 Mon Sep 17 00:00:00 2001 From: Mir Shahriar Date: Mon, 26 Feb 2018 17:07:53 +0600 Subject: [PATCH] update validation call (#177) --- glide.lock | 22 +- pkg/cmds/create.go | 8 +- pkg/validator/validate.go | 27 +- .../apis/kubedb/v1alpha1/postgres_helpers.go | 4 +- .../elasticsearch/pkg/validator/validate.go | 51 +- .../memcached/pkg/validator/validate.go | 48 +- .../kubedb/mongodb/pkg/validator/validate.go | 54 +- .../kubedb/mysql/pkg/validator/validate.go | 54 +- .../kubedb/postgres/pkg/validator/validate.go | 48 +- .../kubedb/redis/pkg/validator/validate.go | 48 +- .../google.golang.org/appengine/appengine.go | 5 +- .../appengine/internal/api.go | 182 ++--- .../appengine/internal/api_classic.go | 16 +- .../appengine/internal/api_common.go | 39 +- .../appengine/internal/api_pre17.go | 682 ++++++++++++++++++ .../appengine/internal/identity_classic.go | 48 +- .../appengine/internal/identity_vm.go | 6 +- .../appengine/internal/main_vm.go | 6 +- 18 files changed, 1213 insertions(+), 135 deletions(-) create mode 100644 vendor/google.golang.org/appengine/internal/api_pre17.go diff --git a/glide.lock b/glide.lock index 7a14ce9e8..bfcea3144 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: a0bbe9041555b0f2f28bf12a06e143bef8b5de12e1a09976d1266859df1bb483 -updated: 2018-02-24T16:23:11.483117158-08:00 +updated: 2018-02-26T16:11:46.283304728+06:00 imports: - name: cloud.google.com/go version: fe3d41e1ecb2ce36ad3a979037c9b9a2b726226f @@ -7,7 +7,7 @@ imports: - compute/metadata - internal - name: github.com/appscode/go - version: 83e7fc1967ed737eedfc87da60c42e9fc4dc684c + version: 2156eaa22a69cb218aa1dbc0ba2fd7a91e168a66 subpackages: - analytics - context @@ -239,7 +239,7 @@ imports: subpackages: - diskcache - name: github.com/hashicorp/go-version - version: 53932f80ddea12bea96be074f9fb2dc545806aba + version: 40efc0a21ca93e3bfbddefd4b5309a3e79d0aeb7 repo: https://github.com/appscode/go-version.git vcs: git - name: github.com/hashicorp/golang-lru @@ -261,7 +261,7 @@ imports: - name: github.com/juju/ratelimit version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 - name: github.com/kubedb/apimachinery - version: 8a7e329dba5f46d6162eeb9ffd525308a7caa20c + version: 37caef566d41ed47313b44975186c335dde08123 subpackages: - apis/kubedb - apis/kubedb/v1alpha1 @@ -270,27 +270,27 @@ imports: - pkg/storage - pkg/validator - name: github.com/kubedb/elasticsearch - version: db0aedbc9f6ff21dc35588183ae3536fece59158 + version: 6a5e3a0a5b35788c5bf499c5f199c3a87bb6e678 subpackages: - pkg/validator - name: github.com/kubedb/memcached - version: 4138e6bd0d30b6e20a98d20ec94b45ea2456c0e1 + version: 3f14a34ec5d89da05c90fabccab3bbfb4e3a7b1e subpackages: - pkg/validator - name: github.com/kubedb/mongodb - version: 4643f00921817edc4e33fd55f0e9a70fe4d3bc8b + version: 9ac4d6d3c43e60c99f45aba8ae5d47fa29e3a019 subpackages: - pkg/validator - name: github.com/kubedb/mysql - version: dd023c50751868540af1f551253de5f4a00a47b8 + version: 5ee1ac8cd219a6ad9de27c7fd853e3ef88e2e0f9 subpackages: - pkg/validator - name: github.com/kubedb/postgres - version: 223d5abd64f5d44983089411c1a26bfdc25e97c9 + version: 4f0d831f8bcdadec688f19ccfc52e4c1a6f79002 subpackages: - pkg/validator - name: github.com/kubedb/redis - version: 039319ad8805831a7ce8c2586b850b2ea8a61faf + version: 04c06279382ffdef80cbe56ad89fb3561c9a3bea subpackages: - pkg/validator - name: github.com/mailru/easyjson @@ -423,7 +423,7 @@ imports: - googleapi/internal/uritemplates - storage/v1 - name: google.golang.org/appengine - version: 4f7eeb5305a4ba1966344836ba4af9996b7b4e05 + version: 5bee14b453b4c71be47ec1781b0fa61c2ea182db subpackages: - internal - internal/app_identity diff --git a/pkg/cmds/create.go b/pkg/cmds/create.go index c44105b42..18f7ad129 100644 --- a/pkg/cmds/create.go +++ b/pkg/cmds/create.go @@ -6,6 +6,7 @@ import ( "io" tapi "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" "github.com/kubedb/cli/pkg/kube" "github.com/kubedb/cli/pkg/util" "github.com/kubedb/cli/pkg/validator" @@ -81,6 +82,11 @@ func RunCreate(f cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *re return err } + extClient, err := cs.NewForConfig(config) + if err != nil { + return err + } + infoList := make([]*resource.Info, 0) err = r.Visit(func(info *resource.Info, err error) error { if err != nil { @@ -97,7 +103,7 @@ func RunCreate(f cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *re } fmt.Println(fmt.Sprintf(`validating "%v"`, info.Source)) - if err := validator.Validate(client, info); err != nil { + if err := validator.Validate(client, extClient, info); err != nil { return cmdutil.AddSourceToErr("validating", info.Source, err) } diff --git a/pkg/validator/validate.go b/pkg/validator/validate.go index 3c41dcca0..a204fe21b 100644 --- a/pkg/validator/validate.go +++ b/pkg/validator/validate.go @@ -5,6 +5,7 @@ import ( "github.com/ghodss/yaml" tapi "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" "github.com/kubedb/cli/pkg/encoder" esv "github.com/kubedb/elasticsearch/pkg/validator" @@ -17,7 +18,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/resource" ) -func Validate(client kubernetes.Interface, info *resource.Info) error { +func Validate(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, info *resource.Info) error { objByte, err := encoder.Encode(info.Object) if err != nil { return err @@ -30,37 +31,37 @@ func Validate(client kubernetes.Interface, info *resource.Info) error { if err := yaml.Unmarshal(objByte, &elasticsearch); err != nil { return err } - return esv.ValidateElasticsearch(client, elasticsearch) + return esv.ValidateElasticsearch(client, extClient, elasticsearch) case tapi.ResourceKindPostgres: var postgres *tapi.Postgres if err := yaml.Unmarshal(objByte, &postgres); err != nil { return err } - return pgv.ValidatePostgres(client, postgres) + return pgv.ValidatePostgres(client, extClient, postgres) case tapi.ResourceKindMySQL: var mysql *tapi.MySQL if err := yaml.Unmarshal(objByte, &mysql); err != nil { return err } - return msv.ValidateMySQL(client, mysql) + return msv.ValidateMySQL(client, extClient, mysql) case tapi.ResourceKindMongoDB: var mongodb *tapi.MongoDB if err := yaml.Unmarshal(objByte, &mongodb); err != nil { return err } - return mgv.ValidateMongoDB(client, mongodb) + return mgv.ValidateMongoDB(client, extClient, mongodb) case tapi.ResourceKindRedis: var redis *tapi.Redis if err := yaml.Unmarshal(objByte, &redis); err != nil { return err } - return rdv.ValidateRedis(client, redis) + return rdv.ValidateRedis(client, extClient, redis) case tapi.ResourceKindMemcached: var memcached *tapi.Memcached if err := yaml.Unmarshal(objByte, &memcached); err != nil { return err } - return memv.ValidateMemcached(client, memcached) + return memv.ValidateMemcached(client, extClient, memcached) case tapi.ResourceKindSnapshot: var snapshot *tapi.Snapshot if err := yaml.Unmarshal(objByte, &snapshot); err != nil { @@ -85,7 +86,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if elasticsearch.Spec.DoNotPause { - return fmt.Errorf(`Elasticsearch "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, elasticsearch.Name) + return fmt.Errorf(`elasticsearch "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, elasticsearch.Name) } case tapi.ResourceKindPostgres: var postgres *tapi.Postgres @@ -93,7 +94,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if postgres.Spec.DoNotPause { - return fmt.Errorf(`Postgres "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, postgres.Name) + return fmt.Errorf(`postgres "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, postgres.Name) } case tapi.ResourceKindMySQL: var mysql *tapi.MySQL @@ -101,7 +102,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if mysql.Spec.DoNotPause { - return fmt.Errorf(`MySQL "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, mysql.Name) + return fmt.Errorf(`mysql "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, mysql.Name) } case tapi.ResourceKindMongoDB: var mongodb *tapi.MongoDB @@ -109,7 +110,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if mongodb.Spec.DoNotPause { - return fmt.Errorf(`MongoDB "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, mongodb.Name) + return fmt.Errorf(`mongodb "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, mongodb.Name) } case tapi.ResourceKindRedis: var redis *tapi.Redis @@ -117,7 +118,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if redis.Spec.DoNotPause { - return fmt.Errorf(`Redis "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, redis.Name) + return fmt.Errorf(`redis "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, redis.Name) } case tapi.ResourceKindMemcached: var memcached *tapi.Memcached @@ -125,7 +126,7 @@ func ValidateDeletion(info *resource.Info) error { return err } if memcached.Spec.DoNotPause { - return fmt.Errorf(`Memcached "%v" can't be paused. To continue delete, unset spec.doNotPause and retry.`, memcached.Name) + return fmt.Errorf(`memcached "%v" can't be paused. To continue delete, unset spec.doNotPause and retry`, memcached.Name) } } return nil diff --git a/vendor/github.com/kubedb/apimachinery/apis/kubedb/v1alpha1/postgres_helpers.go b/vendor/github.com/kubedb/apimachinery/apis/kubedb/v1alpha1/postgres_helpers.go index 0a6883f00..8b71d0cf4 100644 --- a/vendor/github.com/kubedb/apimachinery/apis/kubedb/v1alpha1/postgres_helpers.go +++ b/vendor/github.com/kubedb/apimachinery/apis/kubedb/v1alpha1/postgres_helpers.go @@ -97,8 +97,8 @@ func (p *Postgres) GetMonitoringVendor() string { return "" } -func (p Postgres) PrimaryName() string { - return fmt.Sprintf("%v-primary", p.Name) +func (p Postgres) ReplicasServiceName() string { + return fmt.Sprintf("%v-replicas", p.Name) } func (p Postgres) CustomResourceDefinition() *crd_api.CustomResourceDefinition { diff --git a/vendor/github.com/kubedb/elasticsearch/pkg/validator/validate.go b/vendor/github.com/kubedb/elasticsearch/pkg/validator/validate.go index 9f31e58f0..6585c1a90 100644 --- a/vendor/github.com/kubedb/elasticsearch/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/elasticsearch/pkg/validator/validate.go @@ -4,9 +4,13 @@ import ( "fmt" "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" "github.com/pkg/errors" + core "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" @@ -16,7 +20,7 @@ var ( elasticVersions = sets.NewString("5.6", "5.6.4") ) -func ValidateElasticsearch(client kubernetes.Interface, elasticsearch *api.Elasticsearch) error { +func ValidateElasticsearch(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, elasticsearch *api.Elasticsearch) error { if elasticsearch.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, elasticsearch.Spec) } @@ -67,6 +71,10 @@ func ValidateElasticsearch(client kubernetes.Interface, elasticsearch *api.Elast } } + if err := matchWithDormantDatabase(extClient, elasticsearch); err != nil { + return err + } + if elasticsearch.Spec.Storage != nil { if err := amv.ValidateStorage(client, elasticsearch.Spec.Storage); err != nil { return err @@ -103,3 +111,44 @@ func ValidateElasticsearch(client kubernetes.Interface, elasticsearch *api.Elast } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, elasticsearch *api.Elasticsearch) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(elasticsearch.Namespace).Get(elasticsearch.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindElasticsearch { + return fmt.Errorf(`invalid Elasticsearch: "%v". Exists DormantDatabase "%v" of different Kind`, elasticsearch.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.Elasticsearch + originalSpec := elasticsearch.Spec + + if originalSpec.DatabaseSecret == nil { + originalSpec.DatabaseSecret = &core.SecretVolumeSource{ + SecretName: elasticsearch.Name + "-auth", + } + } + + if originalSpec.CertificateSecret == nil { + originalSpec.CertificateSecret = &core.SecretVolumeSource{ + SecretName: elasticsearch.Name + "-cert", + } + } + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("object spec in Elasticsearch mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/github.com/kubedb/memcached/pkg/validator/validate.go b/vendor/github.com/kubedb/memcached/pkg/validator/validate.go index d694cf8c7..676b76a66 100644 --- a/vendor/github.com/kubedb/memcached/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/memcached/pkg/validator/validate.go @@ -3,8 +3,14 @@ package validator import ( "fmt" + "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" + "github.com/pkg/errors" + kerr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" ) @@ -13,7 +19,7 @@ var ( memcachedVersions = sets.NewString("1.5", "1.5.4") ) -func ValidateMemcached(client kubernetes.Interface, memcached *api.Memcached) error { +func ValidateMemcached(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, memcached *api.Memcached) error { if memcached.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, memcached.Spec) } @@ -23,6 +29,17 @@ func ValidateMemcached(client kubernetes.Interface, memcached *api.Memcached) er return fmt.Errorf(`KubeDB doesn't support Memcached version: %s`, string(memcached.Spec.Version)) } + if memcached.Spec.Replicas != nil { + replicas := types.Int32(memcached.Spec.Replicas) + if replicas < 1 { + return fmt.Errorf(`spec.replicas "%d" invalid`, replicas) + } + } + + if err := matchWithDormantDatabase(extClient, memcached); err != nil { + return err + } + monitorSpec := memcached.Spec.Monitor if monitorSpec != nil { if err := amv.ValidateMonitorSpec(monitorSpec); err != nil { @@ -31,3 +48,32 @@ func ValidateMemcached(client kubernetes.Interface, memcached *api.Memcached) er } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, memcached *api.Memcached) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(memcached.Namespace).Get(memcached.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindMemcached { + return fmt.Errorf(`invalid Memcached: "%v". Exists DormantDatabase "%v" of different Kind`, memcached.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.Memcached + originalSpec := memcached.Spec + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("memcached spec mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/github.com/kubedb/mongodb/pkg/validator/validate.go b/vendor/github.com/kubedb/mongodb/pkg/validator/validate.go index 093a195fb..ded675ea4 100644 --- a/vendor/github.com/kubedb/mongodb/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/mongodb/pkg/validator/validate.go @@ -3,8 +3,14 @@ package validator import ( "fmt" + "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" + "github.com/pkg/errors" + core "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" @@ -14,7 +20,7 @@ var ( mongodbVersions = sets.NewString("3.4", "3.6") ) -func ValidateMongoDB(client kubernetes.Interface, mongodb *api.MongoDB) error { +func ValidateMongoDB(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, mongodb *api.MongoDB) error { if mongodb.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, mongodb.Spec) } @@ -24,6 +30,17 @@ func ValidateMongoDB(client kubernetes.Interface, mongodb *api.MongoDB) error { return fmt.Errorf(`KubeDB doesn't support MongoDB version: %s`, string(mongodb.Spec.Version)) } + if mongodb.Spec.Replicas != nil { + replicas := types.Int32(mongodb.Spec.Replicas) + if replicas != 1 { + return fmt.Errorf(`spec.replicas "%d" invalid. Value must be one`, replicas) + } + } + + if err := matchWithDormantDatabase(extClient, mongodb); err != nil { + return err + } + if mongodb.Spec.Storage != nil { var err error if err = amv.ValidateStorage(client, mongodb.Spec.Storage); err != nil { @@ -53,3 +70,38 @@ func ValidateMongoDB(client kubernetes.Interface, mongodb *api.MongoDB) error { } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, mongodb *api.MongoDB) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(mongodb.Namespace).Get(mongodb.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindMongoDB { + return fmt.Errorf(`invalid MongoDB: "%v". Exists DormantDatabase "%v" of different Kind`, mongodb.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.MongoDB + originalSpec := mongodb.Spec + + if originalSpec.DatabaseSecret == nil { + originalSpec.DatabaseSecret = &core.SecretVolumeSource{ + SecretName: mongodb.Name + "-auth", + } + } + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("mongodb spec mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/github.com/kubedb/mysql/pkg/validator/validate.go b/vendor/github.com/kubedb/mysql/pkg/validator/validate.go index 60b027078..c7b08d6b1 100644 --- a/vendor/github.com/kubedb/mysql/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/mysql/pkg/validator/validate.go @@ -3,8 +3,14 @@ package validator import ( "fmt" + "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" + "github.com/pkg/errors" + core "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" @@ -14,7 +20,7 @@ var ( mysqlVersions = sets.NewString("8.0", "8") ) -func ValidateMySQL(client kubernetes.Interface, mysql *api.MySQL) error { +func ValidateMySQL(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, mysql *api.MySQL) error { if mysql.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, mysql.Spec) } @@ -24,6 +30,17 @@ func ValidateMySQL(client kubernetes.Interface, mysql *api.MySQL) error { return fmt.Errorf(`KubeDB doesn't support MySQL version: %s`, string(mysql.Spec.Version)) } + if mysql.Spec.Replicas != nil { + replicas := types.Int32(mysql.Spec.Replicas) + if replicas != 1 { + return fmt.Errorf(`spec.replicas "%d" invalid. Value must be one`, replicas) + } + } + + if err := matchWithDormantDatabase(extClient, mysql); err != nil { + return err + } + if mysql.Spec.Storage != nil { var err error if err = amv.ValidateStorage(client, mysql.Spec.Storage); err != nil { @@ -54,3 +71,38 @@ func ValidateMySQL(client kubernetes.Interface, mysql *api.MySQL) error { } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, mysql *api.MySQL) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(mysql.Namespace).Get(mysql.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindMySQL { + return fmt.Errorf(`invalid MySQL: "%v". Exists DormantDatabase "%v" of different Kind`, mysql.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.MySQL + originalSpec := mysql.Spec + + if originalSpec.DatabaseSecret == nil { + originalSpec.DatabaseSecret = &core.SecretVolumeSource{ + SecretName: mysql.Name + "-auth", + } + } + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("mysql spec mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/github.com/kubedb/postgres/pkg/validator/validate.go b/vendor/github.com/kubedb/postgres/pkg/validator/validate.go index 958c76775..063fefb91 100644 --- a/vendor/github.com/kubedb/postgres/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/postgres/pkg/validator/validate.go @@ -5,19 +5,24 @@ import ( "fmt" "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" "github.com/kubedb/apimachinery/pkg/storage" amv "github.com/kubedb/apimachinery/pkg/validator" + core "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" ) var ( - postgresVersions = sets.NewString("9.6", "9.6.6", "10.2") + postgresVersions = sets.NewString("9.6", "9.6.7", "10.2") ) -func ValidatePostgres(client kubernetes.Interface, postgres *api.Postgres) error { +func ValidatePostgres(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, postgres *api.Postgres) error { + if postgres.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, postgres.Spec) } @@ -34,6 +39,10 @@ func ValidatePostgres(client kubernetes.Interface, postgres *api.Postgres) error } } + if err := matchWithDormantDatabase(extClient, postgres); err != nil { + return err + } + if postgres.Spec.Storage != nil { var err error if err = amv.ValidateStorage(client, postgres.Spec.Storage); err != nil { @@ -115,3 +124,38 @@ func ValidatePostgres(client kubernetes.Interface, postgres *api.Postgres) error } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, postgres *api.Postgres) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(postgres.Namespace).Get(postgres.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindPostgres { + return fmt.Errorf(`invalid Postgres: "%v". Exists DormantDatabase "%v" of different Kind`, postgres.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.Postgres + originalSpec := postgres.Spec + + if originalSpec.DatabaseSecret == nil { + originalSpec.DatabaseSecret = &core.SecretVolumeSource{ + SecretName: postgres.OffshootName() + "-auth", + } + } + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("object spec in Postgres mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/github.com/kubedb/redis/pkg/validator/validate.go b/vendor/github.com/kubedb/redis/pkg/validator/validate.go index e5777935f..4a581d9e8 100644 --- a/vendor/github.com/kubedb/redis/pkg/validator/validate.go +++ b/vendor/github.com/kubedb/redis/pkg/validator/validate.go @@ -3,8 +3,14 @@ package validator import ( "fmt" + "github.com/appscode/go/types" + meta_util "github.com/appscode/kutil/meta" api "github.com/kubedb/apimachinery/apis/kubedb/v1alpha1" + cs "github.com/kubedb/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha1" amv "github.com/kubedb/apimachinery/pkg/validator" + "github.com/pkg/errors" + kerr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" ) @@ -13,7 +19,7 @@ var ( redisVersions = sets.NewString("4", "4.0", "4.0.6") ) -func ValidateRedis(client kubernetes.Interface, redis *api.Redis) error { +func ValidateRedis(client kubernetes.Interface, extClient cs.KubedbV1alpha1Interface, redis *api.Redis) error { if redis.Spec.Version == "" { return fmt.Errorf(`object 'Version' is missing in '%v'`, redis.Spec) } @@ -23,6 +29,17 @@ func ValidateRedis(client kubernetes.Interface, redis *api.Redis) error { return fmt.Errorf(`KubeDB doesn't support Redis version: %s`, string(redis.Spec.Version)) } + if redis.Spec.Replicas != nil { + replicas := types.Int32(redis.Spec.Replicas) + if replicas != 1 { + return fmt.Errorf(`spec.replicas "%d" invalid. Value must be one`, replicas) + } + } + + if err := matchWithDormantDatabase(extClient, redis); err != nil { + return err + } + if redis.Spec.Storage != nil { var err error if err = amv.ValidateStorage(client, redis.Spec.Storage); err != nil { @@ -39,3 +56,32 @@ func ValidateRedis(client kubernetes.Interface, redis *api.Redis) error { } return nil } + +func matchWithDormantDatabase(extClient cs.KubedbV1alpha1Interface, redis *api.Redis) error { + // Check if DormantDatabase exists or not + dormantDb, err := extClient.DormantDatabases(redis.Namespace).Get(redis.Name, metav1.GetOptions{}) + if err != nil { + if !kerr.IsNotFound(err) { + return err + } + return nil + } + + // Check DatabaseKind + if dormantDb.Labels[api.LabelDatabaseKind] != api.ResourceKindRedis { + return fmt.Errorf(`invalid Redis: "%v". Exists DormantDatabase "%v" of different Kind`, redis.Name, dormantDb.Name) + } + + // Check Origin Spec + drmnOriginSpec := dormantDb.Spec.Origin.Spec.Redis + originalSpec := redis.Spec + + // Skip checking doNotPause + drmnOriginSpec.DoNotPause = originalSpec.DoNotPause + + if !meta_util.Equal(drmnOriginSpec, &originalSpec) { + return errors.New("redis spec mismatches with OriginSpec in DormantDatabases") + } + + return nil +} diff --git a/vendor/google.golang.org/appengine/appengine.go b/vendor/google.golang.org/appengine/appengine.go index 475cf2e32..76dedc81d 100644 --- a/vendor/google.golang.org/appengine/appengine.go +++ b/vendor/google.golang.org/appengine/appengine.go @@ -28,7 +28,8 @@ import ( // See https://cloud.google.com/appengine/docs/flexible/custom-runtimes#health_check_requests // for details on how to do your own health checking. // -// Main is not yet supported on App Engine Standard. +// On App Engine Standard it ensures the server has started and is prepared to +// receive requests. // // Main never returns. // @@ -62,7 +63,7 @@ func IsDevAppServer() bool { // NewContext returns a context for an in-flight HTTP request. // This function is cheap. func NewContext(req *http.Request) context.Context { - return WithContext(context.Background(), req) + return internal.ReqContext(req) } // WithContext returns a copy of the parent context diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go index ec5aa59b3..16f87c5d3 100644 --- a/vendor/google.golang.org/appengine/internal/api.go +++ b/vendor/google.golang.org/appengine/internal/api.go @@ -3,6 +3,7 @@ // license that can be found in the LICENSE file. // +build !appengine +// +build go1.7 package internal @@ -32,7 +33,8 @@ import ( ) const ( - apiPath = "/rpc_http" + apiPath = "/rpc_http" + defaultTicketSuffix = "/default.20150612t184001.0" ) var ( @@ -60,6 +62,11 @@ var ( Dial: limitDial, }, } + + defaultTicketOnce sync.Once + defaultTicket string + backgroundContextOnce sync.Once + backgroundContext netcontext.Context ) func apiURL() *url.URL { @@ -83,16 +90,10 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) { outHeader: w.Header(), apiURL: apiURL(), } - stopFlushing := make(chan int) + r = r.WithContext(withContext(r.Context(), c)) + c.req = r - ctxs.Lock() - ctxs.m[r] = c - ctxs.Unlock() - defer func() { - ctxs.Lock() - delete(ctxs.m, r) - ctxs.Unlock() - }() + stopFlushing := make(chan int) // Patch up RemoteAddr so it looks reasonable. if addr := r.Header.Get(userIPHeader); addr != "" { @@ -191,18 +192,6 @@ func renderPanic(x interface{}) string { return string(buf) } -var ctxs = struct { - sync.Mutex - m map[*http.Request]*context - bg *context // background context, lazily initialized - // dec is used by tests to decorate the netcontext.Context returned - // for a given request. This allows tests to add overrides (such as - // WithAppIDOverride) to the context. The map is nil outside tests. - dec map[*http.Request]func(netcontext.Context) netcontext.Context -}{ - m: make(map[*http.Request]*context), -} - // context represents the context of an in-flight HTTP request. // It implements the appengine.Context and http.ResponseWriter interfaces. type context struct { @@ -223,6 +212,34 @@ type context struct { var contextKey = "holds a *context" +// jointContext joins two contexts in a superficial way. +// It takes values and timeouts from a base context, and only values from another context. +type jointContext struct { + base netcontext.Context + valuesOnly netcontext.Context +} + +func (c jointContext) Deadline() (time.Time, bool) { + return c.base.Deadline() +} + +func (c jointContext) Done() <-chan struct{} { + return c.base.Done() +} + +func (c jointContext) Err() error { + return c.base.Err() +} + +func (c jointContext) Value(key interface{}) interface{} { + if val := c.base.Value(key); val != nil { + return val + } + return c.valuesOnly.Value(key) +} + +// fromContext returns the App Engine context or nil if ctx is not +// derived from an App Engine context. func fromContext(ctx netcontext.Context) *context { c, _ := ctx.Value(&contextKey).(*context) return c @@ -247,86 +264,70 @@ func IncomingHeaders(ctx netcontext.Context) http.Header { return nil } -func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { - ctxs.Lock() - c := ctxs.m[req] - d := ctxs.dec[req] - ctxs.Unlock() +func ReqContext(req *http.Request) netcontext.Context { + return req.Context() +} - if d != nil { - parent = d(parent) +func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { + return jointContext{ + base: parent, + valuesOnly: req.Context(), } +} - if c == nil { - // Someone passed in an http.Request that is not in-flight. - // We panic here rather than panicking at a later point - // so that stack traces will be more sensible. - log.Panic("appengine: NewContext passed an unknown http.Request") - } - return withContext(parent, c) +// DefaultTicket returns a ticket used for background context or dev_appserver. +func DefaultTicket() string { + defaultTicketOnce.Do(func() { + if IsDevAppServer() { + defaultTicket = "testapp" + defaultTicketSuffix + return + } + appID := partitionlessAppID() + escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) + majVersion := VersionID(nil) + if i := strings.Index(majVersion, "."); i > 0 { + majVersion = majVersion[:i] + } + defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) + }) + return defaultTicket } func BackgroundContext() netcontext.Context { - ctxs.Lock() - defer ctxs.Unlock() - - if ctxs.bg != nil { - return toContext(ctxs.bg) - } - - // Compute background security ticket. - appID := partitionlessAppID() - escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) - majVersion := VersionID(nil) - if i := strings.Index(majVersion, "."); i > 0 { - majVersion = majVersion[:i] - } - ticket := fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) - - ctxs.bg = &context{ - req: &http.Request{ - Header: http.Header{ - ticketHeader: []string{ticket}, + backgroundContextOnce.Do(func() { + // Compute background security ticket. + ticket := DefaultTicket() + + c := &context{ + req: &http.Request{ + Header: http.Header{ + ticketHeader: []string{ticket}, + }, }, - }, - apiURL: apiURL(), - } + apiURL: apiURL(), + } + backgroundContext = toContext(c) - // TODO(dsymonds): Wire up the shutdown handler to do a final flush. - go ctxs.bg.logFlusher(make(chan int)) + // TODO(dsymonds): Wire up the shutdown handler to do a final flush. + go c.logFlusher(make(chan int)) + }) - return toContext(ctxs.bg) + return backgroundContext } // RegisterTestRequest registers the HTTP request req for testing, such that // any API calls are sent to the provided URL. It returns a closure to delete // the registration. // It should only be used by aetest package. -func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) func() { +func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { c := &context{ req: req, apiURL: apiURL, } - ctxs.Lock() - defer ctxs.Unlock() - if _, ok := ctxs.m[req]; ok { - log.Panic("req already associated with context") - } - if _, ok := ctxs.dec[req]; ok { - log.Panic("req already associated with context") - } - if ctxs.dec == nil { - ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context) - } - ctxs.m[req] = c - ctxs.dec[req] = decorate - - return func() { - ctxs.Lock() - delete(ctxs.m, req) - delete(ctxs.dec, req) - ctxs.Unlock() - } + ctx := withContext(decorate(req.Context()), c) + req = req.WithContext(ctx) + c.req = req + return req, func() {} } var errTimeout = &CallError{ @@ -452,7 +453,7 @@ func Call(ctx netcontext.Context, service, method string, in, out proto.Message) c := fromContext(ctx) if c == nil { // Give a good error message rather than a panic lower down. - return errors.New("not an App Engine context") + return errNotAppEngineContext } // Apply transaction modifications if we're in a transaction. @@ -475,6 +476,16 @@ func Call(ctx netcontext.Context, service, method string, in, out proto.Message) } ticket := c.req.Header.Get(ticketHeader) + // Use a test ticket under test environment. + if ticket == "" { + if appid := ctx.Value(&appIDOverrideKey); appid != nil { + ticket = appid.(string) + defaultTicketSuffix + } + } + // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. + if ticket == "" { + ticket = DefaultTicket() + } req := &remotepb.Request{ ServiceName: &service, Method: &method, @@ -550,6 +561,9 @@ var logLevelName = map[int64]string{ } func logf(c *context, level int64, format string, args ...interface{}) { + if c == nil { + panic("not an App Engine context") + } s := fmt.Sprintf(format, args...) s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. c.addLogLine(&logpb.UserAppLogLine{ diff --git a/vendor/google.golang.org/appengine/internal/api_classic.go b/vendor/google.golang.org/appengine/internal/api_classic.go index 597f66e6e..f0f40b2e3 100644 --- a/vendor/google.golang.org/appengine/internal/api_classic.go +++ b/vendor/google.golang.org/appengine/internal/api_classic.go @@ -22,14 +22,20 @@ import ( var contextKey = "holds an appengine.Context" +// fromContext returns the App Engine context or nil if ctx is not +// derived from an App Engine context. func fromContext(ctx netcontext.Context) appengine.Context { c, _ := ctx.Value(&contextKey).(appengine.Context) return c } // This is only for classic App Engine adapters. -func ClassicContextFromContext(ctx netcontext.Context) appengine.Context { - return fromContext(ctx) +func ClassicContextFromContext(ctx netcontext.Context) (appengine.Context, error) { + c := fromContext(ctx) + if c == nil { + return nil, errNotAppEngineContext + } + return c, nil } func withContext(parent netcontext.Context, c appengine.Context) netcontext.Context { @@ -53,6 +59,10 @@ func IncomingHeaders(ctx netcontext.Context) http.Header { return nil } +func ReqContext(req *http.Request) netcontext.Context { + return WithContext(netcontext.Background(), req) +} + func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { c := appengine.NewContext(req) return withContext(parent, c) @@ -98,7 +108,7 @@ func Call(ctx netcontext.Context, service, method string, in, out proto.Message) c := fromContext(ctx) if c == nil { // Give a good error message rather than a panic lower down. - return errors.New("not an App Engine context") + return errNotAppEngineContext } // Apply transaction modifications if we're in a transaction. diff --git a/vendor/google.golang.org/appengine/internal/api_common.go b/vendor/google.golang.org/appengine/internal/api_common.go index 2db33a774..e0c0b214b 100644 --- a/vendor/google.golang.org/appengine/internal/api_common.go +++ b/vendor/google.golang.org/appengine/internal/api_common.go @@ -5,10 +5,15 @@ package internal import ( + "errors" + "os" + "github.com/golang/protobuf/proto" netcontext "golang.org/x/net/context" ) +var errNotAppEngineContext = errors.New("not an App Engine context") + type CallOverrideFunc func(ctx netcontext.Context, service, method string, in, out proto.Message) error var callOverrideKey = "holds []CallOverrideFunc" @@ -77,10 +82,42 @@ func Logf(ctx netcontext.Context, level int64, format string, args ...interface{ f(level, format, args...) return } - logf(fromContext(ctx), level, format, args...) + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + logf(c, level, format, args...) } // NamespacedContext wraps a Context to support namespaces. func NamespacedContext(ctx netcontext.Context, namespace string) netcontext.Context { return withNamespace(ctx, namespace) } + +// SetTestEnv sets the env variables for testing background ticket in Flex. +func SetTestEnv() func() { + var environ = []struct { + key, value string + }{ + {"GAE_LONG_APP_ID", "my-app-id"}, + {"GAE_MINOR_VERSION", "067924799508853122"}, + {"GAE_MODULE_INSTANCE", "0"}, + {"GAE_MODULE_NAME", "default"}, + {"GAE_MODULE_VERSION", "20150612t184001"}, + } + + for _, v := range environ { + old := os.Getenv(v.key) + os.Setenv(v.key, v.value) + v.value = old + } + return func() { // Restore old environment after the test completes. + for _, v := range environ { + if v.value == "" { + os.Unsetenv(v.key) + continue + } + os.Setenv(v.key, v.value) + } + } +} diff --git a/vendor/google.golang.org/appengine/internal/api_pre17.go b/vendor/google.golang.org/appengine/internal/api_pre17.go new file mode 100644 index 000000000..028b4f056 --- /dev/null +++ b/vendor/google.golang.org/appengine/internal/api_pre17.go @@ -0,0 +1,682 @@ +// Copyright 2011 Google Inc. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +// +build !appengine +// +build !go1.7 + +package internal + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "os" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/protobuf/proto" + netcontext "golang.org/x/net/context" + + basepb "google.golang.org/appengine/internal/base" + logpb "google.golang.org/appengine/internal/log" + remotepb "google.golang.org/appengine/internal/remote_api" +) + +const ( + apiPath = "/rpc_http" + defaultTicketSuffix = "/default.20150612t184001.0" +) + +var ( + // Incoming headers. + ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") + dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") + traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") + curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") + userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") + remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") + + // Outgoing headers. + apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") + apiEndpointHeaderValue = []string{"app-engine-apis"} + apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") + apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} + apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") + apiContentType = http.CanonicalHeaderKey("Content-Type") + apiContentTypeValue = []string{"application/octet-stream"} + logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") + + apiHTTPClient = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: limitDial, + }, + } + + defaultTicketOnce sync.Once + defaultTicket string +) + +func apiURL() *url.URL { + host, port := "appengine.googleapis.internal", "10001" + if h := os.Getenv("API_HOST"); h != "" { + host = h + } + if p := os.Getenv("API_PORT"); p != "" { + port = p + } + return &url.URL{ + Scheme: "http", + Host: host + ":" + port, + Path: apiPath, + } +} + +func handleHTTP(w http.ResponseWriter, r *http.Request) { + c := &context{ + req: r, + outHeader: w.Header(), + apiURL: apiURL(), + } + stopFlushing := make(chan int) + + ctxs.Lock() + ctxs.m[r] = c + ctxs.Unlock() + defer func() { + ctxs.Lock() + delete(ctxs.m, r) + ctxs.Unlock() + }() + + // Patch up RemoteAddr so it looks reasonable. + if addr := r.Header.Get(userIPHeader); addr != "" { + r.RemoteAddr = addr + } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { + r.RemoteAddr = addr + } else { + // Should not normally reach here, but pick a sensible default anyway. + r.RemoteAddr = "127.0.0.1" + } + // The address in the headers will most likely be of these forms: + // 123.123.123.123 + // 2001:db8::1 + // net/http.Request.RemoteAddr is specified to be in "IP:port" form. + if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { + // Assume the remote address is only a host; add a default port. + r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") + } + + // Start goroutine responsible for flushing app logs. + // This is done after adding c to ctx.m (and stopped before removing it) + // because flushing logs requires making an API call. + go c.logFlusher(stopFlushing) + + executeRequestSafely(c, r) + c.outHeader = nil // make sure header changes aren't respected any more + + stopFlushing <- 1 // any logging beyond this point will be dropped + + // Flush any pending logs asynchronously. + c.pendingLogs.Lock() + flushes := c.pendingLogs.flushes + if len(c.pendingLogs.lines) > 0 { + flushes++ + } + c.pendingLogs.Unlock() + go c.flushLog(false) + w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) + + // Avoid nil Write call if c.Write is never called. + if c.outCode != 0 { + w.WriteHeader(c.outCode) + } + if c.outBody != nil { + w.Write(c.outBody) + } +} + +func executeRequestSafely(c *context, r *http.Request) { + defer func() { + if x := recover(); x != nil { + logf(c, 4, "%s", renderPanic(x)) // 4 == critical + c.outCode = 500 + } + }() + + http.DefaultServeMux.ServeHTTP(c, r) +} + +func renderPanic(x interface{}) string { + buf := make([]byte, 16<<10) // 16 KB should be plenty + buf = buf[:runtime.Stack(buf, false)] + + // Remove the first few stack frames: + // this func + // the recover closure in the caller + // That will root the stack trace at the site of the panic. + const ( + skipStart = "internal.renderPanic" + skipFrames = 2 + ) + start := bytes.Index(buf, []byte(skipStart)) + p := start + for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { + p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 + if p < 0 { + break + } + } + if p >= 0 { + // buf[start:p+1] is the block to remove. + // Copy buf[p+1:] over buf[start:] and shrink buf. + copy(buf[start:], buf[p+1:]) + buf = buf[:len(buf)-(p+1-start)] + } + + // Add panic heading. + head := fmt.Sprintf("panic: %v\n\n", x) + if len(head) > len(buf) { + // Extremely unlikely to happen. + return head + } + copy(buf[len(head):], buf) + copy(buf, head) + + return string(buf) +} + +var ctxs = struct { + sync.Mutex + m map[*http.Request]*context + bg *context // background context, lazily initialized + // dec is used by tests to decorate the netcontext.Context returned + // for a given request. This allows tests to add overrides (such as + // WithAppIDOverride) to the context. The map is nil outside tests. + dec map[*http.Request]func(netcontext.Context) netcontext.Context +}{ + m: make(map[*http.Request]*context), +} + +// context represents the context of an in-flight HTTP request. +// It implements the appengine.Context and http.ResponseWriter interfaces. +type context struct { + req *http.Request + + outCode int + outHeader http.Header + outBody []byte + + pendingLogs struct { + sync.Mutex + lines []*logpb.UserAppLogLine + flushes int + } + + apiURL *url.URL +} + +var contextKey = "holds a *context" + +// fromContext returns the App Engine context or nil if ctx is not +// derived from an App Engine context. +func fromContext(ctx netcontext.Context) *context { + c, _ := ctx.Value(&contextKey).(*context) + return c +} + +func withContext(parent netcontext.Context, c *context) netcontext.Context { + ctx := netcontext.WithValue(parent, &contextKey, c) + if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { + ctx = withNamespace(ctx, ns) + } + return ctx +} + +func toContext(c *context) netcontext.Context { + return withContext(netcontext.Background(), c) +} + +func IncomingHeaders(ctx netcontext.Context) http.Header { + if c := fromContext(ctx); c != nil { + return c.req.Header + } + return nil +} + +func ReqContext(req *http.Request) netcontext.Context { + return WithContext(netcontext.Background(), req) +} + +func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { + ctxs.Lock() + c := ctxs.m[req] + d := ctxs.dec[req] + ctxs.Unlock() + + if d != nil { + parent = d(parent) + } + + if c == nil { + // Someone passed in an http.Request that is not in-flight. + // We panic here rather than panicking at a later point + // so that stack traces will be more sensible. + log.Panic("appengine: NewContext passed an unknown http.Request") + } + return withContext(parent, c) +} + +// DefaultTicket returns a ticket used for background context or dev_appserver. +func DefaultTicket() string { + defaultTicketOnce.Do(func() { + if IsDevAppServer() { + defaultTicket = "testapp" + defaultTicketSuffix + return + } + appID := partitionlessAppID() + escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) + majVersion := VersionID(nil) + if i := strings.Index(majVersion, "."); i > 0 { + majVersion = majVersion[:i] + } + defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) + }) + return defaultTicket +} + +func BackgroundContext() netcontext.Context { + ctxs.Lock() + defer ctxs.Unlock() + + if ctxs.bg != nil { + return toContext(ctxs.bg) + } + + // Compute background security ticket. + ticket := DefaultTicket() + + ctxs.bg = &context{ + req: &http.Request{ + Header: http.Header{ + ticketHeader: []string{ticket}, + }, + }, + apiURL: apiURL(), + } + + // TODO(dsymonds): Wire up the shutdown handler to do a final flush. + go ctxs.bg.logFlusher(make(chan int)) + + return toContext(ctxs.bg) +} + +// RegisterTestRequest registers the HTTP request req for testing, such that +// any API calls are sent to the provided URL. It returns a closure to delete +// the registration. +// It should only be used by aetest package. +func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { + c := &context{ + req: req, + apiURL: apiURL, + } + ctxs.Lock() + defer ctxs.Unlock() + if _, ok := ctxs.m[req]; ok { + log.Panic("req already associated with context") + } + if _, ok := ctxs.dec[req]; ok { + log.Panic("req already associated with context") + } + if ctxs.dec == nil { + ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context) + } + ctxs.m[req] = c + ctxs.dec[req] = decorate + + return req, func() { + ctxs.Lock() + delete(ctxs.m, req) + delete(ctxs.dec, req) + ctxs.Unlock() + } +} + +var errTimeout = &CallError{ + Detail: "Deadline exceeded", + Code: int32(remotepb.RpcError_CANCELLED), + Timeout: true, +} + +func (c *context) Header() http.Header { return c.outHeader } + +// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status +// codes do not permit a response body (nor response entity headers such as +// Content-Length, Content-Type, etc). +func bodyAllowedForStatus(status int) bool { + switch { + case status >= 100 && status <= 199: + return false + case status == 204: + return false + case status == 304: + return false + } + return true +} + +func (c *context) Write(b []byte) (int, error) { + if c.outCode == 0 { + c.WriteHeader(http.StatusOK) + } + if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { + return 0, http.ErrBodyNotAllowed + } + c.outBody = append(c.outBody, b...) + return len(b), nil +} + +func (c *context) WriteHeader(code int) { + if c.outCode != 0 { + logf(c, 3, "WriteHeader called multiple times on request.") // error level + return + } + c.outCode = code +} + +func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { + hreq := &http.Request{ + Method: "POST", + URL: c.apiURL, + Header: http.Header{ + apiEndpointHeader: apiEndpointHeaderValue, + apiMethodHeader: apiMethodHeaderValue, + apiContentType: apiContentTypeValue, + apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, + }, + Body: ioutil.NopCloser(bytes.NewReader(body)), + ContentLength: int64(len(body)), + Host: c.apiURL.Host, + } + if info := c.req.Header.Get(dapperHeader); info != "" { + hreq.Header.Set(dapperHeader, info) + } + if info := c.req.Header.Get(traceHeader); info != "" { + hreq.Header.Set(traceHeader, info) + } + + tr := apiHTTPClient.Transport.(*http.Transport) + + var timedOut int32 // atomic; set to 1 if timed out + t := time.AfterFunc(timeout, func() { + atomic.StoreInt32(&timedOut, 1) + tr.CancelRequest(hreq) + }) + defer t.Stop() + defer func() { + // Check if timeout was exceeded. + if atomic.LoadInt32(&timedOut) != 0 { + err = errTimeout + } + }() + + hresp, err := apiHTTPClient.Do(hreq) + if err != nil { + return nil, &CallError{ + Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), + Code: int32(remotepb.RpcError_UNKNOWN), + } + } + defer hresp.Body.Close() + hrespBody, err := ioutil.ReadAll(hresp.Body) + if hresp.StatusCode != 200 { + return nil, &CallError{ + Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), + Code: int32(remotepb.RpcError_UNKNOWN), + } + } + if err != nil { + return nil, &CallError{ + Detail: fmt.Sprintf("service bridge response bad: %v", err), + Code: int32(remotepb.RpcError_UNKNOWN), + } + } + return hrespBody, nil +} + +func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { + if ns := NamespaceFromContext(ctx); ns != "" { + if fn, ok := NamespaceMods[service]; ok { + fn(in, ns) + } + } + + if f, ctx, ok := callOverrideFromContext(ctx); ok { + return f(ctx, service, method, in, out) + } + + // Handle already-done contexts quickly. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + c := fromContext(ctx) + if c == nil { + // Give a good error message rather than a panic lower down. + return errNotAppEngineContext + } + + // Apply transaction modifications if we're in a transaction. + if t := transactionFromContext(ctx); t != nil { + if t.finished { + return errors.New("transaction context has expired") + } + applyTransaction(in, &t.transaction) + } + + // Default RPC timeout is 60s. + timeout := 60 * time.Second + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + } + + data, err := proto.Marshal(in) + if err != nil { + return err + } + + ticket := c.req.Header.Get(ticketHeader) + // Use a test ticket under test environment. + if ticket == "" { + if appid := ctx.Value(&appIDOverrideKey); appid != nil { + ticket = appid.(string) + defaultTicketSuffix + } + } + // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. + if ticket == "" { + ticket = DefaultTicket() + } + req := &remotepb.Request{ + ServiceName: &service, + Method: &method, + Request: data, + RequestId: &ticket, + } + hreqBody, err := proto.Marshal(req) + if err != nil { + return err + } + + hrespBody, err := c.post(hreqBody, timeout) + if err != nil { + return err + } + + res := &remotepb.Response{} + if err := proto.Unmarshal(hrespBody, res); err != nil { + return err + } + if res.RpcError != nil { + ce := &CallError{ + Detail: res.RpcError.GetDetail(), + Code: *res.RpcError.Code, + } + switch remotepb.RpcError_ErrorCode(ce.Code) { + case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: + ce.Timeout = true + } + return ce + } + if res.ApplicationError != nil { + return &APIError{ + Service: *req.ServiceName, + Detail: res.ApplicationError.GetDetail(), + Code: *res.ApplicationError.Code, + } + } + if res.Exception != nil || res.JavaException != nil { + // This shouldn't happen, but let's be defensive. + return &CallError{ + Detail: "service bridge returned exception", + Code: int32(remotepb.RpcError_UNKNOWN), + } + } + return proto.Unmarshal(res.Response, out) +} + +func (c *context) Request() *http.Request { + return c.req +} + +func (c *context) addLogLine(ll *logpb.UserAppLogLine) { + // Truncate long log lines. + // TODO(dsymonds): Check if this is still necessary. + const lim = 8 << 10 + if len(*ll.Message) > lim { + suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) + ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) + } + + c.pendingLogs.Lock() + c.pendingLogs.lines = append(c.pendingLogs.lines, ll) + c.pendingLogs.Unlock() +} + +var logLevelName = map[int64]string{ + 0: "DEBUG", + 1: "INFO", + 2: "WARNING", + 3: "ERROR", + 4: "CRITICAL", +} + +func logf(c *context, level int64, format string, args ...interface{}) { + if c == nil { + panic("not an App Engine context") + } + s := fmt.Sprintf(format, args...) + s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. + c.addLogLine(&logpb.UserAppLogLine{ + TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), + Level: &level, + Message: &s, + }) + log.Print(logLevelName[level] + ": " + s) +} + +// flushLog attempts to flush any pending logs to the appserver. +// It should not be called concurrently. +func (c *context) flushLog(force bool) (flushed bool) { + c.pendingLogs.Lock() + // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. + n, rem := 0, 30<<20 + for ; n < len(c.pendingLogs.lines); n++ { + ll := c.pendingLogs.lines[n] + // Each log line will require about 3 bytes of overhead. + nb := proto.Size(ll) + 3 + if nb > rem { + break + } + rem -= nb + } + lines := c.pendingLogs.lines[:n] + c.pendingLogs.lines = c.pendingLogs.lines[n:] + c.pendingLogs.Unlock() + + if len(lines) == 0 && !force { + // Nothing to flush. + return false + } + + rescueLogs := false + defer func() { + if rescueLogs { + c.pendingLogs.Lock() + c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) + c.pendingLogs.Unlock() + } + }() + + buf, err := proto.Marshal(&logpb.UserAppLogGroup{ + LogLine: lines, + }) + if err != nil { + log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) + rescueLogs = true + return false + } + + req := &logpb.FlushRequest{ + Logs: buf, + } + res := &basepb.VoidProto{} + c.pendingLogs.Lock() + c.pendingLogs.flushes++ + c.pendingLogs.Unlock() + if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { + log.Printf("internal.flushLog: Flush RPC: %v", err) + rescueLogs = true + return false + } + return true +} + +const ( + // Log flushing parameters. + flushInterval = 1 * time.Second + forceFlushInterval = 60 * time.Second +) + +func (c *context) logFlusher(stop <-chan int) { + lastFlush := time.Now() + tick := time.NewTicker(flushInterval) + for { + select { + case <-stop: + // Request finished. + tick.Stop() + return + case <-tick.C: + force := time.Now().Sub(lastFlush) > forceFlushInterval + if c.flushLog(force) { + lastFlush = time.Now() + } + } + } +} + +func ContextForTesting(req *http.Request) netcontext.Context { + return toContext(&context{req: req}) +} diff --git a/vendor/google.golang.org/appengine/internal/identity_classic.go b/vendor/google.golang.org/appengine/internal/identity_classic.go index e6b9227c5..b59603f13 100644 --- a/vendor/google.golang.org/appengine/internal/identity_classic.go +++ b/vendor/google.golang.org/appengine/internal/identity_classic.go @@ -13,15 +13,45 @@ import ( ) func DefaultVersionHostname(ctx netcontext.Context) string { - return appengine.DefaultVersionHostname(fromContext(ctx)) + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + return appengine.DefaultVersionHostname(c) } -func RequestID(ctx netcontext.Context) string { return appengine.RequestID(fromContext(ctx)) } -func Datacenter(_ netcontext.Context) string { return appengine.Datacenter() } -func ServerSoftware() string { return appengine.ServerSoftware() } -func ModuleName(ctx netcontext.Context) string { return appengine.ModuleName(fromContext(ctx)) } -func VersionID(ctx netcontext.Context) string { return appengine.VersionID(fromContext(ctx)) } -func InstanceID() string { return appengine.InstanceID() } -func IsDevAppServer() bool { return appengine.IsDevAppServer() } +func Datacenter(_ netcontext.Context) string { return appengine.Datacenter() } +func ServerSoftware() string { return appengine.ServerSoftware() } +func InstanceID() string { return appengine.InstanceID() } +func IsDevAppServer() bool { return appengine.IsDevAppServer() } -func fullyQualifiedAppID(ctx netcontext.Context) string { return fromContext(ctx).FullyQualifiedAppID() } +func RequestID(ctx netcontext.Context) string { + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + return appengine.RequestID(c) +} + +func ModuleName(ctx netcontext.Context) string { + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + return appengine.ModuleName(c) +} +func VersionID(ctx netcontext.Context) string { + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + return appengine.VersionID(c) +} + +func fullyQualifiedAppID(ctx netcontext.Context) string { + c := fromContext(ctx) + if c == nil { + panic(errNotAppEngineContext) + } + return c.FullyQualifiedAppID() +} diff --git a/vendor/google.golang.org/appengine/internal/identity_vm.go b/vendor/google.golang.org/appengine/internal/identity_vm.go index ebe68b785..d5fa75be7 100644 --- a/vendor/google.golang.org/appengine/internal/identity_vm.go +++ b/vendor/google.golang.org/appengine/internal/identity_vm.go @@ -23,7 +23,11 @@ const ( ) func ctxHeaders(ctx netcontext.Context) http.Header { - return fromContext(ctx).Request().Header + c := fromContext(ctx) + if c == nil { + return nil + } + return c.Request().Header } func DefaultVersionHostname(ctx netcontext.Context) string { diff --git a/vendor/google.golang.org/appengine/internal/main_vm.go b/vendor/google.golang.org/appengine/internal/main_vm.go index 57331ad17..822e784a4 100644 --- a/vendor/google.golang.org/appengine/internal/main_vm.go +++ b/vendor/google.golang.org/appengine/internal/main_vm.go @@ -22,7 +22,11 @@ func Main() { port = s } - if err := http.ListenAndServe(":"+port, http.HandlerFunc(handleHTTP)); err != nil { + host := "" + if IsDevAppServer() { + host = "127.0.0.1" + } + if err := http.ListenAndServe(host+":"+port, http.HandlerFunc(handleHTTP)); err != nil { log.Fatalf("http.ListenAndServe: %v", err) } }