Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport upstream changes to watch cache enablement #16398

Merged
merged 3 commits into from
Sep 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/completions/bash/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -32111,6 +32111,8 @@ _openshift_start_kubernetes_apiserver()
local_nonpersistent_flags+=("--contention-profiling")
flags+=("--cors-allowed-origins=")
local_nonpersistent_flags+=("--cors-allowed-origins=")
flags+=("--default-watch-cache-size=")
local_nonpersistent_flags+=("--default-watch-cache-size=")
flags+=("--delete-collection-workers=")
local_nonpersistent_flags+=("--delete-collection-workers=")
flags+=("--deserialization-cache-size=")
Expand Down
2 changes: 2 additions & 0 deletions contrib/completions/zsh/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -32260,6 +32260,8 @@ _openshift_start_kubernetes_apiserver()
local_nonpersistent_flags+=("--contention-profiling")
flags+=("--cors-allowed-origins=")
local_nonpersistent_flags+=("--cors-allowed-origins=")
flags+=("--default-watch-cache-size=")
local_nonpersistent_flags+=("--default-watch-cache-size=")
flags+=("--delete-collection-workers=")
local_nonpersistent_flags+=("--delete-collection-workers=")
flags+=("--deserialization-cache-size=")
Expand Down
24 changes: 15 additions & 9 deletions pkg/cmd/server/kubernetes/master/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ import (
"github.com/openshift/origin/pkg/version"
)

const DefaultWatchCacheSize = 1000

// request paths that match this regular expression will be treated as long running
// and not subjected to the default server timeout.
const originLongRunningEndpointsRE = "(/|^)(buildconfigs/.*/instantiatebinary|imagestreamimports)$"
Expand Down Expand Up @@ -150,7 +148,7 @@ func BuildKubeAPIserverOptions(masterConfig configapi.MasterConfig) (*kapiserver
server.Etcd.StorageConfig.KeyFile = masterConfig.EtcdClientInfo.ClientCert.KeyFile
server.Etcd.StorageConfig.CertFile = masterConfig.EtcdClientInfo.ClientCert.CertFile
server.Etcd.StorageConfig.CAFile = masterConfig.EtcdClientInfo.CA
server.Etcd.DefaultWatchCacheSize = DefaultWatchCacheSize
server.Etcd.DefaultWatchCacheSize = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what is setting us to "off by default", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct


server.GenericServerRunOptions.CorsAllowedOriginList = masterConfig.CORSAllowedOrigins
server.GenericServerRunOptions.MaxRequestsInFlight = masterConfig.ServingInfo.MaxRequestsInFlight
Expand Down Expand Up @@ -507,6 +505,20 @@ func buildKubeApiserverConfig(
return originLongRunningRequestRE.MatchString(r.URL.Path) || kubeLongRunningFunc(r, requestInfo)
}

if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := cachesize.NewHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we set this target RAMMB to anything by default?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we set this target RAMMB to anything by default?

I'm not seeing where we write a default here, which would set all the heuristic ones to zero by default, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a "min" function on the heuristic so we always get something even at 0

if userSpecified, err := genericoptions.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}
apiserverOptions.Etcd.WatchCacheSizes, err = genericoptions.WriteWatchCacheSizes(sizes)
if err != nil {
return nil, err
}
}

if err := apiserverOptions.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, err
}
Expand Down Expand Up @@ -566,12 +578,6 @@ func buildKubeApiserverConfig(
EnableCoreControllers: true,
}

if apiserverOptions.Etcd.EnableWatchCache {
// TODO(rebase): upstream also does the following:
// cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
cachesize.SetWatchCacheSizes(apiserverOptions.GenericServerRunOptions.WatchCacheSizes)
}

if kubeApiserverConfig.EnableCoreControllers {
ttl := masterConfig.KubernetesMasterConfig.MasterEndpointReconcileTTL
interval := ttl * 2 / 3
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/server/kubernetes/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func TestNewMasterLeasesHasCorrectTTL(t *testing.T) {
}

restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
watchCacheDisabled := 0
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, &watchCacheDisabled, nil, "masterleases", nil, nil, nil, nil)
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, nil, "masterleases", nil, nil, nil, nil)
defer server.Terminate(t)

masterLeases := newMasterLeases(storageInterface, 15)
Expand Down
2 changes: 0 additions & 2 deletions pkg/security/registry/securitycontextconstraints/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/cachesize"

securityapi "github.com/openshift/origin/pkg/security/apis/security"
"github.com/openshift/origin/pkg/security/registry/securitycontextconstraints"
Expand All @@ -30,7 +29,6 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
},
PredicateFunc: securitycontextconstraints.Matcher,
DefaultQualifiedResource: securityapi.Resource("securitycontextconstraints"),
WatchCacheSize: cachesize.GetWatchCacheSizeByResource("securitycontextconstraints"),

CreateStrategy: securitycontextconstraints.Strategy,
UpdateStrategy: securitycontextconstraints.Strategy,
Expand Down
101 changes: 43 additions & 58 deletions pkg/util/restoptions/configgetter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package restoptions

import (
"fmt"
"strconv"
"strings"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/runtime/schema"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"

"github.com/golang/glog"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
)
Expand All @@ -41,7 +35,6 @@ type configRESTOptionsGetter struct {
}

// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
// By default, the etcd watch cache is enabled with a size of 1000 per resource type.
// TODO: this class should either not need to know about configapi.MasterConfig, or not be in pkg/util
func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig *serverstorage.ResourceConfig, resourcePrefixOverrides map[schema.GroupResource]string, enforcedStorageVersions map[schema.GroupResource]schema.GroupVersion, quorumResources map[schema.GroupResource]struct{}) (Getter, error) {
apiserverOptions, err := kubernetes.BuildKubeAPIserverOptions(masterOptions)
Expand All @@ -55,27 +48,24 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
storageFactory.DefaultResourcePrefixes = resourcePrefixOverrides
storageFactory.StorageConfig.Prefix = masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix

// TODO: refactor vendor/k8s.io/kubernetes/pkg/registry/cachesize to remove our custom cache size code
errs := []error{}
cacheSizes := map[schema.GroupResource]int{}
for _, c := range apiserverOptions.GenericServerRunOptions.WatchCacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)", c))
continue
// perform watch cache heuristic like upstream
if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := newHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
if userSpecified, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}

resource := schema.ParseGroupResource(tokens[0])

size, err := strconv.Atoi(tokens[1])
apiserverOptions.Etcd.WatchCacheSizes, err = options.WriteWatchCacheSizes(sizes)
if err != nil {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s': %v", c, err))
continue
return nil, err
}
cacheSizes[resource] = size
}
if len(errs) > 0 {
return nil, kerrors.NewAggregate(errs)

cacheSizes, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes)
if err != nil {
return nil, err
}

return &configRESTOptionsGetter{
Expand Down Expand Up @@ -108,41 +98,14 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
config.Quorum = true
}

configuredCacheSize, specified := g.cacheSizes[resource]
if !specified || configuredCacheSize < 0 {
configuredCacheSize = g.defaultCacheSize
}
storageWithCacher := registry.StorageWithCacher(configuredCacheSize)

decorator := func(
copier runtime.ObjectCopier,
storageConfig *storagebackend.Config,
requestedSize *int,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFn func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFn storage.TriggerPublisherFunc,
) (storage.Interface, factory.DestroyFunc) {
// use the origin default cache size, not the one in registry.StorageWithCacher
capacity := &configuredCacheSize
if requestedSize != nil {
capacity = requestedSize
}

if *capacity == 0 || !g.cacheEnabled {
glog.V(5).Infof("using uncached watch storage for %s (quorum=%t)", resource.String(), storageConfig.Quorum)
return generic.UndecoratedStorage(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
}

glog.V(5).Infof("using watch cache storage (capacity=%v, quorum=%t) for %s %#v", *capacity, storageConfig.Quorum, resource.String(), storageConfig)
return storageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
cacheSize, ok := g.cacheSizes[resource]
if !ok {
cacheSize = g.defaultCacheSize
}

resourceOptions := generic.RESTOptions{
StorageConfig: config,
Decorator: decorator,
Decorator: registry.StorageWithCacher(cacheSize),
DeleteCollectionWorkers: g.deleteCollectionWorkers,
EnableGarbageCollection: g.enableGarbageCollection,
ResourcePrefix: g.storageFactory.ResourcePrefix(resource),
Expand All @@ -151,3 +114,25 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)

return resourceOptions, nil
}

// newHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory. It reuses the upstream heuristic and adds OpenShift specific resources.
func newHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
// TODO: Revisit this heuristic, copied from upstream
clusterSize := expectedRAMCapacityMB / 60

// default enable watch caches for resources that will have a high number of clients accessing it
// and where the write rate may be significant
watchCacheSizes := make(map[schema.GroupResource]int)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "hostsubnets"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "netnamespaces"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "egressnetworkpolicies"}] = maxInt(10*clusterSize, 100)
return watchCacheSizes
}

func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
11 changes: 7 additions & 4 deletions test/integration/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"

configapi "github.com/openshift/origin/pkg/cmd/server/api"
serverkube "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
testutil "github.com/openshift/origin/test/util"
testserver "github.com/openshift/origin/test/util/server"
)
Expand Down Expand Up @@ -124,9 +123,13 @@ func TestDefaultWatchCacheSize(t *testing.T) {
etcdOptions := apiserveroptions.NewEtcdOptions(&storagebackend.Config{})
kubeDefaultCacheSize := etcdOptions.DefaultWatchCacheSize
if kubeDefaultCacheSize != 100 {
t.Fatalf("upstream DefaultWatchCacheSize changed from 100 to %q", kubeDefaultCacheSize)
t.Fatalf("upstream DefaultWatchCacheSize changed to %d", kubeDefaultCacheSize)
}
testWatchCacheWithConfig(t, master, serverkube.DefaultWatchCacheSize, kubeDefaultCacheSize)
if master.KubernetesMasterConfig.APIServerArguments == nil {
master.KubernetesMasterConfig.APIServerArguments = configapi.ExtendedArguments{}
}
master.KubernetesMasterConfig.APIServerArguments["watch-cache-sizes"] = []string{"namespaces#100"}
testWatchCacheWithConfig(t, master, 100, 0)
}

func TestWatchCacheSizeWithFlag(t *testing.T) {
Expand All @@ -140,5 +143,5 @@ func TestWatchCacheSizeWithFlag(t *testing.T) {
}
master.KubernetesMasterConfig.APIServerArguments["watch-cache-sizes"] = []string{"namespaces#2000"}

testWatchCacheWithConfig(t, master, 2000, serverkube.DefaultWatchCacheSize)
testWatchCacheWithConfig(t, master, 2000, 0)
}
13 changes: 11 additions & 2 deletions vendor/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading