Skip to content

Commit

Permalink
feat(torch): add queue to process the nodes when torch detects a even…
Browse files Browse the repository at this point in the history
…t from k8s

Signed-off-by: Jose Ramon Mañes <jose@celestia.org>
  • Loading branch information
tty47 committed Oct 26, 2023
1 parent 9011bdb commit 47f6d0c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 25 deletions.
59 changes: 59 additions & 0 deletions pkg/db/redis/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis

import (
"os"

log "github.com/sirupsen/logrus"
)

var (
redisHost = ""
redisPort = ""
redisPass = ""
redisFullUrl = ""
)

// InitRedisConfig checks env vars and add default values in case we need
func InitRedisConfig() *RedisClient {
// redis config
redisHost = GetRedisHost()
redisPort = GetRedisPort()
redisPass = GetRedisPass()
redisFullUrl = GetRedisFullURL()

log.Info("Redis host to connect: ", redisFullUrl)

return NewRedisClient(redisFullUrl, redisPass, 0)
}

// GetRedisHost returns the redis host to connect
func GetRedisHost() string {
redisHost = os.Getenv("REDIS_HOST")
if redisHost == "" {
redisHost = "localhost"
}
return redisHost
}

// GetRedisPort returns the redis port
func GetRedisPort() string {
redisPort = os.Getenv("REDIS_PORT")
if redisPort == "" {
redisPort = "6379"
}
return redisPort
}

// GetRedisFullURL returns the full url
func GetRedisFullURL() string {
return redisHost + ":" + redisPort
}

// GetRedisPass returns the redis pass
func GetRedisPass() string {
redisPass = os.Getenv("REDIS_PASS")
if redisPass == "" {
redisPass = ""
}
return redisPass
}
6 changes: 4 additions & 2 deletions pkg/db/redis/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
log "github.com/sirupsen/logrus"
)

// SaveNodeId stores the values in redis.
func SaveNodeId(
// SetNodeId stores the values in redis.
func SetNodeId(
podName string,
r *RedisClient,
ctx context.Context,
Expand All @@ -29,6 +29,8 @@ func SaveNodeId(
log.Error("Error adding the node to redis: ", err)
return err
}
} else {
log.Info("Node ", "["+podName+"]"+" found in Redis")
}

return nil
Expand Down
38 changes: 38 additions & 0 deletions pkg/db/redis/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis

import (
"github.com/adjust/rmq/v5"
log "github.com/sirupsen/logrus"
)

// Producer add data into the queue.
func Producer(data, queueName string) error {
log.Info("Adding STS [", data, "] node to the queue: [", queueName, "]")
data += "-0"
log.Info("Getting the pod from the STS [", data, "]")

connection, err := rmq.OpenConnection(
"producer",
"tcp",
GetRedisFullURL(),
2,
nil,
)
if err != nil {
log.Error("Error: ", err)
return err
}

queue, err := connection.OpenQueue(queueName)
if err != nil {
log.Error("Error: ", err)
return err
}

if err := queue.Publish(data); err != nil {
log.Error("Error, failed to publish: ", err)
return err
}

return nil
}
23 changes: 0 additions & 23 deletions pkg/db/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package redis

import (
"context"
"os"
"time"

"github.com/redis/go-redis/v9"
Expand All @@ -13,28 +12,6 @@ type RedisClient struct {
client *redis.Client
}

// InitRedisConfig checks env vars and add default values in case we need
func InitRedisConfig() *RedisClient {
// redis config
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
redisHost = "localhost"
}
redisPort := os.Getenv("REDIS_PORT")
if redisPort == "" {
redisPort = "6379"
}
redisHost = redisHost + ":" + redisPort
log.Info("Redis host to connect: ", redisHost)

redisPass := os.Getenv("REDIS_PASS")
if redisHost == "" {
redisPass = ""
}

return NewRedisClient(redisHost, redisPass, 0)
}

// NewRedisClient returns a Redis client connection
func NewRedisClient(addr string, password string, db int) *RedisClient {
client := redis.NewClient(&redis.Options{
Expand Down

0 comments on commit 47f6d0c

Please sign in to comment.