diff --git a/Makefile b/Makefile index 1d283a0..d663468 100644 --- a/Makefile +++ b/Makefile @@ -28,19 +28,23 @@ fmt: $(GOBIN)/goimports # Can be used to change the number of tests run, defaults to 1 to prevent caching TESTCOUNT = 1 -# Can be used to change the verobosity of tests: make test TESTVERBOSE=-v -TESTVERBOSE = -# Can be used to generate coverage reports for a specific package -COVERPKG = $(PACKAGE) +# Can be used to add flags to the go test invocation: make test TESTFLAGS=-v +TESTFLAGS = # Can be used to change which package gets tested, defaults to all packages. TESTPKG = ./... -test: $(COVERAGE) -$(COVERAGE): - @mkdir -p $(@D) - go test -race -coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/... -count=$(TESTCOUNT) $(TESTVERBOSE) $(TESTPKG) +test: + go test -race -count=$(TESTCOUNT) $(TESTFLAGS) $(TESTPKG) + +# Can be used to generate coverage reports for a specific package +COVERPKG = $(PACKAGE) + +.PHONY: $(COVERAGE) coverage: $(COVERAGE) +$(COVERAGE): + @mkdir -p $(@D) + $(MAKE) test TESTFLAGS='-coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/...' go tool cover -html=$(COVERAGE) profile_cache: diff --git a/ads/ads.go b/ads/ads.go index 97a9511..9edf16b 100644 --- a/ads/ads.go +++ b/ads/ads.go @@ -5,6 +5,9 @@ protocol (ADS), such as convenient type aliases, constants and core definitions. package ads import ( + "encoding/binary" + "encoding/hex" + "errors" "log/slog" "sync" "time" @@ -255,3 +258,30 @@ func LookupStreamTypeByRPCMethod(rpcMethod string) (StreamType, bool) { return UnknownStreamType, false } } + +var ( + errInvalidNonceEncoding = errors.New("nonce isn't in hex encoding") + errInvalidNonceLength = errors.New("decoded nonce did not have expected length") +) + +// ParseRemainingChunksFromNonce checks whether the Diderot server implementation chunked the delta +// responses because not all resources could fit in the same response without going over the default +// max gRPC message size of 4MB. A nonce from Diderot always starts with the 64-bit nanosecond +// timestamp of when the response was generated on the server. Then the number of remaining chunks as +// a 32-bit integer. The sequence of integers is binary encoded with [binary.BigEndian] then hex +// encoded. If the given nonce does not match the expected format, this function simply returns 0 +// along with an error describing why it does not match. If the error isn't nil, it means the nonce +// was not created by a Diderot server implementation, and therefore does not contain the expected +// information. +func ParseRemainingChunksFromNonce(nonce string) (remainingChunks int, err error) { + decoded, err := hex.DecodeString(nonce) + if err != nil { + return 0, errInvalidNonceEncoding + } + + if len(decoded) != 12 { + return 0, errInvalidNonceLength + } + + return int(binary.BigEndian.Uint32(decoded[8:12])), nil +} diff --git a/ads/ads_example_test.go b/ads/ads_example_test.go new file mode 100644 index 0000000..010d790 --- /dev/null +++ b/ads/ads_example_test.go @@ -0,0 +1,27 @@ +package ads_test + +import ( + "log" + + "github.com/linkedin/diderot/ads" +) + +func ExampleParseRemainingChunksFromNonce() { + // Acquire a delta ADS client + var client ads.DeltaClient + + var responses []*ads.DeltaDiscoveryResponse + for { + res, err := client.Recv() + if err != nil { + log.Panicf("Error receiving delta response: %v", err) + } + responses = append(responses, res) + + if remaining, _ := ads.ParseRemainingChunksFromNonce(res.Nonce); remaining == 0 { + break + } + } + + log.Printf("All responses received: %+v", responses) +} diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 1b4aa34..a9eb5e2 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -324,7 +324,7 @@ func newSotWHandler( res := &ads.SotWDiscoveryResponse{ TypeUrl: typeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } for _, e := range entries { res.Resources = append(res.Resources, e.Resource.Resource) @@ -349,7 +349,7 @@ func newSotWHandler( res := &ads.SotWDiscoveryResponse{ TypeUrl: typeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } for _, r := range allResources { res.Resources = append(res.Resources, r.Resource.Resource) diff --git a/internal/server/handlers_delta.go b/internal/server/handlers_delta.go index 21d708e..3e25613 100644 --- a/internal/server/handlers_delta.go +++ b/internal/server/handlers_delta.go @@ -174,6 +174,11 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De "typeURL", ds.typeURL, "updates", len(ds.queuedUpdates), ) + for i, c := range chunks { + c.Nonce = utils.NewNonce(len(chunks) - i - 1) + } + } else { + chunks[0].Nonce = utils.NewNonce(0) } return chunks @@ -182,7 +187,6 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De func (ds *deltaSender) newChunk() *ads.DeltaDiscoveryResponse { return &ads.DeltaDiscoveryResponse{ TypeUrl: ds.typeURL, - Nonce: utils.NewNonce(), } } diff --git a/internal/server/handlers_delta_test.go b/internal/server/handlers_delta_test.go index c19e482..f6c2fdf 100644 --- a/internal/server/handlers_delta_test.go +++ b/internal/server/handlers_delta_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "slices" "strings" "sync/atomic" "testing" @@ -102,7 +103,7 @@ func TestInitialChunkSize(t *testing.T) { typeURL := utils.GetTypeURL[*wrapperspb.StringValue]() require.Equal(t, proto.Size(&ads.DeltaDiscoveryResponse{ TypeUrl: typeURL, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), }), initialChunkSize(typeURL)) } @@ -121,11 +122,23 @@ func TestDeltaHandlerChunking(t *testing.T) { minChunkSize: initialChunkSize(typeURL), } - sentResponses := ds.chunk(map[string]entry{ + getSentResponses := func(resources map[string]entry, expectedChunks int) []*ads.DeltaDiscoveryResponse { + responses := ds.chunk(resources) + require.Len(t, responses, expectedChunks) + expectedRemainingChunks := 0 + for _, res := range slices.Backward(responses) { + remaining, err := ads.ParseRemainingChunksFromNonce(res.Nonce) + require.NoError(t, err) + require.Equal(t, expectedRemainingChunks, remaining) + expectedRemainingChunks++ + } + return responses + } + + sentResponses := getSentResponses(map[string]entry{ foo.Name: {Resource: foo}, bar.Name: {Resource: bar}, - }) - + }, 2) require.Equal(t, len(sentResponses[0].Resources), 1) require.Equal(t, len(sentResponses[1].Resources), 1) response0 := sentResponses[0].Resources[0] @@ -142,10 +155,10 @@ func TestDeltaHandlerChunking(t *testing.T) { // Delete resources whose names are the same size as the resources to trip the chunker with the same conditions name1 := strings.Repeat("1", resourceSize) name2 := strings.Repeat("2", resourceSize) - sentResponses = ds.chunk(map[string]entry{ + sentResponses = getSentResponses(map[string]entry{ name1: {Resource: nil}, name2: {Resource: nil}, - }) + }, 2) require.Equal(t, len(sentResponses[0].RemovedResources), 1) require.Equal(t, len(sentResponses[1].RemovedResources), 1) require.ElementsMatch(t, @@ -156,12 +169,12 @@ func TestDeltaHandlerChunking(t *testing.T) { small1, small2, small3 := "a", "b", "c" wayTooBig := strings.Repeat("3", 10*resourceSize) - sentResponses = ds.chunk(map[string]entry{ + sentResponses = getSentResponses(map[string]entry{ small1: {Resource: nil}, small2: {Resource: nil}, small3: {Resource: nil}, wayTooBig: {Resource: nil}, - }) + }, 1) require.Equal(t, len(sentResponses[0].RemovedResources), 3) require.ElementsMatch(t, []string{small1, small2, small3}, sentResponses[0].RemovedResources) require.Equal(t, int64(1), statsHandler.DeltaResourcesOverMaxSize.Load()) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 42d3029..f9107ec 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -2,8 +2,9 @@ package utils import ( "encoding/base64" + "encoding/binary" + "encoding/hex" "slices" - "strconv" "strings" "time" @@ -12,24 +13,33 @@ import ( "google.golang.org/protobuf/proto" ) -// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX -// time in nanos in hex encoding, so the nonce will be 16 characters if the current UNIX nano time is -// greater than 2^60-1. This is because it takes 16 hex characters to encode 64 bits, but only 15 to -// encode 60 bits (the output of strconv.FormatInt is not padded by 0s). 2^60-1 nanos from epoch time -// (January 1st 1970) is 2006-07-14 23:58:24.606, which as of this writing is over 17 years ago. This -// is why it's guaranteed that NonceLength will be 16 characters (before that date, encoding the -// nanos only required 15 characters). For the curious, the UNIX nano timestamp will overflow int64 -// some time in 2262, making this constant valid for the next few centuries. -const NonceLength = 16 - -// NewNonce creates a new unique nonce based on the current UNIX time in nanos. It always returns a -// string of length NonceLength. -func NewNonce() string { - // The second parameter to FormatInt is the base, e.g. 2 will return binary, 8 will return octal - // encoding, etc. 16 means FormatInt returns the integer in hex encoding, e.g. 30 => "1e" or - // 1704239351400 => "18ccc94c668". - const hexBase = 16 - return strconv.FormatInt(time.Now().UnixNano(), hexBase) +const ( + // NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX + // time in nanos and the remaining chunks, encoded as 64-bit and 32-bit integers respectively, then + // hex encoded. This means a nonce will always be 8 + 4 bytes, multiplied by 2 by the hex encoding. + NonceLength = (8 + 4) * 2 +) + +// NewNonce creates a new unique nonce based on the current UNIX time in nanos, always returning a +// string of [NonceLength]. +func NewNonce(remainingChunks int) string { + return newNonce(time.Now(), remainingChunks) +} + +func newNonce(now time.Time, remainingChunks int) string { + // preallocating these buffers with constants (instead of doing `out = make([]byte, len(buf) * 2)`) + // means the compiler will allocate them on the stack, instead of heap. This significantly reduces + // the amount of garbage created by this function, as the only heap allocation will be the final + // string(out), rather than all of these buffers. + buf := make([]byte, NonceLength/2) + out := make([]byte, NonceLength) + + binary.BigEndian.PutUint64(buf[:8], uint64(now.UnixNano())) + binary.BigEndian.PutUint32(buf[8:], uint32(remainingChunks)) + + hex.Encode(out, buf) + + return string(out) } func GetTypeURL[T proto.Message]() string { diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index d959668..af57501 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -1,7 +1,9 @@ package utils import ( + "fmt" "testing" + "time" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/linkedin/diderot/ads" @@ -46,6 +48,25 @@ func TestProtoMap(t *testing.T) { }) } -func TestNonceLength(t *testing.T) { - require.Len(t, NewNonce(), NonceLength) +func TestNewNonce(t *testing.T) { + now := time.Now() + t.Run("remainingChunks", func(t *testing.T) { + for _, expected := range []int{0, 42} { + nonce := newNonce(now, expected) + require.Equal(t, fmt.Sprintf("%x%08x", now.UnixNano(), expected), nonce) + actualRemainingChunks, err := ads.ParseRemainingChunksFromNonce(nonce) + require.NoError(t, err) + require.Equal(t, expected, actualRemainingChunks) + } + }) + t.Run("badNonce", func(t *testing.T) { + remaining, err := ads.ParseRemainingChunksFromNonce("foo") + require.Error(t, err) + require.Zero(t, remaining) + }) + t.Run("oldNonce", func(t *testing.T) { + remaining, err := ads.ParseRemainingChunksFromNonce(fmt.Sprintf("%x", now.UnixNano())) + require.Error(t, err) + require.Zero(t, remaining) + }) } diff --git a/server.go b/server.go index b8980a7..92dec8d 100644 --- a/server.go +++ b/server.go @@ -210,7 +210,7 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error) return &ads.SotWDiscoveryResponse{ Resources: nil, TypeUrl: req.TypeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } }, setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) { @@ -251,7 +251,7 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error) return &ads.DeltaDiscoveryResponse{ TypeUrl: req.GetTypeUrl(), RemovedResources: req.GetResourceNamesSubscribe(), - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), ControlPlane: s.controlPlane, } },