diff --git a/pkg/client/chunk/expected.go b/pkg/client/chunk/expected.go new file mode 100644 index 0000000..8f35471 --- /dev/null +++ b/pkg/client/chunk/expected.go @@ -0,0 +1,152 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package chunk + +import ( + "time" + + "golang.org/x/exp/slices" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + protobuf "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// Expected chunks `proto.CheckinExpected` message into multiple chunks to be sent across the protocol. +func Expected(msg *proto.CheckinExpected, maxSize int, opts ...Option) ([]*proto.CheckinExpected, error) { + var options options + options.timestamp = time.Now() // timestamp used for chunk set + for _, opt := range opts { + opt(&options) + } + + s := protobuf.Size(msg) + if s <= maxSize || len(msg.Units) <= 1 { + // fits so no chunking needed or has 0 or 1 units which cannot be chunked + return []*proto.CheckinExpected{msg}, nil + } + + msgs := make([]*proto.CheckinExpected, 0, 3) // start at 3 minimum + + // a single unit is the smallest a chunk can be + // pre-calculate the size and ensure that a single unit is less than the maxSize + bySize := make([]expectedBySize, len(msg.Units)) + for i, u := range msg.Units { + bySize[i].unit = u + bySize[i].size = protobuf.Size(u) + // >= is used because even if it's at the maxSize, with overhead + // it will still be too big even if it's at the exact maxSize + if bySize[i].size >= maxSize { + return nil, status.Errorf( + codes.ResourceExhausted, + "unable to chunk proto.CheckinExpected the unit %s is larger than max (%d vs. %d)", + u.Id, bySize[i].size, maxSize) + } + } + + // sort the smallest units first, this ensures that the first chunk that includes extra + // fields uses the smallest unit to ensure that it all fits + slices.SortStableFunc(bySize, func(a, b expectedBySize) int { + return a.size - b.size + }) + + // first message all fields are set; except units is made smaller + m := shallowCopyCheckinExpected(msg) + m.Units = make([]*proto.UnitExpected, 0, 1) + m.Units = append(m.Units, bySize[0].unit) + m.UnitsTimestamp = timestamppb.New(options.timestamp) + s = protobuf.Size(m) + if s >= maxSize { + // not possible even for the first chunk to fit + return nil, status.Errorf( + codes.ResourceExhausted, + "unable to chunk proto.CheckinExpected the first chunk with unit %s is larger than max (%d vs. %d)", + m.Units[0].Id, s, maxSize) + } + + // keep adding units until it doesn't fit + for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ { + us := bySize[nextUnit] + if s+us.size < maxSize { + // unit fits add it + m.Units = append(m.Units, us.unit) + s += us.size + } else { + // doesn't fit, create a new chunk + msgs = append(msgs, m) + m = &proto.CheckinExpected{} + m.UnitsTimestamp = timestamppb.New(options.timestamp) + m.Units = make([]*proto.UnitExpected, 0, 1) + m.Units = append(m.Units, us.unit) + s = protobuf.Size(m) + } + } + msgs = append(msgs, m) + + // all chunks created, create the empty chunk + m = &proto.CheckinExpected{} + m.UnitsTimestamp = timestamppb.New(options.timestamp) + m.Units = make([]*proto.UnitExpected, 0) + msgs = append(msgs, m) + return msgs, nil +} + +// CheckinExpectedReceiver provides a Recv interface to receive proto.CheckinExpected messages. +type CheckinExpectedReceiver interface { + Recv() (*proto.CheckinExpected, error) +} + +// RecvExpected handles the receiving of chunked proto.CheckinObjected. +func RecvExpected(recv CheckinExpectedReceiver) (*proto.CheckinExpected, error) { + var first *proto.CheckinExpected + for { + msg, err := recv.Recv() + if err != nil { + return nil, err + } + if msg.UnitsTimestamp == nil { + // all included in a single message + return msg, nil + } + if first == nil { + // first message in batch + first = msg + } else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() { + // only used if the new timestamp is newer + if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) { + // not newer so we ignore the message + continue + } + // different batch; restart + first = msg + } + if len(msg.Units) == 0 { + // ending match message + return first, nil + } + if first != msg { + first.Units = append(first.Units, msg.Units...) + } + } +} + +func shallowCopyCheckinExpected(msg *proto.CheckinExpected) *proto.CheckinExpected { + return &proto.CheckinExpected{ + AgentInfo: msg.AgentInfo, + Features: msg.Features, + FeaturesIdx: msg.FeaturesIdx, + Component: msg.Component, + ComponentIdx: msg.ComponentIdx, + Units: msg.Units, + UnitsTimestamp: msg.UnitsTimestamp, + } +} + +type expectedBySize struct { + unit *proto.UnitExpected + size int +} diff --git a/pkg/client/chunk/expected_test.go b/pkg/client/chunk/expected_test.go new file mode 100644 index 0000000..29573c5 --- /dev/null +++ b/pkg/client/chunk/expected_test.go @@ -0,0 +1,348 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package chunk + +import ( + "golang.org/x/exp/slices" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestExpected(t *testing.T) { + timestamp := time.Now() + + scenarios := []struct { + Name string + MaxSize int + Original *proto.CheckinExpected + Expected []*proto.CheckinExpected + Error string + }{ + { + Name: "unit too large to fit", + MaxSize: 30, + Error: "unable to chunk proto.CheckinExpected the unit id-one is larger than max", + Original: &proto.CheckinExpected{ + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + }, + }, + }, + { + Name: "first chunk too large", + MaxSize: 50, + Error: "unable to chunk proto.CheckinExpected the first chunk with", + Original: &proto.CheckinExpected{ + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "testing1", + Type: "testing", + Name: "testing1", + }, + }, + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "testing2", + Type: "testing", + Name: "testing2", + }, + }, + }, + }, + }, + { + Name: "chunk checkin message", + MaxSize: 70, + Original: &proto.CheckinExpected{ + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + { + Id: "id-three", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "little-larger", + }, + }, + }, + }, + Expected: []*proto.CheckinExpected{ + { + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + { + Id: "id-three", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "little-larger", + }, + }, + }, + UnitsTimestamp: timestamppb.New(timestamp), + }, + { + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + }, + UnitsTimestamp: timestamppb.New(timestamp), + }, + { + Units: []*proto.UnitExpected{}, + UnitsTimestamp: timestamppb.New(timestamp), + }, + }, + }, + { + Name: "fits in single message", + MaxSize: 200, + Original: &proto.CheckinExpected{ + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + }, + }, + Expected: []*proto.CheckinExpected{ + { + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "id-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "id-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + }, + }, + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.Name, func(t *testing.T) { + observed, err := Expected(scenario.Original, scenario.MaxSize, WithTimestamp(timestamp)) + if scenario.Error != "" { + require.Error(t, err) + assert.True(t, strings.Contains(err.Error(), scenario.Error)) + } else { + require.NoError(t, err) + diff := cmp.Diff(scenario.Expected, observed, protocmp.Transform()) + require.Empty(t, diff) + + // re-assemble and it should now match the original + assembled, err := RecvExpected(&fakeCheckinExpectedReceiver{msgs: observed}) + require.NoError(t, err) + + // to compare we need to remove the units timestamp and ensure the units are in the same order + // completely acceptable that they get re-ordered in the chunking process + assembled.UnitsTimestamp = nil + slices.SortStableFunc(assembled.Units, sortExpectedUnits) + slices.SortStableFunc(scenario.Original.Units, sortExpectedUnits) + + diff = cmp.Diff(scenario.Original, assembled, protocmp.Transform()) + assert.Empty(t, diff) + } + }) + } +} + +func TestRecvExpected_Timestamp_Restart(t *testing.T) { + firstTimestamp := time.Now() + first := &proto.CheckinExpected{ + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "first-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "first-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + }, + } + firstMsgs, err := Expected(first, 50, WithTimestamp(firstTimestamp)) + require.NoError(t, err) + + secondTimestamp := time.Now() + second := &proto.CheckinExpected{ + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitExpected{ + { + Id: "second-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Id: "testing", + Type: "testing", + Name: "testing", + }, + }, + { + Id: "second-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + }, + }, + } + secondsMsgs, err := Expected(second, 50, WithTimestamp(secondTimestamp)) + require.NoError(t, err) + + // ensure chunking results in exact length as the order in the test relies on it + require.Len(t, firstMsgs, 3) + require.Len(t, secondsMsgs, 3) + + // re-order the messages + reorderedMsgs := make([]*proto.CheckinExpected, 6) + reorderedMsgs[0] = firstMsgs[0] + reorderedMsgs[1] = secondsMsgs[0] // becomes new set + reorderedMsgs[2] = firstMsgs[1] // ignored + reorderedMsgs[3] = firstMsgs[2] // ignored + reorderedMsgs[4] = secondsMsgs[1] + reorderedMsgs[5] = secondsMsgs[2] + + // re-assemble and it should now match the second + assembled, err := RecvExpected(&fakeCheckinExpectedReceiver{msgs: reorderedMsgs}) + require.NoError(t, err) + + // to compare we need to remove the units timestamp and ensure the units are in the same order + // completely acceptable that they get re-ordered in the chunking process + assembled.UnitsTimestamp = nil + slices.SortStableFunc(assembled.Units, sortExpectedUnits) + slices.SortStableFunc(second.Units, sortExpectedUnits) + + diff := cmp.Diff(second, assembled, protocmp.Transform()) + assert.Empty(t, diff) +} + +func sortExpectedUnits(a *proto.UnitExpected, b *proto.UnitExpected) int { + return strings.Compare(a.Id, b.Id) +} + +type fakeCheckinExpectedReceiver struct { + msgs []*proto.CheckinExpected +} + +func (f *fakeCheckinExpectedReceiver) Recv() (*proto.CheckinExpected, error) { + var msg *proto.CheckinExpected + msg, f.msgs = f.msgs[0], f.msgs[1:] + return msg, nil +} diff --git a/pkg/client/chunk/observed.go b/pkg/client/chunk/observed.go new file mode 100644 index 0000000..4da63e6 --- /dev/null +++ b/pkg/client/chunk/observed.go @@ -0,0 +1,154 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package chunk + +import ( + "time" + + "golang.org/x/exp/slices" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + protobuf "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// Observed chunks `proto.CheckinObserved` message into multiple chunks to be sent across the protocol. +func Observed(msg *proto.CheckinObserved, maxSize int, opts ...Option) ([]*proto.CheckinObserved, error) { + var options options + options.timestamp = time.Now() // timestamp used for chunk set + for _, opt := range opts { + opt(&options) + } + + s := protobuf.Size(msg) + if s <= maxSize || len(msg.Units) <= 1 { + // fits so no chunking needed or has 0 or 1 units which cannot be chunked + return []*proto.CheckinObserved{msg}, nil + } + + msgs := make([]*proto.CheckinObserved, 0, 3) // start at 3 minimum + + // a single unit is the smallest a chunk can be + // pre-calculate the size and ensure that a single unit is less than the maxSize + bySize := make([]observedBySize, len(msg.Units)) + for i, u := range msg.Units { + bySize[i].unit = u + bySize[i].size = protobuf.Size(u) + // >= is used because even if it's at the maxSize, with overhead + // it will still be too big even if it's at the exact maxSize + if bySize[i].size >= maxSize { + return nil, status.Errorf( + codes.ResourceExhausted, + "unable to chunk proto.CheckinObserved the unit %s is larger than max (%d vs. %d)", + u.Id, bySize[i].size, maxSize) + } + } + + // sort the smallest units first, this ensures that the first chunk that includes extra + // fields uses the smallest unit to ensure that it all fits + slices.SortStableFunc(bySize, func(a, b observedBySize) int { + return a.size - b.size + }) + + // first message all fields are set; except units is made smaller + m := shallowCopyCheckinObserved(msg) + m.Units = make([]*proto.UnitObserved, 0, 1) + m.Units = append(m.Units, bySize[0].unit) + m.UnitsTimestamp = timestamppb.New(options.timestamp) + s = protobuf.Size(m) + if s >= maxSize { + // not possible even for the first chunk to fit + return nil, status.Errorf( + codes.ResourceExhausted, + "unable to chunk proto.CheckinObserved the first chunk with unit %s is larger than max (%d vs. %d)", + m.Units[0].Id, s, maxSize) + } + + // keep adding units until it doesn't fit + for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ { + us := bySize[nextUnit] + if s+us.size < maxSize { + // unit fits add it + m.Units = append(m.Units, us.unit) + s += us.size + } else { + // doesn't fit, create a new chunk + msgs = append(msgs, m) + m = &proto.CheckinObserved{} + m.Token = msg.Token + m.UnitsTimestamp = timestamppb.New(options.timestamp) + m.Units = make([]*proto.UnitObserved, 0, 1) + m.Units = append(m.Units, us.unit) + s = protobuf.Size(m) + } + } + msgs = append(msgs, m) + + // all chunks created, create the empty chunk + m = &proto.CheckinObserved{} + m.Token = msg.Token + m.UnitsTimestamp = timestamppb.New(options.timestamp) + m.Units = make([]*proto.UnitObserved, 0) + msgs = append(msgs, m) + return msgs, nil +} + +// CheckinObservedReceiver provides a Recv interface to receive proto.CheckinObserved messages. +type CheckinObservedReceiver interface { + Recv() (*proto.CheckinObserved, error) +} + +// RecvObserved handles the receiving of chunked proto.CheckinObserved. +func RecvObserved(recv CheckinObservedReceiver) (*proto.CheckinObserved, error) { + var first *proto.CheckinObserved + for { + msg, err := recv.Recv() + if err != nil { + return nil, err + } + if msg.UnitsTimestamp == nil { + // all included in a single message + return msg, nil + } + if first == nil { + // first message in batch + first = msg + } else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() { + // only used if the new timestamp is newer + if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) { + // not newer so we ignore the message + continue + } + // different batch; restart + first = msg + } + if len(msg.Units) == 0 { + // ending match message + return first, nil + } + if first != msg { + first.Units = append(first.Units, msg.Units...) + } + } +} + +func shallowCopyCheckinObserved(msg *proto.CheckinObserved) *proto.CheckinObserved { + return &proto.CheckinObserved{ + Token: msg.Token, + Units: msg.Units, + VersionInfo: msg.VersionInfo, + FeaturesIdx: msg.FeaturesIdx, + ComponentIdx: msg.ComponentIdx, + UnitsTimestamp: msg.UnitsTimestamp, + Supports: msg.Supports, + } +} + +type observedBySize struct { + unit *proto.UnitObserved + size int +} diff --git a/pkg/utils/utils_test.go b/pkg/client/chunk/observed_test.go similarity index 55% rename from pkg/utils/utils_test.go rename to pkg/client/chunk/observed_test.go index 56f8a14..7b75f31 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/client/chunk/observed_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package utils +package chunk import ( "golang.org/x/exp/slices" @@ -20,7 +20,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) -func TestChunkedObserved(t *testing.T) { +func TestObserved(t *testing.T) { timestamp := time.Now() scenarios := []struct { @@ -88,8 +88,8 @@ func TestChunkedObserved(t *testing.T) { }, }, { - Name: "chunk", - MaxSize: 100, + Name: "chunk checkin message", + MaxSize: 120, Original: &proto.CheckinObserved{ Token: "token", FeaturesIdx: 2, @@ -112,6 +112,16 @@ func TestChunkedObserved(t *testing.T) { State: proto.State_HEALTHY, Message: "Healthy", }, + { + Id: "id-three", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", + Payload: mustStruct(map[string]interface{}{ + "large": "larger than id-two", + }), + }, }, }, Expected: []*proto.CheckinObserved{ @@ -127,6 +137,16 @@ func TestChunkedObserved(t *testing.T) { State: proto.State_HEALTHY, Message: "Healthy", }, + { + Id: "id-three", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", + Payload: mustStruct(map[string]interface{}{ + "large": "larger than id-two", + }), + }, }, UnitsTimestamp: timestamppb.New(timestamp), }, @@ -211,7 +231,7 @@ func TestChunkedObserved(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.Name, func(t *testing.T) { - observed, err := ChunkedObserved(scenario.Original, scenario.MaxSize, WithTimestamp(timestamp)) + observed, err := Observed(scenario.Original, scenario.MaxSize, WithTimestamp(timestamp)) if scenario.Error != "" { require.Error(t, err) assert.True(t, strings.Contains(err.Error(), scenario.Error)) @@ -221,7 +241,7 @@ func TestChunkedObserved(t *testing.T) { require.Empty(t, diff) // re-assemble and it should now match the original - assembled, err := RecvChunkedObserved(&fakeCheckinObservedReceiver{msgs: observed}) + assembled, err := RecvObserved(&fakeCheckinObservedReceiver{msgs: observed}) require.NoError(t, err) // to compare we need to remove the units timestamp and ensure the units are in the same order @@ -237,218 +257,88 @@ func TestChunkedObserved(t *testing.T) { } } -func TestChunkedExpected(t *testing.T) { - timestamp := time.Now() - - scenarios := []struct { - Name string - MaxSize int - Original *proto.CheckinExpected - Expected []*proto.CheckinExpected - Error string - }{ - { - Name: "unit too large to fit", - MaxSize: 30, - Error: "unable to chunk proto.CheckinExpected the unit id-one is larger than max", - Original: &proto.CheckinExpected{ - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_INFO, - Config: &proto.UnitExpectedConfig{ - Id: "testing", - Type: "testing", - Name: "testing", - }, - }, - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - }, - }, +func TestRecvObserved_Timestamp_Restart(t *testing.T) { + firstTimestamp := time.Now() + first := &proto.CheckinObserved{ + Token: "token", + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitObserved{ + { + Id: "first-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", + Payload: mustStruct(map[string]interface{}{ + "large": "this structure places this unit over the maximum size", + }), }, - }, - { - Name: "first chunk too large", - MaxSize: 50, - Error: "unable to chunk proto.CheckinExpected the first chunk with", - Original: &proto.CheckinExpected{ - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_INFO, - Config: &proto.UnitExpectedConfig{ - Id: "testing1", - Type: "testing", - Name: "testing1", - }, - }, - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_INFO, - Config: &proto.UnitExpectedConfig{ - Id: "testing2", - Type: "testing", - Name: "testing2", - }, - }, - }, + { + Id: "first-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", }, }, - { - Name: "chunk", - MaxSize: 50, - Original: &proto.CheckinExpected{ - FeaturesIdx: 2, - ComponentIdx: 3, - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - Id: "testing", - Type: "testing", - Name: "testing", - }, - }, - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - }, - }, - }, - Expected: []*proto.CheckinExpected{ - { - FeaturesIdx: 2, - ComponentIdx: 3, - Units: []*proto.UnitExpected{ - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - }, - }, - UnitsTimestamp: timestamppb.New(timestamp), - }, - { - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - Id: "testing", - Type: "testing", - Name: "testing", - }, - }, - }, - UnitsTimestamp: timestamppb.New(timestamp), - }, - { - Units: []*proto.UnitExpected{}, - UnitsTimestamp: timestamppb.New(timestamp), - }, + } + firstMsgs, err := Observed(first, 100, WithTimestamp(firstTimestamp)) + require.NoError(t, err) + + secondTimestamp := time.Now() + second := &proto.CheckinObserved{ + Token: "token", + FeaturesIdx: 2, + ComponentIdx: 3, + Units: []*proto.UnitObserved{ + { + Id: "second-one", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", + Payload: mustStruct(map[string]interface{}{ + "large": "this structure places this unit over the maximum size", + }), }, - }, - { - Name: "fits in single message", - MaxSize: 200, - Original: &proto.CheckinExpected{ - FeaturesIdx: 2, - ComponentIdx: 3, - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - Id: "testing", - Type: "testing", - Name: "testing", - }, - }, - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - }, - }, - }, - Expected: []*proto.CheckinExpected{ - { - FeaturesIdx: 2, - ComponentIdx: 3, - Units: []*proto.UnitExpected{ - { - Id: "id-one", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - Id: "testing", - Type: "testing", - Name: "testing", - }, - }, - { - Id: "id-two", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - }, - }, - }, + { + Id: "second-two", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Message: "Healthy", }, }, } + secondsMsgs, err := Observed(second, 100, WithTimestamp(secondTimestamp)) + require.NoError(t, err) - for _, scenario := range scenarios { - t.Run(scenario.Name, func(t *testing.T) { - observed, err := ChunkedExpected(scenario.Original, scenario.MaxSize, WithTimestamp(timestamp)) - if scenario.Error != "" { - require.Error(t, err) - assert.True(t, strings.Contains(err.Error(), scenario.Error)) - } else { - require.NoError(t, err) - diff := cmp.Diff(scenario.Expected, observed, protocmp.Transform()) - require.Empty(t, diff) + // ensure chunking results in exact length as the order in the test relies on it + require.Len(t, firstMsgs, 3) + require.Len(t, secondsMsgs, 3) - // re-assemble and it should now match the original - assembled, err := RecvChunkedExpected(&fakeCheckinExpectedReceiver{msgs: observed}) - require.NoError(t, err) + // re-order the messages + reorderedMsgs := make([]*proto.CheckinObserved, 6) + reorderedMsgs[0] = firstMsgs[0] + reorderedMsgs[1] = secondsMsgs[0] // becomes new set + reorderedMsgs[2] = firstMsgs[1] // ignored + reorderedMsgs[3] = firstMsgs[2] // ignored + reorderedMsgs[4] = secondsMsgs[1] + reorderedMsgs[5] = secondsMsgs[2] - // to compare we need to remove the units timestamp and ensure the units are in the same order - // completely acceptable that they get re-ordered in the chunking process - assembled.UnitsTimestamp = nil - slices.SortStableFunc(assembled.Units, sortExpectedUnits) - slices.SortStableFunc(scenario.Original.Units, sortExpectedUnits) + // re-assemble and it should now match the second + assembled, err := RecvObserved(&fakeCheckinObservedReceiver{msgs: reorderedMsgs}) + require.NoError(t, err) - diff = cmp.Diff(scenario.Original, assembled, protocmp.Transform()) - assert.Empty(t, diff) - } - }) - } + // to compare we need to remove the units timestamp and ensure the units are in the same order + // completely acceptable that they get re-ordered in the chunking process + assembled.UnitsTimestamp = nil + slices.SortStableFunc(assembled.Units, sortObservedUnits) + slices.SortStableFunc(second.Units, sortObservedUnits) + + diff := cmp.Diff(second, assembled, protocmp.Transform()) + assert.Empty(t, diff) } func mustStruct(v map[string]interface{}) *structpb.Struct { @@ -463,10 +353,6 @@ func sortObservedUnits(a *proto.UnitObserved, b *proto.UnitObserved) int { return strings.Compare(a.Id, b.Id) } -func sortExpectedUnits(a *proto.UnitExpected, b *proto.UnitExpected) int { - return strings.Compare(a.Id, b.Id) -} - type fakeCheckinObservedReceiver struct { msgs []*proto.CheckinObserved } @@ -476,13 +362,3 @@ func (f *fakeCheckinObservedReceiver) Recv() (*proto.CheckinObserved, error) { msg, f.msgs = f.msgs[0], f.msgs[1:] return msg, nil } - -type fakeCheckinExpectedReceiver struct { - msgs []*proto.CheckinExpected -} - -func (f *fakeCheckinExpectedReceiver) Recv() (*proto.CheckinExpected, error) { - var msg *proto.CheckinExpected - msg, f.msgs = f.msgs[0], f.msgs[1:] - return msg, nil -} diff --git a/pkg/client/chunk/option.go b/pkg/client/chunk/option.go new file mode 100644 index 0000000..288ad89 --- /dev/null +++ b/pkg/client/chunk/option.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package chunk + +import "time" + +type options struct { + timestamp time.Time +} + +// Option is an option for adjusting chunking. +type Option func(opts *options) + +// WithTimestamp adjusts the timestamp used for the chunking. +// +// Note: Mainly used for testing to ensure a specific timestamp is used. +func WithTimestamp(t time.Time) Option { + return func(opts *options) { + opts.timestamp = t + } +} diff --git a/pkg/client/client_v2.go b/pkg/client/client_v2.go index 06ae489..74e6b8b 100644 --- a/pkg/client/client_v2.go +++ b/pkg/client/client_v2.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/elastic/elastic-agent-client/v7/pkg/client/chunk" "io" "runtime" "runtime/pprof" @@ -453,8 +454,8 @@ func (c *clientV2) checkinRoundTrip() { go func() { defer wg.Done() defer close(readerDone) - expected, err := utils.RecvChunkedExpected(checkinClient) - for ; err == nil; expected, err = utils.RecvChunkedExpected(checkinClient) { + expected, err := chunk.RecvExpected(checkinClient) + for ; err == nil; expected, err = chunk.RecvExpected(checkinClient) { c.applyExpected(expected) } if !errors.Is(err, io.EOF) { @@ -1084,12 +1085,12 @@ func inExpected(unit *Unit, expected []*proto.UnitExpected) bool { return false } -func sendObservedChunked(client proto.ElasticAgent_CheckinV2Client, msg *proto.CheckinObserved, chunk bool, maxSize int) error { - if !chunk { +func sendObservedChunked(client proto.ElasticAgent_CheckinV2Client, msg *proto.CheckinObserved, chunkingAllowed bool, maxSize int) error { + if !chunkingAllowed { // chunking is disabled return client.Send(msg) } - msgs, err := utils.ChunkedObserved(msg, maxSize) + msgs, err := chunk.Observed(msg, maxSize) if err != nil { return err } diff --git a/pkg/client/mock/stub_serverV2.go b/pkg/client/mock/stub_serverV2.go index f2b9da2..48ac00d 100644 --- a/pkg/client/mock/stub_serverV2.go +++ b/pkg/client/mock/stub_serverV2.go @@ -8,13 +8,14 @@ import ( "context" "encoding/json" "fmt" - "github.com/elastic/elastic-agent-client/v7/pkg/utils" "net" "sync" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/gofrs/uuid" "google.golang.org/grpc" + + "github.com/elastic/elastic-agent-client/v7/pkg/client/chunk" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) // StubServerCheckinV2 is the checkin function for the V2 controller @@ -105,7 +106,7 @@ func (s *StubServerV2) Checkin(server proto.ElasticAgent_CheckinServer) error { // CheckinV2 is the V2 checkin implementation for the mock server func (s *StubServerV2) CheckinV2(server proto.ElasticAgent_CheckinV2Server) error { for { - checkin, err := utils.RecvChunkedObserved(server) + checkin, err := chunk.RecvObserved(server) if err != nil { return err } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 0d4d821..d3ee652 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -6,15 +6,6 @@ package utils import ( "encoding/json" - "time" - - "golang.org/x/exp/slices" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - protobuf "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) // JSONMustMarshal marshals the input to JSON []byte and panics if it fails. @@ -25,291 +16,3 @@ func JSONMustMarshal(input interface{}) []byte { } return res } - -type chunkedOptions struct { - timestamp time.Time -} - -// ChunkOption is an option for adjusting chunking. -type ChunkOption func(opts *chunkedOptions) - -// WithTimestamp adjusts the timestamp used for the chunking. -// -// Note: Mainly used for testing to ensure a specific timestamp is used. -func WithTimestamp(t time.Time) ChunkOption { - return func(opts *chunkedOptions) { - opts.timestamp = t - } -} - -// ChunkedObserved chunks `proto.CheckinObserved` message into multiple chunks to be sent across the protocol. -func ChunkedObserved(msg *proto.CheckinObserved, maxSize int, opts ...ChunkOption) ([]*proto.CheckinObserved, error) { - var options chunkedOptions - options.timestamp = time.Now() // timestamp used for chunk set - for _, opt := range opts { - opt(&options) - } - - s := protobuf.Size(msg) - if s <= maxSize || len(msg.Units) <= 1 { - // fits so no chunking needed or has 0 or 1 units which cannot be chunked - return []*proto.CheckinObserved{msg}, nil - } - - msgs := make([]*proto.CheckinObserved, 0, 3) // start at 3 minimum - - // a single unit is the smallest a chunk can be - // pre-calculate the size and ensure that a single unit is less than the maxSize - bySize := make([]observedBySize, len(msg.Units)) - for i, u := range msg.Units { - bySize[i].unit = u - bySize[i].size = protobuf.Size(u) - // >= is used because even if it's at the maxSize, with overhead - // it will still be too big even if it's at the exact maxSize - if bySize[i].size >= maxSize { - return nil, status.Errorf( - codes.ResourceExhausted, - "unable to chunk proto.CheckinObserved the unit %s is larger than max (%d vs. %d)", - u.Id, bySize[i].size, maxSize) - } - } - - // sort the smallest units first, this ensures that the first chunk that includes extra - // fields uses the smallest unit to ensure that it all fits - slices.SortStableFunc(bySize, func(a, b observedBySize) int { - return a.size - b.size - }) - - // first message all fields are set; except units is made smaller - m := shallowCopyCheckinObserved(msg) - m.Units = make([]*proto.UnitObserved, 0, 1) - m.Units = append(m.Units, bySize[0].unit) - m.UnitsTimestamp = timestamppb.New(options.timestamp) - s = protobuf.Size(m) - if s >= maxSize { - // not possible even for the first chunk to fit - return nil, status.Errorf( - codes.ResourceExhausted, - "unable to chunk proto.CheckinObserved the first chunk with unit %s is larger than max (%d vs. %d)", - m.Units[0].Id, s, maxSize) - } - - // keep adding units until it doesn't fit - for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ { - us := bySize[nextUnit] - if s+us.size < maxSize { - // unit fits add it - m.Units = append(m.Units, us.unit) - s += us.size - } else { - // doesn't fit, create a new chunk - msgs = append(msgs, m) - m = &proto.CheckinObserved{} - m.Token = msg.Token - m.UnitsTimestamp = timestamppb.New(options.timestamp) - m.Units = make([]*proto.UnitObserved, 0, 1) - m.Units = append(m.Units, us.unit) - s = protobuf.Size(m) - } - } - - // all chunks created, create the empty chunk - msgs = append(msgs, m) - m = &proto.CheckinObserved{} - m.Token = msg.Token - m.UnitsTimestamp = timestamppb.New(options.timestamp) - m.Units = make([]*proto.UnitObserved, 0) - msgs = append(msgs, m) - return msgs, nil -} - -// CheckinObservedReceiver provides a Recv interface to receive proto.CheckinObserved messages. -type CheckinObservedReceiver interface { - Recv() (*proto.CheckinObserved, error) -} - -// RecvChunkedObserved handles the receiving of chunked proto.CheckinObserved. -func RecvChunkedObserved(recv CheckinObservedReceiver) (*proto.CheckinObserved, error) { - var first *proto.CheckinObserved - for { - msg, err := recv.Recv() - if err != nil { - return nil, err - } - if msg.UnitsTimestamp == nil { - // all included in a single message - return msg, nil - } - if first == nil { - // first message in batch - first = msg - } else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() { - // only used if the new timestamp is newer - if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) { - // not newer so we ignore the message - continue - } - // different batch; restart - first = msg - } - if len(msg.Units) == 0 { - // ending match message - return first, nil - } - if first != msg { - first.Units = append(first.Units, msg.Units...) - } - } -} - -func shallowCopyCheckinObserved(msg *proto.CheckinObserved) *proto.CheckinObserved { - return &proto.CheckinObserved{ - Token: msg.Token, - Units: msg.Units, - VersionInfo: msg.VersionInfo, - FeaturesIdx: msg.FeaturesIdx, - ComponentIdx: msg.ComponentIdx, - UnitsTimestamp: msg.UnitsTimestamp, - Supports: msg.Supports, - } -} - -type observedBySize struct { - unit *proto.UnitObserved - size int -} - -// ChunkedExpected chunks `proto.CheckinExpected` message into multiple chunks to be sent across the protocol. -func ChunkedExpected(msg *proto.CheckinExpected, maxSize int, opts ...ChunkOption) ([]*proto.CheckinExpected, error) { - var options chunkedOptions - options.timestamp = time.Now() // timestamp used for chunk set - for _, opt := range opts { - opt(&options) - } - - s := protobuf.Size(msg) - if s <= maxSize || len(msg.Units) <= 1 { - // fits so no chunking needed or has 0 or 1 units which cannot be chunked - return []*proto.CheckinExpected{msg}, nil - } - - msgs := make([]*proto.CheckinExpected, 0, 3) // start at 3 minimum - - // a single unit is the smallest a chunk can be - // pre-calculate the size and ensure that a single unit is less than the maxSize - bySize := make([]expectedBySize, len(msg.Units)) - for i, u := range msg.Units { - bySize[i].unit = u - bySize[i].size = protobuf.Size(u) - // >= is used because even if it's at the maxSize, with overhead - // it will still be too big even if it's at the exact maxSize - if bySize[i].size >= maxSize { - return nil, status.Errorf( - codes.ResourceExhausted, - "unable to chunk proto.CheckinExpected the unit %s is larger than max (%d vs. %d)", - u.Id, bySize[i].size, maxSize) - } - } - - // sort the smallest units first, this ensures that the first chunk that includes extra - // fields uses the smallest unit to ensure that it all fits - slices.SortStableFunc(bySize, func(a, b expectedBySize) int { - return a.size - b.size - }) - - // first message all fields are set; except units is made smaller - m := shallowCopyCheckinExpected(msg) - m.Units = make([]*proto.UnitExpected, 0, 1) - m.Units = append(m.Units, bySize[0].unit) - m.UnitsTimestamp = timestamppb.New(options.timestamp) - s = protobuf.Size(m) - if s >= maxSize { - // not possible even for the first chunk to fit - return nil, status.Errorf( - codes.ResourceExhausted, - "unable to chunk proto.CheckinExpected the first chunk with unit %s is larger than max (%d vs. %d)", - m.Units[0].Id, s, maxSize) - } - - // keep adding units until it doesn't fit - for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ { - us := bySize[nextUnit] - if s+us.size < maxSize { - // unit fits add it - m.Units = append(m.Units, us.unit) - s += us.size - } else { - // doesn't fit, create a new chunk - msgs = append(msgs, m) - m = &proto.CheckinExpected{} - m.UnitsTimestamp = timestamppb.New(options.timestamp) - m.Units = make([]*proto.UnitExpected, 0, 1) - m.Units = append(m.Units, us.unit) - s = protobuf.Size(m) - } - } - - // all chunks created, create the empty chunk - msgs = append(msgs, m) - m = &proto.CheckinExpected{} - m.UnitsTimestamp = timestamppb.New(options.timestamp) - m.Units = make([]*proto.UnitExpected, 0) - msgs = append(msgs, m) - return msgs, nil -} - -// CheckinExpectedReceiver provides a Recv interface to receive proto.CheckinExpected messages. -type CheckinExpectedReceiver interface { - Recv() (*proto.CheckinExpected, error) -} - -// RecvChunkedExpected handles the receiving of chunked proto.CheckinObjected. -func RecvChunkedExpected(recv CheckinExpectedReceiver) (*proto.CheckinExpected, error) { - var first *proto.CheckinExpected - for { - msg, err := recv.Recv() - if err != nil { - return nil, err - } - if msg.UnitsTimestamp == nil { - // all included in a single message - return msg, nil - } - if first == nil { - // first message in batch - first = msg - } else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() { - // only used if the new timestamp is newer - if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) { - // not newer so we ignore the message - continue - } - // different batch; restart - first = msg - } - if len(msg.Units) == 0 { - // ending match message - return first, nil - } - if first != msg { - first.Units = append(first.Units, msg.Units...) - } - } -} - -func shallowCopyCheckinExpected(msg *proto.CheckinExpected) *proto.CheckinExpected { - return &proto.CheckinExpected{ - AgentInfo: msg.AgentInfo, - Features: msg.Features, - FeaturesIdx: msg.FeaturesIdx, - Component: msg.Component, - ComponentIdx: msg.ComponentIdx, - Units: msg.Units, - UnitsTimestamp: msg.UnitsTimestamp, - } -} - -type expectedBySize struct { - unit *proto.UnitExpected - size int -}