Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

ConsulTxnStore: batch KV updates by key-prefix to avoid ops limit #1311

Merged
merged 16 commits into from
Apr 4, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ This functionality is required in case one has more Consul datacenters than just
#### Consul Transaction support

Atomic [Consul Transaction](https://www.consul.io/api-docs/txn) support is enabled by configuring:

```json
"ConsulKVStoreProvider": "consul-txn",
```

_Note: this feature requires Consul version 0.7 or greater._

This will cause Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. The use of transactions reduces the number of requests to the Consul server while ensuring updates of several KVs are atomic.
This causes Orchestrator to use a [Consul Transaction](https://www.consul.io/api-docs/txn) when distributing one or more Consul KVs. KVs are read from the server in one transaction and any necessary updates are performed in a second transaction.

Orchestrator groups KV updates by key-prefix into groups of of 5 to 64 operations _(default 5)_. This grouping ensures updates to a single cluster _(5 x KVs)_ happen atomically. Increasing the `ConsulMaxKVsPerTransaction` configuration setting from `5` _(default)_ to a max of `64` _(Consul Transaction API limit)_ allows more operations to be grouped into fewer transactions but more can fail at once.
10 changes: 10 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
StaleInstanceCoordinatesExpireSeconds = 60
CheckAutoPseudoGTIDGrantsIntervalSeconds = 60
SelectTrueQuery = "select 1"
ConsulKVsPerCluster = 5 // KVs: "/", "/hostname", "/ipv4", "/ipv6" and "/port"
ConsulMaxTransactionOps = 64
)

var deprecatedConfigurationVariables = []string{
Expand Down Expand Up @@ -266,6 +268,7 @@ type Configuration struct {
ConsulAclToken string // ACL token used to write to Consul KV
ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs
ConsulKVStoreProvider string // Consul KV store provider (consul or consul-txn), default: "consul"
ConsulMaxKVsPerTransaction int // Maximum number of KV operations to perform in a single Consul Transaction. Requires the "consul-txn" ConsulKVStoreProvider
ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c
KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master"
WebMessage string // If provided, will be shown on all web pages below the title bar
Expand Down Expand Up @@ -434,6 +437,7 @@ func newConfiguration() *Configuration {
ConsulAclToken: "",
ConsulCrossDataCenterDistribution: false,
ConsulKVStoreProvider: "consul",
ConsulMaxKVsPerTransaction: ConsulKVsPerCluster,
ZkAddress: "",
KVClusterMasterPrefix: "mysql/master",
WebMessage: "",
Expand Down Expand Up @@ -595,6 +599,12 @@ func (this *Configuration) postReadAdjustments() error {
this.BufferInstanceWrites = false
}
}
if this.ConsulMaxKVsPerTransaction < ConsulKVsPerCluster {
this.ConsulMaxKVsPerTransaction = ConsulKVsPerCluster
} else if this.ConsulMaxKVsPerTransaction > ConsulMaxTransactionOps {
this.ConsulMaxKVsPerTransaction = ConsulMaxTransactionOps
}

return nil
}

Expand Down
13 changes: 12 additions & 1 deletion go/kv/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest
if testOp.ResponseCode == 0 {
testOp.ResponseCode = http.StatusOK
}
if strings.HasPrefix(r.URL.String(), "/v1/kv") && testOp.Response != nil {
if r.URL.String() == "/v1/catalog/datacenters" {
w.WriteHeader(testOp.ResponseCode)
json.NewEncoder(w).Encode(testOp.Response)
return
} else if strings.HasPrefix(r.URL.String(), "/v1/kv") && testOp.Response != nil {
w.WriteHeader(testOp.ResponseCode)
json.NewEncoder(w).Encode(testOp.Response)
return
Expand All @@ -54,6 +58,13 @@ func buildConsulTestServer(t *testing.T, testOps []consulTestServerOp) *httptest
t.Fatalf("Unable to unmarshal json request body: %v", err)
continue
}
// https://github.com/openark/orchestrator/issues/1302
// https://github.com/hashicorp/consul/blob/87f6617eecd23a64add1e79eb3cd8dc3da9e649e/agent/txn_endpoint.go#L121-L129
if len(txnOps) > 64 {
w.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprintf(w, "Transaction contains too many operations (%d > 64)", len(txnOps))
return
}
testOpRequest := sortTxnKVOps(testOp.Request.(consulapi.TxnOps))
if testOp.Response != nil && reflect.DeepEqual(testOpRequest, sortTxnKVOps(txnOps)) {
w.WriteHeader(testOp.ResponseCode)
Expand Down
110 changes: 96 additions & 14 deletions go/kv/consul_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"

Expand All @@ -31,6 +32,43 @@ import (
"github.com/openark/golib/log"
)

// groupKVPairsByKeyPrefix groups Consul Transaction operations by KV key prefix. This
// ensures KVs for a single cluster are grouped into a single transaction
func groupKVPairsByKeyPrefix(kvPairs consulapi.KVPairs) (groups []consulapi.KVPairs) {
maxOpsPerTxn := config.Config.ConsulMaxKVsPerTransaction
clusterMasterPrefix := config.Config.KVClusterMasterPrefix + "/"
groupsMap := map[string]consulapi.KVPairs{}
for _, pair := range kvPairs {
prefix := pair.Key
if strings.HasPrefix(prefix, clusterMasterPrefix) {
prefix = strings.Replace(prefix, clusterMasterPrefix, "", 1)
if path := strings.Split(prefix, "/"); len(path) > 0 {
prefix = path[0]
}
}
if _, found := groupsMap[prefix]; found {
groupsMap[prefix] = append(groupsMap[prefix], pair)
} else {
groupsMap[prefix] = consulapi.KVPairs{pair}
}
}

pairsBuf := consulapi.KVPairs{}
for _, group := range groupsMap {
groupLen := len(group)
pairsBufLen := len(pairsBuf)
if (pairsBufLen + groupLen) > maxOpsPerTxn {
groups = append(groups, pairsBuf)
pairsBuf = consulapi.KVPairs{}
}
pairsBuf = append(pairsBuf, group...)
}
if len(pairsBuf) > 0 {
groups = append(groups, pairsBuf)
}
return groups
}

// A Consul store based on config's `ConsulAddress`, `ConsulScheme`, and `ConsulKVPrefix`
type consulTxnStore struct {
client *consulapi.Client
Expand Down Expand Up @@ -71,7 +109,7 @@ func NewConsulTxnStore() KVStore {
func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *consulapi.QueryOptions) (err error) {
ok, resp, _, err := this.client.Txn().Txn(txnOps, queryOptions)
if err != nil {
log.Errorf("consulTxnStore.doWriteTxn(): failed %v", err)
log.Errorf("consulTxnStore.doWriteTxn(): %v", err)
return err
} else if !ok {
for _, terr := range resp.Errors {
Expand All @@ -83,13 +121,28 @@ func (this *consulTxnStore) doWriteTxn(txnOps consulapi.TxnOps, queryOptions *co
return err
}

func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair) (skipped, existing, written, failed int, err error) {
// updateDatacenterKVPairsResponse contains a response from .updateDatacenterKVPairs()
type updateDatacenterKVPairsResponse struct {
err error
existing int
failed int
skipped int
written int
getTxns int
setTxns int
}

// updateDatacenterKVPairs handles updating a list of Consul KV pairs for a single datacenter. Current values are
// read from the server in a single transaction and any necessary updates are made in a second transaction. If a
// KVPair from a group is missing on the server all KVPairs will be updated.
func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc string, kvPairs []*consulapi.KVPair, responses chan updateDatacenterKVPairsResponse) {
defer wg.Done()

queryOptions := &consulapi.QueryOptions{Datacenter: dc}
kcCacheKeys := make([]string, 0)

// get the current key-values in a single transaction
resp := updateDatacenterKVPairsResponse{}
var terr error
var getTxnOps consulapi.TxnOps
var getTxnResp *consulapi.TxnResponse
Expand All @@ -99,7 +152,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin
kcCacheKey := getConsulKVCacheKey(dc, kvPair.Key)
kcCacheKeys = append(kcCacheKeys, kcCacheKey)
if value, found := this.kvCache.Get(kcCacheKey); found && val == value {
skipped++
resp.skipped++
continue
}
getTxnOps = append(getTxnOps, &consulapi.TxnOp{
Expand All @@ -114,8 +167,8 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin
_, getTxnResp, _, terr = this.client.Txn().Txn(getTxnOps, queryOptions)
if terr != nil {
log.Errorf("consulTxnStore.DistributePairs(): %v", terr)
err = terr
}
resp.getTxns++
}

// find key-value pairs that need updating, add pairs that need updating to set transaction
Expand All @@ -127,7 +180,7 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin
if pair.Key == result.KV.Key && string(pair.Value) == string(result.KV.Value) {
this.kvCache.SetDefault(getConsulKVCacheKey(dc, pair.Key), string(pair.Value))
kvExistsAndEqual = true
existing++
resp.existing++
break
}
}
Expand All @@ -145,18 +198,19 @@ func (this *consulTxnStore) updateDatacenterKVPairs(wg *sync.WaitGroup, dc strin

// update key-value pairs in a single Consul Transaction
if len(setTxnOps) > 0 {
if err = this.doWriteTxn(setTxnOps, queryOptions); err != nil {
log.Errorf("consulTxnStore.DistributePairs(): failed %v", kcCacheKeys)
failed = len(setTxnOps)
if resp.err = this.doWriteTxn(setTxnOps, queryOptions); resp.err != nil {
log.Errorf("consulTxnStore.DistributePairs(): failed %v, error %v", kcCacheKeys, resp.err)
resp.failed = len(setTxnOps)
} else {
for _, txnOp := range setTxnOps {
this.kvCache.SetDefault(getConsulKVCacheKey(dc, txnOp.KV.Key), string(txnOp.KV.Value))
written++
resp.written++
}
}
resp.setTxns++
}

return skipped, existing, written, failed, err
responses <- resp
}

// GetKeyValue returns the value of a Consul KV if it exists
Expand Down Expand Up @@ -226,14 +280,42 @@ func (this *consulTxnStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
for _, kvPair := range kvPairs {
consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)})
}

var wg sync.WaitGroup
for _, datacenter := range datacenters {
var skipped, existing, written, failed int
datacenter := datacenter

responses := make(chan updateDatacenterKVPairsResponse)
wg.Add(1)
skipped, existing, written, failed, err = this.updateDatacenterKVPairs(&wg, datacenter, consulPairs)
log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; skipped: %d, existing: %d, written: %d, failed: %d", datacenter, skipped, existing, written, failed)
go func() {
defer wg.Done()

// receive responses from .updateDatacenterKVPairs()
// goroutines, log a summary when channel is closed
go func() {
sum := updateDatacenterKVPairsResponse{}
for resp := range responses {
sum.existing += resp.existing
sum.failed += resp.failed
sum.skipped += resp.skipped
sum.written += resp.written
sum.getTxns += resp.getTxns
sum.setTxns += resp.setTxns
}
log.Debugf("consulTxnStore.DistributePairs(): datacenter: %s; getTxns: %d, setTxns: %d, skipped: %d, existing: %d, written: %d, failed: %d",
datacenter, sum.getTxns, sum.setTxns, sum.skipped, sum.existing, sum.written, sum.failed,
)
}()

// launch an .updateDatacenterKVPairs() goroutine
// for each grouping of consul KV pairs and wait
var dcWg sync.WaitGroup
for _, kvPairGroup := range groupKVPairsByKeyPrefix(consulPairs) {
dcWg.Add(1)
go this.updateDatacenterKVPairs(&dcWg, datacenter, kvPairGroup, responses)
}
dcWg.Wait()
close(responses)
}()
}
wg.Wait()
return err
Expand Down
Loading