From 94cd5d9940c9b611e4c9dfc7b3d6d7aa65ab8167 Mon Sep 17 00:00:00 2001 From: "linbo.hlb" Date: Fri, 21 May 2021 16:37:19 +0800 Subject: [PATCH] add data filtering framework 1. support endpointslice filter for keeping service traffic in-bound of nodePool 2. support master service mutation for pod use InClusterConfig access kube-apiserver --- cmd/yurthub/app/config/config.go | 69 ++++++ cmd/yurthub/app/options/options.go | 10 +- cmd/yurthub/app/start.go | 31 ++- go.mod | 16 +- go.sum | 6 + pkg/yurthub/cachemanager/cache_agent.go | 1 + pkg/yurthub/cachemanager/cache_manager.go | 1 - pkg/yurthub/filter/approver.go | 41 ++++ pkg/yurthub/filter/chain.go | 61 ++++++ pkg/yurthub/filter/filter.go | 202 ++++++++++++++++++ pkg/yurthub/filter/initializer/initializer.go | 123 +++++++++++ pkg/yurthub/filter/interfaces.go | 42 ++++ pkg/yurthub/filter/masterservice/filter.go | 94 ++++++++ pkg/yurthub/filter/masterservice/handler.go | 136 ++++++++++++ pkg/yurthub/filter/servicetopology/filter.go | 152 +++++++++++++ pkg/yurthub/filter/servicetopology/handler.go | 191 +++++++++++++++++ pkg/yurthub/filter/util/utils.go | 38 ++++ .../kubernetes/serializer/serializer.go | 19 ++ pkg/yurthub/proxy/proxy.go | 3 + pkg/yurthub/proxy/remote/loadbalancer.go | 4 +- pkg/yurthub/proxy/remote/remote.go | 20 +- 21 files changed, 1242 insertions(+), 18 deletions(-) create mode 100644 pkg/yurthub/filter/approver.go create mode 100644 pkg/yurthub/filter/chain.go create mode 100644 pkg/yurthub/filter/filter.go create mode 100644 pkg/yurthub/filter/initializer/initializer.go create mode 100644 pkg/yurthub/filter/interfaces.go create mode 100644 pkg/yurthub/filter/masterservice/filter.go create mode 100644 pkg/yurthub/filter/masterservice/handler.go create mode 100644 pkg/yurthub/filter/servicetopology/filter.go create mode 100644 pkg/yurthub/filter/servicetopology/handler.go create mode 100644 pkg/yurthub/filter/util/utils.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index c7a1a0d5463..0d01084e2cd 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -21,13 +21,23 @@ import ( "net" "net/url" "strings" + "time" "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage/factory" + yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" ) @@ -55,6 +65,10 @@ type YurtHubConfiguration struct { HubAgentDummyIfName string StorageWrapper cachemanager.StorageWrapper SerializerManager *serializer.SerializerManager + MutatedMasterServiceAddr string + Filters *filter.Filters + SharedFactory informers.SharedInformerFactory + YurtSharedFactory yurtinformers.SharedInformerFactory } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -75,6 +89,28 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort) proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort) proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort) + + sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr)) + if err != nil { + return nil, err + } + + var filters *filter.Filters + var mutatedMasterServiceAddr string + if options.EnableResourceFilter { + filters = filter.NewFilters(options.DisabledResourceFilters) + registerAllFilters(filters) + + mutatedMasterServiceAddr = us[0].Host + if options.AccessServerThroughHub { + if options.EnableDummyIf { + mutatedMasterServiceAddr = proxyServerDummyAddr + } else { + mutatedMasterServiceAddr = proxyServerAddr + } + } + } + cfg := &YurtHubConfiguration{ LBMode: options.LBMode, RemoteServers: us, @@ -98,6 +134,10 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { HubAgentDummyIfName: options.HubAgentDummyIfName, StorageWrapper: storageWrapper, SerializerManager: serializerManager, + MutatedMasterServiceAddr: mutatedMasterServiceAddr, + Filters: filters, + SharedFactory: sharedFactory, + YurtSharedFactory: yurtSharedFactory, } return cfg, nil @@ -129,3 +169,32 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) { return us, nil } + +// createSharedInformers create sharedInformers from the given proxyAddr. +func createSharedInformers(proxyAddr string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) { + var kubeConfig *rest.Config + var err error + kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "") + if err != nil { + return nil, nil, err + } + + client, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return nil, nil, err + } + + yurtClient, err := yurtclientset.NewForConfig(kubeConfig) + if err != nil { + return nil, nil, err + } + + return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil +} + +// registerAllFilters by order, the front registered filter will be +// called before the later registered ones. +func registerAllFilters(filters *filter.Filters) { + servicetopology.Register(filters) + masterservice.Register(filters) +} diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index 21b9bab0c6e..b880ab95177 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -57,6 +57,9 @@ type YurtHubOptions struct { HubAgentDummyIfIP string HubAgentDummyIfName string DiskCachePath string + AccessServerThroughHub bool + EnableResourceFilter bool + DisabledResourceFilters []string } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -81,8 +84,10 @@ func NewYurtHubOptions() *YurtHubOptions { HubAgentDummyIfIP: "169.254.2.1", HubAgentDummyIfName: fmt.Sprintf("%s-dummy0", projectinfo.GetHubName()), DiskCachePath: disk.CacheBaseDir, + AccessServerThroughHub: false, + EnableResourceFilter: true, + DisabledResourceFilters: make([]string, 0), } - return o } @@ -136,6 +141,9 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.HubAgentDummyIfIP, "dummy-if-ip", o.HubAgentDummyIfIP, "the ip address of dummy interface that used for container connect hub agent(exclusive ips: 169.254.31.0/24, 169.254.1.1/32)") fs.StringVar(&o.HubAgentDummyIfName, "dummy-if-name", o.HubAgentDummyIfName, "the name of dummy interface that is used for hub agent") fs.StringVar(&o.DiskCachePath, "disk-cache-path", o.DiskCachePath, "the path for kubernetes to storage metadata") + fs.BoolVar(&o.AccessServerThroughHub, "access-server-through-hub", o.AccessServerThroughHub, "enable pods access kube-apiserver through yurthub or not") + fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy") + fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response") } // verifyDummyIP verify the specified ip is valid or not diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index aa2901808a2..557afec18d7 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -26,6 +26,9 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/certificate" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubelet" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" "github.com/openyurtio/openyurt/pkg/yurthub/gc" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" @@ -136,8 +139,16 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { gcMgr.Run() trace++ + klog.Infof("%d. new filter chain for mutating response body", trace) + filterChain, err := createFilterChain(cfg) + if err != nil { + klog.Errorf("could not new filter chain, %v", err) + return err + } + trace++ + klog.Infof("%d. new reverse proxy handler for remote servers", trace) - yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, stopCh) + yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, filterChain, stopCh) if err != nil { klog.Errorf("could not create reverse proxy handler, %v", err) return err @@ -156,6 +167,12 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { klog.Infof("%d. new %s server and begin to serve, dummy proxy server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerDummyAddr) } + // start shared informers here + if filterChain != nil && cfg.Filters.Enabled(servicetopology.FilterName) { + cfg.SharedFactory.Start(stopCh) + cfg.YurtSharedFactory.Start(stopCh) + } + klog.Infof("%d. new %s server and begin to serve, proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubServerAddr) s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler) if err != nil { @@ -163,8 +180,18 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { return err } s.Run() - klog.Infof("hub agent exited") <-stopCh return nil } + +func createFilterChain(cfg *config.YurtHubConfiguration) (filter.Interface, error) { + if cfg.Filters == nil { + return nil, nil + } + + genericInitializer := initializer.New(cfg.SharedFactory, cfg.YurtSharedFactory, cfg.SerializerManager, cfg.StorageWrapper, cfg.NodeName, cfg.MutatedMasterServiceAddr) + initializerChain := filter.FilterInitializers{} + initializerChain = append(initializerChain, genericInitializer) + return cfg.Filters.NewFromFilters(initializerChain) +} diff --git a/go.mod b/go.mod index a0e3ec6ef9e..2aee1f37af2 100644 --- a/go.mod +++ b/go.mod @@ -5,36 +5,28 @@ go 1.13 require ( github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/aliyun/alibaba-cloud-sdk-go v1.61.355 - github.com/beorn7/perks v1.0.1 // indirect github.com/daviddengcn/go-colortext v0.0.0-20160507010035-511bcaf42ccd github.com/docker/docker v17.12.0-ce-rc1.0.20200531234253-77e06fda0c94+incompatible // indirect github.com/emicklei/go-restful v2.12.0+incompatible // indirect - github.com/evanphx/json-patch v4.5.0+incompatible // indirect github.com/go-openapi/spec v0.19.8 // indirect github.com/google/uuid v1.1.1 - github.com/googleapis/gnostic v0.3.1 // indirect github.com/gorilla/mux v1.7.4 - github.com/hashicorp/golang-lru v0.5.4 // indirect - github.com/imdario/mergo v0.3.9 // indirect - github.com/json-iterator/go v1.1.10 // indirect github.com/onsi/ginkgo v1.13.0 github.com/onsi/gomega v1.10.1 github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/openyurtio/yurt-app-manager-api v0.18.8 github.com/prometheus/client_golang v1.7.1 - github.com/prometheus/procfs v0.0.11 // indirect github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 github.com/vishvananda/netlink v1.0.0 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect - golang.org/x/text v0.3.3 // indirect golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect google.golang.org/grpc v1.27.0 - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/square/go-jose.v2 v2.5.1 // indirect - k8s.io/api v0.18.8 - k8s.io/apimachinery v0.18.8 + k8s.io/api v0.19.7 + k8s.io/apimachinery v0.19.7 k8s.io/apiserver v0.18.8 - k8s.io/client-go v0.18.8 + k8s.io/client-go v0.19.2 k8s.io/cluster-bootstrap v0.0.0 k8s.io/component-base v0.18.8 k8s.io/klog v1.0.0 diff --git a/go.sum b/go.sum index 3f9158190d1..f0f0c1fb49f 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,7 @@ github.com/go-lintpack/lintpack v0.5.2/go.mod h1:NwZuYi2nUHho8XEIZ6SIxihrnPoqBTD github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/go-logr/zapr v0.1.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik= @@ -491,6 +492,8 @@ github.com/opencontainers/runtime-spec v1.0.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/ github.com/opencontainers/selinux v1.3.1-0.20190929122143-5215b1806f52/go.mod h1:+BLncwf63G4dgOzykXAxcmnFlUaOlkDdmw/CqsW6pjs= github.com/openyurtio/apiserver-network-proxy v1.18.8 h1:xXqaP8DAOvCHD7DNIqtBOhuWxCnwULLc1PqOMoJ7UeI= github.com/openyurtio/apiserver-network-proxy v1.18.8/go.mod h1:X5Au3jBNIgYL2uK0IHeNGnZqlUlVSCFQhi/npPgkKRg= +github.com/openyurtio/yurt-app-manager-api v0.18.8 h1:VyA1hZMu61aRttqWNKGXp5ct6pNELGct4NCm6xkqEmc= +github.com/openyurtio/yurt-app-manager-api v0.18.8/go.mod h1:brjCZScD7rNyjD+FVpuqLrXPZFLKGdFP801KMARUtbc= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.1.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -785,6 +788,7 @@ golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gomodules.xyz/jsonpatch/v2 v2.0.1/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= @@ -918,6 +922,8 @@ mvdan.cc/unparam v0.0.0-20190209190245-fbb59629db34/go.mod h1:H6SUd1XjIs+qQCyskX rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7 h1:uuHDyjllyzRyCIvvn0OBjiRB0SgBZGqHNYAmjR7fO50= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0= +sigs.k8s.io/controller-runtime v0.5.7 h1:QcB8YQTyMshLLspHiqAkKHO74PgmUAUmTDhol4VccOw= +sigs.k8s.io/controller-runtime v0.5.7/go.mod h1:KjjGQrdWFaSTHwB5A5VDmX9sMLlvkXjVazxVbfOI3a8= sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU= sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= sigs.k8s.io/structured-merge-diff/v3 v3.0.0 h1:dOmIZBMfhcHS09XZkMyUgkq5trg3/jRyJYFZUiaOp8E= diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go index 751d6574c60..69a6fefb950 100644 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ b/pkg/yurthub/cachemanager/cache_agent.go @@ -31,6 +31,7 @@ var ( "flanneld", "coredns", projectinfo.GetAgentName(), + projectinfo.GetHubName(), } cacheAgentsKey = "_internal/cache-manager/cache-agent.conf" sepForAgent = "," diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index c2b8c4b195b..4399822808b 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -243,7 +243,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re klog.Errorf("failed to create serializer in saveWatchObject, %s", util.ReqInfoString(info)) return fmt.Errorf("failed to create serializer in saveWatchObject, %s", util.ReqInfoString(info)) } - accessor := meta.NewAccessor() d, err := s.WatchDecoder(r) diff --git a/pkg/yurthub/filter/approver.go b/pkg/yurthub/filter/approver.go new file mode 100644 index 00000000000..88addb306d7 --- /dev/null +++ b/pkg/yurthub/filter/approver.go @@ -0,0 +1,41 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import "k8s.io/apimachinery/pkg/util/sets" + +type Approver struct { + comp string + resource string + operations sets.String +} + +func NewApprover(comp, resource string, verbs ...string) *Approver { + return &Approver{ + comp: comp, + resource: resource, + operations: sets.NewString(verbs...), + } +} + +func (a *Approver) Approve(comp, resource, verb string) bool { + if a.comp != comp || a.resource != resource { + return false + } + + return a.operations.Has(verb) +} diff --git a/pkg/yurthub/filter/chain.go b/pkg/yurthub/filter/chain.go new file mode 100644 index 00000000000..21b620d5f16 --- /dev/null +++ b/pkg/yurthub/filter/chain.go @@ -0,0 +1,61 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "io" + "net/http" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type filterChain []Interface + +func (fc filterChain) Approve(comp, resource, verb string) bool { + for _, f := range fc { + if f.Approve(comp, resource, verb) { + return true + } + } + + return false +} + +func (fc filterChain) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + ctx := req.Context() + comp, ok := util.ClientComponentFrom(ctx) + if !ok { + return 0, rc, nil + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return 0, rc, nil + } + + for _, f := range fc { + if !f.Approve(comp, info.Resource, info.Verb) { + continue + } + + return f.Filter(req, rc, stopCh) + } + + return 0, rc, nil +} diff --git a/pkg/yurthub/filter/filter.go b/pkg/yurthub/filter/filter.go new file mode 100644 index 00000000000..22402fc6af8 --- /dev/null +++ b/pkg/yurthub/filter/filter.go @@ -0,0 +1,202 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "sync" + + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog" +) + +type Factory func() (Interface, error) + +type Filters struct { + sync.Mutex + names []string + registry map[string]Factory + disabledFilters sets.String +} + +func NewFilters(disabledFilters []string) *Filters { + return &Filters{ + names: make([]string, 0), + registry: make(map[string]Factory), + disabledFilters: sets.NewString(disabledFilters...), + } +} + +func (fs *Filters) NewFromFilters(initializer FilterInitializer) (Interface, error) { + var filters []Interface + for _, name := range fs.names { + if fs.Enabled(name) { + factory, found := fs.registry[name] + if !found { + return nil, fmt.Errorf("Filter %s has not registered", name) + } + + ins, err := factory() + if err != nil { + klog.Errorf("new filter %s failed, %v", name, err) + return nil, err + } + + if err = initializer.Initialize(ins); err != nil { + return nil, err + } + klog.V(2).Infof("Filter %s initialize successfully", name) + + filters = append(filters, ins) + } + } + + if len(filters) == 0 { + return nil, nil + } + + return filterChain(filters), nil +} + +func (fs *Filters) Register(name string, fn Factory) { + fs.Lock() + defer fs.Unlock() + + _, found := fs.registry[name] + if found { + klog.Warningf("Filter %q has already registered", name) + return + } + + klog.V(2).Infof("Filter %s registered successfully", name) + fs.registry[name] = fn + fs.names = append(fs.names, name) +} + +func (fs *Filters) Enabled(name string) bool { + if fs.disabledFilters.Len() == 1 && fs.disabledFilters.Has("*") { + return false + } + + return !fs.disabledFilters.Has(name) +} + +type FilterInitializers []FilterInitializer + +func (fis FilterInitializers) Initialize(ins Interface) error { + for _, fi := range fis { + if err := fi.Initialize(ins); err != nil { + return err + } + } + + return nil +} + +type filterReadCloser struct { + req *http.Request + rc io.ReadCloser + data *bytes.Buffer + ch chan watch.Event + handler Handler + isWatch bool + serializer *serializer.Serializer + stopCh <-chan struct{} +} + +// NewFilterReadCloser create an filterReadCloser object +func NewFilterReadCloser( + req *http.Request, + rc io.ReadCloser, + handler Handler, + serializer *serializer.Serializer, + stopCh <-chan struct{}) (int, io.ReadCloser, error) { + + ctx := req.Context() + info, _ := apirequest.RequestInfoFrom(ctx) + dr := &filterReadCloser{ + req: req, + rc: rc, + ch: make(chan watch.Event), + data: new(bytes.Buffer), + handler: handler, + isWatch: info.Verb == "watch", + serializer: serializer, + stopCh: stopCh, + } + + if dr.isWatch { + go func(req *http.Request, rc io.ReadCloser, ch chan watch.Event) { + err := handler.StreamResponseFilter(rc, ch) + if err != nil && err != io.EOF && err != context.Canceled { + klog.Errorf("filter watch response ended with error, %v", err) + } + }(req, rc, dr.ch) + return 0, dr, nil + } else { + var newData []byte + n, err := dr.data.ReadFrom(rc) + if err != nil { + return int(n), dr, err + } + + newData, err = handler.ObjectResponseFilter(dr.data.Bytes()) + dr.data = bytes.NewBuffer(newData) + return len(newData), dr, err + } +} + +// Read read data into p and write into pipe +func (dr *filterReadCloser) Read(p []byte) (int, error) { + if dr.isWatch { + select { + case watchEvent, ok := <-dr.ch: + if !ok { + return 0, io.EOF + } + + buf := &bytes.Buffer{} + n, err := dr.serializer.WatchEncode(buf, &watchEvent) + if err != nil { + klog.Errorf("failed to encode resource in Reader %v", err) + return 0, err + } + copied := copy(p, buf.Bytes()) + if copied != n { + return 0, fmt.Errorf("expect copy %d bytes, but only %d bytes copyied", n, copied) + } + + return n, nil + } + } else { + n := copy(p, dr.data.Bytes()) + return n, io.EOF + } +} + +// Close close readers +func (dr *filterReadCloser) Close() error { + return dr.rc.Close() +} diff --git a/pkg/yurthub/filter/initializer/initializer.go b/pkg/yurthub/filter/initializer/initializer.go new file mode 100644 index 00000000000..99a855b2733 --- /dev/null +++ b/pkg/yurthub/filter/initializer/initializer.go @@ -0,0 +1,123 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initializer + +import ( + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" + "k8s.io/client-go/informers" +) + +// WantsSharedInformerFactory is an interface for setting SharedInformerFactory +type WantsSharedInformerFactory interface { + SetSharedInformerFactory(factory informers.SharedInformerFactory) error +} + +// WantsYurtSharedInformerFactory is an interface for setting Yurt-App-Manager SharedInformerFactory +type WantsYurtSharedInformerFactory interface { + SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error +} + +// WantsNodeName is an interface for setting node name +type WantsNodeName interface { + SetNodeName(nodeName string) error +} + +// WantsNodeName is an interface for setting node name +type WantsSerializerManager interface { + SetSerializerManager(s *serializer.SerializerManager) error +} + +// WantsStorageWrapper is an interface for setting StorageWrapper +type WantsStorageWrapper interface { + SetStorageWrapper(s cachemanager.StorageWrapper) error +} + +// WantsMasterServiceAddr is an interface for setting mutated master service address +type WantsMasterServiceAddr interface { + SetMasterServiceAddr(addr string) error +} + +// genericFilterInitializer is responsible for initializing generic filter +type genericFilterInitializer struct { + factory informers.SharedInformerFactory + yurtFactory yurtinformers.SharedInformerFactory + serializerManager *serializer.SerializerManager + storageWrapper cachemanager.StorageWrapper + nodeName string + masterServiceAddr string +} + +// New creates an filterInitializer object +func New(factory informers.SharedInformerFactory, + yurtFactory yurtinformers.SharedInformerFactory, + sm *serializer.SerializerManager, + sw cachemanager.StorageWrapper, + nodeName string, + masterServiceAddr string) *genericFilterInitializer { + return &genericFilterInitializer{ + factory: factory, + yurtFactory: yurtFactory, + serializerManager: sm, + storageWrapper: sw, + nodeName: nodeName, + masterServiceAddr: masterServiceAddr, + } +} + +// Initialize used for executing filter initialization +func (fi *genericFilterInitializer) Initialize(ins filter.Interface) error { + if wants, ok := ins.(WantsNodeName); ok { + if err := wants.SetNodeName(fi.nodeName); err != nil { + return err + } + } + + if wants, ok := ins.(WantsMasterServiceAddr); ok { + if err := wants.SetMasterServiceAddr(fi.masterServiceAddr); err != nil { + return err + } + } + + if wants, ok := ins.(WantsSharedInformerFactory); ok { + if err := wants.SetSharedInformerFactory(fi.factory); err != nil { + return err + } + } + + if wants, ok := ins.(WantsYurtSharedInformerFactory); ok { + if err := wants.SetYurtSharedInformerFactory(fi.yurtFactory); err != nil { + return err + } + } + + if wants, ok := ins.(WantsSerializerManager); ok { + if err := wants.SetSerializerManager(fi.serializerManager); err != nil { + return err + } + } + + if wants, ok := ins.(WantsStorageWrapper); ok { + if err := wants.SetStorageWrapper(fi.storageWrapper); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go new file mode 100644 index 00000000000..949d6e7acfd --- /dev/null +++ b/pkg/yurthub/filter/interfaces.go @@ -0,0 +1,42 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "io" + "net/http" + + "k8s.io/apimachinery/pkg/watch" + + v1 "k8s.io/api/core/v1" +) + +type Interface interface { + Approve(comp, resource, verb string) bool + Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) +} + +type FilterInitializer interface { + Initialize(filter Interface) error +} + +type Handler interface { + StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error + ObjectResponseFilter(b []byte) ([]byte, error) +} + +type NodeGetter func() (*v1.Node, error) diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go new file mode 100644 index 00000000000..99f9fd8ff3c --- /dev/null +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -0,0 +1,94 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package masterservice + +import ( + "io" + "net" + "net/http" + "strconv" + + "k8s.io/klog" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" +) + +// masterservice filter is used to mutate the ClusterIP and https port of default/kubernetes service +// in order to pods on edge nodes can access kube-apiserver directly by inClusterConfig. +const FilterName = "masterservice" + +// Register registers a filter +func Register(filters *filter.Filters) { + filters.Register(FilterName, func() (filter.Interface, error) { + return NewFilter(), nil + }) +} + +func NewFilter() *masterServiceFilter { + return &masterServiceFilter{ + Approver: filter.NewApprover("kubelet", "services", []string{"list", "watch"}...), + stopCh: make(chan struct{}), + } +} + +type masterServiceFilter struct { + *filter.Approver + serializerManager *serializer.SerializerManager + host string + port int32 + stopCh chan struct{} +} + +func (msf *masterServiceFilter) SetSerializerManager(s *serializer.SerializerManager) error { + msf.serializerManager = s + return nil +} + +func (msf *masterServiceFilter) SetMasterServiceAddr(addr string) error { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + return err + } + msf.host = host + port, err := strconv.ParseInt(portStr, 10, 32) + if err != nil { + return err + } + msf.port = int32(port) + return nil +} + +func (msf *masterServiceFilter) Approve(comp, resource, verb string) bool { + if !msf.Approver.Approve(comp, resource, verb) { + return false + } + + return true +} + +func (msf *masterServiceFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + s := filterutil.CreateSerializer(req, msf.serializerManager) + if s == nil { + klog.Errorf("skip filter, failed to create serializer in masterServiceFilter") + return 0, rc, nil + } + + handler := NewMasterServiceFilterHandler(req, s, msf.host, msf.port) + return filter.NewFilterReadCloser(req, rc, handler, s, stopCh) +} diff --git a/pkg/yurthub/filter/masterservice/handler.go b/pkg/yurthub/filter/masterservice/handler.go new file mode 100644 index 00000000000..d04df4cc605 --- /dev/null +++ b/pkg/yurthub/filter/masterservice/handler.go @@ -0,0 +1,136 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package masterservice + +import ( + "io" + "net/http" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog" +) + +const ( + MasterServiceNamespace = "default" + MasterServiceName = "kubernetes" + MasterServicePortName = "https" +) + +type masterServiceFilterHandler struct { + req *http.Request + serializer *serializer.Serializer + host string + port int32 +} + +func NewMasterServiceFilterHandler( + req *http.Request, + serializer *serializer.Serializer, + host string, + port int32) filter.Handler { + return &masterServiceFilterHandler{ + req: req, + serializer: serializer, + host: host, + port: port, + } +} + +// ObjectResponseFilter mutate master service(default/kubernetes) in the ServiceList object +func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { + list, err := fh.serializer.Decode(b) + if err != nil || list == nil { + klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err) + return b, nil + } + + // return data un-mutated if not ServiceList + serviceList, ok := list.(*v1.ServiceList) + if !ok { + return b, nil + } + + // mutate master service + for i := range serviceList.Items { + if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName { + serviceList.Items[i].Spec.ClusterIP = fh.host + for j := range serviceList.Items[i].Spec.Ports { + if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName { + serviceList.Items[i].Spec.Ports[j].Port = fh.port + break + } + } + klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req)) + break + } + } + + // return the mutated serviceList + return fh.serializer.Encode(serviceList) +} + +//StreamResponseFilter mutate master service(default/kubernetes) in Watch Stream +func (fh *masterServiceFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { + defer func() { + close(ch) + }() + + d, err := fh.serializer.WatchDecoder(rc) + if err != nil { + klog.Errorf("StreamResponseFilter for master service ended with error, %v", err) + return err + } + + for { + watchType, obj, err := d.Decode() + if err != nil { + //klog.V(2).Infof("%s %s watch decode ended with: %v", comp, info.Path, err) + return err + } + + var wEvent watch.Event + wEvent.Type = watchType + // return data un-mutated if not Service + service, ok := obj.(*v1.Service) + if ok && service.Namespace == MasterServiceNamespace && service.Name == MasterServiceName { + service.Spec.ClusterIP = fh.host + for j := range service.Spec.Ports { + if service.Spec.Ports[j].Name == MasterServicePortName { + service.Spec.Ports[j].Port = fh.port + break + } + } + klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req)) + wEvent.Object = service + } else { + accessor := meta.NewAccessor() + ns, _ := accessor.Namespace(obj) + name, _ := accessor.Name(obj) + kind, _ := accessor.Kind(obj) + klog.V(2).Infof("skip filter, not master service(%s: %s/%s) for request %s", kind, ns, name, util.ReqString(fh.req)) + wEvent.Object = obj + } + + ch <- wEvent + } +} diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go new file mode 100644 index 00000000000..5cf93508d6e --- /dev/null +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -0,0 +1,152 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicetopology + +import ( + "fmt" + "io" + "net/http" + + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" + appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +// servicetopology filter is used to reassemble endpointslice in order to make the service traffic +// under the topology that defined by service.Annotation["openyurt.io/topologyKeys"] +const FilterName = "servicetopology" + +// Register registers a filter +func Register(filters *filter.Filters) { + filters.Register(FilterName, func() (filter.Interface, error) { + return NewFilter(), nil + }) +} + +func NewFilter() *serviceTopologyFilter { + return &serviceTopologyFilter{ + Approver: filter.NewApprover("kube-proxy", "endpointslices", []string{"list", "watch"}...), + stopCh: make(chan struct{}), + } +} + +type serviceTopologyFilter struct { + *filter.Approver + serviceLister listers.ServiceLister + serviceSynced cache.InformerSynced + nodepoolLister appslisters.NodePoolLister + nodePoolSynced cache.InformerSynced + nodeGetter filter.NodeGetter + nodeSynced cache.InformerSynced + nodeName string + serializerManager *serializer.SerializerManager + stopCh chan struct{} +} + +func (ssf *serviceTopologyFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { + ssf.serviceLister = factory.Core().V1().Services().Lister() + ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced + + return nil +} + +func (ssf *serviceTopologyFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error { + ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister() + ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced + + return nil +} + +func (ssf *serviceTopologyFilter) SetNodeName(nodeName string) error { + ssf.nodeName = nodeName + + return nil +} + +func (ssf *serviceTopologyFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { + if len(ssf.nodeName) == 0 { + return fmt.Errorf("node name for serviceTopologyFilter is not ready") + } + + nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName) + ssf.nodeSynced = func() bool { + obj, err := s.Get(nodeKey) + if err != nil || obj == nil { + return false + } + + if _, ok := obj.(*v1.Node); !ok { + return false + } + + return true + } + + ssf.nodeGetter = func() (*v1.Node, error) { + obj, err := s.Get(nodeKey) + if err != nil { + return nil, err + } else if obj == nil { + return nil, fmt.Errorf("node(%s) is not ready", ssf.nodeName) + } + + if node, ok := obj.(*v1.Node); ok { + return node, nil + } + + return nil, fmt.Errorf("node(%s) is not found", ssf.nodeName) + } + + return nil +} + +func (ssf *serviceTopologyFilter) SetSerializerManager(s *serializer.SerializerManager) error { + ssf.serializerManager = s + return nil +} + +func (ssf *serviceTopologyFilter) Approve(comp, resource, verb string) bool { + if !ssf.Approver.Approve(comp, resource, verb) { + return false + } + + if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { + return false + } + + return true +} + +func (ssf *serviceTopologyFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + s := filterutil.CreateSerializer(req, ssf.serializerManager) + if s == nil { + klog.Errorf("skip filter, failed to create serializer in serviceTopologyFilter") + return 0, rc, nil + } + + handler := NewServiceTopologyFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) + return filter.NewFilterReadCloser(req, rc, handler, s, stopCh) +} diff --git a/pkg/yurthub/filter/servicetopology/handler.go b/pkg/yurthub/filter/servicetopology/handler.go new file mode 100644 index 00000000000..69f9c9dcae5 --- /dev/null +++ b/pkg/yurthub/filter/servicetopology/handler.go @@ -0,0 +1,191 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicetopology + +import ( + "io" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" + appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" + + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" + "k8s.io/apimachinery/pkg/watch" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/klog" +) + +const ( + AnnotationServiceTopologyKey = "openyurt.io/topologyKeys" + AnnotationServiceTopologyValueNode = "kubernetes.io/hostname" + AnnotationServiceTopologyValueZone = "kubernetes.io/zone" + AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool" +) + +type serviceTopologyFilterHandler struct { + nodeName string + serializer *serializer.Serializer + serviceLister listers.ServiceLister + nodePoolLister appslisters.NodePoolLister + nodeGetter filter.NodeGetter +} + +func NewServiceTopologyFilterHandler( + nodeName string, + serializer *serializer.Serializer, + serviceLister listers.ServiceLister, + nodePoolLister appslisters.NodePoolLister, + nodeGetter filter.NodeGetter) filter.Handler { + return &serviceTopologyFilterHandler{ + nodeName: nodeName, + serializer: serializer, + serviceLister: serviceLister, + nodePoolLister: nodePoolLister, + nodeGetter: nodeGetter, + } + +} + +//ObjectResponseFilter filter the endpointslice from get response object and return the bytes +func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { + eps, err := fh.serializer.Decode(b) + if err != nil || eps == nil { + klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of serviceTopologyFilterHandler, %v", err) + return b, nil + } + + if endpointSliceList, ok := eps.(*discovery.EndpointSliceList); ok { + //filter endpointSlice + var items []discovery.EndpointSlice + for i := range endpointSliceList.Items { + item := fh.reassembleEndpointSlice(&endpointSliceList.Items[i]) + if item != nil { + items = append(items, *item) + } + } + endpointSliceList.Items = items + + return fh.serializer.Encode(endpointSliceList) + } else { + return b, nil + } +} + +//FilterWatchObject filter the endpointslice from watch response object and return the bytes +func (fh *serviceTopologyFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { + defer func() { + close(ch) + }() + + d, err := fh.serializer.WatchDecoder(rc) + if err != nil { + klog.Errorf("StreamResponseFilter of serviceTopologyFilterHandler ended with error, %v", err) + return err + } + + for { + watchType, obj, err := d.Decode() + if err != nil { + return err + } + + var wEvent watch.Event + wEvent.Type = watchType + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if ok { + item := fh.reassembleEndpointSlice(endpointSlice) + if item == nil { + continue + } + wEvent.Object = item + } else { + wEvent.Object = obj + } + + klog.V(5).Infof("filter watch decode endpointSlice: type: %s, obj=%#+v", watchType, endpointSlice) + ch <- wEvent + } +} + +// reassembleEndpointSlice will discard endpointslice for LB service and filter the endpoints out of endpointslice in terms of service Topology. +func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice { + var serviceTopologyType string + // get the service Topology type + if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok { + svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName) + if err != nil { + klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err) + return endpointSlice + } + + // discard endpointSlice if service type is LoadBalancer + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { + klog.Infof("endpointSlice(%s/%s) of load balancer service is discarded", endpointSlice.Namespace, endpointSlice.Name) + return nil + } + + if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok { + klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey) + return endpointSlice + } + } + + var newEps []discovery.Endpoint + // if type of service Topology is 'kubernetes.io/hostname' + // filter the endpoint just on the local host + if serviceTopologyType == AnnotationServiceTopologyValueNode { + for i := range endpointSlice.Endpoints { + if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName { + newEps = append(newEps, endpointSlice.Endpoints[i]) + } + } + endpointSlice.Endpoints = newEps + } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone { + // if type of service Topology is openyurt.io/nodepool + // filter the endpoint just on the node which is in the same nodepool with current node + currentNode, err := fh.nodeGetter() + if err != nil { + klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err) + return endpointSlice + } + if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok { + nodePool, err := fh.nodePoolLister.Get(nodePoolName) + if err != nil { + klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err) + return endpointSlice + } + for i := range endpointSlice.Endpoints { + if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) { + newEps = append(newEps, endpointSlice.Endpoints[i]) + } + } + endpointSlice.Endpoints = newEps + } + } + return endpointSlice +} + +func inSameNodePool(nodeName string, nodeList []string) bool { + for _, n := range nodeList { + if nodeName == n { + return true + } + } + return false +} diff --git a/pkg/yurthub/filter/util/utils.go b/pkg/yurthub/filter/util/utils.go new file mode 100644 index 00000000000..e2765059dec --- /dev/null +++ b/pkg/yurthub/filter/util/utils.go @@ -0,0 +1,38 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "net/http" + + "k8s.io/klog" + + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +func CreateSerializer(req *http.Request, sm *serializer.SerializerManager) *serializer.Serializer { + ctx := req.Context() + respContentType, _ := util.RespContentTypeFrom(ctx) + info, _ := apirequest.RequestInfoFrom(ctx) + if respContentType == "" || info == nil || info.APIVersion == "" || info.Resource == "" { + klog.Infof("CreateSerializer failed , info is :%+v", info) + return nil + } + return sm.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) +} diff --git a/pkg/yurthub/kubernetes/serializer/serializer.go b/pkg/yurthub/kubernetes/serializer/serializer.go index d64bcad9d4c..3ad07c044c6 100644 --- a/pkg/yurthub/kubernetes/serializer/serializer.go +++ b/pkg/yurthub/kubernetes/serializer/serializer.go @@ -291,6 +291,25 @@ func (s *Serializer) Decode(b []byte) (runtime.Object, error) { return out, nil } +// Encode encode object and return bytes of it. +func (s *Serializer) Encode(obj runtime.Object) ([]byte, error) { + if obj == nil { + return []byte{}, fmt.Errorf("obj is nil, content type: %s", s.contentType) + } + + mediaType, params, err := mime.ParseMediaType(s.contentType) + if err != nil { + return nil, err + } + + encoder, err := s.Encoder(mediaType, params) + if err != nil { + return nil, err + } + + return runtime.Encode(encoder, obj) +} + // WatchDecoder generates a Decoder for decoding response of watch request. func (s *Serializer) WatchDecoder(body io.ReadCloser) (*restclientwatch.Decoder, error) { var err error diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 10bd9a8bd00..c63777a2d2e 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -22,6 +22,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" @@ -52,6 +53,7 @@ func NewYurtReverseProxyHandler( transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, certManager interfaces.YurtCertificateManager, + filterChain filter.Interface, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -65,6 +67,7 @@ func NewYurtReverseProxyHandler( transportMgr, healthChecker, certManager, + filterChain, stopCh) if err != nil { return nil, err diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 05e7a0471d8..904115db5ce 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -24,6 +24,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -127,10 +128,11 @@ func NewLoadBalancer( transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, certManager interfaces.YurtCertificateManager, + filterChain filter.Interface, stopCh <-chan struct{}) (LoadBalancer, error) { backends := make([]*RemoteProxy, 0, len(remoteServers)) for i := range remoteServers { - b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, healthChecker, stopCh) + b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, healthChecker, filterChain, stopCh) if err != nil { klog.Errorf("could not new proxy backend(%s), %v", remoteServers[i].String(), err) continue diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 91eb857bbea..a9028dab592 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -25,6 +25,7 @@ import ( "net/url" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -39,6 +40,7 @@ type RemoteProxy struct { reverseProxy *httputil.ReverseProxy cacheMgr cachemanager.CacheManager remoteServer *url.URL + filterChain filter.Interface stopCh <-chan struct{} } @@ -47,6 +49,7 @@ func NewRemoteProxy(remoteServer *url.URL, cacheMgr cachemanager.CacheManager, transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, + filterChain filter.Interface, stopCh <-chan struct{}) (*RemoteProxy, error) { currentTransport := transportMgr.CurrentTransport() if currentTransport == nil { @@ -58,6 +61,7 @@ func NewRemoteProxy(remoteServer *url.URL, reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), cacheMgr: cacheMgr, remoteServer: remoteServer, + filterChain: filterChain, stopCh: stopCh, } @@ -93,7 +97,8 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { ctx := req.Context() // re-added transfer-encoding=chunked response header for watch request - if info, exists := apirequest.RequestInfoFrom(ctx); exists { + info, exists := apirequest.RequestInfoFrom(ctx) + if exists { if info.Verb == "watch" { klog.V(5).Infof("add transfer-encoding=chunked header into response for req %s", util.ReqString(req)) h := resp.Header @@ -114,6 +119,19 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { ctx = util.WithRespContentType(ctx, respContentType) req = req.WithContext(ctx) + if rp.filterChain != nil { + size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh) + if err != nil { + klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err) + return err + } + resp.Body = filterRc + if size > 0 { + resp.ContentLength = int64(size) + resp.Header.Set("Content-Length", fmt.Sprint(size)) + } + } + rc, prc := util.NewDualReadCloser(resp.Body, true) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { err := rp.cacheMgr.CacheResponse(req, prc, stopCh)