Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support send/recv chunk for checkin protocol #89

Merged
merged 5 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ service ElasticAgent {
//
// Use of the source field allows the input configurations to evolve without needing to modify
// the control protocol itself. In some cases commonly used or important fields are extracted as
// a dedicated message type, but these definitions do not comletely define the contents of the
// a dedicated message type, but these definitions do not completely define the contents of the
// source field which is free to contain additional fields.
rpc CheckinV2(stream CheckinObserved) returns (stream CheckinExpected);

Expand All @@ -62,6 +62,12 @@ service ElasticAgent {
rpc Checkin(stream StateObserved) returns (stream StateExpected);
}

// Features that the connection between the client and the server supports.
enum ConnectionSupports {
// Checkin chunking support.
CheckinChunking = 0;
}

// State codes for the current state.
enum State {
// STARTING is an optional observed state indicating the unit is doing work to start before
Expand Down Expand Up @@ -282,6 +288,13 @@ message CheckinExpected {
// Index or revision of the expected component configuration. When the expected configuration
// changes the agent will increment this number and the Component field will be populated.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;
}

// Observed status for a unit.
Expand Down Expand Up @@ -338,6 +351,17 @@ message CheckinObserved {

// Index or revision of the currently component configuration.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;

// Supports provides information to the agent about extra features this client supports. Should always be included
// on first checkin, and not again unless upon reconnect.
repeated ConnectionSupports supports = 8;
}

// A action request is streamed from the Elastic Agent to the application so an action can be performed
Expand Down Expand Up @@ -449,4 +473,8 @@ message ConnInfo {
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
// Supports provides information to the client about extra features this server supports.
repeated ConnectionSupports supports = 8;
// Maximum message size that the client can use.
uint32 max_message_size = 9;
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
module github.com/elastic/elastic-agent-client/v7

go 1.18
go 1.20

require (
github.com/gofrs/uuid v4.2.0+incompatible
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc
github.com/hashicorp/go-multierror v1.1.1
github.com/magefile/mage v1.13.0
github.com/stretchr/testify v1.7.0
golang.org/x/exp v0.0.0-20231127185646-65229373498e
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.30.0
)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc h1:AGDHt781oIcL4EFk7cPnvBUYTwU8BEU6GDTO3ZMn1sE=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -29,6 +30,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestArtifact(t *testing.T) {
var errs []error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := NewV2(fmt.Sprintf(":%d", srv.Port), token, VersionInfo{}, grpc.WithTransportCredentials(insecure.NewCredentials()))
client := NewV2(fmt.Sprintf(":%d", srv.Port), token, VersionInfo{}, WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))
storeErrors(ctx, client, &errs, &errsMu)

var unitsMu sync.Mutex
Expand Down
152 changes: 152 additions & 0 deletions pkg/client/chunk/expected.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package chunk

import (
"time"

"golang.org/x/exp/slices"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
protobuf "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)

// Expected chunks `proto.CheckinExpected` message into multiple chunks to be sent across the protocol.
func Expected(msg *proto.CheckinExpected, maxSize int, opts ...Option) ([]*proto.CheckinExpected, error) {
var options options
options.timestamp = time.Now() // timestamp used for chunk set
for _, opt := range opts {
opt(&options)
}

s := protobuf.Size(msg)
if s <= maxSize || len(msg.Units) <= 1 {
// fits so no chunking needed or has 0 or 1 units which cannot be chunked
return []*proto.CheckinExpected{msg}, nil
}

msgs := make([]*proto.CheckinExpected, 0, 3) // start at 3 minimum

// a single unit is the smallest a chunk can be
// pre-calculate the size and ensure that a single unit is less than the maxSize
bySize := make([]expectedBySize, len(msg.Units))
for i, u := range msg.Units {
bySize[i].unit = u
bySize[i].size = protobuf.Size(u)
// >= is used because even if it's at the maxSize, with overhead
// it will still be too big even if it's at the exact maxSize
if bySize[i].size >= maxSize {
return nil, status.Errorf(
codes.ResourceExhausted,
"unable to chunk proto.CheckinExpected the unit %s is larger than max (%d vs. %d)",
u.Id, bySize[i].size, maxSize)
}
}

// sort the smallest units first, this ensures that the first chunk that includes extra
// fields uses the smallest unit to ensure that it all fits
slices.SortStableFunc(bySize, func(a, b expectedBySize) int {
return a.size - b.size
})

// first message all fields are set; except units is made smaller
m := shallowCopyCheckinExpected(msg)
m.Units = make([]*proto.UnitExpected, 0, 1)
m.Units = append(m.Units, bySize[0].unit)
m.UnitsTimestamp = timestamppb.New(options.timestamp)
s = protobuf.Size(m)
if s >= maxSize {
// not possible even for the first chunk to fit
return nil, status.Errorf(
codes.ResourceExhausted,
"unable to chunk proto.CheckinExpected the first chunk with unit %s is larger than max (%d vs. %d)",
m.Units[0].Id, s, maxSize)
}

// keep adding units until it doesn't fit
for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ {
us := bySize[nextUnit]
if s+us.size < maxSize {
// unit fits add it
m.Units = append(m.Units, us.unit)
s += us.size
} else {
// doesn't fit, create a new chunk
msgs = append(msgs, m)
m = &proto.CheckinExpected{}
m.UnitsTimestamp = timestamppb.New(options.timestamp)
m.Units = make([]*proto.UnitExpected, 0, 1)
m.Units = append(m.Units, us.unit)
s = protobuf.Size(m)
}
}
msgs = append(msgs, m)

// all chunks created, create the empty chunk
m = &proto.CheckinExpected{}
m.UnitsTimestamp = timestamppb.New(options.timestamp)
m.Units = make([]*proto.UnitExpected, 0)
msgs = append(msgs, m)
return msgs, nil
}

// CheckinExpectedReceiver provides a Recv interface to receive proto.CheckinExpected messages.
type CheckinExpectedReceiver interface {
Recv() (*proto.CheckinExpected, error)
}

// RecvExpected handles the receiving of chunked proto.CheckinObjected.
func RecvExpected(recv CheckinExpectedReceiver) (*proto.CheckinExpected, error) {
var first *proto.CheckinExpected
for {
msg, err := recv.Recv()
if err != nil {
return nil, err
}
if msg.UnitsTimestamp == nil {
// all included in a single message
return msg, nil
}
if first == nil {
// first message in batch
first = msg
} else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() {
// only used if the new timestamp is newer
if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) {
// not newer so we ignore the message
continue
Comment on lines +120 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this makes sense but we probably I think we want a way to tell that it happened. If this begins to cause missed checkins we won't be able to know why to debug it.

}
// different batch; restart
first = msg
}
if len(msg.Units) == 0 {
// ending match message
return first, nil
}
if first != msg {
first.Units = append(first.Units, msg.Units...)
}
}
}

func shallowCopyCheckinExpected(msg *proto.CheckinExpected) *proto.CheckinExpected {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proto.Clone is a deep copy but has the advantage that we don't have to keep this function up to date if we add a top level message field.

Is there a test we can write that would fail if you forgot to update this function (and the observed version of it)?

return &proto.CheckinExpected{
AgentInfo: msg.AgentInfo,
Features: msg.Features,
FeaturesIdx: msg.FeaturesIdx,
Component: msg.Component,
ComponentIdx: msg.ComponentIdx,
Units: msg.Units,
UnitsTimestamp: msg.UnitsTimestamp,
}
}

type expectedBySize struct {
unit *proto.UnitExpected
size int
}
Loading