Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Add .PutKVPairs() method to KVStore interface #1274

Merged
merged 4 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/kv/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ func (this *consulStore) GetKeyValue(key string) (value string, found bool, err
return string(pair.Value), (pair != nil), nil
}

func (this *consulStore) PutKVPairs(kvPairs []*KVPair) (err error) {
if this.client == nil {
return nil
}
for _, pair := range kvPairs {
if err := this.PutKeyValue(pair.Key, pair.Value); err != nil {
return err
}
}
return nil
}

func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
// This function is non re-entrant (it can only be running once at any point in time)
if atomic.CompareAndSwapInt64(&this.distributionReentry, 0, 1) {
Expand Down
9 changes: 9 additions & 0 deletions go/kv/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (this *internalKVStore) GetKeyValue(key string) (value string, found bool,
return value, found, log.Errore(err)
}

func (this *internalKVStore) PutKVPairs(kvPairs []*KVPair) (err error) {
for _, pair := range kvPairs {
if err := this.PutKeyValue(pair.Key, pair.Value); err != nil {
return err
}
}
return nil
}

func (this *internalKVStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
return nil
}
12 changes: 9 additions & 3 deletions go/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (this *KVPair) String() string {

type KVStore interface {
PutKeyValue(key string, value string) (err error)
PutKVPairs(kvPairs []*KVPair) (err error)
GetKeyValue(key string) (value string, found bool, err error)
DistributePairs(kvPairs [](*KVPair)) (err error)
}
Expand Down Expand Up @@ -84,11 +85,16 @@ func PutValue(key string, value string) (err error) {
return nil
}

func PutKVPair(kvPair *KVPair) (err error) {
if kvPair == nil {
func PutKVPairs(kvPairs []*KVPair) (err error) {
if len(kvPairs) < 1 {
return nil
}
return PutValue(kvPair.Key, kvPair.Value)
for _, store := range getKVStores() {
if err := store.PutKVPairs(kvPairs); err != nil {
return err
}
}
return nil
}

func DistributePairs(kvPairs [](*KVPair)) (err error) {
Expand Down
12 changes: 12 additions & 0 deletions go/kv/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (this *zkStore) GetKeyValue(key string) (value string, found bool, err erro
return string(result), true, nil
}

func (this *zkStore) PutKVPairs(kvPairs []*KVPair) (err error) {
if this.zook == nil {
return nil
}
for _, pair := range kvPairs {
if err := this.PutKeyValue(pair.Key, pair.Value); err != nil {
return err
}
}
return nil
}

func (this *zkStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
return nil
}
6 changes: 3 additions & 3 deletions go/logic/command_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ func (applier *CommandApplier) enableGlobalRecoveries(value []byte) interface{}
}

func (applier *CommandApplier) putKeyValue(value []byte) interface{} {
kvPair := kv.KVPair{}
if err := json.Unmarshal(value, &kvPair); err != nil {
kvPair := &kv.KVPair{}
if err := json.Unmarshal(value, kvPair); err != nil {
return log.Errore(err)
}
err := kv.PutKVPair(&kvPair)
err := kv.PutKVPairs([]*kv.KVPair{kvPair})
return err
}

Expand Down
13 changes: 11 additions & 2 deletions go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,23 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP
submitKvPairs = append(submitKvPairs, kvPair)
}
log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs))
var kvPutKVPairs [](*kv.KVPair)
for _, kvPair := range submitKvPairs {
if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand("put-key-value", kvPair)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
submittedCount++
} else {
selectedError = err
}
} else {
err = kv.PutKVPair(kvPair)
kvPutKVPairs = append(kvPutKVPairs, kvPair)
}
}
if len(kvPutKVPairs) > 0 {
err := kv.PutKVPairs(kvPutKVPairs)
if err == nil {
submittedCount++
submittedCount = submittedCount + len(kvPutKVPairs)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
} else {
selectedError = err
}
Expand Down
6 changes: 2 additions & 4 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,10 +922,8 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate
// of the put-key-value event upon startup. We _recommend_ a snapshot in the near future.
go orcraft.PublishCommand("async-snapshot", "")
} else {
for _, kvPair := range kvPairs {
err := kv.PutKVPair(kvPair)
log.Errore(err)
}
err := kv.PutKVPairs(kvPairs)
log.Errore(err)
}
{
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs))
Expand Down