Skip to content

Commit

Permalink
feat(torch): node consumer, it will check if the node is in the db, o…
Browse files Browse the repository at this point in the history
…therwise it will generate it

Signed-off-by: Jose Ramon Mañes <jose@celestia.org>
  • Loading branch information
tty47 committed Oct 26, 2023
1 parent 47f6d0c commit 5b5cdb3
Showing 1 changed file with 106 additions and 0 deletions.
106 changes: 106 additions & 0 deletions pkg/nodes/nodes_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package nodes

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/adjust/rmq/v5"
log "github.com/sirupsen/logrus"

"github.com/celestiaorg/torch/config"
"github.com/celestiaorg/torch/pkg/db/redis"
)

const (
prefetchLimit = 10 // prefetchLimit
pollDuration = 10 * time.Second // pollDuration how often is Torch going to pull data from the queue.
consumerName = "torch-consumer" // consumerName name used in the tag to identify the consumer.
)

// ConsumerInit initialize the process to check the queues in Redis.
func ConsumerInit(queueName string) {
errChan := make(chan error, 10)
go logErrors(errChan)

red := redis.InitRedisConfig()
// Create a new context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)

// Make sure to call the cancel function to release resources when you're done
defer cancel()

connection, err := rmq.OpenConnection(
"consumer",
"tcp",
redis.GetRedisFullURL(),
2,
errChan,
)
if err != nil {
log.Error("Error: ", err)
}

queue, err := connection.OpenQueue(queueName)
if err != nil {
log.Error("Error: ", err)
}

if err := queue.StartConsuming(prefetchLimit, pollDuration); err != nil {
log.Error("Error: ", err)
}

_, err = queue.AddConsumerFunc(consumerName, func(delivery rmq.Delivery) {
log.Info("Performing task: ", delivery.Payload())
peer := config.Peer{
NodeName: delivery.Payload(),
NodeType: "da",
}

// here we wil send the node to generate the id
err := CheckNodesInDBOrCreateThem(peer, red, ctx)
if err != nil {
log.Error("Error checking the nodes: CheckNodesInDBOrCreateThem - ", err)
}

if err := delivery.Ack(); err != nil {
log.Error("Error: ", err)
}
})
if err != nil {
log.Error("Error: ", err)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
defer signal.Stop(signals)

<-signals // wait for signal
go func() {
<-signals // hard exit on second signal (in case shutdown gets stuck)
os.Exit(1)
}()

<-connection.StopAllConsuming() // wait for all Consume() calls to finish
}

func logErrors(errChan <-chan error) {
for err := range errChan {
switch err := err.(type) {
case *rmq.HeartbeatError:
if err.Count == rmq.HeartbeatErrorLimit {
log.Print("heartbeat error (limit): ", err)
} else {
log.Print("heartbeat error: ", err)
}
case *rmq.ConsumeError:
log.Print("consume error: ", err)
case *rmq.DeliveryError:
log.Print("delivery error: ", err.Delivery, err)
default:
log.Print("other error: ", err)
}
}
}

0 comments on commit 5b5cdb3

Please sign in to comment.