Skip to content

Commit

Permalink
feat: parallelize nodes update
Browse files Browse the repository at this point in the history
This PR aims to optimize the process of updating nodes with
corresponding features. In fact, previously, we were updating nodes
sequentially even though they are independent from each other.
Therefore, we integrated new components: LabelersNodePool which is
responsible for spininng a goroutine whenever there's a request for
updating nodes, and a Workqueue which is responsible for holding nodes names
that should be updated.

Signed-off-by: AhmedGrati <ahmedgrati1999@gmail.com>
  • Loading branch information
TessaIO committed Jun 2, 2023
1 parent d64398f commit 1a2fefb
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 43 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,6 @@ site-build:
site-serve:
@mkdir -p docs/vendor/bundle
$(SITE_BUILD_CMD) sh -c "bundle install && jekyll serve $(JEKYLL_OPTS) -H 127.0.0.1"

benchmark:
go test -bench=./pkg/nfd-master -run=^# ./pkg/nfd-master
5 changes: 5 additions & 0 deletions cmd/nfd-master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func main() {
args.Overrides.NoPublish = overrides.NoPublish
case "resync-period":
args.Overrides.ResyncPeriod = overrides.ResyncPeriod
case "nfd-api-parallelism":
args.Overrides.NfdApiParallelism = overrides.NfdApiParallelism
}
})

Expand Down Expand Up @@ -156,5 +158,8 @@ func initFlags(flagset *flag.FlagSet) (*master.Args, *master.ConfigOverrideArgs)
flagset.Var(overrides.ResyncPeriod, "resync-period",
"Specify the NFD API controller resync period."+
"It has an effect when the NodeFeature API has been enabled (with -enable-nodefeature-api).")
overrides.NfdApiParallelism = flagset.Int("nfd-api-parallelism", 10, "Defines the maximum number of goroutines responsible of updating nodes. "+
"Can be used for the throttling mechanism. It has effect only when -enable-nodefeature-api has been set.")

return args, overrides
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
# renewDeadline: 10s
# # this value has to be greater than 0
# retryPeriod: 2s
# nfdApiParallelism: 10
3 changes: 3 additions & 0 deletions deployment/helm/node-feature-discovery/templates/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ spec:
{{- if .Values.master.resyncPeriod }}
- "-resync-period={{ .Values.master.resyncPeriod }}"
{{- end }}
{{- if .Values.master.nfdApiParallelism | empty | not }}
- "-nfd-api-parallelism={{ .Values.master.nfdApiParallelism }}"
{{- end }}
{{- if .Values.tls.enable }}
- "-ca-file=/etc/kubernetes/node-feature-discovery/certs/ca.crt"
- "-key-file=/etc/kubernetes/node-feature-discovery/certs/tls.key"
Expand Down
2 changes: 2 additions & 0 deletions deployment/helm/node-feature-discovery/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ master:
# renewDeadline: 10s
# # this value has to be greater than 0
# retryPeriod: 2s
# nfdApiParallelism: 10
### <NFD-MASTER-CONF-END-DO-NOT-REMOVE>
# The TCP port that nfd-master listens for incoming requests. Default: 8080
port: 8080
Expand All @@ -39,6 +40,7 @@ master:
enableTaints: false
crdController: null
featureRulesController: null
nfdApiParallelism: null
deploymentAnnotations: {}
replicaCount: 1

Expand Down
1 change: 1 addition & 0 deletions docs/deployment/helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ We have introduced the following Chart parameters.
| `master.annotations` | dict | {} | NFD master pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
| `master.affinity` | dict | | NFD master pod required [node affinity](https://kubernetes.io/docs/tasks/configure-pod-container/assign-pods-nodes-using-node-affinity/) |
| `master.deploymentAnnotations` | dict | {} | NFD master deployment [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
| `master.nfdApiParallelism` | integer | 10 | Specifies the maximum number of concurrent node updates. |
| `master.config` | dict | | NFD master [configuration](../reference/master-configuration-reference) |

### Worker pod parameters
Expand Down
15 changes: 15 additions & 0 deletions docs/reference/master-commandline-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,21 @@ Example:
nfd-master -options='{"noPublish": true}'
```

### -nfd-api-parallelism

The `-nfd-api-parallelism` flag can be used to specify the maximum
number of concurrent node updates.

It takes effect only when `-enable-nodefeature-api` has been set.

Default: 10

Example:

```bash
nfd-master -nfd-api-parallelism=1
```

### Logging

The following logging-related flags are inherited from the
Expand Down
15 changes: 15 additions & 0 deletions docs/reference/master-configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,18 @@ Example:
leaderElection:
retryPeriod: 2s
```

### nfdApiParallelism

The `nfdApiParallelism` option can be used to specify the maximum
number of concurrent node updates.

It takes effect only when `-enable-nodefeature-api` has been set.

Default: 10

Example:

```yaml
nfdApiParallelism: 1
```
105 changes: 105 additions & 0 deletions pkg/nfd-master/nfd-master-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
"sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned/fake"
nfdscheme "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned/scheme"
nfdinformers "sigs.k8s.io/node-feature-discovery/pkg/generated/informers/externalversions"
"sigs.k8s.io/node-feature-discovery/pkg/labeler"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
Expand All @@ -56,6 +61,60 @@ func newMockNode() *corev1.Node {
return &n
}

func mockNodeList() *corev1.NodeList {
l := corev1.NodeList{}

for i := 0; i < 1000; i++ {
n := corev1.Node{}
n.Name = fmt.Sprintf("node %v", i)
n.Labels = map[string]string{}
n.Annotations = map[string]string{}
n.Status.Capacity = corev1.ResourceList{}

l.Items = append(l.Items, n)
}
return &l
}

func newMockNfdAPIController(client *fake.Clientset) *nfdController {
c := &nfdController{
stopChan: make(chan struct{}, 1),
updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string),
}

informerFactory := nfdinformers.NewSharedInformerFactory(client, 1*time.Hour)

// Add informer for NodeFeature objects
featureInformer := informerFactory.Nfd().V1alpha1().NodeFeatures()
if _, err := featureInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}); err != nil {
return nil
}
c.featureLister = featureInformer.Lister()

// Add informer for NodeFeatureRule objects
ruleInformer := informerFactory.Nfd().V1alpha1().NodeFeatureRules()
if _, err := ruleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(object interface{}) {},
UpdateFunc: func(oldObject, newObject interface{}) {},
DeleteFunc: func(object interface{}) {},
}); err != nil {
return nil
}
c.ruleLister = ruleInformer.Lister()

// Start informers
informerFactory.Start(c.stopChan)

utilruntime.Must(nfdv1alpha1.AddToScheme(nfdscheme.Scheme))

return c
}

func newMockMaster(apihelper apihelper.APIHelpers) *nfdMaster {
return &nfdMaster{
nodeName: mockNodeName,
Expand Down Expand Up @@ -647,11 +706,14 @@ extraLabelNs: ["added.ns.io"]
writeConfig(`
extraLabelNs: ["override.ns.io"]
resyncPeriod: '2h'
nfdApiParallelism: 300
`)
So(func() interface{} { return master.config.ExtraLabelNs },
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{"override.ns.io": struct{}{}})
So(func() interface{} { return master.config.ResyncPeriod.Duration },
withTimeout, 2*time.Second, ShouldResemble, time.Duration(2)*time.Hour)
So(func() interface{} { return master.config.NfdApiParallelism },
withTimeout, 2*time.Second, ShouldResemble, 300)

// Removing config file should get back our defaults
err = os.RemoveAll(tmpDir)
Expand All @@ -660,22 +722,65 @@ resyncPeriod: '2h'
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{})
So(func() interface{} { return master.config.ResyncPeriod.Duration },
withTimeout, 2*time.Second, ShouldResemble, time.Duration(1)*time.Hour)
So(func() interface{} { return master.config.NfdApiParallelism },
withTimeout, 2*time.Second, ShouldResemble, 10)

// Re-creating config dir and file should change the config
err = os.MkdirAll(configDir, 0755)
So(err, ShouldBeNil)
writeConfig(`
extraLabelNs: ["another.override.ns"]
resyncPeriod: '3m'
nfdApiParallelism: 100
`)
So(func() interface{} { return master.config.ExtraLabelNs },
withTimeout, 2*time.Second, ShouldResemble, utils.StringSetVal{"another.override.ns": struct{}{}})
So(func() interface{} { return master.config.ResyncPeriod.Duration },
withTimeout, 2*time.Second, ShouldResemble, time.Duration(3)*time.Minute)
So(func() interface{} { return master.config.NfdApiParallelism },
withTimeout, 2*time.Second, ShouldResemble, 100)
})
})
}

func BenchmarkNfdAPIUpdateAllNodes(b *testing.B) {
mockAPIHelper := new(apihelper.MockAPIHelpers)

mockMaster := newMockMaster(mockAPIHelper)
mockMaster.nfdController = newMockNfdAPIController(fake.NewSimpleClientset())
mockMaster.config.NoPublish = true

mockNodeUpdaterPool := newNodeUpdaterPool(mockMaster)
mockMaster.nodeUpdaterPool = mockNodeUpdaterPool

mockClient := &k8sclient.Clientset{}

statusPatches := []apihelper.JsonPatch{}
metadataPatches := []apihelper.JsonPatch{
{Op: "add", Path: "/metadata/annotations/nfd.node.kubernetes.io~1feature-labels", Value: ""}, {Op: "add", Path: "/metadata/annotations/nfd.node.kubernetes.io~1extended-resources", Value: ""},
}

mockAPIHelper.On("GetClient").Return(mockClient, nil)
mockAPIHelper.On("GetNodes", mockClient).Return(mockNodeList(), nil)

mockNodeUpdaterPool.start(10)

for i := 0; i < 1000; i++ {
nodeName := fmt.Sprintf("node %v", i)
node := corev1.Node{}
node.Name = nodeName
mockAPIHelper.On("GetNode", mockClient, nodeName).Return(&node, nil)
mockAPIHelper.On("PatchNodeStatus", mockClient, nodeName, mock.MatchedBy(jsonPatchMatcher(statusPatches))).Return(nil)
mockAPIHelper.On("PatchNode", mockClient, nodeName, mock.MatchedBy(jsonPatchMatcher(metadataPatches))).Return(nil)
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
_ = mockMaster.nfdAPIUpdateAllNodes()
}
fmt.Println(b.Elapsed())
}

// withTimeout is a custom assertion for polling a value asynchronously
// actual is a function for getting the actual value
// expected[0] is a time.Duration value specifying the timeout
Expand Down
Loading

0 comments on commit 1a2fefb

Please sign in to comment.