diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index eed1981feac..e6228c26fd5 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path" + "strings" "time" extflag "github.com/efficientgo/tools/extkingpin" @@ -780,7 +781,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) - cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent)) + hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") + cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext). + Default(string(receive.AlgorithmHashmod)). + EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)) rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) diff --git a/docs/components/receive.md b/docs/components/receive.md index 792616ad21d..58dce7eb395 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -128,7 +128,7 @@ Flags: the hashring configuration. --receive.hashrings-algorithm=hashmod The algorithm used when distributing series in - the hashrings. + the hashrings. Must be one of hashmod, ketama --receive.hashrings-file= Path to file that contains the hashring configuration. A watcher is initialized to diff --git a/docs/components/tools.md b/docs/components/tools.md index 80ceed95859..776164aeb00 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -101,6 +101,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) . @@ -601,6 +602,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket downsample --help" @@ -675,6 +677,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket mark --help" diff --git a/hack/config.yml b/hack/config.yml new file mode 100644 index 00000000000..5079f12c834 --- /dev/null +++ b/hack/config.yml @@ -0,0 +1,35 @@ +global: + scrape_interval: 15s + external_labels: + replica: "1" + +remote_write: +- url: http://receiver-2:19291/api/v1/receive + headers: + THANOS-TENANT: prometheus + write_relabel_configs: + - source_labels: + - job + regex: prometheus + action: keep + +- url: http://receiver-2:19291/api/v1/receive + headers: + THANOS-TENANT: receiver + write_relabel_configs: + - source_labels: + - job + regex: receivers + action: keep + +scrape_configs: +- job_name: 'prometheus' + scrape_interval: 10s + static_configs: + - targets: [ 'localhost:9090' ] + +- job_name: 'receivers' + scrape_interval: 10s + static_configs: + - targets: [ 'receiver-1:10902', 'receiver-2:10902', 'receiver-3:10902' ] + diff --git a/hack/docker-compose.yml b/hack/docker-compose.yml new file mode 100644 index 00000000000..f59cc8d0837 --- /dev/null +++ b/hack/docker-compose.yml @@ -0,0 +1,110 @@ +version: "2" + +services: + +# minio: +# image: minio/minio:latest +# command: +# - server +# - /data +# - --console-address=:9001 +# ports: +# - "9000:9000" +# - "9001:9001" +# volumes: +# - bucket:/data + +# compactor: +# image: thanos:latest +# depends_on: +# minio: +# condition: service_started +# command: +# - compact +# - --objstore.config-file=/var/receive/s3.yml +# - --wait +# - --delete-delay=10m +# - --consistency-delay=0s +# - --log.level=debug +# volumes: +# - ./s3.yml:/var/receive/s3.yml +# ports: +# - "10903:10902" + + prometheus: + image: prom/prometheus:latest + command: + - --config.file=/var/prometheus/config.yml + - --web.enable-admin-api + volumes: + - ./config.yml:/var/prometheus/config.yml + ports: + - "9090:9090" + + receiver-1: + image: thanos:latest + command: + - receive + - --label=receive_replica="shard-1" + - --grpc-address=0.0.0.0:10901 + - --receive.local-endpoint=receiver-1:10901 + - --tsdb.min-block-duration=2m + - --tsdb.max-block-duration=2m + - --tsdb.retention=5m +# - --receive.hashrings-algorithm=consistent + - --receive.hashrings-file=/var/receive/hashrings.json + - --receive.replication-factor=2 +# - --objstore.config-file=/var/receive/s3.yml + volumes: + - ./hashrings.json:/var/receive/hashrings.json + - ./s3.yml:/var/receive/s3.yml + + receiver-2: + image: thanos:latest + command: + - receive + - --label=receive_replica="shard-2" + - --grpc-address=0.0.0.0:10901 + - --tsdb.min-block-duration=2m + - --tsdb.max-block-duration=2m + - --tsdb.retention=5m +# - --receive.hashrings-algorithm=consistent + - --receive.local-endpoint=receiver-2:10901 + - --receive.hashrings-file=/var/receive/hashrings.json + - --receive.replication-factor=2 +# - --objstore.config-file=/var/receive/s3.yml + volumes: + - ./hashrings.json:/var/receive/hashrings.json + - ./s3.yml:/var/receive/s3.yml + + receiver-3: + image: thanos:latest + command: + - receive + - --label=receive_replica="shard-3" + - --grpc-address=0.0.0.0:10901 + - --tsdb.min-block-duration=2m + - --tsdb.max-block-duration=2m + - --tsdb.retention=5m +# - --receive.hashrings-algorithm=consistent + - --receive.local-endpoint=receiver-3:10901 + - --receive.hashrings-file=/var/receive/hashrings.json + - --receive.replication-factor=2 +# - --objstore.config-file=/var/receive/s3.yml + volumes: + - ./hashrings.json:/var/receive/hashrings.json + - ./s3.yml:/var/receive/s3.yml + + query: + image: thanos:latest + command: + - query + - --endpoint=receiver-1:10901 + - --endpoint=receiver-2:10901 + - --endpoint=receiver-3:10901 + - --query.replica-label=receive_replica + ports: + - "10902:10902" + +volumes: + bucket: {} \ No newline at end of file diff --git a/hack/hashrings.json b/hack/hashrings.json new file mode 100644 index 00000000000..5ba2937886c --- /dev/null +++ b/hack/hashrings.json @@ -0,0 +1,11 @@ +[ + { + "hashring": "remote-write", + "endpoints": ["receiver-1:10901", "receiver-3:10901"], + "tenants": ["prometheus"] + }, + { + "hashring": "remote-write", + "endpoints": ["receiver-1:10901", "receiver-2:10901"], + "tenants": ["receiver"] +}] \ No newline at end of file diff --git a/hack/s3.yml b/hack/s3.yml new file mode 100644 index 00000000000..88df90e4d87 --- /dev/null +++ b/hack/s3.yml @@ -0,0 +1,7 @@ +type: S3 +config: + bucket: "receiver" + endpoint: "minio:9000" + access_key: minioadmin + insecure: true + secret_key: minioadmin diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index f7eb719418e..cb4c3e78e2a 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -22,11 +22,11 @@ import ( type HashringAlgorithm string const ( - AlgorithmHashmod HashringAlgorithm = "hashmod" - AlgorithmConsistent HashringAlgorithm = "consistent" + AlgorithmHashmod HashringAlgorithm = "hashmod" + AlgorithmKetama HashringAlgorithm = "ketama" // SectionsPerNode is the number of sections in the ring assigned to each node - // when using consistent hashing. A higher number yields a better series distribution, + // in the ketama hashring. A higher number yields a better series distribution, // but also comes with a higher memory cost. SectionsPerNode = 1000 ) @@ -98,21 +98,21 @@ func (p sections) Less(i, j int) bool { return p[i].hash < p[j].hash } func (p sections) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p sections) Sort() { sort.Sort(p) } -// consistentHashring represents a group of nodes handling write requests with consistent hashing. -type consistentHashring struct { +// ketamaHashring represents a group of nodes handling write requests with consistent hashing. +type ketamaHashring struct { endpoints []string sections sections numEndpoints uint64 } -func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentHashring { +func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring { // Replication works by choosing subsequent nodes in the ring. // In order to improve consistency, we avoid relying on the ordering of the endpoints // and sort them lexicographically. sort.Strings(endpoints) numSections := len(endpoints) * sectionsPerNode - ring := consistentHashring{ + ring := ketamaHashring{ endpoints: endpoints, sections: make(sections, 0, numSections), numEndpoints: uint64(len(endpoints)), @@ -135,11 +135,11 @@ func newConsistentHashring(endpoints []string, sectionsPerNode int) *consistentH return &ring } -func (c consistentHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { +func (c ketamaHashring) Get(tenant string, ts *prompb.TimeSeries) (string, error) { return c.GetN(tenant, ts, 0) } -func (c consistentHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { +func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (string, error) { if n >= c.numEndpoints { return "", &insufficientNodesError{have: c.numEndpoints, want: n + 1} } @@ -222,8 +222,8 @@ func newMultiHashring(algorithm HashringAlgorithm, cfg []HashringConfig) Hashrin switch algorithm { case AlgorithmHashmod: return simpleHashring(endpoints) - case AlgorithmConsistent: - return newConsistentHashring(endpoints, SectionsPerNode) + case AlgorithmKetama: + return newKetamaHashring(endpoints, SectionsPerNode) default: return simpleHashring(endpoints) } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index f15ca5ce593..676aaf8c078 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -226,7 +226,7 @@ func TestConsistentHashringGet(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hashRing := newConsistentHashring(test.nodes, 10) + hashRing := newKetamaHashring(test.nodes, 10) result, err := hashRing.GetN("tenant", test.ts, test.n) require.NoError(t, err) require.Equal(t, test.expectedNode, result) @@ -322,7 +322,7 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, } func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { - hashRing := newConsistentHashring(nodes, SectionsPerNode) + hashRing := newKetamaHashring(nodes, SectionsPerNode) assignments := make(map[string][]*prompb.TimeSeries) for _, ts := range series { result, err := hashRing.Get("tenant", ts)