Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance gpu node indexer #328

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,37 @@ const (
END;
RETURN v_dec_value;
END;
$$ LANGUAGE plpgsql;`
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION notify_nodes_count_changed() RETURNS trigger AS $notify_nodes_count_changed$
BEGIN
PERFORM pg_notify('node_added', NEW.twin_id);
RETURN NULL;
END;
$notify_nodes_count_changed$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER node_added
AFTER INSERT ON node
FOR EACH ROW EXECUTE PROCEDURE notify_nodes_count_changed();
`
)

// PostgresDatabase postgres db client
type PostgresDatabase struct {
gormDB *gorm.DB
gormDB *gorm.DB
connString string
}

func (d *PostgresDatabase) GetConnectionString() string {
return d.connString
}

// NewPostgresDatabase returns a new postgres db client
func NewPostgresDatabase(host string, port int, user, password, dbname string, maxConns int) (Database, error) {
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
connString := fmt.Sprintf("host=%s port=%d user=%s "+
"password=%s dbname=%s sslmode=disable",
host, port, user, password, dbname)
gormDB, err := gorm.Open(postgres.Open(psqlInfo), &gorm.Config{})
gormDB, err := gorm.Open(postgres.Open(connString), &gorm.Config{})
if err != nil {
return nil, errors.Wrap(err, "failed to create orm wrapper around db")
}
Expand All @@ -131,7 +148,7 @@ func NewPostgresDatabase(host string, port int, user, password, dbname string, m
return nil, errors.Wrap(err, "failed to auto migrate DB")
}

res := PostgresDatabase{gormDB}
res := PostgresDatabase{gormDB, connString}
if err := res.initialize(); err != nil {
return nil, errors.Wrap(err, "failed to setup tables")
}
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Database interface {
GetContract(ctx context.Context, contractID uint32) (DBContract, error)
GetContractBills(ctx context.Context, contractID uint32, limit types.Limit) ([]ContractBilling, uint, error)
UpsertNodesGPU(ctx context.Context, nodesGPU []types.NodeGPU) error
GetConnectionString() string
}

type ContractBilling types.ContractBilling
Expand Down
67 changes: 66 additions & 1 deletion grid-proxy/internal/gpuindexer/gpuindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"time"

"github.com/google/uuid"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
Expand All @@ -17,7 +19,12 @@ import (
rmbTypes "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/direct/types"
)

const lingerBatch = 10 * time.Second
const (
resultsBatcherCleanupInterval = 10 * time.Second
minListenerReconnectInterval = 10 * time.Second
dbNodeAddedNotificationChannel = "node_added"
lingerBatch = 10 * time.Second
)

type NodeGPUIndexer struct {
db db.Database
Expand All @@ -26,6 +33,7 @@ type NodeGPUIndexer struct {
batchSize int
nodesGPUResultsChan chan []types.NodeGPU
nodesGPUBatchesChan chan []types.NodeGPU
nodeAddedChan chan uint32
nodesGPUResultsWorkers int
nodesGPUBufferWorkers int
}
Expand All @@ -44,6 +52,7 @@ func NewNodeGPUIndexer(
db: db,
nodesGPUResultsChan: make(chan []types.NodeGPU),
nodesGPUBatchesChan: make(chan []types.NodeGPU),
nodeAddedChan: make(chan uint32),
checkInterval: time.Duration(indexerCheckIntervalMins) * time.Minute,
batchSize: batchSize,
nodesGPUResultsWorkers: nodesGPUResultsWorkers,
Expand All @@ -60,13 +69,67 @@ func NewNodeGPUIndexer(
return indexer, nil
}

// startDBListener sets up a PostgreSQL listener to listen for changes in the database and triggers the nodesChangeChan channel.
func (n *NodeGPUIndexer) startDBListener(ctx context.Context) {
listener := pq.NewListener(n.db.GetConnectionString(), minListenerReconnectInterval, 6*minListenerReconnectInterval, func(ev pq.ListenerEventType, err error) {
if err != nil {
log.Error().Err(err).Msg("failed listening to DB changes")
}
})
defer listener.Close()

err := listener.Listen(dbNodeAddedNotificationChannel)
if err != nil {
log.Error().Err(err).Msg("failed to listen to DB changes")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this ever happens, this will fail and will never recover, that startDBListener should always be running, if you exit this routine silently, what do we do? we fall back completely to the 1hr scheduler? is that the intended behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this exactly what will happen

}

for {
select {
case notification, ok := <-listener.Notify:
if !ok {
log.Error().Msg("DB listener channel closed")
return
}
if notification == nil {
log.Error().Msg("received nil notification from DB listener")
continue
}

payload := notification.Extra
twinId, err := strconv.ParseUint(payload, 10, 64)
if err != nil {
log.Error().Err(err).Msgf("failed to parse twin id %v", payload)
continue
}

log.Debug().Msgf("Received data from channel [%v]: twin_id: %v", notification.Channel, payload)

n.nodeAddedChan <- uint32(twinId)
case <-ctx.Done():
log.Error().Err(ctx.Err()).Msg("context canceled")
return
}
}
}

func (n *NodeGPUIndexer) getGPUInfo(ctx context.Context, twinId uint32) {
id := uuid.NewString()
err := n.relayClient.Call(ctx, id, twinId, "zos.gpu.list", nil)
if err != nil {
log.Error().Err(err).Msgf("failed to send get GPU info request from relay in GPU indexer for node with twin %d", twinId)
}
}

func (n *NodeGPUIndexer) queryGridNodes(ctx context.Context) {
ticker := time.NewTicker(n.checkInterval)
n.runQueryGridNodes(ctx)
for {
select {
case <-ticker.C:
n.runQueryGridNodes(ctx)
case addedNodeTwinId := <-n.nodeAddedChan:
n.getGPUInfo(ctx, addedNodeTwinId)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -178,6 +241,8 @@ func (n *NodeGPUIndexer) Start(ctx context.Context) {
go n.gpuBatchesDBUpserter(ctx)
}

go n.startDBListener(ctx)

go n.queryGridNodes(ctx)

log.Info().Msg("GPU indexer started")
Expand Down
Loading