Skip to content

Commit

Permalink
Add support send/recv chunk for checkin.
Browse files Browse the repository at this point in the history
  • Loading branch information
Blake Rouse committed Dec 5, 2023
1 parent 21e4fd8 commit 71b5b0e
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 261 deletions.
30 changes: 29 additions & 1 deletion elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ service ElasticAgent {
//
// Use of the source field allows the input configurations to evolve without needing to modify
// the control protocol itself. In some cases commonly used or important fields are extracted as
// a dedicated message type, but these definitions do not comletely define the contents of the
// a dedicated message type, but these definitions do not completely define the contents of the
// source field which is free to contain additional fields.
rpc CheckinV2(stream CheckinObserved) returns (stream CheckinExpected);

Expand All @@ -62,6 +62,12 @@ service ElasticAgent {
rpc Checkin(stream StateObserved) returns (stream StateExpected);
}

// Features that the connection between the client and the server supports.
enum ConnectionSupports {
// Checkin chunking support.
CheckinChunking = 0;
}

// State codes for the current state.
enum State {
// STARTING is an optional observed state indicating the unit is doing work to start before
Expand Down Expand Up @@ -282,6 +288,13 @@ message CheckinExpected {
// Index or revision of the expected component configuration. When the expected configuration
// changes the agent will increment this number and the Component field will be populated.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;
}

// Observed status for a unit.
Expand Down Expand Up @@ -338,6 +351,17 @@ message CheckinObserved {

// Index or revision of the currently component configuration.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;

// Supports provides information to the agent about extra features this client supports. Should always be included
// on first checkin, and not again unless upon reconnect.
repeated ConnectionSupports supports = 8;
}

// A action request is streamed from the Elastic Agent to the application so an action can be performed
Expand Down Expand Up @@ -449,4 +473,8 @@ message ConnInfo {
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
// Supports provides information to the client about extra features this server supports.
repeated ConnectionSupports supports = 8;
// Maximum message size that the client can use.
uint32 max_message_size = 9;
}
182 changes: 175 additions & 7 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"errors"
"fmt"
protobuf "google.golang.org/protobuf/proto"
"io"
"runtime"
"runtime/pprof"
Expand All @@ -24,6 +25,9 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/utils"
)

// DefaultMaxMessageSize is the maximum message size that is allowed to be sent.
const DefaultMaxMessageSize = 1024 * 1024 * 4 // copied from the gRPC default

type (
// UnitChangedType defines types for when units are adjusted.
UnitChangedType int
Expand Down Expand Up @@ -181,10 +185,49 @@ type V2 interface {
RegisterOptionalDiagnosticHook(paramTag string, name string, description string, filename string, contentType string, hook DiagnosticHook)
}

// v2options hold the client options.
type v2options struct {
maxMessageSize int
chunkingAllowed bool
dialOptions []grpc.DialOption
}

// DialOptions returns the dial options for the GRPC connection.
func (o *v2options) DialOptions() []grpc.DialOption {
opts := make([]grpc.DialOption, 0, len(o.dialOptions)+1)
opts = append(opts, o.dialOptions...)
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(o.maxMessageSize), grpc.MaxCallSendMsgSize(o.maxMessageSize)))
return opts
}

// V2ClientOption is an option that can be used when creating the client.
type V2ClientOption func(*v2options)

// WithMaxMessageSize sets the maximum message size.
func WithMaxMessageSize(size int) V2ClientOption {
return func(o *v2options) {
o.maxMessageSize = size
}
}

// WithChunking sets if the client can use chunking with the server.
func WithChunking(enabled bool) V2ClientOption {
return func(o *v2options) {
o.chunkingAllowed = enabled
}
}

// WithGRPCDialOptions allows the setting of GRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) V2ClientOption {
return func(o *v2options) {
o.dialOptions = append(o.dialOptions, opts...)
}
}

// clientV2 manages the state and communication to the Elastic Agent over the V2 control protocol.
type clientV2 struct {
target string
opts []grpc.DialOption
opts v2options
token string

agentInfoMu sync.RWMutex
Expand Down Expand Up @@ -228,10 +271,16 @@ type clientV2 struct {
}

// NewV2 creates a client connection to Elastic Agent over the V2 control protocol.
func NewV2(target string, token string, versionInfo VersionInfo, opts ...grpc.DialOption) V2 {
func NewV2(target string, token string, versionInfo VersionInfo, opts ...V2ClientOption) V2 {
var options v2options
options.maxMessageSize = DefaultMaxMessageSize
for _, o := range opts {
o(&options)
}

c := &clientV2{
target: target,
opts: opts,
opts: options,
token: token,
versionInfo: versionInfo,
stateChangeObservedCh: make(chan struct{}, 1),
Expand All @@ -247,7 +296,7 @@ func NewV2(target string, token string, versionInfo VersionInfo, opts ...grpc.Di
// Start starts the connection to Elastic Agent.
func (c *clientV2) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := grpc.DialContext(ctx, c.target, c.opts...)
conn, err := grpc.DialContext(ctx, c.target, c.opts.DialOptions()...)
if err != nil {
return err
}
Expand Down Expand Up @@ -405,8 +454,8 @@ func (c *clientV2) checkinRoundTrip() {
go func() {
defer wg.Done()
defer close(readerDone)
expected, err := checkinClient.Recv()
for ; err == nil; expected, err = checkinClient.Recv() {
expected, err := recvExpectedChunked(checkinClient, c.opts.chunkingAllowed)
for ; err == nil; expected, err = recvExpectedChunked(checkinClient, c.opts.chunkingAllowed) {
c.applyExpected(expected)
}
if !errors.Is(err, io.EOF) {
Expand All @@ -433,6 +482,9 @@ func (c *clientV2) checkinWriter(
t := time.NewTicker(c.minCheckTimeout)
defer t.Stop()

// Always resent the version information on restart of the loop.
c.versionInfoSent = false

// Keep sending until the call returns an error
for c.sendObserved(checkinClient) == nil {

Expand Down Expand Up @@ -479,8 +531,13 @@ func (c *clientV2) sendObserved(client proto.ElasticAgent_CheckinV2Client) error
Version: c.versionInfo.Version,
Meta: c.versionInfo.Meta,
}
// supports information is sent when version information is set,
// this ensures that its always sent once per connected loop
if c.opts.chunkingAllowed {
msg.Supports = []proto.ConnectionSupports{proto.ConnectionSupports_CheckinChunking}
}
}
err := client.Send(msg)
err := sendObservedChunked(client, msg, c.opts.chunkingAllowed, c.opts.maxMessageSize)
if err != nil && !errors.Is(err, io.EOF) {
c.errCh <- err
} else {
Expand Down Expand Up @@ -1027,3 +1084,114 @@ func inExpected(unit *Unit, expected []*proto.UnitExpected) bool {
}
return false
}

func recvExpectedChunked(client proto.ElasticAgent_CheckinV2Client, chunk bool) (*proto.CheckinExpected, error) {
if chunk {
var initialMsg *proto.CheckinExpected
for {
msg, err := client.Recv()
if err != nil {
return nil, err
}
if msg.UnitsTimestamp == nil {
// all included in a single message
return msg, nil
}
if initialMsg == nil {
// first message in batch
initialMsg = msg
} else if initialMsg.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() {
// different batch; restart
initialMsg = msg
}
if len(msg.Units) == 0 {
// ending match message
return initialMsg, nil
}
initialMsg.Units = append(initialMsg.Units, msg.Units...)
}
}
return client.Recv()
}

func sendObservedChunked(client proto.ElasticAgent_CheckinV2Client, msg *proto.CheckinObserved, chunk bool, maxSize int) error {
if !chunk {
// chunking is disabled
return client.Send(msg)
}
s := protobuf.Size(msg)
if s <= maxSize {
// fits so no chunking needed
return client.Send(msg)
}
// doesn't fit; chunk the message
// this is done by dividing the units into two; keep dividing each chunk into two until it fits
// a timestamp is needed to ensure all chunks have a timestamp
msgs, err := observedChunked(msg, maxSize, 2)
if err != nil {
return err
}
for _, msg := range msgs {
if err := client.Send(msg); err != nil {
return err
}
}
return nil
}

func observedChunked(msg *proto.CheckinObserved, maxSize int, divider int) ([]*proto.CheckinObserved, error) {
timestamp := time.Now()
chunkSize := len(msg.Units) / divider
if chunkSize < 0 {
return nil, fmt.Errorf("unable to chunk proto.CheckinObserved a single unit is greater than the max %d size", maxSize)
}
msgs := make([]*proto.CheckinObserved, 0, divider+1)
for i := 0; i < divider; i++ {
if i == 0 {
// first message all fields are set; except units is made smaller
m := shallowCopyCheckinObserved(msg)
m.Units = make([]*proto.UnitObserved, chunkSize)
copy(m.Units, msg.Units[0:chunkSize])
msg.UnitsTimestamp = timestamppb.New(timestamp)
if protobuf.Size(m) > maxSize {
// too large increase divider
return observedChunked(msg, maxSize, divider*2)
}
msgs = append(msgs, m)
continue
}
if i == divider-1 {
// last message; chunk size needs to take into account rounding division where the last chunk
// might need to include an extra unit
chunkSize = chunkSize + len(msg.Units) - (chunkSize * divider)
}
m := &proto.CheckinObserved{}
m.Token = msg.Token
m.Units = make([]*proto.UnitObserved, chunkSize)
copy(m.Units, msg.Units[i*chunkSize:(i*chunkSize)+chunkSize])
m.UnitsTimestamp = timestamppb.New(timestamp)
if protobuf.Size(m) > maxSize {
// too large increase divider
return observedChunked(msg, maxSize, divider*2)
}
msgs = append(msgs, m)
}
msgs = append(msgs, &proto.CheckinObserved{
Token: msg.Token,
Units: []*proto.UnitObserved{},
UnitsTimestamp: timestamppb.New(timestamp),
})
return msgs, nil
}

func shallowCopyCheckinObserved(msg *proto.CheckinObserved) *proto.CheckinObserved {
return &proto.CheckinObserved{
Token: msg.Token,
Units: msg.Units,
VersionInfo: msg.VersionInfo,
FeaturesIdx: msg.FeaturesIdx,
ComponentIdx: msg.ComponentIdx,
UnitsTimestamp: msg.UnitsTimestamp,
Supports: msg.Supports,
}
}
13 changes: 11 additions & 2 deletions pkg/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewFromReader(reader io.Reader, impl StateInterface, actions ...Action) (Cl
}

// NewV2FromReader creates a new V2 client reading the connection information from the io.Reader.
func NewV2FromReader(reader io.Reader, ver VersionInfo, opts ...grpc.DialOption) (V2, []Service, error) {
func NewV2FromReader(reader io.Reader, ver VersionInfo, opts ...V2ClientOption) (V2, []Service, error) {
connInfo := &proto.ConnInfo{}
data, err := ioutil.ReadAll(reader)
if err != nil {
Expand All @@ -87,11 +87,20 @@ func NewV2FromReader(reader io.Reader, ver VersionInfo, opts ...grpc.DialOption)
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
})
for _, s := range connInfo.Supports {
if s == proto.ConnectionSupports_CheckinChunking {
opts = append(opts, WithChunking(true))
}
}
if connInfo.MaxMessageSize > 0 {
opts = append(opts, WithMaxMessageSize(int(connInfo.MaxMessageSize)))
}
opts = append(opts, WithGRPCDialOptions(grpc.WithTransportCredentials(trans)))
client := NewV2(
connInfo.Addr,
connInfo.Token,
ver,
append(opts, grpc.WithTransportCredentials(trans))...,
opts...,
)
services := make([]Service, 0, len(connInfo.Services))
for _, srv := range connInfo.Services {
Expand Down
2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-deprecated.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-future.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/proto/elastic-agent-client-future_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 71b5b0e

Please sign in to comment.