Skip to content

Commit

Permalink
Rename algorithm from consistent to ketama
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 16, 2022
1 parent 0ccb5b0 commit 4f5c65f
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 15 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand Down Expand Up @@ -780,7 +781,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent))
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext).
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Flags:
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings.
the hashrings. Must be one of hashmod, ketama
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) .
Expand Down Expand Up @@ -601,6 +602,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket downsample --help"
Expand Down Expand Up @@ -675,6 +677,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket mark --help"
Expand Down
35 changes: 35 additions & 0 deletions hack/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
global:
scrape_interval: 15s
external_labels:
replica: "1"

remote_write:
- url: http://receiver-2:19291/api/v1/receive
headers:
THANOS-TENANT: prometheus
write_relabel_configs:
- source_labels:
- job
regex: prometheus
action: keep

- url: http://receiver-2:19291/api/v1/receive
headers:
THANOS-TENANT: receiver
write_relabel_configs:
- source_labels:
- job
regex: receivers
action: keep

scrape_configs:
- job_name: 'prometheus'
scrape_interval: 10s
static_configs:
- targets: [ 'localhost:9090' ]

- job_name: 'receivers'
scrape_interval: 10s
static_configs:
- targets: [ 'receiver-1:10902', 'receiver-2:10902', 'receiver-3:10902' ]

110 changes: 110 additions & 0 deletions hack/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
version: "2"

services:

# minio:
# image: minio/minio:latest
# command:
# - server
# - /data
# - --console-address=:9001
# ports:
# - "9000:9000"
# - "9001:9001"
# volumes:
# - bucket:/data

# compactor:
# image: thanos:latest
# depends_on:
# minio:
# condition: service_started
# command:
# - compact
# - --objstore.config-file=/var/receive/s3.yml
# - --wait
# - --delete-delay=10m
# - --consistency-delay=0s
# - --log.level=debug
# volumes:
# - ./s3.yml:/var/receive/s3.yml
# ports:
# - "10903:10902"

prometheus:
image: prom/prometheus:latest
command:
- --config.file=/var/prometheus/config.yml
- --web.enable-admin-api
volumes:
- ./config.yml:/var/prometheus/config.yml
ports:
- "9090:9090"

receiver-1:
image: thanos:latest
command:
- receive
- --label=receive_replica="shard-1"
- --grpc-address=0.0.0.0:10901
- --receive.local-endpoint=receiver-1:10901
- --tsdb.min-block-duration=2m
- --tsdb.max-block-duration=2m
- --tsdb.retention=5m
# - --receive.hashrings-algorithm=consistent
- --receive.hashrings-file=/var/receive/hashrings.json
- --receive.replication-factor=2
# - --objstore.config-file=/var/receive/s3.yml
volumes:
- ./hashrings.json:/var/receive/hashrings.json
- ./s3.yml:/var/receive/s3.yml

receiver-2:
image: thanos:latest
command:
- receive
- --label=receive_replica="shard-2"
- --grpc-address=0.0.0.0:10901
- --tsdb.min-block-duration=2m
- --tsdb.max-block-duration=2m
- --tsdb.retention=5m
# - --receive.hashrings-algorithm=consistent
- --receive.local-endpoint=receiver-2:10901
- --receive.hashrings-file=/var/receive/hashrings.json
- --receive.replication-factor=2
# - --objstore.config-file=/var/receive/s3.yml
volumes:
- ./hashrings.json:/var/receive/hashrings.json
- ./s3.yml:/var/receive/s3.yml

receiver-3:
image: thanos:latest
command:
- receive
- --label=receive_replica="shard-3"
- --grpc-address=0.0.0.0:10901
- --tsdb.min-block-duration=2m
- --tsdb.max-block-duration=2m
- --tsdb.retention=5m
# - --receive.hashrings-algorithm=consistent
- --receive.local-endpoint=receiver-3:10901
- --receive.hashrings-file=/var/receive/hashrings.json
- --receive.replication-factor=2
# - --objstore.config-file=/var/receive/s3.yml
volumes:
- ./hashrings.json:/var/receive/hashrings.json
- ./s3.yml:/var/receive/s3.yml

query:
image: thanos:latest
command:
- query
- --endpoint=receiver-1:10901
- --endpoint=receiver-2:10901
- --endpoint=receiver-3:10901
- --query.replica-label=receive_replica
ports:
- "10902:10902"

volumes:
bucket: {}
11 changes: 11 additions & 0 deletions hack/hashrings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"hashring": "remote-write",
"endpoints": ["receiver-1:10901", "receiver-3:10901"],
"tenants": ["prometheus"]
},
{
"hashring": "remote-write",
"endpoints": ["receiver-1:10901", "receiver-2:10901"],
"tenants": ["receiver"]
}]
7 changes: 7 additions & 0 deletions hack/s3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: S3
config:
bucket: "receiver"
endpoint: "minio:9000"
access_key: minioadmin
insecure: true
secret_key: minioadmin
22 changes: 11 additions & 11 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
type HashringAlgorithm string

const (
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmConsistent HashringAlgorithm = "consistent"
AlgorithmHashmod HashringAlgorithm = "hashmod"
AlgorithmKetama HashringAlgorithm = "ketama"

// SectionsPerNode is the number of sections in the ring assigned to each node
// when using consistent hashing. A higher number yields a better series distribution,
// in the ketama hashring. A higher number yields a better series distribution,
// but also comes with a higher memory cost.
SectionsPerNode = 1000
)
Expand Down Expand Up @@ -98,21 +98,21 @@ func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash }
func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sections) Sort() { sort.Sort(p) }

// consistentHashring represents a group of nodes handling write requests with consistent hashing.
type consistentHashring struct {
// ketamaHashring represents a group of nodes handling write requests with consistent hashing.
type ketamaHashring struct {
endpoints []string
sections sections
numEndpoints uint64
}

func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring {
func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := consistentHashring{
ring := ketamaHashring{
endpoints: endpoints,
sections: make(sections, 0, numSections),
numEndpoints: uint64(len(endpoints)),
Expand All @@ -135,11 +135,11 @@ func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentH
return &ring
}

func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) {
return c.GetN(tenant, ts, 0)
}

func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) {
if n >= c.numEndpoints {
return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1}
}
Expand Down Expand Up @@ -222,8 +222,8 @@ func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashrin
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
case AlgorithmConsistent:
return newConsistentHashring(endpoints, SectionsPerNode)
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode)
default:
return simpleHashring(endpoints)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestConsistentHashringGet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hashRing := newConsistentHashring(test.nodes, 10)
hashRing := newKetamaHashring(test.nodes, 10)
result, err := hashRing.GetN("tenant", test.ts, test.n)
require.NoError(t, err)
require.Equal(t, test.expectedNode, result)
Expand Down Expand Up @@ -322,7 +322,7 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string,
}

func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) {
hashRing := newConsistentHashring(nodes, SectionsPerNode)
hashRing := newKetamaHashring(nodes, SectionsPerNode)
assignments := make(map[string][]*prompb.TimeSeries)
for _, ts := range series {
result, err := hashRing.Get("tenant", ts)
Expand Down

0 comments on commit 4f5c65f

Please sign in to comment.