Skip to content

Commit

Permalink
feat(evpn-bridge): netlink watcher for configuration
Browse files Browse the repository at this point in the history
Co-authored-by: Vemula Venkatesh <venkatesh.vemula@intel.com>
Co-authored-by: Saikumar Banoth <banoth.saikumar@intel.com>
Co-authored-by: Jambekar Vishakha <vishakha.jambekar@intel.com>
Co-authored-by: Dimitrios Markou <dimitrios.markou@ericsson.com>
Signed-off-by: atulpatel261194 <Atul.Patel@intel.com>
  • Loading branch information
5 people authored and artek-koltun committed Aug 26, 2024
1 parent 5505b13 commit 46a655f
Show file tree
Hide file tree
Showing 2 changed files with 2,062 additions and 0 deletions.
71 changes: 71 additions & 0 deletions pkg/netlink/eventbus/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2022-2023 Intel Corporation, or its subsidiaries.

// Package eventbus handles pub sub
package eventbus

import (
"sync"
)

// EventBus holds the event bus info
type EventBus struct {
subscribers map[string][]*Subscriber
mutex sync.Mutex
}

// Subscriber holds the info for each subscriber
type Subscriber struct {
Ch chan interface{}
Quit chan struct{}
}

// NewEventBus initializes an EventBus object
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]*Subscriber),
}
}

// Subscribe api provides registration of a subscriber to the given eventType
func (e *EventBus) Subscribe(eventType string) *Subscriber {
e.mutex.Lock()
defer e.mutex.Unlock()

subscriber := &Subscriber{
Ch: make(chan interface{}),
Quit: make(chan struct{}),
}

e.subscribers[eventType] = append(e.subscribers[eventType], subscriber)

return subscriber
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(eventType string, data interface{}) {
e.mutex.Lock()
defer e.mutex.Unlock()

subscribers, ok := e.subscribers[eventType]
if !ok {
return
}

for _, sub := range subscribers {
sub.Ch <- data
}
}

// Unsubscribe closes all subscriber channels and empties the subscriber map.
func (e *EventBus) Unsubscribe() {
e.mutex.Lock()
defer e.mutex.Unlock()

for eventName, subs := range e.subscribers {
for _, sub := range subs {
close(sub.Ch) // Close each channel
}
delete(e.subscribers, eventName) // Remove the entry from the map
}
}
Loading

0 comments on commit 46a655f

Please sign in to comment.