Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds event manager to tribe
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Jul 15, 2016
1 parent 0987d52 commit 6288d66
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 8 deletions.
41 changes: 41 additions & 0 deletions core/tribe_event/tribe_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tribe_event

import "github.com/intelsdi-x/snap/core"

const (
PluginAdded = "Tribe.PluginAdded"
)

type AddPluginEvent struct {
Agreement struct {
Name string
}
Plugin struct {
Name string
Type core.PluginType
Version int
}
}

func (e AddPluginEvent) Namespace() string {
return PluginAdded
}
40 changes: 32 additions & 8 deletions mgmt/rest/tribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
log "github.com/Sirupsen/logrus"
. "github.com/smartystreets/goconvey/convey"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/tribe_event"
"github.com/intelsdi-x/snap/mgmt/rest/rbody"
"github.com/intelsdi-x/snap/mgmt/tribe"
"github.com/intelsdi-x/snap/scheduler"
Expand Down Expand Up @@ -151,7 +153,7 @@ func TestTribeTaskAgreements(t *testing.T) {
log.SetLevel(log.WarnLevel)
numOfNodes := 5
aName := "agreement99"
mgtPorts, tribePort := startTribes(numOfNodes, "")
mgtPorts, tribePort, lpe := startTribes(numOfNodes, "")
Convey("A cluster is started", t, func() {
Convey("Members are retrieved", func() {
for _, i := range mgtPorts {
Expand Down Expand Up @@ -206,6 +208,7 @@ func TestTribeTaskAgreements(t *testing.T) {
So(resp.Meta.Code, ShouldEqual, 200)
So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 1)
pluginToUnload := resp.Body.(*rbody.PluginList).LoadedPlugins[0]
<-lpe.pluginAddEvent
resp = getAgreement(mgtPorts[0], aName)
So(resp.Meta.Code, ShouldEqual, 200)
So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 1)
Expand Down Expand Up @@ -278,7 +281,7 @@ func TestTribeTaskAgreements(t *testing.T) {
wg.Wait()
So(timedOut, ShouldEqual, false)
Convey("A new node joins the agreement", func() {
mgtPort, _ := startTribes(1, fmt.Sprintf("127.0.0.1:%d", tribePort))
mgtPort, _, _ := startTribes(1, fmt.Sprintf("127.0.0.1:%d", tribePort))
j := joinAgreement(mgtPort[0], fmt.Sprintf("member-%d", mgtPort[0]), aName)
mgtPorts = append(mgtPorts, mgtPort[0])
So(j.Meta.Code, ShouldEqual, 200)
Expand Down Expand Up @@ -429,7 +432,7 @@ func TestTribePluginAgreements(t *testing.T) {
)
numOfNodes := 5
aName := "agreement1"
mgtPorts, _ := startTribes(numOfNodes, "")
mgtPorts, _, _ := startTribes(numOfNodes, "")
Convey("A cluster is started", t, func() {
Convey("Members are retrieved", func() {
for _, i := range mgtPorts {
Expand Down Expand Up @@ -663,11 +666,29 @@ func TestTribePluginAgreements(t *testing.T) {
})
}

type listenToSeedEvents struct {
pluginAddEvent chan struct{}
}

func newListenToSeedEvents() *listenToSeedEvents {
return &listenToSeedEvents{
pluginAddEvent: make(chan struct{}),
}
}

func (l *listenToSeedEvents) HandleGomitEvent(e gomit.Event) {
switch e.Body.(type) {
case *tribe_event.AddPluginEvent:
l.pluginAddEvent <- struct{}{}
}
}

// returns an array of the mgtports and the tribe port for the last node
func startTribes(count int, seed string) ([]int, int) {
func startTribes(count int, seed string) ([]int, int, *listenToSeedEvents) {
var wg sync.WaitGroup
var tribePort int
var mgtPorts []int
lpe := newListenToSeedEvents()
for i := 0; i < count; i++ {
mgtPort := getAvailablePort()
mgtPorts = append(mgtPorts, mgtPort)
Expand All @@ -680,14 +701,17 @@ func startTribes(count int, seed string) ([]int, int) {
conf.RestAPIPort = mgtPort
//conf.MemberlistConfig.PushPullInterval = 5 * time.Second
conf.MemberlistConfig.RetransmitMult = conf.MemberlistConfig.RetransmitMult * 2
if seed == "" {
seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort)
}

t, err := tribe.New(conf)
if err != nil {
panic(err)
}

if seed == "" {
seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort)
t.EventManager.RegisterHandler("tribe.tests", lpe)
}

c := control.New(control.GetDefaultConfig())
c.RegisterEventHandler("tribe", t)
c.Start()
Expand Down Expand Up @@ -724,7 +748,7 @@ func startTribes(count int, seed string) ([]int, int) {
}(mgtPort)
}
wg.Wait()
return mgtPorts, tribePort
return mgtPorts, tribePort, lpe
}

var nextPort uint64 = 55234
Expand Down
11 changes: 11 additions & 0 deletions mgmt/tribe/tribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/scheduler_event"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/core/tribe_event"
"github.com/intelsdi-x/snap/mgmt/tribe/agreement"
"github.com/intelsdi-x/snap/mgmt/tribe/worker"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -78,6 +79,7 @@ type tribe struct {
taskStateResponses map[string]*taskStateQueryResponse
members map[string]*agreement.Member
tags map[string]string
EventManager *gomit.EventController
config *Config

pluginCatalog worker.ManagesPlugins
Expand Down Expand Up @@ -117,6 +119,7 @@ func New(cfg *Config) (*tribe, error) {
workerQuitChan: make(chan struct{}),
workerWaitGroup: &sync.WaitGroup{},
config: cfg,
EventManager: gomit.NewEventController(),
}

tribe.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down Expand Up @@ -504,6 +507,14 @@ func (t *tribe) AddPlugin(agreementName string, p agreement.Plugin) error {
UUID: uuid.New(),
Type: addPluginMsgType,
}
defer t.EventManager.Emit(&tribe_event.AddPluginEvent{
Agreement: struct{ Name string }{agreementName},
Plugin: struct {
Name string
Type core.PluginType
Version int
}{Name: p.Name(), Type: p.Type_, Version: p.Version_},
})
if t.handleAddPlugin(msg) {
t.broadcast(addPluginMsgType, msg, nil)
}
Expand Down

0 comments on commit 6288d66

Please sign in to comment.