From 694704608d8a0155a69cce8353c7768ffc2bb16b Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Tue, 8 Nov 2022 12:12:34 -0800 Subject: [PATCH 1/2] negotiated refactor Co-authored-by: Jeffrey Ying Kubernetes-commit: 76f056867a2be388f7780c1ff5c794edcdfd9aa4 --- pkg/apiserver/handler_apis.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/apiserver/handler_apis.go b/pkg/apiserver/handler_apis.go index c014e044a..ba9376401 100644 --- a/pkg/apiserver/handler_apis.go +++ b/pkg/apiserver/handler_apis.go @@ -91,7 +91,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } - responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList) + responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList, false) } // convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object. @@ -162,5 +162,5 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.Error(w, "", http.StatusNotFound) return } - responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup) + responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup, false) } From 175acc3efa2c32e821c6255c0636da1056935295 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Tue, 8 Nov 2022 12:24:09 -0800 Subject: [PATCH 2/2] add aggregated-apiservices to aggregated discovery Co-authored-by: Jeffrey Ying Kubernetes-commit: b64df605b48ce1ae5d85a2656505d4f651033b1b --- go.mod | 40 +- go.sum | 30 +- pkg/apiserver/apiserver.go | 47 +- pkg/apiserver/handler_discovery.go | 572 ++++++++++++++++++++++++ pkg/apiserver/handler_discovery_test.go | 359 +++++++++++++++ pkg/apiserver/handler_proxy.go | 8 + 6 files changed, 1007 insertions(+), 49 deletions(-) create mode 100644 pkg/apiserver/handler_discovery.go create mode 100644 pkg/apiserver/handler_discovery_test.go diff --git a/go.mod b/go.mod index 79dc5452a..e326b55dc 100644 --- a/go.mod +++ b/go.mod @@ -8,25 +8,25 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/emicklei/go-restful/v3 v3.9.0 github.com/gogo/protobuf v1.3.2 + github.com/google/gofuzz v1.1.0 github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.1.1-0.20221027164007-c63010009c80 - k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a - k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5 - k8s.io/apiserver v0.0.0-20221108213719-5643daa2db6a - k8s.io/client-go v0.0.0-20221108173010-769443557e04 - k8s.io/code-generator v0.0.0-20221108000200-7429fbb99432 - k8s.io/component-base v0.0.0-20221108213136-021afb59bb71 + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 + k8s.io/apiserver v0.0.0 + k8s.io/client-go v0.0.0 + k8s.io/code-generator v0.0.0 + k8s.io/component-base v0.0.0 k8s.io/klog/v2 v2.80.1 k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 - k8s.io/utils v0.0.0-20221107191617-1a15be271d1d + k8s.io/utils v0.0.0-20220922133306-665eaaec4324 sigs.k8s.io/structured-merge-diff/v4 v4.2.3 ) require ( github.com/NYTimes/gziphandler v1.1.1 // indirect - github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect @@ -43,10 +43,8 @@ require ( github.com/go-openapi/swag v0.19.14 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/cel-go v0.12.5 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect @@ -62,11 +60,10 @@ require ( github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.14.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/client_golang v1.13.0 // indirect + github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/stoewer/go-strcase v1.2.0 // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v3 v3.5.5 // indirect @@ -101,18 +98,19 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect - k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6 // indirect + k8s.io/kms v0.0.0 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5 - k8s.io/apiserver => k8s.io/apiserver v0.0.0-20221108213719-5643daa2db6a - k8s.io/client-go => k8s.io/client-go v0.0.0-20221108173010-769443557e04 - k8s.io/code-generator => k8s.io/code-generator v0.0.0-20221108000200-7429fbb99432 - k8s.io/component-base => k8s.io/component-base v0.0.0-20221108213136-021afb59bb71 - k8s.io/kms => k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6 + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery + k8s.io/apiserver => ../apiserver + k8s.io/client-go => ../client-go + k8s.io/code-generator => ../code-generator + k8s.io/component-base => ../component-base + k8s.io/kms => ../kms + k8s.io/kube-aggregator => ../kube-aggregator ) diff --git a/go.sum b/go.sum index e72aec4d4..6fc1eb373 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,6 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= -github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -170,8 +168,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= -github.com/google/cel-go v0.12.5 h1:DmzaiSgoaqGCjtpPQWl26/gND+yRpim56H1jCVev6d8= -github.com/google/cel-go v0.12.5/go.mod h1:Jk7ljRzLBhkmiAwBoUxB1sZSCVBAzkqPF25olK/iRDw= github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -278,14 +274,13 @@ github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= -github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= -github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= +github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU= +github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= @@ -312,7 +307,6 @@ github.com/spf13/cobra v1.6.0 h1:42a0n6jwCot1pUmomAp4T7DeMD+20LFv4Q54pxLf2LI= github.com/spf13/cobra v1.6.0/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -722,29 +716,15 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a h1:GaCla9HtNyi63kysI/cyeA4bv6wRkIyuiUeXpaTF+dw= -k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a/go.mod h1:PSXY9/fSNyKgKHUU+O9scnZiW8m+V1znqk49oI6hAEY= -k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5 h1:iFAMJ1evvrO6X7dS7EKujS6An+bp3u/dD6opu8rn0QA= -k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5/go.mod h1:VXMmlsE7YRJ5vyAyWpkKIfFkEbDNpVs0ObpkuQf1WfM= -k8s.io/apiserver v0.0.0-20221108213719-5643daa2db6a h1:Pgwr1+mXasRCcSKqXNgXPj497HPJjZ5KT8u4GVvNoY0= -k8s.io/apiserver v0.0.0-20221108213719-5643daa2db6a/go.mod h1:HEuVcGugsLz1DrS8BkNLT7wjhrBVrZh2wFZfTAuy2NE= -k8s.io/client-go v0.0.0-20221108173010-769443557e04 h1:ad7JkOkiLiyMKWHRkmbJgjCzySdkXxRxWeNosATW0mo= -k8s.io/client-go v0.0.0-20221108173010-769443557e04/go.mod h1:O6sEWJ2BPd8Dag831LA1lzC3WnE29nuwUJZZ4H2vlyo= -k8s.io/code-generator v0.0.0-20221108000200-7429fbb99432 h1:lWxXcC+YG3ZY5sCsk337h9RgzUK0sWoNWf8fs6KZm0I= -k8s.io/code-generator v0.0.0-20221108000200-7429fbb99432/go.mod h1:EzIGoP4u7NcVT4orMRQyvQcWqFCUVApa+V+28R7t/+M= -k8s.io/component-base v0.0.0-20221108213136-021afb59bb71 h1:Qr7dcMdpWjUZUEkZcbvGGQbtTlVR9b9VqQci/G0jzfY= -k8s.io/component-base v0.0.0-20221108213136-021afb59bb71/go.mod h1:5bp64lK0p+wJD2BFMin7Akfxiwvt58T4iDz2Q+6woBE= k8s.io/gengo v0.0.0-20220902162205-c0856e24416d h1:U9tB195lKdzwqicbJvyJeOXV7Klv+wNAWENRnXEGi08= k8s.io/gengo v0.0.0-20220902162205-c0856e24416d/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6 h1:d/x+J+EPT4UkD2pH39Ms5xKo1IVDfYlzoxowFd99tFg= -k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6/go.mod h1:cvW8uNcFX5HPpTk6XURAIylLjIzrE7ACcRGt0HRL7eI= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/utils v0.0.0-20221107191617-1a15be271d1d h1:0Smp/HP1OH4Rvhe+4B8nWGERtlqAGSftbSbbmm45oFs= -k8s.io/utils v0.0.0-20221107191617-1a15be271d1d/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20220922133306-665eaaec4324 h1:i+xdFemcSNuJvIfBlaYuXgRondKxK4z4prVPKzEaelI= +k8s.io/utils v0.0.0-20220922133306-665eaaec4324/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d60f8df9f..4fd206cc8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" @@ -154,6 +155,11 @@ type APIAggregator struct { // openAPIV3AggregationController downloads and caches OpenAPI v3 specs. openAPIV3AggregationController *openapiv3controller.AggregationController + // discoveryAggregationController downloads and caches discovery documents + // from all aggregated apiservices so they are available from /apis endpoint + // when discovery with resources are requested + discoveryAggregationController DiscoveryAggregationController + // egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector @@ -244,7 +250,13 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg lister: s.lister, discoveryGroup: discoveryGroup(enabledVersions), } - s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager) + s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport) + } else { + s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) + } s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s) @@ -365,8 +377,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return s, nil } -// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling -// the generic PrepareRun. +// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec & +// aggregated discovery document and calling the generic PrepareRun. func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { // add post start hook before generic PrepareRun in order to be before /healthz installation if s.openAPIConfig != nil { @@ -383,6 +395,20 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { }) } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + s.discoveryAggregationController = NewDiscoveryManager( + s.GenericAPIServer.AggregatedDiscoveryGroupManager, + ) + + // Setup discovery endpoint + s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error { + // Run discovery manager's worker to watch for new/removed/updated + // APIServices to the discovery document can be updated at runtime + go s.discoveryAggregationController.Run(context.StopCh) + return nil + }) + } + prepared := s.GenericAPIServer.PrepareRun() // delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers @@ -432,6 +458,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIV3AggregationController != nil { s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService) } + // Forward calls to discovery manager to update discovery document + if s.discoveryAggregationController != nil { + handlerCopy := *proxyHandler + handlerCopy.setServiceAvailable(true) + s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy) + } return nil } @@ -457,6 +489,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { if s.openAPIV3AggregationController != nil { s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService) } + if s.discoveryAggregationController != nil { + s.discoveryAggregationController.AddAPIService(apiService, proxyHandler) + } + s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) @@ -489,6 +525,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { // RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please. // It's a slow moving API, so it's ok to run the controller on a single thread. func (s *APIAggregator) RemoveAPIService(apiServiceName string) { + // Forward calls to discovery manager to update discovery document + if s.discoveryAggregationController != nil { + s.discoveryAggregationController.RemoveAPIService(apiServiceName) + } + version := v1helper.APIServiceNameToGroupVersion(apiServiceName) proxyPath := "/apis/" + version.Group + "/" + version.Version diff --git a/pkg/apiserver/handler_discovery.go b/pkg/apiserver/handler_discovery.go new file mode 100644 index 000000000..41d6e0be9 --- /dev/null +++ b/pkg/apiserver/handler_discovery.go @@ -0,0 +1,572 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "errors" + "fmt" + "net/http" + "sync" + "time" + + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + "k8s.io/apiserver/pkg/endpoints/request" + scheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" +) + +var APIRegistrationGroup string = "apiregistration.k8s.io" +var APIRegistrationGroupPriority int = 18000 + +// Given a list of APIServices and proxyHandlers for contacting them, +// DiscoveryManager caches a list of discovery documents for each server + +type DiscoveryAggregationController interface { + // Adds or Updates an APIService from the Aggregated Discovery Controller's + // knowledge base + // Thread-safe + AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) + + // Removes an APIService from the Aggregated Discovery Controller's Knowledge + // bank + // Thread-safe + RemoveAPIService(apiServiceName string) + + // Spwans a worker which waits for added/updated apiservices and updates + // the unified discovery document by contacting the aggregated api services + Run(stopCh <-chan struct{}) + + // Returns true if all non-local APIServices that have been added + // are synced at least once to the discovery document + ExternalServicesSynced() bool +} + +type discoveryManager struct { + // Locks `services` + servicesLock sync.RWMutex + + // Map from APIService's name (or a unique string for local servers) + // to information about contacting that API Service + apiServices map[string]groupVersionInfo + + // Locks cachedResults + resultsLock sync.RWMutex + + // Map from APIService.Spec.Service to the previously fetched value + // (Note that many APIServices might use the same APIService.Spec.Service) + cachedResults map[serviceKey]cachedResult + + // Queue of dirty apiServiceKey which need to be refreshed + // It is important that the reconciler for this queue does not excessively + // contact the apiserver if a key was enqueued before the server was last + // contacted. + dirtyAPIServiceQueue workqueue.RateLimitingInterface + + // Merged handler which stores all known groupversions + mergedDiscoveryHandler discoveryendpoint.ResourceManager +} + +// Version of Service/Spec with relevant fields for use as a cache key +type serviceKey struct { + Namespace string + Name string + Port int32 +} + +// Human-readable String representation used for logs +func (s serviceKey) String() string { + return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port) +} + +func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey { + // Docs say. Defaults to 443 for compatibility reasons. + // BETA: Should this be a shared constant to avoid drifting with the + // implementation? + port := int32(443) + if service.Port != nil { + port = *service.Port + } + + return serviceKey{ + Name: service.Name, + Namespace: service.Namespace, + Port: port, + } +} + +type cachedResult struct { + // Currently cached discovery document for this service + // Map from group name to version name to + discovery map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery + + // ETag hash of the cached discoveryDocument + etag string + + // Guaranteed to be a time less than the time the server responded with the + // discovery data. + lastUpdated time.Time +} + +// Information about a specific APIService/GroupVersion +type groupVersionInfo struct { + // Date this APIService was marked dirty. + // Guaranteed to be a time greater than the most recent time the APIService + // was known to be modified. + // + // Used for request deduplication to ensure the data used to reconcile each + // apiservice was retrieved after the time of the APIService change: + // real_apiservice_change_time < groupVersionInfo.lastMarkedDirty < cachedResult.lastUpdated < real_document_fresh_time + // + // This ensures that if the apiservice was changed after the last cached entry + // was stored, the discovery document will always be re-fetched. + lastMarkedDirty time.Time + + // Last time sync function was run for this GV. + lastReconciled time.Time + + // ServiceReference of this GroupVersion. This identifies the Service which + // describes how to contact the server responsible for this GroupVersion. + service serviceKey + + // groupPriority describes the priority of the APIService for sorting + groupPriority int + + // Method for contacting the service + handler http.Handler +} + +var _ DiscoveryAggregationController = &discoveryManager{} + +func NewDiscoveryManager( + target discoveryendpoint.ResourceManager, +) DiscoveryAggregationController { + return &discoveryManager{ + mergedDiscoveryHandler: target, + apiServices: make(map[string]groupVersionInfo), + cachedResults: make(map[serviceKey]cachedResult), + dirtyAPIServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"), + } +} + +// Returns discovery data for the given apiservice. +// Caches the result. +// Returns the cached result if it is retrieved after the apiservice was last +// marked dirty +// If there was an error in fetching, returns the stale cached result if it exists, +// and a non-nil error +// If the result is current, returns nil error and non-nil result +func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) { + // Lookup last cached result for this apiservice's service. + cached, exists := dm.getCacheEntryForService(info.service) + + // If entry exists and was updated after the given time, just stop now + if exists && cached.lastUpdated.After(info.lastMarkedDirty) { + return &cached, nil + } + + // If we have a handler to contact the server for this APIService, and + // the cache entry is too old to use, refresh the cache entry now. + handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out") + req, err := http.NewRequest("GET", "/apis", nil) + if err != nil { + // NewRequest should not fail, but if it does for some reason, + // log it and continue + return &cached, fmt.Errorf("failed to create http.Request: %v", err) + } + + // Apply aggregator user to request + req = req.WithContext( + request.WithUser( + req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}})) + req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{ + Path: req.URL.Path, + IsResourceRequest: false, + })) + req.Header.Add("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + + if exists && len(cached.etag) > 0 { + req.Header.Add("If-None-Match", cached.etag) + } + + // Important that the time recorded in the data's "lastUpdated" is conservatively + // from BEFORE the request is dispatched so that lastUpdated can be used to + // de-duplicate requests. + now := time.Now() + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + switch writer.respCode { + case http.StatusNotModified: + dm.resultsLock.Lock() + defer dm.resultsLock.Unlock() + + // Keep old entry, update timestamp + cached = cachedResult{ + discovery: cached.discovery, + etag: cached.etag, + lastUpdated: now, + } + + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + case http.StatusNotFound: + // Discovery Document is not being served at all. + // Fall back to legacy discovery information + if len(gv.Version) == 0 { + return nil, errors.New("not found") + } + + var path string + if len(gv.Group) == 0 { + path = "/api/" + gv.Version + } else { + path = "/apis/" + gv.Group + "/" + gv.Version + } + + req, err := http.NewRequest("GET", path, nil) + if err != nil { + // NewRequest should not fail, but if it does for some reason, + // log it and continue + return nil, fmt.Errorf("failed to create http.Request: %v", err) + } + + // Apply aggregator user to request + req = req.WithContext( + request.WithUser( + req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"})) + + // req.Header.Add("Accept", runtime.ContentTypeProtobuf) + req.Header.Add("Accept", runtime.ContentTypeJSON) + + if exists && len(cached.etag) > 0 { + req.Header.Add("If-None-Match", cached.etag) + } + + writer := newInMemoryResponseWriter() + handler.ServeHTTP(writer, req) + + if writer.respCode != http.StatusOK { + return nil, fmt.Errorf("failed to download discovery for %s: %v", path, writer.String()) + } + + parsed := &metav1.APIResourceList{} + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + return nil, err + } + + // Create a discomap with single group-version + resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources) + if err != nil { + return nil, err + } + + discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{ + // Convert old-style APIGroupList to new information + gv: { + Version: gv.Version, + Resources: resources, + }, + } + + cached = cachedResult{ + discovery: discoMap, + lastUpdated: now, + } + + // Save the resolve, because it is still useful in case other services + // are already marked dirty. THey can use it without making http request + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + + case http.StatusOK: + parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{} + if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil { + return nil, err + } + klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String()) + + // Convert discovery info into a map for convenient lookup later + discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{} + for _, g := range parsed.Items { + for _, v := range g.Versions { + discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v + } + } + + // Save cached result + cached = cachedResult{ + discovery: discoMap, + etag: writer.Header().Get("Etag"), + lastUpdated: now, + } + dm.setCacheEntryForService(info.service, cached) + return &cached, nil + + default: + klog.Infof("DiscoveryManager: Failed to download discovery for %v: %v %s", + info.service.String(), writer.respCode, writer.data) + return nil, fmt.Errorf("service %s returned non-success response code: %v", + info.service.String(), writer.respCode) + } +} + +// Try to sync a single APIService. +func (dm *discoveryManager) syncAPIService(apiServiceName string) error { + info, exists := dm.getInfoForAPIService(apiServiceName) + + gv := helper.APIServiceNameToGroupVersion(apiServiceName) + mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version} + + if !exists { + // apiservice was removed. remove it from merged discovery + dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv) + return nil + } + + // Lookup last cached result for this apiservice's service. + now := time.Now() + cached, err := dm.fetchFreshDiscoveryForService(mgv, info) + + info.lastReconciled = now + dm.setInfoForAPIService(apiServiceName, &info) + + var entry apidiscoveryv2beta1.APIVersionDiscovery + + // Extract the APIService's specific resource information from the + // groupversion + if cached == nil { + // There was an error fetching discovery for this APIService, and + // there is nothing in the cache for this GV. + // + // Just use empty GV to mark that GV exists, but no resources. + // Also mark that it is stale to indicate the fetch failed + // TODO: Maybe also stick in a status for the version the error? + entry = apidiscoveryv2beta1.APIVersionDiscovery{ + Version: gv.Version, + } + } else { + // Find our specific groupversion within the discovery document + entry, exists = cached.discovery[mgv] + if exists { + // The stale/fresh entry has our GV, so we can include it in the doc + } else { + // Successfully fetched discovery information from the server, but + // the server did not include this groupversion? + entry = apidiscoveryv2beta1.APIVersionDiscovery{ + Version: gv.Version, + } + } + } + + // The entry's staleness depends upon if `fetchFreshDiscoveryForService` + // returned an error or not. + if err == nil { + entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent + } else { + entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + } + + dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry) + return nil +} + +// Spwans a goroutune which waits for added/updated apiservices and updates +// the discovery document accordingly +func (dm *discoveryManager) Run(stopCh <-chan struct{}) { + klog.Info("Starting ResourceDiscoveryManager") + + // Shutdown the queue since stopCh was signalled + defer dm.dirtyAPIServiceQueue.ShutDown() + + // Spawn workers + // These workers wait for APIServices to be marked dirty. + // Worker ensures the cached discovery document hosted by the ServiceReference of + // the APIService is at least as fresh as the APIService, then includes the + // APIService's groupversion into the merged document + for i := 0; i < 2; i++ { + go func() { + for { + next, shutdown := dm.dirtyAPIServiceQueue.Get() + if shutdown { + return + } + + func() { + defer dm.dirtyAPIServiceQueue.Done(next) + + if err := dm.syncAPIService(next.(string)); err != nil { + dm.dirtyAPIServiceQueue.AddRateLimited(next) + } else { + dm.dirtyAPIServiceQueue.Forget(next) + } + }() + } + }() + } + + // Ensure that apiregistration.k8s.io is the first group in the discovery group. + dm.mergedDiscoveryHandler.SetGroupPriority(APIRegistrationGroup, APIRegistrationGroupPriority) + + wait.PollUntil(1*time.Minute, func() (done bool, err error) { + dm.servicesLock.Lock() + defer dm.servicesLock.Unlock() + + now := time.Now() + + // Mark all non-local APIServices as dirty + for key, info := range dm.apiServices { + info.lastMarkedDirty = now + dm.apiServices[key] = info + dm.dirtyAPIServiceQueue.Add(key) + } + return false, nil + }, stopCh) +} + +// Adds an APIService to be tracked by the discovery manager. If the APIService +// is already known +func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) { + // If service is nil then its information is contained by a local APIService + // which is has already been added to the manager. + if apiService.Spec.Service == nil { + return + } + + // Add or update APIService record and mark it as dirty + dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{ + groupPriority: int(apiService.Spec.GroupPriorityMinimum), + handler: handler, + lastMarkedDirty: time.Now(), + service: newServiceKey(*apiService.Spec.Service), + }) + dm.dirtyAPIServiceQueue.Add(apiService.Name) +} + +func (dm *discoveryManager) RemoveAPIService(apiServiceName string) { + if dm.setInfoForAPIService(apiServiceName, nil) != nil { + // mark dirty if there was actually something deleted + dm.dirtyAPIServiceQueue.Add(apiServiceName) + } +} + +func (dm *discoveryManager) ExternalServicesSynced() bool { + dm.servicesLock.RLock() + defer dm.servicesLock.RUnlock() + for _, info := range dm.apiServices { + if info.lastReconciled.IsZero() { + return false + } + } + + return true +} + +// +// Lock-protected accessors +// + +func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) { + dm.resultsLock.RLock() + defer dm.resultsLock.RUnlock() + + result, ok := dm.cachedResults[key] + return result, ok +} + +func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) { + dm.resultsLock.Lock() + defer dm.resultsLock.Unlock() + + dm.cachedResults[key] = result +} + +func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) { + dm.servicesLock.RLock() + defer dm.servicesLock.RUnlock() + + result, ok := dm.apiServices[name] + return result, ok +} + +func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) { + dm.servicesLock.Lock() + defer dm.servicesLock.Unlock() + + if oldValue, exists := dm.apiServices[name]; exists { + oldValueIfExisted = &oldValue + } + + if result != nil { + dm.apiServices[name] = *result + } else { + delete(dm.apiServices, name) + } + + return oldValueIfExisted +} + +// !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go +// which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go +// so we should find a home for this +// inMemoryResponseWriter is a http.Writer that keep the response in memory. +type inMemoryResponseWriter struct { + writeHeaderCalled bool + header http.Header + respCode int + data []byte +} + +func newInMemoryResponseWriter() *inMemoryResponseWriter { + return &inMemoryResponseWriter{header: http.Header{}} +} + +func (r *inMemoryResponseWriter) Header() http.Header { + return r.header +} + +func (r *inMemoryResponseWriter) WriteHeader(code int) { + r.writeHeaderCalled = true + r.respCode = code +} + +func (r *inMemoryResponseWriter) Write(in []byte) (int, error) { + if !r.writeHeaderCalled { + r.WriteHeader(http.StatusOK) + } + r.data = append(r.data, in...) + return len(in), nil +} + +func (r *inMemoryResponseWriter) String() string { + s := fmt.Sprintf("ResponseCode: %d", r.respCode) + if r.data != nil { + s += fmt.Sprintf(", Body: %s", string(r.data)) + } + if r.header != nil { + s += fmt.Sprintf(", Header: %s", r.header) + } + return s +} diff --git a/pkg/apiserver/handler_discovery_test.go b/pkg/apiserver/handler_discovery_test.go new file mode 100644 index 000000000..2b7a94f5c --- /dev/null +++ b/pkg/apiserver/handler_discovery_test.go @@ -0,0 +1,359 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver_test + +import ( + "context" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/require" + apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints" + "k8s.io/apiserver/pkg/endpoints/discovery" + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + scheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/apiserver" +) + +// Test that the discovery manager starts and aggregates from two local API services +func TestBasic(t *testing.T) { + service1 := discoveryendpoint.NewResourceManager() + service2 := discoveryendpoint.NewResourceManager() + apiGroup1 := fuzzAPIGroups(2, 5, 25) + apiGroup2 := fuzzAPIGroups(2, 5, 50) + service1.SetGroups(apiGroup1.Items) + service2.SetGroups(apiGroup2.Items) + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + + for _, g := range apiGroup1.Items { + for _, v := range g.Versions { + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Name: "service1", + }, + }, + }, service1) + } + } + + for _, g := range apiGroup2.Items { + for _, v := range g.Versions { + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Name: "service2", + }, + }, + }, service2) + } + } + + testCtx, _ := context.WithCancel(context.Background()) + go aggregatedManager.Run(testCtx.Done()) + + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + response, _, parsed := fetchPath(aggregatedResourceManager, "") + if response.StatusCode != 200 { + t.Fatalf("unexpected status code %d", response.StatusCode) + } + checkAPIGroups(t, apiGroup1, parsed) + checkAPIGroups(t, apiGroup2, parsed) +} + +func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList, response *apidiscoveryv2beta1.APIGroupDiscoveryList) { + if len(response.Items) < len(api.Items) { + t.Errorf("expected to check for at least %d groups, only have %d groups in response", len(api.Items), len(response.Items)) + } + for _, knownGroup := range api.Items { + found := false + for _, possibleGroup := range response.Items { + if knownGroup.Name == possibleGroup.Name { + t.Logf("found %s", knownGroup.Name) + found = true + } + } + if found == false { + t.Errorf("could not find %s", knownGroup.Name) + } + } +} + +// Test that a handler associated with an APIService gets pinged after the +// APIService has been marked as dirty +func TestDirty(t *testing.T) { + pinged := false + service := discoveryendpoint.NewResourceManager() + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pinged = true + service.ServeHTTP(w, r) + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + // immediately check for ping, since Run() should block for local services + if !pinged { + t.Errorf("service handler never pinged") + } +} + +// Show that an APIService can be removed and that its group no longer remains +// if there are no versions +func TestRemoveAPIService(t *testing.T) { + aggyService := discoveryendpoint.NewResourceManager() + service := discoveryendpoint.NewResourceManager() + apiGroup := fuzzAPIGroups(2, 3, 10) + service.SetGroups(apiGroup.Items) + + var apiServices []*apiregistrationv1.APIService + for _, g := range apiGroup.Items { + for _, v := range g.Versions { + apiservice := &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: v.Version + "." + g.Name, + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: g.Name, + Version: v.Version, + Service: &apiregistrationv1.ServiceReference{ + Namespace: "serviceNamespace", + Name: "serviceName", + }, + }, + } + + apiServices = append(apiServices, apiservice) + } + } + + aggregatedManager := apiserver.NewDiscoveryManager(aggyService) + + for _, s := range apiServices { + aggregatedManager.AddAPIService(s, service) + } + + testCtx, _ := context.WithCancel(context.Background()) + go aggregatedManager.Run(testCtx.Done()) + + for _, s := range apiServices { + aggregatedManager.RemoveAPIService(s.Name) + } + + cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) + + response, _, parsed := fetchPath(aggyService, "") + if response.StatusCode != 200 { + t.Fatalf("unexpected status code %d", response.StatusCode) + } + if len(parsed.Items) > 0 { + t.Errorf("expected to find no groups after service deletion (got %d groups)", len(parsed.Items)) + } +} + +func TestLegacyFallback(t *testing.T) { + aggregatedResourceManager := discoveryendpoint.NewResourceManager() + + legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{ + Name: "stable.example.com", + PreferredVersion: metav1.GroupVersionForDiscovery{ + GroupVersion: "stable.example.com/v1", + Version: "v1", + }, + Versions: []metav1.GroupVersionForDiscovery{ + { + GroupVersion: "stable.example.com/v1", + Version: "v1", + }, + { + GroupVersion: "stable.example.com/v1beta1", + Version: "v1beta1", + }, + }, + }) + + resource := metav1.APIResource{ + Name: "foos", + SingularName: "foo", + Group: "stable.example.com", + Version: "v1", + Namespaced: false, + Kind: "Foo", + Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"}, + Categories: []string{"all"}, + } + + legacyResourceHandler := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{ + Group: "stable.example.com", + Version: "v1", + }, discovery.APIResourceListerFunc(func() []metav1.APIResource { + return []metav1.APIResource{ + resource, + } + })) + + aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) + aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "v1.stable.example.com", + }, + Spec: apiregistrationv1.APIServiceSpec{ + Group: "stable.example.com", + Version: "v1", + Service: &apiregistrationv1.ServiceReference{ + Name: "test-service", + }, + }, + }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/apis/stable.example.com" { + legacyGroupHandler.ServeHTTP(w, r) + } else if r.URL.Path == "/apis/stable.example.com/v1" { + // defer to legacy discovery + legacyResourceHandler.ServeHTTP(w, r) + } else { + // Unknown url + w.WriteHeader(http.StatusNotFound) + } + })) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go aggregatedManager.Run(testCtx.Done()) + require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) + + // At this point external services have synced. Check if discovery document + // includes the legacy resources + _, _, doc := fetchPath(aggregatedResourceManager, "") + + converted, err := endpoints.ConvertGroupVersionIntoToDiscovery([]metav1.APIResource{resource}) + require.NoError(t, err) + require.Equal(t, []apidiscoveryv2beta1.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: resource.Group, + }, + Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ + { + Version: resource.Version, + Resources: converted, + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, + }, + }, + }, + }, doc.Items) +} + +// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go +func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList { + fuzzer := fuzz.NewWithSeed(seed) + fuzzer.NumElements(atLeastNumGroups, maxNumGroups) + fuzzer.NilChance(0) + fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) { + c.FuzzNoCustom(o) + + // The ResourceManager will just not serve the grouop if its versions + // list is empty + atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{} + c.Fuzz(&atLeastOne) + o.Versions = append(o.Versions, atLeastOne) + + o.TypeMeta = metav1.TypeMeta{ + Kind: "APIGroupDiscovery", + APIVersion: "v1", + } + }) + + var apis []apidiscoveryv2beta1.APIGroupDiscovery + fuzzer.Fuzz(&apis) + + return apidiscoveryv2beta1.APIGroupDiscoveryList{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIGroupDiscoveryList", + APIVersion: "v1", + }, + Items: apis, + } + +} + +// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go +func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) { + // Expect json-formatted apis group list + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/apis", nil) + + // Ask for JSON response + req.Header.Set("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") + + if etag != "" { + // Quote provided etag if unquoted + quoted := etag + if !strings.HasPrefix(etag, "\"") { + quoted = strconv.Quote(etag) + } + req.Header.Set("If-None-Match", quoted) + } + + handler.ServeHTTP(w, req) + + bytes := w.Body.Bytes() + var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList + if len(bytes) > 0 { + decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{} + runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), bytes, decoded) + } + + return w.Result(), bytes, decoded +} diff --git a/pkg/apiserver/handler_proxy.go b/pkg/apiserver/handler_proxy.go index 72feef9eb..d1c6597c5 100644 --- a/pkg/apiserver/handler_proxy.go +++ b/pkg/apiserver/handler_proxy.go @@ -231,6 +231,14 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) { // these methods provide locked access to fields +// Sets serviceAvailable value on proxyHandler +// not thread safe +func (r *proxyHandler) setServiceAvailable(value bool) { + info := r.handlingInfo.Load().(proxyHandlingInfo) + info.serviceAvailable = true + r.handlingInfo.Store(info) +} + func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) { if apiService.Spec.Service == nil { r.handlingInfo.Store(proxyHandlingInfo{local: true})