diff --git a/pkg/nodes/nodes_consumer.go b/pkg/nodes/nodes_consumer.go index f9682e9..24af937 100644 --- a/pkg/nodes/nodes_consumer.go +++ b/pkg/nodes/nodes_consumer.go @@ -15,9 +15,10 @@ import ( ) const ( - prefetchLimit = 10 // prefetchLimit - pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue. - consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer. + consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer. + prefetchLimit = 10 // prefetchLimit + pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue. + timeoutDurationConsumer = 60 * time.Second // timeoutDurationConsumer timeout for the consumer. ) // ConsumerInit initialize the process to check the queues in Redis. @@ -27,7 +28,7 @@ func ConsumerInit(queueName string) { red := redis.InitRedisConfig() // Create a new context with a timeout - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationConsumer) // Make sure to call the cancel function to release resources when you're done defer cancel() @@ -55,8 +56,9 @@ func ConsumerInit(queueName string) { _, err = queue.AddConsumerFunc(consumerName, func(delivery rmq.Delivery) { log.Info("Performing task: ", delivery.Payload()) peer := config.Peer{ - NodeName: delivery.Payload(), - NodeType: "da", + NodeName: delivery.Payload(), + NodeType: "da", + ContainerName: "da", } // here we wil send the node to generate the id diff --git a/pkg/nodes/queue.go b/pkg/nodes/queue.go index 7d4fee5..8327eeb 100644 --- a/pkg/nodes/queue.go +++ b/pkg/nodes/queue.go @@ -11,91 +11,76 @@ import ( "github.com/celestiaorg/torch/pkg/metrics" ) -const ( - MaxRetryCount = 5 // MaxRetryCount number of retries per node. - TickerTime = 5 * time.Second // TickerTime time specified to make a signal. -) - var ( - taskQueue = make(chan config.Peer) // taskQueue channel for pending tasks (peers to process later). + taskQueue = make(chan config.Peer) // taskQueue channel for pending tasks (peers to process later). + MaxRetryCount = 5 // MaxRetryCount number of retries per node. + TickerTime = 5 * time.Second // TickerTime time specified to make a signal. + timeoutDurationProcessQueue = 10 * time.Second // timeoutDurationProcessQueue time specified to make a signal. ) // ProcessTaskQueue processes the pending tasks in the queue the time specified in the const TickerTime. -func ProcessTaskQueue(ctx context.Context) { +func ProcessTaskQueue() { ticker := time.NewTicker(TickerTime) for { select { - case <-ctx.Done(): - // The context has been canceled, exit the loop. - return case <-ticker.C: - processQueue(ctx) + processQueue() } } } // processQueue process the nodes in the queue and tries to generate the Multi Address -func processQueue(ctx context.Context) { +func processQueue() { red := redis.InitRedisConfig() + // Create a new context with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeoutDurationProcessQueue) + + // Make sure to call the cancel function to release resources when you're done + defer cancel() for { select { case <-ctx.Done(): - // The context has been canceled, exit the loop. + log.Error("processQueue - The context has been canceled, exit the loop.") return case peer := <-taskQueue: - // Perform the operation with the node + // TODO: + // errors should be returned back and go routines needs to be in errGroup instead of pure go err := CheckNodesInDBOrCreateThem(peer, red, ctx) if err != nil { log.Error("Error checking the nodes: CheckNodesInDBOrCreateThem - ", err) } + + default: + return } } } -// CheckNodesInDBOrCreateThem attempts to find the node in the DB; if the node is not in the DB, it attempts to create it. +// CheckNodesInDBOrCreateThem try to find the node in the DB, if the node is not in the DB, it tries to create it. func CheckNodesInDBOrCreateThem(peer config.Peer, red *redis.RedisClient, ctx context.Context) error { log.Info("Processing Node in the queue: ", "[", peer.NodeName, "]") - // Check if the node is in the DB + // check if the node is in the DB ma, err := redis.CheckIfNodeExistsInDB(red, ctx, peer.NodeName) if err != nil { - log.Error("Error CheckIfNodeExistsInDB for node: [", peer.NodeName, "]", err) + log.Error("Error CheckIfNodeExistsInDB for node: [", peer.NodeName, "]: ", err) return err } - // If the node doesn't exist in the DB, attempt to generate it in a goroutine + + // if the node doesn't exist in the DB, let's try to create it if ma == "" { log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, let's try to generate it") - - // Create a channel for errors - errCh := make(chan error) - - // Start a goroutine for GenerateNodeIdAndSaveIt - go func() { - defer close(errCh) - var generateErr error - ma, generateErr = GenerateNodeIdAndSaveIt(peer, peer.NodeName, red, ctx) - if generateErr != nil { - errCh <- generateErr - } - }() - - // Wait for the goroutine to finish and check for errors - select { - case generateErr := <-errCh: - if generateErr != nil { - log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", generateErr) - return generateErr - } - case <-ctx.Done(): - // Context canceled, return an error or handle it as needed - return ctx.Err() + ma, err = GenerateNodeIdAndSaveIt(peer, peer.NodeName, red, ctx) + if err != nil { + log.Error("Error GenerateNodeIdAndSaveIt for full-node: [", peer.NodeName, "]", err) } + return err } - // Check if the multi-address is empty after attempting to generate it + // check if the multi address is empty after trying to generate it if ma == "" { - // Check if the node is still within the maximum number of retries + // check if the node is still under the maximum number of retries if peer.RetryCount < MaxRetryCount { log.Info("Node ", "["+peer.NodeName+"]"+" NOT found in DB, adding it to the queue, attempt: ", "[", peer.RetryCount, "]") peer.RetryCount++ // increment the counter