diff --git a/go/kv/consul.go b/go/kv/consul.go index 91ab4952f..6bfc9dd89 100644 --- a/go/kv/consul.go +++ b/go/kv/consul.go @@ -86,6 +86,18 @@ func (this *consulStore) GetKeyValue(key string) (value string, found bool, err return string(pair.Value), (pair != nil), nil } +func (this *consulStore) PutKVPairs(kvPairs []*KVPair) (err error) { + if this.client == nil { + return nil + } + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // This function is non re-entrant (it can only be running once at any point in time) if atomic.CompareAndSwapInt64(&this.distributionReentry, 0, 1) { diff --git a/go/kv/internal.go b/go/kv/internal.go index 1c1d6803e..e9083cadd 100644 --- a/go/kv/internal.go +++ b/go/kv/internal.go @@ -62,6 +62,15 @@ func (this *internalKVStore) GetKeyValue(key string) (value string, found bool, return value, found, log.Errore(err) } +func (this *internalKVStore) PutKVPairs(kvPairs []*KVPair) (err error) { + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *internalKVStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } diff --git a/go/kv/kv.go b/go/kv/kv.go index 3eebcbc4d..6b5f59b53 100644 --- a/go/kv/kv.go +++ b/go/kv/kv.go @@ -36,6 +36,7 @@ func (this *KVPair) String() string { type KVStore interface { PutKeyValue(key string, value string) (err error) + PutKVPairs(kvPairs []*KVPair) (err error) GetKeyValue(key string) (value string, found bool, err error) DistributePairs(kvPairs [](*KVPair)) (err error) } @@ -84,11 +85,16 @@ func PutValue(key string, value string) (err error) { return nil } -func PutKVPair(kvPair *KVPair) (err error) { - if kvPair == nil { +func PutKVPairs(kvPairs []*KVPair) (err error) { + if len(kvPairs) < 1 { return nil } - return PutValue(kvPair.Key, kvPair.Value) + for _, store := range getKVStores() { + if err := store.PutKVPairs(kvPairs); err != nil { + return err + } + } + return nil } func DistributePairs(kvPairs [](*KVPair)) (err error) { diff --git a/go/kv/zk.go b/go/kv/zk.go index 37f838ff5..8dc47e0fe 100644 --- a/go/kv/zk.go +++ b/go/kv/zk.go @@ -75,6 +75,18 @@ func (this *zkStore) GetKeyValue(key string) (value string, found bool, err erro return string(result), true, nil } +func (this *zkStore) PutKVPairs(kvPairs []*KVPair) (err error) { + if this.zook == nil { + return nil + } + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *zkStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } diff --git a/go/logic/command_applier.go b/go/logic/command_applier.go index cb65f03d9..73f67fbc9 100644 --- a/go/logic/command_applier.go +++ b/go/logic/command_applier.go @@ -256,11 +256,11 @@ func (applier *CommandApplier) enableGlobalRecoveries(value []byte) interface{} } func (applier *CommandApplier) putKeyValue(value []byte) interface{} { - kvPair := kv.KVPair{} - if err := json.Unmarshal(value, &kvPair); err != nil { + kvPair := &kv.KVPair{} + if err := json.Unmarshal(value, kvPair); err != nil { return log.Errore(err) } - err := kv.PutKVPair(&kvPair) + err := kv.PutKVPairs([]*kv.KVPair{kvPair}) return err } diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index e1af1fe2c..1eeebcd11 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -442,14 +442,19 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP submitKvPairs = append(submitKvPairs, kvPair) } log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs)) - for _, kvPair := range submitKvPairs { - if orcraft.IsRaftEnabled() { - _, err = orcraft.PublishCommand("put-key-value", kvPair) - } else { - err = kv.PutKVPair(kvPair) + if orcraft.IsRaftEnabled() { + for _, kvPair := range submitKvPairs { + _, err := orcraft.PublishCommand("put-key-value", kvPair) + if err == nil { + submittedCount++ + } else { + selectedError = err + } } + } else { + err := kv.PutKVPairs(submitKvPairs) if err == nil { - submittedCount++ + submittedCount += len(submitKvPairs) } else { selectedError = err } diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index c0a642640..0489d8200 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -922,10 +922,8 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate // of the put-key-value event upon startup. We _recommend_ a snapshot in the near future. go orcraft.PublishCommand("async-snapshot", "") } else { - for _, kvPair := range kvPairs { - err := kv.PutKVPair(kvPair) - log.Errore(err) - } + err := kv.PutKVPairs(kvPairs) + log.Errore(err) } { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs))