Skip to content

Commit

Permalink
feature: add --working-mode flag for yurthub (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
DrmagicE authored Sep 22, 2021
1 parent 64347fa commit da69115
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 36 deletions.
6 changes: 6 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"

Expand Down Expand Up @@ -78,6 +79,7 @@ type YurtHubConfiguration struct {
Filters *filter.Filters
SharedFactory informers.SharedInformerFactory
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -109,6 +111,9 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
var filters *filter.Filters
var mutatedMasterServiceAddr string
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

Expand Down Expand Up @@ -145,6 +150,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
EnableDummyIf: options.EnableDummyIf,
EnableIptables: options.EnableIptables,
HubAgentDummyIfName: options.HubAgentDummyIfName,
WorkingMode: util.WorkingMode(options.WorkingMode),
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
Expand Down
9 changes: 8 additions & 1 deletion cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type YurtHubOptions struct {
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewYurtHubOptions() *YurtHubOptions {
AccessServerThroughHub: true,
EnableResourceFilter: true,
DisabledResourceFilters: make([]string, 0),
WorkingMode: string(util.WorkingModeEdge),
}
return o
}
Expand All @@ -112,6 +114,10 @@ func ValidateOptions(options *YurtHubOptions) error {
return fmt.Errorf("cert manage mode %s is not supported", options.CertMgrMode)
}

if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}

if err := verifyDummyIP(options.HubAgentDummyIfIP); err != nil {
return fmt.Errorf("dummy ip %s is not invalid, %v", options.HubAgentDummyIfIP, err)
}
Expand Down Expand Up @@ -139,7 +145,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
fs.BoolVar(&o.Version, "version", o.Version, "print the version information.")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableDummyIf, "enable-dummy-if", o.EnableDummyIf, "enable dummy interface or not")
fs.BoolVar(&o.EnableIptables, "enable-iptables", o.EnableIptables, "enable iptables manager to setup rules for accessing hub agent")
fs.StringVar(&o.HubAgentDummyIfIP, "dummy-if-ip", o.HubAgentDummyIfIP, "the ip address of dummy interface that used for container connect hub agent(exclusive ips: 169.254.31.0/24, 169.254.1.1/32)")
Expand All @@ -149,6 +155,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy")
fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response")
fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
}

// verifyDummyIP verify the specified ip is valid or not
Expand Down
28 changes: 19 additions & 9 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -125,19 +126,28 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
}
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++

klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh)
if err != nil {
return fmt.Errorf("could not new gc manager, %v", err)
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh)
if err != nil {
return fmt.Errorf("could not new gc manager, %v", err)
}
gcMgr.Run()
} else {
klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
gcMgr.Run()
trace++

klog.Infof("%d. new filter chain for mutating response body", trace)
Expand Down
3 changes: 3 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ const (
// on kube-proxy list/watch service request from edge nodes.
DiscardCloudServiceFilterName = "discardcloudservice"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
var DisabledInCloudMode = []string{DiscardCloudServiceFilterName}
19 changes: 15 additions & 4 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ func NewYurtReverseProxyHandler(
return nil, err
}

var localProxy *local.LocalProxy
// When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled,
// so we don't need to create a LocalProxy.
if cacheMgr != nil {
localProxy = local.NewLocalProxy(cacheMgr, lb.IsHealthy)
}

yurtProxy := &yurtReverseProxy{
resolver: resolver,
loadBalancer: lb,
localProxy: local.NewLocalProxy(cacheMgr, lb.IsHealthy),
localProxy: localProxy,
cacheMgr: cacheMgr,
maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight,
stopCh: stopCh,
Expand All @@ -88,17 +95,21 @@ func NewYurtReverseProxyHandler(
func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler {
handler = util.WithRequestTrace(handler)
handler = util.WithRequestContentType(handler)
handler = util.WithCacheHeaderCheck(handler)
if p.cacheMgr != nil {
handler = util.WithCacheHeaderCheck(handler)
}
handler = util.WithRequestTimeout(handler)
handler = util.WithListRequestSelector(handler)
if p.cacheMgr != nil {
handler = util.WithListRequestSelector(handler)
}
handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight)
handler = util.WithRequestClientComponent(handler)
handler = filters.WithRequestInfo(handler, p.resolver)
return handler
}

func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() {
if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() || p.localProxy == nil {
p.loadBalancer.ServeHTTP(rw, req)
} else {
p.localProxy.ServeHTTP(rw, req)
Expand Down
46 changes: 24 additions & 22 deletions pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,32 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
}
}

// cache resp with storage interface
if resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent {
if rp.cacheMgr.CanCacheFor(req) {
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType := resp.Header.Get("Content-Type")
if len(respContentType) == 0 {
respContentType = reqContentType
// prepare response content type
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType := resp.Header.Get("Content-Type")
if len(respContentType) == 0 {
respContentType = reqContentType
}
ctx = util.WithRespContentType(ctx, respContentType)
req = req.WithContext(ctx)

// filter response data
if rp.filterChain != nil {
size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh)
if err != nil {
klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err)
return err
}
ctx = util.WithRespContentType(ctx, respContentType)
req = req.WithContext(ctx)

if rp.filterChain != nil {
size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh)
if err != nil {
klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err)
return err
}
resp.Body = filterRc
if size > 0 {
resp.ContentLength = int64(size)
resp.Header.Set("Content-Length", fmt.Sprint(size))
}
resp.Body = filterRc
if size > 0 {
resp.ContentLength = int64(size)
resp.Header.Set("Content-Length", fmt.Sprint(size))
}
}

// cache resp with storage interface
if rp.cacheMgr != nil && rp.cacheMgr.CanCacheFor(req) {
rc, prc := util.NewDualReadCloser(req, resp.Body, true)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
err := rp.cacheMgr.CacheResponse(req, prc, stopCh)
Expand All @@ -164,7 +166,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {

resp.Body = rc
}
} else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" {
} else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" && rp.cacheMgr != nil {
// 404 Not Found: The CRD may have been unregistered and should be updated locally as well.
// Other types of requests may return a 404 response for other reasons (for example, getting a pod that doesn't exist).
// And the main purpose is to return 404 when list an unregistered resource locally, so here only consider the list request.
Expand All @@ -184,7 +186,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {

func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, err error) {
klog.V(2).Infof("remote proxy error handler: %s, %v", util.ReqString(req), err)
if !rp.cacheMgr.CanCacheFor(req) {
if rp.cacheMgr == nil || !rp.cacheMgr.CanCacheFor(req) {
rw.WriteHeader(http.StatusBadGateway)
return
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ import (
// ProxyKeyType represents the key in proxy request context
type ProxyKeyType int

// WorkingMode represents the working mode of yurthub.
type WorkingMode string

const (
// WorkingModeCloud represents yurthub is working in cloud mode, which means yurthub is deployed on the cloud side.
WorkingModeCloud WorkingMode = "cloud"
// WorkingModeEdge represents yurthub is working in edge mode, which means yurthub is deployed on the edge side.
WorkingModeEdge WorkingMode = "edge"
)

const (
// YurtHubCertificateManagerName represents the certificateManager name in yurthub mode
YurtHubCertificateManagerName = "hubself"
Expand Down Expand Up @@ -321,6 +331,16 @@ func IsSupportedCertMode(certMode string) bool {
return false
}

// IsSupportedWorkingMode check working mode is supported or not
func IsSupportedWorkingMode(workingMode WorkingMode) bool {
switch workingMode {
case WorkingModeCloud, WorkingModeEdge:
return true
}

return false
}

// FileExists checks if specified file exists.
func FileExists(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
Expand Down

0 comments on commit da69115

Please sign in to comment.