Skip to content

Commit

Permalink
Use a ring channel to avoid blocking write of events (#2082)
Browse files Browse the repository at this point in the history
* Use a ring channel to avoid blocking write of events

* Add eapache/channels dependency
  • Loading branch information
aledbf authored Feb 14, 2018
1 parent 33475b7 commit 9bcb5b0
Show file tree
Hide file tree
Showing 35 changed files with 2,833 additions and 78 deletions.
558 changes: 525 additions & 33 deletions Gopkg.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
# name = "github.com/x/y"
# version = "2.4.0"

[[override]]

This comment has been minimized.

Copy link
@antoineco

antoineco Feb 21, 2018

Contributor

@aledbf this breaks when the project is cloned without the vendor/ directory (dep ensure -no-vendor model).

# k8s.io/ingress-nginx/vendor/k8s.io/kubernetes/pkg/util/parsers
vendor/k8s.io/kubernetes/pkg/util/parsers/parsers.go:36:16: undefined: reference.ParseNormalizedNamed

This comment has been minimized.

Copy link
@aledbf

aledbf Feb 21, 2018

Author Member

@antoineco thank you for the report. Today I will fix this issue.

name = "github.com/docker/distribution"
revision = "edc3ab29cdff8694dd6feb85cfeb4b5f1b38ed9c"

[[constraint]]
name = "github.com/opencontainers/go-digest"
name = "github.com/eapache/channels"
branch = "master"

[[constraint]]
Expand Down
21 changes: 13 additions & 8 deletions internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/golang/glog"

proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -106,7 +107,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
}),

stopCh: make(chan struct{}),
updateCh: make(chan store.Event, 1024),
updateCh: channels.NewRingChannel(1024),

stopLock: &sync.Mutex{},

Expand Down Expand Up @@ -209,7 +210,7 @@ type NGINXController struct {
stopLock *sync.Mutex

stopCh chan struct{}
updateCh chan store.Event
updateCh *channels.RingChannel

// ngxErrCh channel used to detect errors with the nginx processes
ngxErrCh chan error
Expand Down Expand Up @@ -290,16 +291,20 @@ func (n *NGINXController) Start() {
// start a new nginx master process if the controller is not being stopped
n.start(cmd)
}
case evt := <-n.updateCh:
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
}
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
}

n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.Enqueue(evt.Obj)
} else {
glog.Warningf("unexpected event type received %T", event)
}
case <-n.stopCh:
break
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ingress/controller/store/backend_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s k8sStore) ReadSecrets(ing *extensions.Ingress) {
// sendDummyEvent sends a dummy event to trigger an update
// This is used in when a secret change
func (s *k8sStore) sendDummyEvent() {
s.updateCh <- Event{
s.updateCh.In() <- Event{
Type: UpdateEvent,
Obj: &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Expand Down
31 changes: 16 additions & 15 deletions internal/ingress/controller/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/eapache/channels"
"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -194,7 +195,7 @@ type k8sStore struct {
filesystem file.Filesystem

// updateCh
updateCh chan Event
updateCh *channels.RingChannel

// mu mutex used to avoid simultaneous incovations to syncSecret
mu *sync.Mutex
Expand All @@ -208,7 +209,7 @@ func New(checkOCSP bool,
resyncPeriod time.Duration,
client clientset.Interface,
fs file.Filesystem,
updateCh chan Event) Storer {
updateCh *channels.RingChannel) Storer {

store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
Expand Down Expand Up @@ -246,7 +247,7 @@ func New(checkOCSP bool,

store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
Expand All @@ -272,7 +273,7 @@ func New(checkOCSP bool,
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
Expand All @@ -293,7 +294,7 @@ func New(checkOCSP bool,
}

store.extractAnnotations(curIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
Expand All @@ -312,7 +313,7 @@ func New(checkOCSP bool,
_, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec))
if err == nil {
store.syncSecret(key)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
Expand All @@ -323,7 +324,7 @@ func New(checkOCSP bool,
store.extractAnnotations(ing)
}

updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
Expand All @@ -346,7 +347,7 @@ func New(checkOCSP bool,
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
Expand All @@ -362,7 +363,7 @@ func New(checkOCSP bool,
}
}

updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: sec,
}
Expand All @@ -372,13 +373,13 @@ func New(checkOCSP bool,

eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
Expand All @@ -387,7 +388,7 @@ func New(checkOCSP bool,
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
Expand All @@ -402,7 +403,7 @@ func New(checkOCSP bool,
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
}
Expand All @@ -415,15 +416,15 @@ func New(checkOCSP bool,
if mapKey == configmap {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
}
// updates to configuration configmaps can trigger an update
if mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
Expand Down
36 changes: 20 additions & 16 deletions internal/ingress/controller/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
extensions "k8s.io/api/extensions/v1beta1"
Expand Down Expand Up @@ -56,11 +57,11 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)

stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)

go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
<-ch
<-ch.Out()
}
}(updateCh)

Expand Down Expand Up @@ -111,7 +112,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected an Ingres but none returned")
}

close(updateCh)
updateCh.Close()
close(stopCh)
})

Expand All @@ -120,19 +121,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)

stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)

var add uint64
var upd uint64
var del uint64

go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}

e := evt.(Event)
if e.Obj == nil {
continue
}
Expand Down Expand Up @@ -254,7 +256,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected 1 event of type Delete but %v occurred", del)
}

close(updateCh)
updateCh.Close()
close(stopCh)
})

Expand All @@ -263,19 +265,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)

stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)

var add uint64
var upd uint64
var del uint64

go func(ch chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}

e := evt.(Event)
if e.Obj == nil {
continue
}
Expand Down Expand Up @@ -339,7 +342,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected 1 events of type Delete but %v occurred", del)
}

close(updateCh)
updateCh.Close()
close(stopCh)
})

Expand All @@ -348,19 +351,20 @@ func TestStore(t *testing.T) {
defer deleteNamespace(ns, clientSet, t)

stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
updateCh := channels.NewRingChannel(1024)

var add uint64
var upd uint64
var del uint64

go func(ch <-chan Event) {
go func(ch *channels.RingChannel) {
for {
e, ok := <-ch
evt, ok := <-ch.Out()
if !ok {
return
}

e := evt.(Event)
if e.Obj == nil {
continue
}
Expand Down Expand Up @@ -478,7 +482,7 @@ func TestStore(t *testing.T) {
}
})

close(updateCh)
updateCh.Close()
close(stopCh)
})

Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/eapache/channels/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions vendor/github.com/eapache/channels/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9bcb5b0

Please sign in to comment.