From 236a12b153b9e9209b36ac534b324ac3eb55df6f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 23 Nov 2020 00:59:37 +0100 Subject: [PATCH 1/4] Add .PutKVPairs() to KVStore interface --- go/kv/consul.go | 12 ++++++++++++ go/kv/internal.go | 9 +++++++++ go/kv/kv.go | 13 +++++++++++++ go/kv/zk.go | 12 ++++++++++++ go/logic/orchestrator.go | 13 +++++++++++-- go/logic/topology_recovery.go | 6 ++---- 6 files changed, 59 insertions(+), 6 deletions(-) diff --git a/go/kv/consul.go b/go/kv/consul.go index 91ab4952f..6bfc9dd89 100644 --- a/go/kv/consul.go +++ b/go/kv/consul.go @@ -86,6 +86,18 @@ func (this *consulStore) GetKeyValue(key string) (value string, found bool, err return string(pair.Value), (pair != nil), nil } +func (this *consulStore) PutKVPairs(kvPairs []*KVPair) (err error) { + if this.client == nil { + return nil + } + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // This function is non re-entrant (it can only be running once at any point in time) if atomic.CompareAndSwapInt64(&this.distributionReentry, 0, 1) { diff --git a/go/kv/internal.go b/go/kv/internal.go index 1c1d6803e..e9083cadd 100644 --- a/go/kv/internal.go +++ b/go/kv/internal.go @@ -62,6 +62,15 @@ func (this *internalKVStore) GetKeyValue(key string) (value string, found bool, return value, found, log.Errore(err) } +func (this *internalKVStore) PutKVPairs(kvPairs []*KVPair) (err error) { + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *internalKVStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } diff --git a/go/kv/kv.go b/go/kv/kv.go index 3eebcbc4d..60ad02c5a 100644 --- a/go/kv/kv.go +++ b/go/kv/kv.go @@ -36,6 +36,7 @@ func (this *KVPair) String() string { type KVStore interface { PutKeyValue(key string, value string) (err error) + PutKVPairs(kvPairs []*KVPair) (err error) GetKeyValue(key string) (value string, found bool, err error) DistributePairs(kvPairs [](*KVPair)) (err error) } @@ -91,6 +92,18 @@ func PutKVPair(kvPair *KVPair) (err error) { return PutValue(kvPair.Key, kvPair.Value) } +func PutKVPairs(kvPairs []*KVPair) (err error) { + if len(kvPairs) < 1 { + return nil + } + for _, store := range getKVStores() { + if err := store.PutKVPairs(kvPairs); err != nil { + return err + } + } + return nil +} + func DistributePairs(kvPairs [](*KVPair)) (err error) { for _, store := range getKVStores() { if err := store.DistributePairs(kvPairs); err != nil { diff --git a/go/kv/zk.go b/go/kv/zk.go index 37f838ff5..8dc47e0fe 100644 --- a/go/kv/zk.go +++ b/go/kv/zk.go @@ -75,6 +75,18 @@ func (this *zkStore) GetKeyValue(key string) (value string, found bool, err erro return string(result), true, nil } +func (this *zkStore) PutKVPairs(kvPairs []*KVPair) (err error) { + if this.zook == nil { + return nil + } + for _, pair := range kvPairs { + if err := this.PutKeyValue(pair.Key, pair.Value); err != nil { + return err + } + } + return nil +} + func (this *zkStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index e1af1fe2c..da39fc2b0 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -442,14 +442,23 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP submitKvPairs = append(submitKvPairs, kvPair) } log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs)) + var kvPutKVPairs [](*kv.KVPair) for _, kvPair := range submitKvPairs { if orcraft.IsRaftEnabled() { _, err = orcraft.PublishCommand("put-key-value", kvPair) + if err == nil { + submittedCount++ + } else { + selectedError = err + } } else { - err = kv.PutKVPair(kvPair) + kvPutKVPairs = append(kvPutKVPairs, kvPair) } + } + if len(kvPutKVPairs) > 0 { + err := kv.PutKVPairs(kvPutKVPairs) if err == nil { - submittedCount++ + submittedCount = submittedCount + len(kvPutKVPairs) } else { selectedError = err } diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index c0a642640..0489d8200 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -922,10 +922,8 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate // of the put-key-value event upon startup. We _recommend_ a snapshot in the near future. go orcraft.PublishCommand("async-snapshot", "") } else { - for _, kvPair := range kvPairs { - err := kv.PutKVPair(kvPair) - log.Errore(err) - } + err := kv.PutKVPairs(kvPairs) + log.Errore(err) } { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs)) From cef2eb988188be3fed7770f2946a7df58855f77a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 23 Nov 2020 01:04:39 +0100 Subject: [PATCH 2/4] Remove kv.PutKVPair method --- go/kv/kv.go | 7 ------- go/logic/command_applier.go | 6 +++--- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/go/kv/kv.go b/go/kv/kv.go index 60ad02c5a..6b5f59b53 100644 --- a/go/kv/kv.go +++ b/go/kv/kv.go @@ -85,13 +85,6 @@ func PutValue(key string, value string) (err error) { return nil } -func PutKVPair(kvPair *KVPair) (err error) { - if kvPair == nil { - return nil - } - return PutValue(kvPair.Key, kvPair.Value) -} - func PutKVPairs(kvPairs []*KVPair) (err error) { if len(kvPairs) < 1 { return nil diff --git a/go/logic/command_applier.go b/go/logic/command_applier.go index cb65f03d9..73f67fbc9 100644 --- a/go/logic/command_applier.go +++ b/go/logic/command_applier.go @@ -256,11 +256,11 @@ func (applier *CommandApplier) enableGlobalRecoveries(value []byte) interface{} } func (applier *CommandApplier) putKeyValue(value []byte) interface{} { - kvPair := kv.KVPair{} - if err := json.Unmarshal(value, &kvPair); err != nil { + kvPair := &kv.KVPair{} + if err := json.Unmarshal(value, kvPair); err != nil { return log.Errore(err) } - err := kv.PutKVPair(&kvPair) + err := kv.PutKVPairs([]*kv.KVPair{kvPair}) return err } From 290bb42e7b19cf5e578f8145585feaa4dd22fb47 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 23 Nov 2020 20:17:51 +0100 Subject: [PATCH 3/4] Simplify raft/KV conditional block --- go/logic/orchestrator.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index da39fc2b0..585ab305b 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -442,23 +442,19 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP submitKvPairs = append(submitKvPairs, kvPair) } log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs)) - var kvPutKVPairs [](*kv.KVPair) - for _, kvPair := range submitKvPairs { - if orcraft.IsRaftEnabled() { + if orcraft.IsRaftEnabled() { + for _, kvPair := range submitKvPairs { _, err = orcraft.PublishCommand("put-key-value", kvPair) if err == nil { submittedCount++ } else { selectedError = err } - } else { - kvPutKVPairs = append(kvPutKVPairs, kvPair) } - } - if len(kvPutKVPairs) > 0 { - err := kv.PutKVPairs(kvPutKVPairs) + } else { + err := kv.PutKVPairs(submitKvPairs) if err == nil { - submittedCount = submittedCount + len(kvPutKVPairs) + submittedCount += len(submitKvPairs) } else { selectedError = err } From f1436905c02c37aab9040e2e5b6168902b1b127f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 24 Nov 2020 14:08:44 +0100 Subject: [PATCH 4/4] Update go/logic/orchestrator.go Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/logic/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index 585ab305b..1eeebcd11 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -444,7 +444,7 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs)) if orcraft.IsRaftEnabled() { for _, kvPair := range submitKvPairs { - _, err = orcraft.PublishCommand("put-key-value", kvPair) + _, err := orcraft.PublishCommand("put-key-value", kvPair) if err == nil { submittedCount++ } else {