Skip to content

Commit

Permalink
fix(evpn-bridge): first review
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitrios Markou <dimitrios.markou@ericsson.com>
  • Loading branch information
mardim91 committed Jul 17, 2024
1 parent 85bde59 commit 1a5cb19
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
10 changes: 4 additions & 6 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {
e.publishL.RLock()
defer e.publishL.RUnlock()
e.publishL.Lock()
defer e.publishL.Unlock()
var err error
// We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel
// and the Publish function will stuck. So the default case serves exctly this purpose.

select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name)
err = fmt.Errorf("channel is busy")
err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name)
}
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (t *TaskManager) processTasks() {
NotificationID: uuid.NewString(),
}
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task)
log.Printf("processTasks(): Failed to sent notification: %+v\n", err)
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
Expand Down

0 comments on commit 1a5cb19

Please sign in to comment.