From ca461f72f8c93a2c4837a19a9c7777860ef1d106 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 8 Feb 2021 18:00:02 +0100 Subject: [PATCH 01/12] WIP: add logic to batch ops --- go/config/config.go | 2 ++ go/kv/consul_txn.go | 54 +++++++++++++++++++++++++++++++++++++--- go/kv/consul_txn_test.go | 38 ++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/go/config/config.go b/go/config/config.go index cb455f415..c6ec705af 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -266,6 +266,7 @@ type Configuration struct { ConsulAclToken string // ACL token used to write to Consul KV ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs ConsulKVStoreProvider string // Consul KV store provider (consul or consul-txn), default: "consul" + ConsulMaxOpsPerTransaction int // Maximum number of operations to perform in a single Consul Transaction. Requires the "consul-txn" ConsulKVStoreProvider ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master" WebMessage string // If provided, will be shown on all web pages below the title bar @@ -434,6 +435,7 @@ func newConfiguration() *Configuration { ConsulAclToken: "", ConsulCrossDataCenterDistribution: false, ConsulKVStoreProvider: "consul", + ConsulMaxOpsPerTransaction: 12, ZkAddress: "", KVClusterMasterPrefix: "mysql/master", WebMessage: "", diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index 0f1530f0f..1fffdfbe4 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -20,6 +20,7 @@ import ( "crypto/tls" "fmt" "net/http" + "strings" "sync" "sync/atomic" @@ -31,6 +32,37 @@ import ( "github.com/openark/golib/log" ) +// groupKVPairsByPrefix groups Consul Transaction operations by KV key prefix. This ensures KVs +// for a single cluster are grouped into a single transaction as they have a common key prefix +func groupKVPairsByPrefix(kvPairs consulapi.KVPairs, maxPairsPerGroup int) (groups []consulapi.KVPairs) { + groupsMap := map[string]consulapi.KVPairs{} + for _, pair := range kvPairs { + s := strings.Split(pair.Key, "/") + prefix := strings.Join(s[:len(s)-1], "/") + if _, found := groupsMap[prefix]; found { + groupsMap[prefix] = append(groupsMap[prefix], pair) + } else { + groupsMap[prefix] = consulapi.KVPairs{pair} + } + } + + pairs := consulapi.KVPairs{} + for _, group := range groupsMap { + groupLen := len(group) + pairsLen := len(pairs) + if (pairsLen + groupLen) > maxPairsPerGroup { + groups = append(groups, pairs) + pairs = consulapi.KVPairs{} + } + pairs = append(pairs, group...) + } + if len(pairs) > 0 { + groups = append(groups, pairs) + } + + return groups +} + // A Consul store based on config's `ConsulAddress`, `ConsulScheme`, and `ConsulKVPrefix` type consulTxnStore struct { client *consulapi.Client @@ -211,6 +243,13 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } + // Consul Transaction API is limited to 64 ops per transaction + // and we need to perform at least 3 KV operations + maxOpsPerTxn := config.Config.ConsulMaxOpsPerTransaction + if maxOpsPerTxn > 64 || maxOpsPerTxn < 3 { + return fmt.Errorf("ConsulMaxOpsPerTransaction must be > 3 and < 64") + } + datacenters, err := this.client.Catalog().Datacenters() if err != nil { return err @@ -220,14 +259,21 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { for _, kvPair := range kvPairs { consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)}) } + var wg sync.WaitGroup for _, datacenter := range datacenters { - var skipped, existing, written, failed int datacenter := datacenter - wg.Add(1) - skipped, existing, written, failed, err = this.updateDatacenterKVPairs(&wg, datacenter, consulPairs) - log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; skipped: %d, existing: %d, written: %d, failed: %d", datacenter, skipped, existing, written, failed) + go func() { + defer wg.Done() + for groupNum, consulPairGroup := range groupKVPairsByPrefix(consulPairs, maxOpsPerTxn) { + wg.Add(1) + skipped, existing, written, failed, _ := this.updateDatacenterKVPairs(&wg, datacenter, consulPairGroup) + log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; txnGroup: %d, skipped: %d, existing: %d, written: %d, failed: %d", + datacenter, groupNum, skipped, existing, written, failed, + ) + } + }() } wg.Wait() return err diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index 32ead5cb9..f338959e7 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -2,6 +2,7 @@ package kv import ( "net/http" + "strings" "sync" "testing" @@ -9,6 +10,43 @@ import ( "github.com/openark/orchestrator/go/config" ) +func TestGroupKVPairsByPrefix(t *testing.T) { + grouped := groupKVPairsByPrefix(consulapi.KVPairs{ + {Key: "mysql/master/cluster1/hostname", Value: []byte("test")}, + {Key: "mysql/master/cluster1/ipv4", Value: []byte("test")}, + {Key: "mysql/master/cluster1/port", Value: []byte("test")}, + {Key: "mysql/master/cluster2/hostname", Value: []byte("test")}, + {Key: "mysql/master/cluster2/ipv4", Value: []byte("test")}, + {Key: "mysql/master/cluster2/port", Value: []byte("test")}, + {Key: "mysql/master/cluster3/hostname", Value: []byte("test")}, + {Key: "mysql/master/cluster3/ipv4", Value: []byte("test")}, + {Key: "mysql/master/cluster3/port", Value: []byte("test")}, + }, 6) + if len(grouped) != 2 { + t.Fatalf("expected 2 groups, got %d: %v", len(grouped), grouped) + } + if len(grouped[0]) != 6 { + t.Fatalf("expected 6 KVPairs in first group, got %d: %v", len(grouped[0]), grouped[0]) + } + if len(grouped[1]) != 3 { + t.Fatalf("expected 3 KVPairs in second group, got %d: %v", len(grouped[1]), grouped[1]) + } + + // check KVs for a cluster are in a single group + for _, group := range grouped { + clusterCount := map[string]int{} + for _, kvPair := range group { + s := strings.Split(kvPair.Key, "/") + clusterCount[s[2]]++ + } + for cluster, count := range clusterCount { + if count != 3 { + t.Fatalf("expected 3 KVPairs for %s to be in a single group, got %d", cluster, count) + } + } + } +} + func TestConsulTxnStorePutKVPairs(t *testing.T) { server := buildConsulTestServer(t, []consulTestServerOp{ { From 5c3c30ba2a0a1cd901e9497aff3d08bd4cd155e1 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 9 Feb 2021 15:07:51 +0100 Subject: [PATCH 02/12] Group KV pairs --- docs/kv.md | 5 +++-- go/config/config.go | 12 +++++++++-- go/kv/consul_txn.go | 46 ++++++++++++++++++++++++++++------------ go/kv/consul_txn_test.go | 24 +++++++++++++++------ 4 files changed, 63 insertions(+), 24 deletions(-) diff --git a/docs/kv.md b/docs/kv.md index e3fc8fce5..3501a7712 100644 --- a/docs/kv.md +++ b/docs/kv.md @@ -82,11 +82,12 @@ This functionality is required in case one has more Consul datacenters than just #### Consul Transaction support Atomic [Consul Transaction](https://www.consul.io/api-docs/txn) support is enabled by configuring: - ```json "ConsulKVStoreProvider": "consul-txn", ``` _Note: this feature requires Consul version 0.7 or greater._ -This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates of several KVs are atomic. +This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates are atomic. KVs are read from the server in a transaction and any necessary updates are performed in a second transaction. + +Orchestrator will group KV updates with the same key-prefix into a single [Consul Transaction](https://www.consul.io/api-docs/txn). Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `3` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. diff --git a/go/config/config.go b/go/config/config.go index c6ec705af..a048b2219 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -60,6 +60,8 @@ const ( StaleInstanceCoordinatesExpireSeconds = 60 CheckAutoPseudoGTIDGrantsIntervalSeconds = 60 SelectTrueQuery = "select 1" + ConsulKVsPerCluster = 4 // kvs: "/hostname", "/ipv4", "/ipv6", "/port" + ConsulMaxTransactionOps = 64 ) var deprecatedConfigurationVariables = []string{ @@ -266,7 +268,7 @@ type Configuration struct { ConsulAclToken string // ACL token used to write to Consul KV ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs ConsulKVStoreProvider string // Consul KV store provider (consul or consul-txn), default: "consul" - ConsulMaxOpsPerTransaction int // Maximum number of operations to perform in a single Consul Transaction. Requires the "consul-txn" ConsulKVStoreProvider + ConsulMaxKVsPerTransaction int // Maximum number of KV operations to perform in a single Consul Transaction. Requires the "consul-txn" ConsulKVStoreProvider ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master" WebMessage string // If provided, will be shown on all web pages below the title bar @@ -435,7 +437,7 @@ func newConfiguration() *Configuration { ConsulAclToken: "", ConsulCrossDataCenterDistribution: false, ConsulKVStoreProvider: "consul", - ConsulMaxOpsPerTransaction: 12, + ConsulMaxKVsPerTransaction: ConsulKVsPerCluster, ZkAddress: "", KVClusterMasterPrefix: "mysql/master", WebMessage: "", @@ -597,6 +599,12 @@ func (this *Configuration) postReadAdjustments() error { this.BufferInstanceWrites = false } } + if this.ConsulMaxKVsPerTransaction < ConsulKVsPerCluster { + this.ConsulMaxKVsPerTransaction = ConsulKVsPerCluster + } else if this.ConsulMaxKVsPerTransaction > ConsulMaxTransactionOps { + this.ConsulMaxKVsPerTransaction = ConsulMaxTransactionOps + } + return nil } diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index 1fffdfbe4..537e45398 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -34,7 +34,8 @@ import ( // groupKVPairsByPrefix groups Consul Transaction operations by KV key prefix. This ensures KVs // for a single cluster are grouped into a single transaction as they have a common key prefix -func groupKVPairsByPrefix(kvPairs consulapi.KVPairs, maxPairsPerGroup int) (groups []consulapi.KVPairs) { +func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs) { + maxOpsPerTxn := config.Config.ConsulMaxKVsPerTransaction groupsMap := map[string]consulapi.KVPairs{} for _, pair := range kvPairs { s := strings.Split(pair.Key, "/") @@ -50,7 +51,7 @@ func groupKVPairsByPrefix(kvPairs consulapi.KVPairs, maxPairsPerGroup int) (grou for _, group := range groupsMap { groupLen := len(group) pairsLen := len(pairs) - if (pairsLen + groupLen) > maxPairsPerGroup { + if (pairsLen + groupLen) > maxOpsPerTxn { groups = append(groups, pairs) pairs = consulapi.KVPairs{} } @@ -115,7 +116,16 @@ func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *co return err } -func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair) (skipped, existing, written, failed int, err error) { +type updateDatacenterKVPairsResp struct { + datacenter string + skipped int + existing int + written int + failed int + err error +} + +func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair) *updateDatacenterKVPairsResp { defer wg.Done() queryOptions := &consulapi.QueryOptions{Datacenter: dc} @@ -182,7 +192,14 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin } } - return skipped, existing, written, failed, err + respChan <- updateDatacenterKVPairsResp{ + datacenter: dc, + skipped: skipped, + existing: existing, + written: written, + failed: failed, + err: err, + } } // GetKeyValue returns the value of a Consul KV if it exists @@ -230,6 +247,14 @@ func (this *consulTxnStore) PutKVPairs(kvPairs []*KVPair) (err error) { return this.doWriteTxn(txnOps, nil) } +type updateDatacenterKVPairsResp struct { + datacenter string + skipped int + existing int + written int + failed int +} + // DistributePairs updates all known Consul Datacenters with one or more KV pairs func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // This function is non re-entrant (it can only be running once at any point in time) @@ -243,13 +268,6 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { return nil } - // Consul Transaction API is limited to 64 ops per transaction - // and we need to perform at least 3 KV operations - maxOpsPerTxn := config.Config.ConsulMaxOpsPerTransaction - if maxOpsPerTxn > 64 || maxOpsPerTxn < 3 { - return fmt.Errorf("ConsulMaxOpsPerTransaction must be > 3 and < 64") - } - datacenters, err := this.client.Catalog().Datacenters() if err != nil { return err @@ -266,11 +284,11 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { wg.Add(1) go func() { defer wg.Done() - for groupNum, consulPairGroup := range groupKVPairsByPrefix(consulPairs, maxOpsPerTxn) { + for groupNum, consulPairGroup := range groupKVPairsByPrefix(consulPairs) { wg.Add(1) - skipped, existing, written, failed, _ := this.updateDatacenterKVPairs(&wg, datacenter, consulPairGroup) + resp := this.updateDatacenterKVPairs(&wg, datacenter, consulPairGroup) log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; txnGroup: %d, skipped: %d, existing: %d, written: %d, failed: %d", - datacenter, groupNum, skipped, existing, written, failed, + resp.datacenter, groupNum, resp.skipped, resp.existing, resp.written, resp.failed, ) } }() diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index f338959e7..528665e57 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -11,6 +11,8 @@ import ( ) func TestGroupKVPairsByPrefix(t *testing.T) { + // batch 9 x KVPairs into 2 x transactions + config.Config.ConsulMaxKVsPerTransaction = 6 grouped := groupKVPairsByPrefix(consulapi.KVPairs{ {Key: "mysql/master/cluster1/hostname", Value: []byte("test")}, {Key: "mysql/master/cluster1/ipv4", Value: []byte("test")}, @@ -21,7 +23,7 @@ func TestGroupKVPairsByPrefix(t *testing.T) { {Key: "mysql/master/cluster3/hostname", Value: []byte("test")}, {Key: "mysql/master/cluster3/ipv4", Value: []byte("test")}, {Key: "mysql/master/cluster3/port", Value: []byte("test")}, - }, 6) + }) if len(grouped) != 2 { t.Fatalf("expected 2 groups, got %d: %v", len(grouped), grouped) } @@ -33,15 +35,25 @@ func TestGroupKVPairsByPrefix(t *testing.T) { } // check KVs for a cluster are in a single group - for _, group := range grouped { - clusterCount := map[string]int{} + clusterCounts := map[string]map[int]int{} + for i, group := range grouped { for _, kvPair := range group { s := strings.Split(kvPair.Key, "/") - clusterCount[s[2]]++ + clusterName := s[2] + if _, ok := clusterCounts[clusterName]; ok { + clusterCounts[clusterName][i]++ + } else { + clusterCounts[clusterName] = map[int]int{i: 1} + } + } + } + for cluster, groups := range clusterCounts { + if len(groups) != 1 { + t.Fatalf("expected %s to be in a single group, found it in %d group(s): %v", cluster, len(groups), groups) } - for cluster, count := range clusterCount { + for _, count := range groups { if count != 3 { - t.Fatalf("expected 3 KVPairs for %s to be in a single group, got %d", cluster, count) + t.Fatalf("expected group to contain 3 x %s keys, found: %d", cluster, count) } } } From d1d341180f786a8abb88f3f6d435d99b73d20efe Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 15 Feb 2021 14:59:59 +0100 Subject: [PATCH 03/12] Fix log summary --- go/kv/consul_test.go | 6 ++- go/kv/consul_txn.go | 82 ++++++++++++++++++++++------------------ go/kv/consul_txn_test.go | 48 ++++++++++++++++++----- 3 files changed, 88 insertions(+), 48 deletions(-) diff --git a/go/kv/consul_test.go b/go/kv/consul_test.go index 029e8fe72..c47833909 100644 --- a/go/kv/consul_test.go +++ b/go/kv/consul_test.go @@ -44,7 +44,11 @@ func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest if testOp.ResponseCode == 0 { testOp.ResponseCode = http.StatusOK } - if strings.HasPrefix(r.URL.String(), "/v1/kv") && testOp.Response != nil { + if strings.HasPrefix(r.URL.String(), "/v1/catalog/datacenters") { + w.WriteHeader(testOp.ResponseCode) + json.NewEncoder(w).Encode(testOp.Response) + return + } else if strings.HasPrefix(r.URL.String(), "/v1/kv") && testOp.Response != nil { w.WriteHeader(testOp.ResponseCode) json.NewEncoder(w).Encode(testOp.Response) return diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index c77184f0f..1d425431e 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -39,7 +39,10 @@ func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs groupsMap := map[string]consulapi.KVPairs{} for _, pair := range kvPairs { s := strings.Split(pair.Key, "/") - prefix := strings.Join(s[:len(s)-1], "/") + var prefix string + if len(s) > 1 { + prefix = strings.Join(s[:len(s)-1], "/") + } if _, found := groupsMap[prefix]; found { groupsMap[prefix] = append(groupsMap[prefix], pair) } else { @@ -116,22 +119,22 @@ func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *co return err } -type updateDatacenterKVPairsResp struct { - datacenter string - skipped int - existing int - written int - failed int - err error +type updateDatacenterKVPairsResponse struct { + err error + existing int + failed int + skipped int + written int } -func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair) *updateDatacenterKVPairsResp { +func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair, responses chan updateDatacenterKVPairsResponse) { defer wg.Done() queryOptions := &consulapi.QueryOptions{Datacenter: dc} kcCacheKeys := make([]string, 0) // get the current key-values in a single transaction + resp := updateDatacenterKVPairsResponse{} var terr error var getTxnOps consulapi.TxnOps var getTxnResp *consulapi.TxnResponse @@ -141,7 +144,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin kcCacheKey := getConsulKVCacheKey(dc, kvPair.Key) kcCacheKeys = append(kcCacheKeys, kcCacheKey) if value, found := this.kvCache.Get(kcCacheKey); found && val == value { - skipped++ + resp.skipped++ continue } getTxnOps = append(getTxnOps, &consulapi.TxnOp{ @@ -156,7 +159,6 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin _, getTxnResp, _, terr = this.client.Txn().Txn(getTxnOps, queryOptions) if terr != nil { log.Errorf("consulTxnStore.DistributePairs(): %v", terr) - err = terr } } @@ -169,7 +171,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin if pair.Key == result.KV.Key && string(pair.Value) == string(result.KV.Value) { this.kvCache.SetDefault(getConsulKVCacheKey(dc, pair.Key), string(pair.Value)) kvExistsAndEqual = true - existing++ + resp.existing++ break } } @@ -187,25 +189,18 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin // update key-value pairs in a single Consul Transaction if len(setTxnOps) > 0 { - if err = this.doWriteTxn(setTxnOps, queryOptions); err != nil { - log.Errorf("consulTxnStore.DistributePairs(): failed %v", kcCacheKeys) - failed = len(setTxnOps) + if resp.err = this.doWriteTxn(setTxnOps, queryOptions); resp.err != nil { + log.Errorf("consulTxnStore.DistributePairs(): failed %v, error %v", kcCacheKeys, resp.err) + resp.failed = len(setTxnOps) } else { for _, txnOp := range setTxnOps { this.kvCache.SetDefault(getConsulKVCacheKey(dc, txnOp.KV.Key), string(txnOp.KV.Value)) - written++ + resp.written++ } } } - respChan <- updateDatacenterKVPairsResp{ - datacenter: dc, - skipped: skipped, - existing: existing, - written: written, - failed: failed, - err: err, - } + responses <- resp } // GetKeyValue returns the value of a Consul KV if it exists @@ -253,14 +248,6 @@ func (this *consulTxnStore) PutKVPairs(kvPairs []*KVPair) (err error) { return this.doWriteTxn(txnOps, nil) } -type updateDatacenterKVPairsResp struct { - datacenter string - skipped int - existing int - written int - failed int -} - // DistributePairs updates all known Consul Datacenters with one or more KV pairs func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // This function is non re-entrant (it can only be running once at any point in time) @@ -287,16 +274,37 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { var wg sync.WaitGroup for _, datacenter := range datacenters { datacenter := datacenter + responses := make(chan updateDatacenterKVPairsResponse) wg.Add(1) go func() { defer wg.Done() - for groupNum, consulPairGroup := range groupKVPairsByPrefix(consulPairs) { - wg.Add(1) - resp := this.updateDatacenterKVPairs(&wg, datacenter, consulPairGroup) - log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; txnGroup: %d, skipped: %d, existing: %d, written: %d, failed: %d", - resp.datacenter, groupNum, resp.skipped, resp.existing, resp.written, resp.failed, + + // receive responses from .updateDatacenterKVPairs() + // goroutines, log a summary when channel is closed + go func() { + var setTxns int + summary := updateDatacenterKVPairsResponse{} + for resp := range responses { + summary.existing += resp.existing + summary.failed += resp.failed + summary.skipped += resp.skipped + summary.written += resp.written + setTxns++ + } + log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; setTxns: %d, skipped: %d, existing: %d, written: %d, failed: %d", + datacenter, setTxns, summary.skipped, summary.existing, summary.written, summary.failed, ) + }() + + // launch an .updateDatacenterKVPairs() goroutine + // for each grouping of consul KV pairs and wait + var dcWg sync.WaitGroup + for _, consulPairGroup := range groupKVPairsByPrefix(consulPairs) { + dcWg.Add(1) + go this.updateDatacenterKVPairs(&dcWg, datacenter, consulPairGroup, responses) } + dcWg.Wait() + close(responses) }() } wg.Wait() diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index c855c0864..7d7745b70 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -230,6 +230,7 @@ func TestConsulTxnStoreUpdateDatacenterKVPairs(t *testing.T) { var wg sync.WaitGroup config.Config.ConsulAddress = server.URL store := NewConsulTxnStore().(*consulTxnStore) + respChan := make(chan updateDatacenterKVPairsResponse) t.Run("success-cached", func(t *testing.T) { wg.Add(1) @@ -242,13 +243,15 @@ func TestConsulTxnStoreUpdateDatacenterKVPairs(t *testing.T) { {Key: "test", Value: []byte("test")}, // already correct on consul server {Key: "test2", Value: []byte("test")}, // not equal on consul server } - skipped, existing, written, failed, err := store.updateDatacenterKVPairs(&wg, consulTestDefaultDatacenter, kvPairs) - if err != nil { - t.Fatalf(".updateDatacenterKVPairs() should not return an error, got: %v", err) + + go store.updateDatacenterKVPairs(&wg, consulTestDefaultDatacenter, kvPairs, respChan) + resp := <-respChan + if resp.err != nil { + t.Fatalf(".updateDatacenterKVPairs() should not return an error, got: %v", resp.err) } - if skipped != 1 || existing != 1 || written != 1 || failed != 0 { + if resp.skipped != 1 || resp.existing != 1 || resp.written != 1 || resp.failed != 0 { t.Fatalf("expected: existing/skipped/written=1 and failed=0, got: skipped=%d, existing=%d, written=%d, failed=%d", - skipped, existing, written, failed, + resp.skipped, resp.existing, resp.written, resp.failed, ) } @@ -266,14 +269,39 @@ func TestConsulTxnStoreUpdateDatacenterKVPairs(t *testing.T) { {Key: "test", Value: []byte("test")}, // already correct on consul server {Key: "doesnt-exist", Value: []byte("test")}, // does not exist on consul server } - skipped, existing, written, failed, err := store.updateDatacenterKVPairs(&wg, consulTestDefaultDatacenter, kvPairs) - if err != nil { - t.Fatalf(".updateDatacenterKVPairs() should not return an error, got: %v", err) + go store.updateDatacenterKVPairs(&wg, consulTestDefaultDatacenter, kvPairs, respChan) + resp := <-respChan + + if resp.err != nil { + t.Fatalf(".updateDatacenterKVPairs() should not return an error, got: %v", resp.err) } - if skipped != 0 || existing != 0 || written != 2 || failed != 0 { // confirm all KVs are updated if one does not exist + if resp.skipped != 0 || resp.existing != 0 || resp.written != 2 || resp.failed != 0 { // confirm all KVs are updated if one does not exist t.Fatalf("expected: existing/skipped/failed=0 and written=2, got: skipped=%d, existing=%d, written=%d, failed=%d", - skipped, existing, written, failed, + resp.skipped, resp.existing, resp.written, resp.failed, ) } }) } + +func TestConsulTxnStoreDistributePairs(t *testing.T) { + server := buildConsulTestServer(t, []consulTestServerOp{ + { + Method: "GET", + URL: "/v1/catalog/datacenters", + Response: []string{"dc1"}, + }, + }) + defer server.Close() + config.Config.ConsulAddress = server.URL + config.Config.ConsulCrossDataCenterDistribution = true + + store := NewConsulTxnStore() + if err := store.DistributePairs([]*KVPair{ + {Key: "test/hostname", Value: "test"}, + {Key: "test/ipv4", Value: "test"}, + {Key: "test/ipv6", Value: "test"}, + {Key: "test/port", Value: "test"}, + }); err != nil { + t.Fatal(err) + } +} From af32d30df9be260ba6f555aabc1cb3e6ab566d34 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 15 Feb 2021 15:57:35 +0100 Subject: [PATCH 04/12] Fix unit test --- go/kv/consul_txn_test.go | 50 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index 7d7745b70..14a25b95d 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -290,6 +290,56 @@ func TestConsulTxnStoreDistributePairs(t *testing.T) { URL: "/v1/catalog/datacenters", Response: []string{"dc1"}, }, + { + Method: "PUT", + URL: "/v1/txn?dc=dc1", + Request: consulapi.TxnOps{ + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/hostname"}, + }, + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/ipv4"}, + }, + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/ipv6"}, + }, + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/port"}, + }, + }, + Response: &consulapi.TxnResponse{ + Results: consulapi.TxnResults{ + { + KV: &consulapi.KVPair{Key: "test/hostname", Value: []byte("not-equal")}, + }, + { + KV: &consulapi.KVPair{Key: "test/ipv4", Value: []byte("test")}, + }, + { + KV: &consulapi.KVPair{Key: "test/ipv6", Value: []byte("test")}, + }, + { + KV: &consulapi.KVPair{Key: "test/port", Value: []byte("test")}, + }, + }, + }, + }, + { + Method: "PUT", + URL: "/v1/txn?dc=dc1", + Request: consulapi.TxnOps{ + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVSet, Key: "test/hostname", Value: []byte("test")}, + }, + }, + Response: &consulapi.TxnResponse{ + Results: consulapi.TxnResults{ + { + KV: &consulapi.KVPair{Key: "test/hostname", Value: []byte("test")}, + }, + }, + }, + }, }) defer server.Close() config.Config.ConsulAddress = server.URL From 46363def9844d7f6a07abd9a79234242dfa2cb33 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 15 Feb 2021 16:01:13 +0100 Subject: [PATCH 05/12] Fix doc --- docs/kv.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/kv.md b/docs/kv.md index 3501a7712..7931bd611 100644 --- a/docs/kv.md +++ b/docs/kv.md @@ -90,4 +90,4 @@ _Note: this feature requires Consul version 0.7 or greater._ This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates are atomic. KVs are read from the server in a transaction and any necessary updates are performed in a second transaction. -Orchestrator will group KV updates with the same key-prefix into a single [Consul Transaction](https://www.consul.io/api-docs/txn). Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `3` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. +Orchestrator will group KV updates with the same key-prefix into a single [Consul Transaction](https://www.consul.io/api-docs/txn). Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `4` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. From f0be032666fc1ec4fe77a7d63386b1ce1bfcda7b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 15 Feb 2021 23:32:06 +0100 Subject: [PATCH 06/12] Split prefix by removing KVClusterMasterPrefix and splitting remainder --- go/config/config.go | 2 +- go/kv/consul_txn.go | 14 +++--- go/kv/consul_txn_test.go | 96 +++++++++++++++++++++++++--------------- 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/go/config/config.go b/go/config/config.go index a048b2219..0a7e3619f 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -60,7 +60,7 @@ const ( StaleInstanceCoordinatesExpireSeconds = 60 CheckAutoPseudoGTIDGrantsIntervalSeconds = 60 SelectTrueQuery = "select 1" - ConsulKVsPerCluster = 4 // kvs: "/hostname", "/ipv4", "/ipv6", "/port" + ConsulKVsPerCluster = 5 // KVs: "/", "/hostname", "/ipv4", "/ipv6" and "/port" ConsulMaxTransactionOps = 64 ) diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index 1d425431e..f99efee3c 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -35,13 +35,17 @@ import ( // groupKVPairsByPrefix groups Consul Transaction operations by KV key prefix. This ensures KVs // for a single cluster are grouped into a single transaction as they have a common key prefix func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs) { + clusterMasterPrefix := config.Config.KVClusterMasterPrefix maxOpsPerTxn := config.Config.ConsulMaxKVsPerTransaction groupsMap := map[string]consulapi.KVPairs{} for _, pair := range kvPairs { - s := strings.Split(pair.Key, "/") - var prefix string - if len(s) > 1 { - prefix = strings.Join(s[:len(s)-1], "/") + prefix := pair.Key + if strings.HasPrefix(prefix, clusterMasterPrefix) { + prefix = strings.Replace(prefix, clusterMasterPrefix, "", 1) + path := strings.Split(prefix, "/") + if len(path) > 1 { + prefix = path[1] + } } if _, found := groupsMap[prefix]; found { groupsMap[prefix] = append(groupsMap[prefix], pair) @@ -55,7 +59,7 @@ func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs groupLen := len(group) pairsLen := len(pairs) if (pairsLen + groupLen) > maxOpsPerTxn { - groups = append(groups, pairs) + groups = append(groups, groupKVPairsByPrefix(pairs)...) pairs = consulapi.KVPairs{} } pairs = append(pairs, group...) diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index 14a25b95d..54897f2ba 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -1,6 +1,7 @@ package kv import ( + "fmt" "net/http" "strings" "sync" @@ -11,27 +12,44 @@ import ( ) func TestGroupKVPairsByPrefix(t *testing.T) { - // batch 9 x KVPairs into 2 x transactions - config.Config.ConsulMaxKVsPerTransaction = 6 - grouped := groupKVPairsByPrefix(consulapi.KVPairs{ - {Key: "mysql/master/cluster1/hostname", Value: []byte("test")}, - {Key: "mysql/master/cluster1/ipv4", Value: []byte("test")}, - {Key: "mysql/master/cluster1/port", Value: []byte("test")}, - {Key: "mysql/master/cluster2/hostname", Value: []byte("test")}, - {Key: "mysql/master/cluster2/ipv4", Value: []byte("test")}, - {Key: "mysql/master/cluster2/port", Value: []byte("test")}, - {Key: "mysql/master/cluster3/hostname", Value: []byte("test")}, - {Key: "mysql/master/cluster3/ipv4", Value: []byte("test")}, - {Key: "mysql/master/cluster3/port", Value: []byte("test")}, - }) - if len(grouped) != 2 { - t.Fatalf("expected 2 groups, got %d: %v", len(grouped), grouped) + config.Config.ConsulMaxKVsPerTransaction = 10 + config.Config.KVClusterMasterPrefix = "mysql/master" + + // make 100 KVs for 20 clusters + kvPairs := consulapi.KVPairs{} + var kvs int + for kvs < 100 { + kvPairs = append(kvPairs, + &consulapi.KVPair{ + Key: fmt.Sprintf("%s/cluster%d", config.Config.KVClusterMasterPrefix, kvs), + Value: []byte("mysql.example.com:3306"), + }, + &consulapi.KVPair{ + Key: fmt.Sprintf("%s/cluster%d/hostname", config.Config.KVClusterMasterPrefix, kvs), + Value: []byte("mysql.example.com"), + }, + &consulapi.KVPair{ + Key: fmt.Sprintf("%s/cluster%d/ipv4", config.Config.KVClusterMasterPrefix, kvs), + Value: []byte("10.20.30.40"), + }, + &consulapi.KVPair{ + Key: fmt.Sprintf("%s/cluster%d/ipv6", config.Config.KVClusterMasterPrefix, kvs), + Value: []byte("fdf0:7a53:0b88:d147:xxxx:xxxx:xxxx:xxxx"), + }, + &consulapi.KVPair{ + Key: fmt.Sprintf("%s/cluster%d/port", config.Config.KVClusterMasterPrefix, kvs), + Value: []byte("3306"), + }, + ) + kvs += 5 } - if len(grouped[0]) != 6 { - t.Fatalf("expected 6 KVPairs in first group, got %d: %v", len(grouped[0]), grouped[0]) + + grouped := groupKVPairsByPrefix(kvPairs) + if len(grouped) != 10 { + t.Fatalf("expected 10 groups, got %d: %v", len(grouped), grouped) } - if len(grouped[1]) != 3 { - t.Fatalf("expected 3 KVPairs in second group, got %d: %v", len(grouped[1]), grouped[1]) + if len(grouped[0]) != config.Config.ConsulMaxKVsPerTransaction { + t.Fatalf("expected %d KVPairs in first group, got %d: %v", config.Config.ConsulMaxKVsPerTransaction, len(grouped[0]), grouped[0]) } // check KVs for a cluster are in a single group @@ -52,8 +70,8 @@ func TestGroupKVPairsByPrefix(t *testing.T) { t.Fatalf("expected %s to be in a single group, found it in %d group(s): %v", cluster, len(groups), groups) } for _, count := range groups { - if count != 3 { - t.Fatalf("expected group to contain 3 x %s keys, found: %d", cluster, count) + if count != config.ConsulKVsPerCluster { + t.Fatalf("expected group to contain %d x %s keys, found: %d", config.ConsulKVsPerCluster, cluster, count) } } } @@ -295,31 +313,37 @@ func TestConsulTxnStoreDistributePairs(t *testing.T) { URL: "/v1/txn?dc=dc1", Request: consulapi.TxnOps{ { - KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/hostname"}, + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/cluster1"}, + }, + { + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/cluster1/hostname"}, }, { - KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/ipv4"}, + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/cluster1/ipv4"}, }, { - KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/ipv6"}, + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/cluster1/ipv6"}, }, { - KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/port"}, + KV: &consulapi.KVTxnOp{Verb: consulapi.KVGet, Key: "test/cluster1/port"}, }, }, Response: &consulapi.TxnResponse{ Results: consulapi.TxnResults{ { - KV: &consulapi.KVPair{Key: "test/hostname", Value: []byte("not-equal")}, + KV: &consulapi.KVPair{Key: "test/cluster1", Value: []byte("not-equal")}, + }, + { + KV: &consulapi.KVPair{Key: "test/cluster1/hostname", Value: []byte("mysql.example.com")}, }, { - KV: &consulapi.KVPair{Key: "test/ipv4", Value: []byte("test")}, + KV: &consulapi.KVPair{Key: "test/cluster1/ipv4", Value: []byte("10.20.30.40")}, }, { - KV: &consulapi.KVPair{Key: "test/ipv6", Value: []byte("test")}, + KV: &consulapi.KVPair{Key: "test/cluster1/ipv6", Value: []byte("fdf0:7a53:0b88:d147:xxxx:xxxx:xxxx:xxxx")}, }, { - KV: &consulapi.KVPair{Key: "test/port", Value: []byte("test")}, + KV: &consulapi.KVPair{Key: "test/cluster1/port", Value: []byte("3306")}, }, }, }, @@ -329,13 +353,13 @@ func TestConsulTxnStoreDistributePairs(t *testing.T) { URL: "/v1/txn?dc=dc1", Request: consulapi.TxnOps{ { - KV: &consulapi.KVTxnOp{Verb: consulapi.KVSet, Key: "test/hostname", Value: []byte("test")}, + KV: &consulapi.KVTxnOp{Verb: consulapi.KVSet, Key: "test/cluster1", Value: []byte("mysql.example.com:3306")}, }, }, Response: &consulapi.TxnResponse{ Results: consulapi.TxnResults{ { - KV: &consulapi.KVPair{Key: "test/hostname", Value: []byte("test")}, + KV: &consulapi.KVPair{Key: "test/cluster1", Value: []byte("mysql.example.com:3306")}, }, }, }, @@ -344,13 +368,15 @@ func TestConsulTxnStoreDistributePairs(t *testing.T) { defer server.Close() config.Config.ConsulAddress = server.URL config.Config.ConsulCrossDataCenterDistribution = true + config.Config.KVClusterMasterPrefix = "test" store := NewConsulTxnStore() if err := store.DistributePairs([]*KVPair{ - {Key: "test/hostname", Value: "test"}, - {Key: "test/ipv4", Value: "test"}, - {Key: "test/ipv6", Value: "test"}, - {Key: "test/port", Value: "test"}, + {Key: "test/cluster1", Value: "mysql.example.com:3306"}, + {Key: "test/cluster1/hostname", Value: "mysql.example.com"}, + {Key: "test/cluster1/ipv4", Value: "10.20.30.40"}, + {Key: "test/cluster1/ipv6", Value: "fdf0:7a53:0b88:d147:xxxx:xxxx:xxxx:xxxx"}, + {Key: "test/cluster1/port", Value: "3306"}, }); err != nil { t.Fatal(err) } From 51595bdd12dbf3fad3dcf7c1e567fb687c40dec7 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 15 Feb 2021 23:34:50 +0100 Subject: [PATCH 07/12] No need to recurse --- go/kv/consul_txn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index f99efee3c..afe67803d 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -59,7 +59,7 @@ func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs groupLen := len(group) pairsLen := len(pairs) if (pairsLen + groupLen) > maxOpsPerTxn { - groups = append(groups, groupKVPairsByPrefix(pairs)...) + groups = append(groups, pairs) pairs = consulapi.KVPairs{} } pairs = append(pairs, group...) From 09ad6405dcb4b40bd55b21e05a3e330ba2b26036 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 16 Feb 2021 18:24:37 +0100 Subject: [PATCH 08/12] test cleanup --- go/kv/consul_test.go | 9 ++++++++- go/kv/consul_txn.go | 36 +++++++++++++++++------------------- go/kv/consul_txn_test.go | 26 +++++++++++++------------- 3 files changed, 38 insertions(+), 33 deletions(-) diff --git a/go/kv/consul_test.go b/go/kv/consul_test.go index c47833909..8dfba8020 100644 --- a/go/kv/consul_test.go +++ b/go/kv/consul_test.go @@ -44,7 +44,7 @@ func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest if testOp.ResponseCode == 0 { testOp.ResponseCode = http.StatusOK } - if strings.HasPrefix(r.URL.String(), "/v1/catalog/datacenters") { + if r.URL.String() == "/v1/catalog/datacenters" { w.WriteHeader(testOp.ResponseCode) json.NewEncoder(w).Encode(testOp.Response) return @@ -58,6 +58,13 @@ func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest t.Fatalf("Unable to unmarshal json request body: %v", err) continue } + // https://github.com/openark/orchestrator/issues/1302 + // https://github.com/hashicorp/consul/blob/87f6617eecd23a64add1e79eb3cd8dc3da9e649e/agent/txn_endpoint.go#L121-L129 + if len(txnOps) > 64 { + w.WriteHeader(http.StatusRequestEntityTooLarge) + fmt.Fprintf(w, "Transaction contains too many operations (%d > 64)", len(txnOps)) + return + } testOpRequest := sortTxnKVOps(testOp.Request.(consulapi.TxnOps)) if testOp.Response != nil && reflect.DeepEqual(testOpRequest, sortTxnKVOps(txnOps)) { w.WriteHeader(testOp.ResponseCode) diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index afe67803d..e9d9d4460 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -32,19 +32,18 @@ import ( "github.com/openark/golib/log" ) -// groupKVPairsByPrefix groups Consul Transaction operations by KV key prefix. This ensures KVs -// for a single cluster are grouped into a single transaction as they have a common key prefix -func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs) { - clusterMasterPrefix := config.Config.KVClusterMasterPrefix +// groupKVPairsByKeyPrefix groups Consul Transaction operations by KV key prefix. This +// ensures KVs for a single cluster are grouped into a single transaction +func groupKVPairsByKeyPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs) { maxOpsPerTxn := config.Config.ConsulMaxKVsPerTransaction + clusterMasterPrefix := config.Config.KVClusterMasterPrefix + "/" groupsMap := map[string]consulapi.KVPairs{} for _, pair := range kvPairs { prefix := pair.Key if strings.HasPrefix(prefix, clusterMasterPrefix) { prefix = strings.Replace(prefix, clusterMasterPrefix, "", 1) - path := strings.Split(prefix, "/") - if len(path) > 1 { - prefix = path[1] + if path := strings.Split(prefix, "/"); len(path) > 0 { + prefix = path[0] } } if _, found := groupsMap[prefix]; found { @@ -54,20 +53,19 @@ func groupKVPairsByPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs } } - pairs := consulapi.KVPairs{} + pairsBuf := consulapi.KVPairs{} for _, group := range groupsMap { groupLen := len(group) - pairsLen := len(pairs) - if (pairsLen + groupLen) > maxOpsPerTxn { - groups = append(groups, pairs) - pairs = consulapi.KVPairs{} + pairsBufLen := len(pairsBuf) + if (pairsBufLen + groupLen) > maxOpsPerTxn { + groups = append(groups, pairsBuf) + pairsBuf = consulapi.KVPairs{} } - pairs = append(pairs, group...) + pairsBuf = append(pairsBuf, group...) } - if len(pairs) > 0 { - groups = append(groups, pairs) + if len(pairsBuf) > 0 { + groups = append(groups, pairsBuf) } - return groups } @@ -111,7 +109,7 @@ func NewConsulTxnStore() KVStore { func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *consulapi.QueryOptions) (err error) { ok, resp, _, err := this.client.Txn().Txn(txnOps, queryOptions) if err != nil { - log.Errorf("consulTxnStore.doWriteTxn(): failed %v", err) + log.Errorf("consulTxnStore.doWriteTxn(): %v", err) return err } else if !ok { for _, terr := range resp.Errors { @@ -303,9 +301,9 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // launch an .updateDatacenterKVPairs() goroutine // for each grouping of consul KV pairs and wait var dcWg sync.WaitGroup - for _, consulPairGroup := range groupKVPairsByPrefix(consulPairs) { + for _, kvPairGroup := range groupKVPairsByKeyPrefix(consulPairs) { dcWg.Add(1) - go this.updateDatacenterKVPairs(&dcWg, datacenter, consulPairGroup, responses) + go this.updateDatacenterKVPairs(&dcWg, datacenter, kvPairGroup, responses) } dcWg.Wait() close(responses) diff --git a/go/kv/consul_txn_test.go b/go/kv/consul_txn_test.go index 54897f2ba..225b4a9f2 100644 --- a/go/kv/consul_txn_test.go +++ b/go/kv/consul_txn_test.go @@ -11,45 +11,45 @@ import ( "github.com/openark/orchestrator/go/config" ) -func TestGroupKVPairsByPrefix(t *testing.T) { - config.Config.ConsulMaxKVsPerTransaction = 10 +func TestGroupKVPairsByKeyPrefix(t *testing.T) { + config.Config.ConsulMaxKVsPerTransaction = 12 // only 10 (5 x 2) KVs should fit into a max of 12 config.Config.KVClusterMasterPrefix = "mysql/master" // make 100 KVs for 20 clusters kvPairs := consulapi.KVPairs{} - var kvs int - for kvs < 100 { + var cluster int + for cluster < 20 { kvPairs = append(kvPairs, &consulapi.KVPair{ - Key: fmt.Sprintf("%s/cluster%d", config.Config.KVClusterMasterPrefix, kvs), + Key: fmt.Sprintf("%s/cluster%d", config.Config.KVClusterMasterPrefix, cluster), Value: []byte("mysql.example.com:3306"), }, &consulapi.KVPair{ - Key: fmt.Sprintf("%s/cluster%d/hostname", config.Config.KVClusterMasterPrefix, kvs), + Key: fmt.Sprintf("%s/cluster%d/hostname", config.Config.KVClusterMasterPrefix, cluster), Value: []byte("mysql.example.com"), }, &consulapi.KVPair{ - Key: fmt.Sprintf("%s/cluster%d/ipv4", config.Config.KVClusterMasterPrefix, kvs), + Key: fmt.Sprintf("%s/cluster%d/ipv4", config.Config.KVClusterMasterPrefix, cluster), Value: []byte("10.20.30.40"), }, &consulapi.KVPair{ - Key: fmt.Sprintf("%s/cluster%d/ipv6", config.Config.KVClusterMasterPrefix, kvs), + Key: fmt.Sprintf("%s/cluster%d/ipv6", config.Config.KVClusterMasterPrefix, cluster), Value: []byte("fdf0:7a53:0b88:d147:xxxx:xxxx:xxxx:xxxx"), }, &consulapi.KVPair{ - Key: fmt.Sprintf("%s/cluster%d/port", config.Config.KVClusterMasterPrefix, kvs), + Key: fmt.Sprintf("%s/cluster%d/port", config.Config.KVClusterMasterPrefix, cluster), Value: []byte("3306"), }, ) - kvs += 5 + cluster++ } - grouped := groupKVPairsByPrefix(kvPairs) + grouped := groupKVPairsByKeyPrefix(kvPairs) if len(grouped) != 10 { t.Fatalf("expected 10 groups, got %d: %v", len(grouped), grouped) } - if len(grouped[0]) != config.Config.ConsulMaxKVsPerTransaction { - t.Fatalf("expected %d KVPairs in first group, got %d: %v", config.Config.ConsulMaxKVsPerTransaction, len(grouped[0]), grouped[0]) + if len(grouped[0]) != 10 { + t.Fatalf("expected 10 KVPairs in first group, got %d: %v", len(grouped[0]), grouped[0]) } // check KVs for a cluster are in a single group From 150860702cd5f65f7f34a274c826df7a2f6dbf27 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 16 Feb 2021 20:50:33 +0100 Subject: [PATCH 09/12] Improve .DistributePairs() logging metrics --- go/kv/consul_txn.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index e9d9d4460..8e1de544e 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -121,6 +121,7 @@ func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *co return err } +// updateDatacenterKVPairsResponse contains a response from .updateDatacenterKVPairs() type updateDatacenterKVPairsResponse struct { err error existing int @@ -129,6 +130,9 @@ type updateDatacenterKVPairsResponse struct { written int } +// updateDatacenterKVPairs handles updating a list of Consul KV pairs for a single datacenter. Current values are +// read from the server in a single transaction and any necessary updates are made in a second transaction. If a +// KVPair from a group is missing on the server all KVPairs will be updated. func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair, responses chan updateDatacenterKVPairsResponse) { defer wg.Done() @@ -284,17 +288,20 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // receive responses from .updateDatacenterKVPairs() // goroutines, log a summary when channel is closed go func() { - var setTxns int summary := updateDatacenterKVPairsResponse{} + var getTxns, setTxns int for resp := range responses { summary.existing += resp.existing summary.failed += resp.failed summary.skipped += resp.skipped summary.written += resp.written - setTxns++ + if summary.written > 0 { + setTxns++ + } + getTxns++ } - log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; setTxns: %d, skipped: %d, existing: %d, written: %d, failed: %d", - datacenter, setTxns, summary.skipped, summary.existing, summary.written, summary.failed, + log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; getTxns: %d, setTxns: %d, skipped: %d, existing: %d, written: %d, failed: %d", + datacenter, getTxns, setTxns, summary.skipped, summary.existing, summary.written, summary.failed, ) }() From c7404ff01a941a3d36c826fcce1a77b68d345512 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 17 Feb 2021 17:08:11 +0100 Subject: [PATCH 10/12] Clean up summary --- go/kv/consul_txn.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/go/kv/consul_txn.go b/go/kv/consul_txn.go index 8e1de544e..06e064a85 100644 --- a/go/kv/consul_txn.go +++ b/go/kv/consul_txn.go @@ -128,6 +128,8 @@ type updateDatacenterKVPairsResponse struct { failed int skipped int written int + getTxns int + setTxns int } // updateDatacenterKVPairs handles updating a list of Consul KV pairs for a single datacenter. Current values are @@ -166,6 +168,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin if terr != nil { log.Errorf("consulTxnStore.DistributePairs(): %v", terr) } + resp.getTxns++ } // find key-value pairs that need updating, add pairs that need updating to set transaction @@ -204,6 +207,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin resp.written++ } } + resp.setTxns++ } responses <- resp @@ -288,20 +292,17 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) { // receive responses from .updateDatacenterKVPairs() // goroutines, log a summary when channel is closed go func() { - summary := updateDatacenterKVPairsResponse{} - var getTxns, setTxns int + sum := updateDatacenterKVPairsResponse{} for resp := range responses { - summary.existing += resp.existing - summary.failed += resp.failed - summary.skipped += resp.skipped - summary.written += resp.written - if summary.written > 0 { - setTxns++ - } - getTxns++ + sum.existing += resp.existing + sum.failed += resp.failed + sum.skipped += resp.skipped + sum.written += resp.written + sum.getTxns += resp.getTxns + sum.setTxns += resp.setTxns } log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; getTxns: %d, setTxns: %d, skipped: %d, existing: %d, written: %d, failed: %d", - datacenter, getTxns, setTxns, summary.skipped, summary.existing, summary.written, summary.failed, + datacenter, sum.getTxns, sum.setTxns, sum.skipped, sum.existing, sum.written, sum.failed, ) }() From d0ba3eedb69ce56599c11860c7879d8beb662bfc Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Feb 2021 01:55:12 +0100 Subject: [PATCH 11/12] Update kv.md --- docs/kv.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/kv.md b/docs/kv.md index 7931bd611..12af18546 100644 --- a/docs/kv.md +++ b/docs/kv.md @@ -88,6 +88,6 @@ Atomic [Consul Transaction](https://www.consul.io/api-docs/txn) support is enabl _Note: this feature requires Consul version 0.7 or greater._ -This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates are atomic. KVs are read from the server in a transaction and any necessary updates are performed in a second transaction. +This causes Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. KVs are read from the server in one transaction and any necessary updates are performed in a second transaction. -Orchestrator will group KV updates with the same key-prefix into a single [Consul Transaction](https://www.consul.io/api-docs/txn). Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `4` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. +Orchestrator groups KV updates by key-prefix into groups of less of 5 to 64 operations _(default 5)_. This is to ensure updates to a single "cluster" _(5 x KVs)_ happens atomically. Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `4` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. From db72592ea7cfdb892c039c469ada50a70dfabef4 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Feb 2021 01:57:53 +0100 Subject: [PATCH 12/12] Update kv.md --- docs/kv.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/kv.md b/docs/kv.md index 12af18546..d540c00f3 100644 --- a/docs/kv.md +++ b/docs/kv.md @@ -90,4 +90,4 @@ _Note: this feature requires Consul version 0.7 or greater._ This causes Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. KVs are read from the server in one transaction and any necessary updates are performed in a second transaction. -Orchestrator groups KV updates by key-prefix into groups of less of 5 to 64 operations _(default 5)_. This is to ensure updates to a single "cluster" _(5 x KVs)_ happens atomically. Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `4` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions. +Orchestrator groups KV updates by key-prefix into groups of of 5 to 64 operations _(default 5)_. This grouping ensures updates to a single cluster _(5 x KVs)_ happen atomically. Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `5` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions but more can fail at once.