Skip to content

Commit

Permalink
Merge pull request #328 from threefoldtech/development_proxy_enhanceI…
Browse files Browse the repository at this point in the history
…ndexer

enhance gpu node indexer
  • Loading branch information
Omarabdul3ziz authored Oct 26, 2023
2 parents 2aee70e + 8606786 commit 2efb640
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
27 changes: 22 additions & 5 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,37 @@ const (
END;
RETURN v_dec_value;
END;
$$ LANGUAGE plpgsql;`
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify_nodes_count_changed() RETURNS trigger AS $notify_nodes_count_changed$
BEGIN
PERFORM pg_notify('node_added', NEW.twin_id);
RETURN NULL;
END;
$notify_nodes_count_changed$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER node_added
AFTER INSERT ON node
FOR EACH ROW EXECUTE PROCEDURE notify_nodes_count_changed();
`
)

// PostgresDatabase postgres db client
type PostgresDatabase struct {
gormDB *gorm.DB
gormDB *gorm.DB
connString string
}

func (d *PostgresDatabase) GetConnectionString() string {
return d.connString
}

// NewPostgresDatabase returns a new postgres db client
func NewPostgresDatabase(host string, port int, user, password, dbname string, maxConns int) (Database, error) {
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
connString := fmt.Sprintf("host=%s port=%d user=%s "+
"password=%s dbname=%s sslmode=disable",
host, port, user, password, dbname)
gormDB, err := gorm.Open(postgres.Open(psqlInfo), &gorm.Config{})
gormDB, err := gorm.Open(postgres.Open(connString), &gorm.Config{})
if err != nil {
return nil, errors.Wrap(err, "failed to create orm wrapper around db")
}
Expand All @@ -131,7 +148,7 @@ func NewPostgresDatabase(host string, port int, user, password, dbname string, m
return nil, errors.Wrap(err, "failed to auto migrate DB")
}

res := PostgresDatabase{gormDB}
res := PostgresDatabase{gormDB, connString}
if err := res.initialize(); err != nil {
return nil, errors.Wrap(err, "failed to setup tables")
}
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Database interface {
GetContract(ctx context.Context, contractID uint32) (DBContract, error)
GetContractBills(ctx context.Context, contractID uint32, limit types.Limit) ([]ContractBilling, uint, error)
UpsertNodesGPU(ctx context.Context, nodesGPU []types.NodeGPU) error
GetConnectionString() string
}

type ContractBilling types.ContractBilling
Expand Down
67 changes: 66 additions & 1 deletion grid-proxy/internal/gpuindexer/gpuindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"time"

"github.com/google/uuid"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
Expand All @@ -17,7 +19,12 @@ import (
rmbTypes "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/direct/types"
)

const lingerBatch = 10 * time.Second
const (
resultsBatcherCleanupInterval = 10 * time.Second
minListenerReconnectInterval = 10 * time.Second
dbNodeAddedNotificationChannel = "node_added"
lingerBatch = 10 * time.Second
)

type NodeGPUIndexer struct {
db db.Database
Expand All @@ -26,6 +33,7 @@ type NodeGPUIndexer struct {
batchSize int
nodesGPUResultsChan chan []types.NodeGPU
nodesGPUBatchesChan chan []types.NodeGPU
nodeAddedChan chan uint32
nodesGPUResultsWorkers int
nodesGPUBufferWorkers int
}
Expand All @@ -44,6 +52,7 @@ func NewNodeGPUIndexer(
db: db,
nodesGPUResultsChan: make(chan []types.NodeGPU),
nodesGPUBatchesChan: make(chan []types.NodeGPU),
nodeAddedChan: make(chan uint32),
checkInterval: time.Duration(indexerCheckIntervalMins) * time.Minute,
batchSize: batchSize,
nodesGPUResultsWorkers: nodesGPUResultsWorkers,
Expand All @@ -60,13 +69,67 @@ func NewNodeGPUIndexer(
return indexer, nil
}

// startDBListener sets up a PostgreSQL listener to listen for changes in the database and triggers the nodesChangeChan channel.
func (n *NodeGPUIndexer) startDBListener(ctx context.Context) {
listener := pq.NewListener(n.db.GetConnectionString(), minListenerReconnectInterval, 6*minListenerReconnectInterval, func(ev pq.ListenerEventType, err error) {
if err != nil {
log.Error().Err(err).Msg("failed listening to DB changes")
}
})
defer listener.Close()

err := listener.Listen(dbNodeAddedNotificationChannel)
if err != nil {
log.Error().Err(err).Msg("failed to listen to DB changes")
return
}

for {
select {
case notification, ok := <-listener.Notify:
if !ok {
log.Error().Msg("DB listener channel closed")
return
}
if notification == nil {
log.Error().Msg("received nil notification from DB listener")
continue
}

payload := notification.Extra
twinId, err := strconv.ParseUint(payload, 10, 64)
if err != nil {
log.Error().Err(err).Msgf("failed to parse twin id %v", payload)
continue
}

log.Debug().Msgf("Received data from channel [%v]: twin_id: %v", notification.Channel, payload)

n.nodeAddedChan <- uint32(twinId)
case <-ctx.Done():
log.Error().Err(ctx.Err()).Msg("context canceled")
return
}
}
}

func (n *NodeGPUIndexer) getGPUInfo(ctx context.Context, twinId uint32) {
id := uuid.NewString()
err := n.relayClient.Call(ctx, id, twinId, "zos.gpu.list", nil)
if err != nil {
log.Error().Err(err).Msgf("failed to send get GPU info request from relay in GPU indexer for node with twin %d", twinId)
}
}

func (n *NodeGPUIndexer) queryGridNodes(ctx context.Context) {
ticker := time.NewTicker(n.checkInterval)
n.runQueryGridNodes(ctx)
for {
select {
case <-ticker.C:
n.runQueryGridNodes(ctx)
case addedNodeTwinId := <-n.nodeAddedChan:
n.getGPUInfo(ctx, addedNodeTwinId)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -178,6 +241,8 @@ func (n *NodeGPUIndexer) Start(ctx context.Context) {
go n.gpuBatchesDBUpserter(ctx)
}

go n.startDBListener(ctx)

go n.queryGridNodes(ctx)

log.Info().Msg("GPU indexer started")
Expand Down

0 comments on commit 2efb640

Please sign in to comment.