Skip to content

Commit

Permalink
Signing the other fields of the requests
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Protasio <alanprot@gmail.com>
  • Loading branch information
alanprot committed Jun 29, 2023
1 parent 0aeaf46 commit 9c48d99
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 40 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,7 @@ ha_tracker:
# EXPERIMENTAL: If enabled, sign the write request between distributors and
# ingesters.
# CLI flag: -distributor.sign_write_requests
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]
ring:
Expand Down
4 changes: 2 additions & 2 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ func (t *Cortex) setupGRPCHeaderForwarding() {
}

func (t *Cortex) setupRequestSigning() {
if t.Cfg.Distributor.EnableSignWriteRequests {
util_log.WarnExperimentalUse("Distributor EnableSignWriteRequests")
if t.Cfg.Distributor.SignWriteRequestsEnabled {
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
t.Cfg.IngesterClient.GRPCClientConfig.EnableSignRequests = t.Cfg.Distributor.EnableSignWriteRequests
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
96 changes: 72 additions & 24 deletions pkg/cortexpb/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cortexpb
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/cespare/xxhash/v2"
Expand All @@ -13,13 +14,56 @@ import (
const maxBufferSize = 1024
const signVersion = "v1"

var byteSlicePool = sync.Pool{
var signerPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 0, maxBufferSize)
return &b
return newSigner()
},
}

type signer struct {
h *xxhash.Digest
b []byte
optimized bool
}

func newSigner() *signer {
s := &signer{
h: xxhash.New(),
b: make([]byte, 0, maxBufferSize),
}
s.Reset()
return s
}

func (s *signer) Reset() {
s.h.Reset()
s.b = s.b[:0]
s.optimized = true
}

func (s *signer) WriteString(val string) {
switch {
case !s.optimized:
_, _ = s.h.WriteString(val)
case len(s.b)+len(val) > cap(s.b):
// If labels val does not fit in the []byte we fall back to not allocate the whole entry.
_, _ = s.h.Write(s.b)
_, _ = s.h.WriteString(val)
s.optimized = false
default:
// Use xxhash.Sum64(b) for fast path as it's faster.
s.b = append(s.b, val...)
}
}

func (s *signer) Sum64() uint64 {
if s.optimized {
return xxhash.Sum64(s.b)
}

return s.h.Sum64()
}

func (w *WriteRequest) VerifySign(ctx context.Context, signature string) (bool, error) {
s, err := w.Sign(ctx)
return s == signature, err
Expand All @@ -31,29 +75,33 @@ func (w *WriteRequest) Sign(ctx context.Context) (string, error) {
return "", err
}

// Use xxhash.Sum64(b) for fast path as it's faster.
bp := byteSlicePool.Get().(*[]byte)
b := (*bp)[:0]
defer byteSlicePool.Put(bp)
b = append(b, u...)

for _, s := range w.Timeseries {
for i, v := range s.Labels {
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
// If labels entry is {maxBufferSize}+ do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
for _, v := range s.Labels[i:] {
_, _ = h.WriteString(v.Name)
_, _ = h.WriteString(v.Value)
}
return fmt.Sprintf("%v/%v", signVersion, h.Sum64()), nil
}
s := signerPool.Get().(*signer)
defer func() {
s.Reset()
signerPool.Put(s)
}()
s.WriteString(u)

b = append(b, v.Name...)
b = append(b, v.Value...)
for _, md := range w.Metadata {
s.WriteString(strconv.Itoa(int(md.Type)))
s.WriteString(md.MetricFamilyName)
s.WriteString(md.Help)
s.WriteString(md.Unit)
}

for _, ts := range w.Timeseries {
for _, lbl := range ts.Labels {
s.WriteString(lbl.Name)
s.WriteString(lbl.Value)
}

for _, ex := range ts.Exemplars {
for _, lbl := range ex.Labels {
s.WriteString(lbl.Name)
s.WriteString(lbl.Value)
}
}
}

return fmt.Sprintf("%v/%v", signVersion, xxhash.Sum64(b)), nil
return fmt.Sprintf("%v/%v", signVersion, s.Sum64()), nil
}
100 changes: 100 additions & 0 deletions pkg/cortexpb/extensions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cortexpb

import (
"context"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
)

func TestWriteRequest_Sign(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

tests := map[string]struct {
w *WriteRequest
expectedSign string
}{
"small write with exemplar": {
w: createWriteRequest(10, true, "family1", "help1", "unit"),
expectedSign: "v1/9125893422459502203",
},
"small write with exemplar and changed md": {
w: createWriteRequest(10, true, "family2", "help1", "unit"),
expectedSign: "v1/18044786562323437562",
},
"small write without exemplar": {
w: createWriteRequest(10, false, "family1", "help1", "unit"),
expectedSign: "v1/7697478040597284323",
},
"big write with exemplar": {
w: createWriteRequest(10000, true, "family1", "help1", "unit"),
expectedSign: "v1/18402783317092766507",
},
"big write without exemplar": {
w: createWriteRequest(10000, false, "family1", "help1", "unit"),
expectedSign: "v1/14973071954515615892",
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// running multiple times in parallel to make sure no race
itNumber := 1000
wg := sync.WaitGroup{}
wg.Add(itNumber)
for i := 0; i < itNumber; i++ {
go func() {
defer wg.Done()
s, err := tc.w.Sign(ctx)
require.NoError(t, err)
// Make sure this sign doesn't change
require.Equal(t, tc.expectedSign, s)
}()
}
wg.Wait()
})
}
}

func createWriteRequest(numTs int, exemplar bool, family string, help string, unit string) *WriteRequest {
w := &WriteRequest{}
w.Metadata = []*MetricMetadata{
{
MetricFamilyName: family,
Help: help,
Unit: unit,
},
}

for i := 0; i < numTs; i++ {
w.Timeseries = append(w.Timeseries, PreallocTimeseries{
TimeSeries: &TimeSeries{
Labels: []LabelAdapter{
{
Name: fmt.Sprintf("Name-%v", i),
Value: fmt.Sprintf("Value-%v", i),
},
},
},
})

if exemplar {
w.Timeseries[i].Exemplars = []Exemplar{
{
Labels: []LabelAdapter{
{
Name: fmt.Sprintf("Ex-Name-%v", i),
Value: fmt.Sprintf("Ex-Value-%v", i),
},
},
},
}
}
}

return w
}
10 changes: 5 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ type Config struct {
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
EnableSignWriteRequests bool `yaml:"sign_write_requests"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -164,7 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.BoolVar(&cfg.EnableSignWriteRequests, "distributor.sign_write_requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")

Expand Down
8 changes: 4 additions & 4 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Config struct {
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffConfig backoff.Config `yaml:"backoff_config"`

TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
EnableSignRequests bool `yaml:"-"`
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
SignWriteRequestsEnabled bool `yaml:"-"`
}

// RegisterFlags registers flags.
Expand Down Expand Up @@ -92,7 +92,7 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
}

if cfg.EnableSignRequests {
if cfg.SignWriteRequestsEnabled {
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/util/grpcclient/signing_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (

const (
ErrDifferentSignaturePresent = errors.Error("different signature already present")
ErrMultipleSignaturePresent = errors.Error("multiples signature present")
ErrSignatureNotPresent = errors.Error("signature not present")
ErrSignatureMismatch = errors.Error("signature mismatch")
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func UnarySigningClientInterceptor(ctx context.Context, method string, req, repl
return ErrDifferentSignaturePresent
}
} else {
return ErrDifferentSignaturePresent
return ErrMultipleSignaturePresent
}
} else {
md = md.Copy()
Expand Down
13 changes: 11 additions & 2 deletions pkg/util/grpcclient/signing_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestUnarySigningHandler(t *testing.T) {
return nil, nil
})

require.ErrorIs(t, ErrSignatureMismatch, err)
require.ErrorIs(t, err, ErrSignatureMismatch)

// Return error when signature is not present
ctx = user.InjectOrgID(context.Background(), "user-")
Expand All @@ -54,5 +54,14 @@ func TestUnarySigningHandler(t *testing.T) {
return nil, nil
})

require.ErrorIs(t, ErrSignatureNotPresent, err)
require.ErrorIs(t, err, ErrSignatureNotPresent)

// Return error when multiples signatures are present
md[reqSignHeaderName] = append(md[reqSignHeaderName], "sig1", "sig2")
ctx = metadata.NewOutgoingContext(ctx, md)
err = UnarySigningClientInterceptor(ctx, "", w, w, nil, func(c context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
ctx = c
return nil
})
require.ErrorIs(t, err, ErrMultipleSignaturePresent)
}

0 comments on commit 9c48d99

Please sign in to comment.