Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add --working-mode flag for yurthub #483

Merged
merged 1 commit into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"

Expand Down Expand Up @@ -78,6 +79,7 @@ type YurtHubConfiguration struct {
Filters *filter.Filters
SharedFactory informers.SharedInformerFactory
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -109,6 +111,9 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
var filters *filter.Filters
var mutatedMasterServiceAddr string
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

Expand Down Expand Up @@ -145,6 +150,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
EnableDummyIf: options.EnableDummyIf,
EnableIptables: options.EnableIptables,
HubAgentDummyIfName: options.HubAgentDummyIfName,
WorkingMode: util.WorkingMode(options.WorkingMode),
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
Expand Down
9 changes: 8 additions & 1 deletion cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type YurtHubOptions struct {
AccessServerThroughHub bool
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewYurtHubOptions() *YurtHubOptions {
AccessServerThroughHub: true,
EnableResourceFilter: true,
DisabledResourceFilters: make([]string, 0),
WorkingMode: string(util.WorkingModeEdge),
}
return o
}
Expand All @@ -112,6 +114,10 @@ func ValidateOptions(options *YurtHubOptions) error {
return fmt.Errorf("cert manage mode %s is not supported", options.CertMgrMode)
}

if !util.IsSupportedWorkingMode(util.WorkingMode(options.WorkingMode)) {
return fmt.Errorf("working mode %s is not supported", options.WorkingMode)
}

if err := verifyDummyIP(options.HubAgentDummyIfIP); err != nil {
return fmt.Errorf("dummy ip %s is not invalid, %v", options.HubAgentDummyIfIP, err)
}
Expand Down Expand Up @@ -139,7 +145,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
fs.BoolVar(&o.Version, "version", o.Version, "print the version information.")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableDummyIf, "enable-dummy-if", o.EnableDummyIf, "enable dummy interface or not")
fs.BoolVar(&o.EnableIptables, "enable-iptables", o.EnableIptables, "enable iptables manager to setup rules for accessing hub agent")
fs.StringVar(&o.HubAgentDummyIfIP, "dummy-if-ip", o.HubAgentDummyIfIP, "the ip address of dummy interface that used for container connect hub agent(exclusive ips: 169.254.31.0/24, 169.254.1.1/32)")
Expand All @@ -149,6 +155,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableResourceFilter, "enable-resource-filter", o.EnableResourceFilter, "enable to filter response that comes back from reverse proxy")
fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response")
fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
}

// verifyDummyIP verify the specified ip is valid or not
Expand Down
28 changes: 19 additions & 9 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -125,19 +126,28 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
}
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
trace++

klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh)
if err != nil {
return fmt.Errorf("could not new gc manager, %v", err)
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, stopCh)
if err != nil {
return fmt.Errorf("could not new gc manager, %v", err)
}
gcMgr.Run()
} else {
klog.Infof("%d. disable gc manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
gcMgr.Run()
trace++

klog.Infof("%d. new filter chain for mutating response body", trace)
Expand Down
3 changes: 3 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ const (
// on kube-proxy list/watch service request from edge nodes.
DiscardCloudServiceFilterName = "discardcloudservice"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
var DisabledInCloudMode = []string{DiscardCloudServiceFilterName}
19 changes: 15 additions & 4 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ func NewYurtReverseProxyHandler(
return nil, err
}

var localProxy *local.LocalProxy
// When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled,
// so we don't need to create a LocalProxy.
if cacheMgr != nil {
localProxy = local.NewLocalProxy(cacheMgr, lb.IsHealthy)
}

yurtProxy := &yurtReverseProxy{
resolver: resolver,
loadBalancer: lb,
localProxy: local.NewLocalProxy(cacheMgr, lb.IsHealthy),
localProxy: localProxy,
cacheMgr: cacheMgr,
maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight,
stopCh: stopCh,
Expand All @@ -88,17 +95,21 @@ func NewYurtReverseProxyHandler(
func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler {
handler = util.WithRequestTrace(handler)
handler = util.WithRequestContentType(handler)
handler = util.WithCacheHeaderCheck(handler)
if p.cacheMgr != nil {
handler = util.WithCacheHeaderCheck(handler)
}
handler = util.WithRequestTimeout(handler)
handler = util.WithListRequestSelector(handler)
if p.cacheMgr != nil {
handler = util.WithListRequestSelector(handler)
}
handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight)
handler = util.WithRequestClientComponent(handler)
handler = filters.WithRequestInfo(handler, p.resolver)
return handler
}

func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() {
if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() || p.localProxy == nil {
p.loadBalancer.ServeHTTP(rw, req)
} else {
p.localProxy.ServeHTTP(rw, req)
Expand Down
46 changes: 24 additions & 22 deletions pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,32 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
}
}

// cache resp with storage interface
if resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent {
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
if rp.cacheMgr.CanCacheFor(req) {
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType := resp.Header.Get("Content-Type")
if len(respContentType) == 0 {
respContentType = reqContentType
// prepare response content type
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType := resp.Header.Get("Content-Type")
if len(respContentType) == 0 {
respContentType = reqContentType
}
ctx = util.WithRespContentType(ctx, respContentType)
req = req.WithContext(ctx)

// filter response data
if rp.filterChain != nil {
size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh)
if err != nil {
klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err)
return err
}
ctx = util.WithRespContentType(ctx, respContentType)
req = req.WithContext(ctx)

if rp.filterChain != nil {
size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh)
if err != nil {
klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err)
return err
}
resp.Body = filterRc
if size > 0 {
resp.ContentLength = int64(size)
resp.Header.Set("Content-Length", fmt.Sprint(size))
}
resp.Body = filterRc
if size > 0 {
resp.ContentLength = int64(size)
resp.Header.Set("Content-Length", fmt.Sprint(size))
}
}

// cache resp with storage interface
if rp.cacheMgr != nil && rp.cacheMgr.CanCacheFor(req) {
rc, prc := util.NewDualReadCloser(req, resp.Body, true)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
err := rp.cacheMgr.CacheResponse(req, prc, stopCh)
Expand All @@ -164,7 +166,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {

resp.Body = rc
}
} else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" {
} else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" && rp.cacheMgr != nil {
// 404 Not Found: The CRD may have been unregistered and should be updated locally as well.
// Other types of requests may return a 404 response for other reasons (for example, getting a pod that doesn't exist).
// And the main purpose is to return 404 when list an unregistered resource locally, so here only consider the list request.
Expand All @@ -184,7 +186,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {

func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, err error) {
klog.V(2).Infof("remote proxy error handler: %s, %v", util.ReqString(req), err)
if !rp.cacheMgr.CanCacheFor(req) {
if rp.cacheMgr == nil || !rp.cacheMgr.CanCacheFor(req) {
rw.WriteHeader(http.StatusBadGateway)
return
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ import (
// ProxyKeyType represents the key in proxy request context
type ProxyKeyType int

// WorkingMode represents the working mode of yurthub.
type WorkingMode string

const (
// WorkingModeCloud represents yurthub is working in cloud mode, which means yurthub is deployed on the cloud side.
WorkingModeCloud WorkingMode = "cloud"
// WorkingModeEdge represents yurthub is working in edge mode, which means yurthub is deployed on the edge side.
WorkingModeEdge WorkingMode = "edge"
)

const (
// YurtHubCertificateManagerName represents the certificateManager name in yurthub mode
YurtHubCertificateManagerName = "hubself"
Expand Down Expand Up @@ -321,6 +331,16 @@ func IsSupportedCertMode(certMode string) bool {
return false
}

// IsSupportedWorkingMode check working mode is supported or not
func IsSupportedWorkingMode(workingMode WorkingMode) bool {
switch workingMode {
case WorkingModeCloud, WorkingModeEdge:
return true
}

return false
}

// FileExists checks if specified file exists.
func FileExists(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
Expand Down