Skip to content

Commit

Permalink
Add UpdateTimestamp API
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Aug 8, 2024
1 parent 656664e commit b81c776
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,22 @@ func (c *correct) Start(ctx context.Context) (err error) {
return err
}
counts := detail.GetCounts()
agents := make([]string, 0, detail.GetLiveAgents())
for agent, count := range counts {
log.Infof("index info: addr(%s), stored(%d), uncommitted(%d), indexing=%t, saving=%t", agent, count.GetStored(), count.GetUncommitted(), count.GetIndexing(), count.GetSaving())
agents := make([]string, 0, len(counts))
for agent:= range counts {
agents = append(agents, agent)
}
slices.SortFunc(agents, func(left, right string) int {
return cmp.Compare(counts[left].GetStored(), counts[right].GetStored())
})

for _, agent := range agents {
count, ok := counts[agent]
if ok && count != nil {
log.Infof("index info: addr(%s), stored(%d), uncommitted(%d), indexing=%t, saving=%t", agent, count.GetStored(), count.GetUncommitted(), count.GetIndexing(), count.GetSaving())
}
}
log.Infof("sorted agents: %v,\tdiscovered agents: %v", agents, c.discoverer.GetAddrs(ctx))

errs := make([]error, 0, len(agents))

emptyReq := new(payload.Object_List_Request)
Expand Down Expand Up @@ -265,12 +272,12 @@ func (c *correct) Start(ctx context.Context) (err error) {
if len(replicas) <= 0 {
diff := c.indexReplica - 1
addrs := c.discoverer.GetAddrs(egctx)
if len(addrs) == 0 {
return errors.ErrNoAvailableAgentToInsert
}
// correct index replica shortage
if diff > 0 {
log.Infof("replica shortage(diff=%d) of vector id: %s detected from last %s. inserting to other agents = %v", diff, id, debugMsg, addrs)
if len(addrs) == 0 {
return errors.ErrNoAvailableAgentToInsert
}
log.Infof("replica shortage(configured: %d, stored: 1) of vector id: %s detected for %s. inserting to other agents = %v", c.indexReplica, id, debugMsg, addrs)
req := &payload.Insert_Request{
Vector: vec,
// TODO: this should be deleted after Config.Timestamp deprecation
Expand Down Expand Up @@ -550,7 +557,7 @@ func (c *correct) Start(ctx context.Context) (err error) {
diff := c.indexReplica - currentNumberOfIndexReplica
addrs := c.discoverer.GetAddrs(egctx)
if diff > 0 { // correct index replica shortage
log.Infof("replica shortage(diff=%d) of vector id: %s detected for %s. inserting to other agents = %v", diff, id, debugMsg, addrs)
log.Infof("replica shortage(configured: %d, stored: %d) of vector id: %s detected for %s. inserting to other agents = %v", c.indexReplica, diff, id, debugMsg, addrs)
if len(addrs) == 0 {
return errors.ErrNoAvailableAgentToInsert
}
Expand Down Expand Up @@ -635,7 +642,7 @@ func (c *correct) Start(ctx context.Context) (err error) {
}
}
} else if diff < 0 { // correct index replica oversupply
log.Infof("replica oversupply of vector %s. deleting...", id)
log.Infof("replica oversupply(configured: %d, stored: %d) of vector id: %s detected for %s. deleting from agents = %v", c.indexReplica, currentNumberOfIndexReplica, id, debugMsg, found)
if len(addrs) == 0 {
return errors.ErrNoAvailableAgentToRemove
}
Expand Down

0 comments on commit b81c776

Please sign in to comment.