Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(evpn-bridge): fix system behaviour for pending objects #391

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions pkg/infradb/subscriberframework/eventbus/event_bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package eventbus

import (
"log"
"sync"
"testing"
"time"
)

type ModulelciHandler struct {
receivedEvents []*ObjectData
sync.Mutex
}

func (h *ModulelciHandler) HandleEvent(eventType string, objectData *ObjectData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ci?
Will you export it to other modules? if no pls keep it lower case

h.Lock()
defer h.Unlock()
h.receivedEvents = append(h.receivedEvents, objectData)
switch eventType {
case "testEvent":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not create constants for those values? It would be much harder to make a mistake by a typo

case "testEventpriority":
case "testEventChBusy":
case "testEventUnsub":
log.Printf("received event type %s", eventType)
default:
log.Printf("error: Unknown event type %s", eventType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fail here if any unsupported type received?

}
}

func TestSubscribeAndPublish(t *testing.T) {
handler := &ModulelciHandler{}

EventBus := NewEventBus()
EventBus.StartSubscriber("testModule", "testEvent", 1, handler)

time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to sleep here? To make sure we can Publish? But then it looks strange. Subscribed returned success, but we cannot use it properly


objectData := &ObjectData{
ResourceVersion: "v1",
Name: "testObject",
NotificationID: "123",
}

subscribers := EventBus.GetSubscribers("testEvent")
if len(subscribers) == 0 {
t.Errorf("No subscribers found for event type 'testEvent'")
}
subscriber := subscribers[0]

err := EventBus.Publish(objectData, subscriber)
if err != nil {
t.Errorf("Publish() failed with error: %v", err)
}

time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this sleep ut execution time can grow quickly. We might use more advanced approach here.
What if HandleEvent would push into another channel and in test we block on select to receive all the entries (or timeout). Then lock is probably not needed

handler.Lock()
if len(handler.receivedEvents) != 1 || handler.receivedEvents[0] != objectData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be split into 2 checks, otherwise it is hard to distinguish what went wrong

t.Errorf("Event was not received by the handler as expected")
}
handler.Unlock()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsubscribe?


func TestPriorityOrderWithStartSubscriber(t *testing.T) {
handler1 := &ModulelciHandler{}
handler2 := &ModulelciHandler{}

EventBus := NewEventBus()

EventBus.StartSubscriber("testModule1", "testEventpriority", 2, handler1)
EventBus.StartSubscriber("testModule2", "testEventpriority", 1, handler2)

time.Sleep(100 * time.Millisecond)

subscribers := EventBus.GetSubscribers("testEventpriority")
if len(subscribers) != 2 {
t.Errorf("Expected 2 subscribers, got %d", len(subscribers))
}
if subscribers[0].Priority > subscribers[1].Priority {
t.Errorf("Subscribers are not sorted by priority")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we register handler with priority 2 after a handler with prio 1?

}

for _, sub := range subscribers {
EventBus.Unsubscribe(sub)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will defer suit better here?

}
}
func TestPublishChannelBusyWithStartSubscriber(t *testing.T) {
handler := &ModulelciHandler{}
EventBus := NewEventBus()
EventBus.StartSubscriber("testModuleChBusy", "testEventChBusy", 1, handler)

time.Sleep(100 * time.Millisecond)

subscribers := EventBus.GetSubscribers("testEventChBusy")
if len(subscribers) == 0 {
t.Errorf("No subscribers found for event type 'testEventChBusy'")
}
subscriber := subscribers[0]

subscriber.Ch <- &ObjectData{}

objectData := &ObjectData{
ResourceVersion: "v1",
Name: "testObject",
NotificationID: "123",
}
err := EventBus.Publish(objectData, subscriber)
if err == nil {
t.Errorf("Expected an error when publishing to a busy channel, but got nil")
}

EventBus.Unsubscribe(subscriber)
}
func TestUnsubscribeEvent(t *testing.T) {
handler := &ModulelciHandler{}
EventBus := NewEventBus()
EventBus.StartSubscriber("testModuleUnsub", "testEventUnsub", 1, handler)

time.Sleep(100 * time.Millisecond)

subscribers := EventBus.GetSubscribers("testEventUnsub")
if len(subscribers) == 0 {
t.Errorf("No subscribers found for event type 'testEventUnsub'")
}
subscriber := subscribers[0]

EventBus.UnsubscribeEvent(subscriber, "testEventUnsub")

time.Sleep(200 * time.Millisecond)

subscribers = EventBus.GetSubscribers("testEventUnsub")
for _, sub := range subscribers {
if sub == subscriber {
t.Errorf("Subscriber was not successfully unsubscribed")
}
}
}

func TestUnsubscribe(t *testing.T) {
handler := &ModulelciHandler{}
EventBus := NewEventBus()
EventBus.StartSubscriber("testModuleUnsub", "testEventUnsub", 1, handler)

time.Sleep(100 * time.Millisecond)

subscribers := EventBus.GetSubscribers("testEventUnsub")
if len(subscribers) == 0 {
t.Errorf("No subscribers found for event type 'testEventUnsub'")
}
subscriber := subscribers[0]

EventBus.Unsubscribe(subscriber)
time.Sleep(200 * time.Millisecond)

select {
case _, ok := <-subscriber.Ch:
if ok {
t.Errorf("Subscriber's channel should be closed, but it's not")
}
default:
}
}

func TestSubscriberExist(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AlreadyExist

handler := &ModulelciHandler{}
EventBus := NewEventBus()
EventBus.StartSubscriber("testModuleSubExist", "testEventSubExist", 3, handler)

time.Sleep(100 * time.Millisecond)

exists := EventBus.subscriberExist("testEventSubExist", "testModuleSubExist")
if !exists {
t.Errorf("subscriberExist should return true for existing subscriber")
}

subscribers := EventBus.GetSubscribers("testEventSubExist")
for _, sub := range subscribers {
if sub.Name == "testModuleSubExist" {
EventBus.Unsubscribe(sub)
}
}
}
18 changes: 12 additions & 6 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package eventbus

import (
"fmt"
"log"
"sort"
"sync"
Expand All @@ -19,7 +20,6 @@ type EventBus struct {
subscribers map[string][]*Subscriber
eventHandlers map[string]EventHandler
subscriberL sync.RWMutex
publishL sync.RWMutex
mutex sync.RWMutex
}

Expand Down Expand Up @@ -89,7 +89,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa

subscriber := &Subscriber{
Name: moduleName,
Ch: make(chan interface{}, 1),
Ch: make(chan interface{}),
Quit: make(chan bool),
Priority: priority,
}
Expand Down Expand Up @@ -128,10 +128,16 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
e.publishL.RLock()
defer e.publishL.RUnlock()
subscriber.Ch <- objectData
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {
var err error

select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
err = fmt.Errorf("channel for subscriber %s is busy", subscriber.Name)
}
return err
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
Expand Down
74 changes: 57 additions & 17 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Task struct {
objectType string
resourceVersion string
subIndex int
retryTimer time.Duration
subs []*eventbus.Subscriber
// systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus
systemTimer time.Duration
subs []*eventbus.Subscriber
}

// TaskStatus holds info related to the status that has been received
Expand Down Expand Up @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib
objectType: objectType,
resourceVersion: resourceVersion,
subIndex: 0,
systemTimer: 1 * time.Second,
subs: subs,
}
}
Expand Down Expand Up @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs
// StatusUpdated creates a task status and sends it for handling
func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) {
taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component)

// Do we need to make this call happen in a goroutine in order to not make the
// StatusUpdated function stuck in case that nobody reads what is written on the channel ?
// Is there any case where this can happen
// (nobody reads what is written on the channel and the StatusUpdated gets stuck) ?
t.taskStatusChan <- taskStatus
log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus)
log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus)

// We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel.
// e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile
// the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet
// as the timer has not expired and the queue is empty (We assume that there is only one task in the queue).
select {
case t.taskStatusChan <- taskStatus:
log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus)
default:
log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus)
}
}

// processTasks processes the task
Expand All @@ -111,6 +118,9 @@ func (t *TaskManager) processTasks() {
task := t.taskQueue.Dequeue()
log.Printf("processTasks(): Task has been dequeued for processing: %+v\n", task)

// A new sub-list of the initial subscribers list will be generated based on the value of the subIndex.
// This sub-list can be equal to the initial list (subIndex is equal to zero) or smaller than the initial
// list (subIndex greater than zero) in case a requeue event occurred.
subsToIterate := task.subs[task.subIndex:]
loopTwo:
for i, sub := range subsToIterate {
Expand All @@ -123,7 +133,21 @@ func (t *TaskManager) processTasks() {
// (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed)
NotificationID: uuid.NewString(),
}
eventbus.EBus.Publish(objectData, sub)
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Failed to sent notification: %+v\n", err)
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are at subscriber with index i at the moment
if you use += here and you already have subIndex as non-zero, won't you end up with a wrong index to start with next time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this will not happen as the next time we will take a sublist of the subscribers based on the subIndex. Then we will iterate on that sublist and that means that the i will start from 0 again.

check here: https://github.com/opiproject/opi-evpn-bridge/blob/main/pkg/infradb/taskmanager/taskmanager.go#L114

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds a bit error-prone
What if we, instead of index calculations, copy the rest of subscribers into a task, so it could continue where it stopped?
Any other means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I like the solution as it is today. It looks cleaner to me. It is tested and it works so nice so I would not like to change it if that is ok with you. I can put a comment if you want so people can understand what this index calculation is all about. I do not like so much to be honest to keep sublists all the time for the remaining subscribers to me that is a bit more error prone. Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining

Copy link
Contributor

@artek-koltun artek-koltun Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it is not. I already misinterpreted what is happening here. We could create an issue and discuss it there. I don't know if we can add a Subscriber in the middle and if we need to regard that a new one and other details

I can put a comment if you want so people can understand what this index calculation is all about.

Please do. I am also wondering if it could be a single place like a function to "re-queue task"

Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining

Please create an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

#393

task.systemTimer *= 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • maybe the name should be smth like waitForRetry?
  • Will we stop increasing timer if no one is listening for the published task on another end?

Copy link
Contributor Author

@mardim91 mardim91 Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I would like to keep it as a system timer as it is more explicit if you compare it also with the Component.Timer which is provided by each component in case of Error :
    time.AfterFunc(taskStatus.component.Timer, func() {
  • The timer will increase every time that we try to publish but the channel is busy and we need to reque the task. So it is working this way:
  1. We publish
  2. An error is returned because the channel is busy
  3. We increase the timer
  4. We wait for the timer to expire in a goroutine and then we requeue the task

Copy link
Contributor

@artek-koltun artek-koltun Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow you described is clear. I want to know if we ever stop increasing it. Will it make sense to wait for hours? days? years?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we never stop increasing it. But this is a known issue that we have just not addressed do far. We are planning to address this in the future. I can open an issue to track it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls create an issue

Copy link
Contributor Author

@mardim91 mardim91 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
#394

log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
}
log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData)

loopThree:
Expand All @@ -143,11 +167,17 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID)

// We need a timeout in case that the subscriber doesn't update the status at all for whatever reason.
// If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer
// If that occurs then we just requeue the task with a timer
case <-time.After(30 * time.Second):
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus)
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
go t.taskQueue.Enqueue(task)
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if we do not receive an answer within reasonable time, does it mean that we have a problem and probably no one is handling our requests? and the best we can do is report an error and gracefully shutdown?

  2. If we block for 30 seconds in this thread, can we handle other task responses? If we are blocked for 30 seconds, doesn't it mean that what you actually want is to wait for a response in another goroutine which could continue handling when the response is received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is not built for that. What we want here is that when we do not receive any answer because for whatever reason the subsciber is stuck we would like to requeue the task and try again because maybe the unresponsivness of the subscriber is temporary. Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.
  2. We will not have multiple subscribers sending multiple responses at the same time. The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber. You will not hve a case where we send in parallel multipe tasks to multiple subscribers and the subscribers will send in the same time multiple respnses back in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.

Issue is needed

The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber.

Sounds like a sequential flow. Do we need channels then? Pls consider. It would make all the things much simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think channels and notifications make the whole system scale better. The implementation that we have so far it just uses one task queue and one process that drains the queue. Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications. So I would prefer keeping it this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications.

But we didn't hit, and we don't know if we really need it in the future, but already complicating the design.
If we need to scale, there are likely better approaches than this sequential one.
If we don't need to scale, then we are complicating the system for nothing at the moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I didn't get your question correct at the beggining. I apologize for this

The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications. The plugins are responsible to configuring the underline system of their responsibility (e.g. FRRModule for FRR, LinuxModule for Linux etc...). The core system doesn't need to know what are those plugins or what they exactly do but the only thing that they need to know is towards which subscribers they need to send notifications. The channels help in this agnostic notion as well as to the communication between the different go routines as each subscriber runs as a different go routine.

Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one. Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.

This is a significant design choice which has been presented to the TSC before the implementation and we have decided that was resonable to move forward with it . Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.

Copy link
Contributor

@artek-koltun artek-koltun Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications.

The same level of agnosticism can be achieved with a function calling. The channels do not add more agnosticism here.
By the definition: Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.

as well as to the communication between the different go routines as each subscriber runs as a different go routine.

They can be run in different routines, even if we call them by a function (you can use mutex to sync or send to channel or execute right in the same thread context)

Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one.

We send a msg, and go to sleep waiting for a plugin to complete its job transforming our flow into sequential

Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.

What if we do not register General Linux Module at all so it won't receive notifications at all?

Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.

It might be. But from the chunk I see, it looks like you need a sequential flow and channels add complexity.

Apparently this PR is not the place to make such decisions, but pls consider

})
break loopThree
}
}
Expand All @@ -159,19 +189,29 @@ func (t *TaskManager) processTasks() {
break loopTwo
}

// We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded
task.systemTimer = 1 * time.Second

switch taskStatus.component.CompStatus {
case common.ComponentStatusSuccess:
log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task)
continue loopTwo
default:
case common.ComponentStatusError:
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason. The increasing
// of the subIndex value will be always correct as after the requeue of the task we generate and iterate on a new sub-list
// of the remaining subscribers which is equal or smaller than the initial subscribers list.
task.subIndex += i
task.retryTimer = taskStatus.component.Timer
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer)
time.AfterFunc(task.retryTimer, func() {
time.AfterFunc(taskStatus.component.Timer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo
default:
artek-koltun marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task)
log.Printf("processTasks(): The task %+v will be dropped\n", task)
break loopTwo
}
}
}
Expand Down
Loading
Loading