From f83c294947d9bc48fcd24bb0e9daf39e72284618 Mon Sep 17 00:00:00 2001 From: DrmagicE <379342542@qq.com> Date: Tue, 21 Sep 2021 15:20:29 +0800 Subject: [PATCH] feature: add --working-mode flag for yurthub --- cmd/yurthub/app/config/config.go | 6 ++++ cmd/yurthub/app/options/options.go | 9 +++++- cmd/yurthub/app/start.go | 28 ++++++++++++------ pkg/yurthub/filter/constant.go | 3 ++ pkg/yurthub/proxy/proxy.go | 19 +++++++++--- pkg/yurthub/proxy/remote/remote.go | 46 ++++++++++++++++-------------- pkg/yurthub/util/util.go | 20 +++++++++++++ 7 files changed, 95 insertions(+), 36 deletions(-) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 37a6c7c0b3a..b7f16e8e73e 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -34,6 +34,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage/factory" + "github.com/openyurtio/openyurt/pkg/yurthub/util" yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned" yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions" @@ -78,6 +79,7 @@ type YurtHubConfiguration struct { Filters *filter.Filters SharedFactory informers.SharedInformerFactory YurtSharedFactory yurtinformers.SharedInformerFactory + WorkingMode util.WorkingMode } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -109,6 +111,9 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { var filters *filter.Filters var mutatedMasterServiceAddr string if options.EnableResourceFilter { + if options.WorkingMode == string(util.WorkingModeCloud) { + options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...) + } filters = filter.NewFilters(options.DisabledResourceFilters) registerAllFilters(filters) @@ -145,6 +150,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { EnableDummyIf: options.EnableDummyIf, EnableIptables: options.EnableIptables, HubAgentDummyIfName: options.HubAgentDummyIfName, + WorkingMode: util.WorkingMode(options.WorkingMode), StorageWrapper: storageWrapper, SerializerManager: serializerManager, RESTMapperManager: restMapperManager, diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index feb6eb5a932..5b97ab1c348 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -62,6 +62,7 @@ type YurtHubOptions struct { AccessServerThroughHub bool EnableResourceFilter bool DisabledResourceFilters []string + WorkingMode string } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -90,6 +91,7 @@ func NewYurtHubOptions() *YurtHubOptions { AccessServerThroughHub: true, EnableResourceFilter: true, DisabledResourceFilters: make([]string, 0), + WorkingMode: string(util.WorkingModeEdge), } return o } @@ -112,6 +114,10 @@ func ValidateOptions(options *YurtHubOptions) error { return fmt.Errorf("cert manage mode %s is not supported", options.CertMgrMode) } + if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) { + return fmt.Errorf("working mode %s is not supported", options.WorkingMode) + } + if err := verifyDummyIP(options.HubAgentDummyIfIP); err != nil { return fmt.Errorf("dummy ip %s is not invalid, %v", options.HubAgentDummyIfIP, err) } @@ -139,7 +145,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.") fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).") fs.BoolVar(&o.Version, "version", o.Version, "print the version information.") - fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/") + fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "enable profiling via web interface host:port/debug/pprof/") fs.BoolVar(&o.EnableDummyIf, "enable-dummy-if", o.EnableDummyIf, "enable dummy interface or not") fs.BoolVar(&o.EnableIptables, "enable-iptables", o.EnableIptables, "enable iptables manager to setup rules for accessing hub agent") fs.StringVar(&o.HubAgentDummyIfIP, "dummy-if-ip", o.HubAgentDummyIfIP, "the ip address of dummy interface that used for container connect hub agent(exclusive ips: 169.254.31.0/24, 169.254.1.1/32)") @@ -149,6 +155,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy") fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response") fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent") + fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).") } // verifyDummyIP verify the specified ip is valid or not diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 3711268b85e..372c5399300 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -36,6 +36,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/proxy" "github.com/openyurtio/openyurt/pkg/yurthub/server" "github.com/openyurtio/openyurt/pkg/yurthub/transport" + "github.com/openyurtio/openyurt/pkg/yurthub/util" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -125,19 +126,28 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { } trace++ - klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) - cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) - if err != nil { - return fmt.Errorf("could not new cache manager, %v", err) + var cacheMgr cachemanager.CacheManager + if cfg.WorkingMode == util.WorkingModeEdge { + klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) + cacheMgr, err = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) + if err != nil { + return fmt.Errorf("could not new cache manager, %v", err) + } + } else { + klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName) } trace++ - klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency) - gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh) - if err != nil { - return fmt.Errorf("could not new gc manager, %v", err) + if cfg.WorkingMode == util.WorkingModeEdge { + klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency) + gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh) + if err != nil { + return fmt.Errorf("could not new gc manager, %v", err) + } + gcMgr.Run() + } else { + klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName) } - gcMgr.Run() trace++ klog.Infof("%d. new filter chain for mutating response body", trace) diff --git a/pkg/yurthub/filter/constant.go b/pkg/yurthub/filter/constant.go index 0e955ad84c7..74daf6c0fce 100644 --- a/pkg/yurthub/filter/constant.go +++ b/pkg/yurthub/filter/constant.go @@ -29,3 +29,6 @@ const ( // on kube-proxy list/watch service request from edge nodes. DiscardCloudServiceFilterName = "discardcloudservice" ) + +// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode. +var DisabledInCloudMode = []string{DiscardCloudServiceFilterName} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 28da9d3108b..494f6dc957c 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -73,10 +73,17 @@ func NewYurtReverseProxyHandler( return nil, err } + var localProxy *local.LocalProxy + // When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled, + // so we don't need to create a LocalProxy. + if cacheMgr != nil { + localProxy = local.NewLocalProxy(cacheMgr, lb.IsHealthy) + } + yurtProxy := &yurtReverseProxy{ resolver: resolver, loadBalancer: lb, - localProxy: local.NewLocalProxy(cacheMgr, lb.IsHealthy), + localProxy: localProxy, cacheMgr: cacheMgr, maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight, stopCh: stopCh, @@ -88,9 +95,13 @@ func NewYurtReverseProxyHandler( func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler { handler = util.WithRequestTrace(handler) handler = util.WithRequestContentType(handler) - handler = util.WithCacheHeaderCheck(handler) + if p.cacheMgr != nil { + handler = util.WithCacheHeaderCheck(handler) + } handler = util.WithRequestTimeout(handler) - handler = util.WithListRequestSelector(handler) + if p.cacheMgr != nil { + handler = util.WithListRequestSelector(handler) + } handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight) handler = util.WithRequestClientComponent(handler) handler = filters.WithRequestInfo(handler, p.resolver) @@ -98,7 +109,7 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler } func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() { + if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() || p.localProxy == nil { p.loadBalancer.ServeHTTP(rw, req) } else { p.localProxy.ServeHTTP(rw, req) diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 1302ad49876..7ea42ea69e4 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -130,30 +130,32 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { } } - // cache resp with storage interface if resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent { - if rp.cacheMgr.CanCacheFor(req) { - reqContentType, _ := util.ReqContentTypeFrom(ctx) - respContentType := resp.Header.Get("Content-Type") - if len(respContentType) == 0 { - respContentType = reqContentType + // prepare response content type + reqContentType, _ := util.ReqContentTypeFrom(ctx) + respContentType := resp.Header.Get("Content-Type") + if len(respContentType) == 0 { + respContentType = reqContentType + } + ctx = util.WithRespContentType(ctx, respContentType) + req = req.WithContext(ctx) + + // filter response data + if rp.filterChain != nil { + size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh) + if err != nil { + klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err) + return err } - ctx = util.WithRespContentType(ctx, respContentType) - req = req.WithContext(ctx) - - if rp.filterChain != nil { - size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh) - if err != nil { - klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err) - return err - } - resp.Body = filterRc - if size > 0 { - resp.ContentLength = int64(size) - resp.Header.Set("Content-Length", fmt.Sprint(size)) - } + resp.Body = filterRc + if size > 0 { + resp.ContentLength = int64(size) + resp.Header.Set("Content-Length", fmt.Sprint(size)) } + } + // cache resp with storage interface + if rp.cacheMgr != nil && rp.cacheMgr.CanCacheFor(req) { rc, prc := util.NewDualReadCloser(req, resp.Body, true) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { err := rp.cacheMgr.CacheResponse(req, prc, stopCh) @@ -164,7 +166,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { resp.Body = rc } - } else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" { + } else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" && rp.cacheMgr != nil { // 404 Not Found: The CRD may have been unregistered and should be updated locally as well. // Other types of requests may return a 404 response for other reasons (for example, getting a pod that doesn't exist). // And the main purpose is to return 404 when list an unregistered resource locally, so here only consider the list request. @@ -184,7 +186,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, err error) { klog.V(2).Infof("remote proxy error handler: %s, %v", util.ReqString(req), err) - if !rp.cacheMgr.CanCacheFor(req) { + if rp.cacheMgr == nil || !rp.cacheMgr.CanCacheFor(req) { rw.WriteHeader(http.StatusBadGateway) return } diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 20215571ca5..ed3bf3873ce 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -45,6 +45,16 @@ import ( // ProxyKeyType represents the key in proxy request context type ProxyKeyType int +// WorkingMode represents the working mode of yurthub. +type WorkingMode string + +const ( + // WorkingModeCloud represents yurthub is working in cloud mode, which means yurthub is deployed on the cloud side. + WorkingModeCloud WorkingMode = "cloud" + // WorkingModeEdge represents yurthub is working in edge mode, which means yurthub is deployed on the edge side. + WorkingModeEdge WorkingMode = "edge" +) + const ( // YurtHubCertificateManagerName represents the certificateManager name in yurthub mode YurtHubCertificateManagerName = "hubself" @@ -321,6 +331,16 @@ func IsSupportedCertMode(certMode string) bool { return false } +// IsSupportedWorkingMode check working mode is supported or not +func IsSupportedWorkingMode(workingMode WorkingMode) bool { + switch workingMode { + case WorkingModeCloud, WorkingModeEdge: + return true + } + + return false +} + // FileExists checks if specified file exists. func FileExists(filename string) (bool, error) { if _, err := os.Stat(filename); os.IsNotExist(err) {