-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance gpu node indexer #328
Conversation
- use postgres NOTIFY/LISTEN and TRIGGER to requery the nodes each time node table count changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, the SQL part I am not sure about it but other changes are OK.
payload := notification.Extra | ||
twinId, err := strconv.ParseInt(payload, 10, 64) | ||
if err != nil { | ||
log.Error().Err(err).Msgf("failed to parse twin id %v", twinId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
twinId
should be payload
work is done here. converted to draft until it is needed |
$$ LANGUAGE plpgsql;` | ||
$$ LANGUAGE plpgsql; | ||
|
||
CREATE OR REPLACE FUNCTION notify_node_change() RETURNS trigger AS $notify_node_change$ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the function to notify_nodes_count_changed
// NewPostgresDatabase returns a new postgres db client | ||
func NewPostgresDatabase(host string, port int, user, password, dbname string) (Database, error) { | ||
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+ | ||
func GetConnectionString(host string, port int, user, password, dbname string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildConnectionString
} | ||
|
||
// NewPostgresDatabase returns a new postgres db client | ||
func NewPostgresDatabase(psqlInfo string) (Database, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connString, psqlInfo doesn't provide any information on the parameter
func NewPostgresDatabase(host string, port int, user, password, dbname string) (Database, error) { | ||
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+ | ||
func GetConnectionString(host string, port int, user, password, dbname string) string { | ||
return fmt.Sprintf("host=%s port=%d user=%s "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sslmode is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was always like this, should we enable it?
grid-proxy/cmds/proxy_server/main.go
Outdated
@@ -129,7 +131,7 @@ func main() { | |||
log.Fatal().Err(err).Msg("failed to create GPU indexer") | |||
} | |||
|
|||
indexer.Start(ctx) | |||
indexer.Start(ctx, dbConnStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the indexer, can already insert to the database, why is this passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ( | ||
resultsBatcherCleanupInterval = 10 * time.Second | ||
minListenerReconnectInterval = 10 * time.Second | ||
dbNotificationChannel = "node_added" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbNodeAddedNotificationChannel
@@ -26,6 +32,7 @@ type NodeGPUIndexer struct { | |||
batchSize int | |||
nodesGPUResultsChan chan []types.NodeGPU | |||
nodesGPUBatchesChan chan []types.NodeGPU | |||
nodesChangeChan chan int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodeAddedChan
@@ -150,7 +208,7 @@ func (n *NodeGPUIndexer) gpuNodeResultsBatcher(ctx context.Context) { | |||
} | |||
} | |||
|
|||
func (n *NodeGPUIndexer) Start(ctx context.Context) { | |||
func (n *NodeGPUIndexer) Start(ctx context.Context, connStr string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why you did it. however this isn't correct, this should be either inferred from n.db or defined as a method on the db struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
went with defining method on the db that will return the connStr
} | ||
} | ||
|
||
func (n *NodeGPUIndexer) getGPUInfo(ctx context.Context, twinId int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
twinId should uint32, and do the type conversion before passing to the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found it uint64
for the most of the types, so edited it to be uint64
instead
err := listener.Listen(dbNotificationChannel) | ||
if err != nil { | ||
log.Error().Err(err).Msg("failed to listen to DB changes") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this ever happens, this will fail and will never recover, that startDBListener should always be running, if you exit this routine silently, what do we do? we fall back completely to the 1hr scheduler? is that the intended behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this exactly what will happen
@@ -44,6 +51,7 @@ func NewNodeGPUIndexer( | |||
db: db, | |||
nodesGPUResultsChan: make(chan []types.NodeGPU), | |||
nodesGPUBatchesChan: make(chan []types.NodeGPU), | |||
nodesChangeChan: make(chan int64), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder about the type, why not uint32, given that's the twinID type?
- rename channel/trigger - add connStr to the db and its getter -
…-sdk-go into development_proxy_enhanceIndexer
Description
we kept the periodically running ticker to requery the nodes and upsert the GPUs info each hour.
also for newly added nodes a Postgres trigger is added on the node table which will notify the client with the added node twin id and then the client will ask the node with an RMB call if it has a GPU or not.
Changes
Related Issues
Checklist