Skip to content

Commit

Permalink
[V2] Enable support for shippers (#1527)
Browse files Browse the repository at this point in the history
* Work on adding shipper support.

* Fix fmt.

* Fix reference to spec. Allow shipper to be null but still enabled if key exists.

* Move supported shippers into its own key in the input specification.

* Fix issue in merge.

* Implement fake shipper and add fake shipper output to the fake component.

* Add protoc to the test target.

* Don't generate fake shipper protocol in test.

* Commit fake GRPC into code.

* Add unit test for running with shipper, with sending event between running componentn and running shipper.

* Add docstring for shipper test.

* Add changelog fragement.

* Adjust paths for shipper to work on windows and better on unix.

* Update changelog/fragments/1667571017-Add-support-for-running-the-elastic-agent-shipper.yaml

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

* Fix fake/component to connect over npipe on windows.

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
  • Loading branch information
blakerouse and cmacknz authored Nov 8, 2022
1 parent bd36958 commit 73b3d2e
Show file tree
Hide file tree
Showing 53 changed files with 3,847 additions and 1,016 deletions.
9 changes: 2 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ fleet.enc.lock
# Files generated with the bump version automations
*.bck


# agent
build/
elastic-agent
Expand All @@ -54,9 +53,5 @@ elastic-agent.yml.*
fleet.yml
fleet.yml.lock
fleet.yml.old
internal/pkg/agent/application/fleet.yml
internal/pkg/agent/transpiler/tests/exec-1.0-darwin-x86_64/exec
pkg/component/fake/fake

# VSCode
/.vscode
pkg/component/fake/component/component
pkg/component/fake/shipper/shipper
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add experimental support for running the elastic-agent-shipper

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 1527

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 219
2 changes: 2 additions & 0 deletions control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ message DiagnosticAgentResponse {

// DiagnosticUnitRequest specifies a specific unit to gather diagnostics from.
message DiagnosticUnitRequest {
// ID of the component.
string component_id = 1;
// Type of unit.
UnitType unit_type = 2;
// ID of the unit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
}

state := h.coord.State(false)
unit, ok := findUnitFromInputType(state, action.InputType)
comp, unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.ActionType, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err = h.coord.PerformAction(ctx, unit, action.InputType, params)
res, err = h.coord.PerformAction(ctx, comp, unit, action.InputType, params)
}
end := time.Now().UTC()

Expand Down Expand Up @@ -151,13 +151,13 @@ func readMapString(m map[string]interface{}, key string, def string) string {
return def
}

func findUnitFromInputType(state coordinator.State, inputType string) (component.Unit, bool) {
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {
return unit, true
return comp.Component, unit, true
}
}
}
return component.Unit{}, false
return component.Component{}, component.Unit{}, false
}
13 changes: 7 additions & 6 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type RuntimeManager interface {
State() []runtime.ComponentComponentState

// PerformAction executes an action on a unit.
PerformAction(ctx context.Context, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)
PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error)

// SubscribeAll provides an interface to watch for changes in all components.
SubscribeAll(context.Context) *runtime.SubscriptionAll

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
PerformDiagnostics(context.Context, ...component.Unit) []runtime.ComponentUnitDiagnostic
PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
}

// ConfigChange provides an interface for receiving a new configuration.
Expand Down Expand Up @@ -285,19 +285,20 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
return nil
}

// AckUpgrade performs acknowledgement for upgrade.
func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
return c.upgradeMgr.Ack(ctx, acker)
}

// PerformAction executes an action on a unit.
func (c *Coordinator) PerformAction(ctx context.Context, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
return c.runtimeMgr.PerformAction(ctx, unit, name, params)
func (c *Coordinator) PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
return c.runtimeMgr.PerformAction(ctx, comp, unit, name, params)
}

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
func (c *Coordinator) PerformDiagnostics(ctx context.Context, units ...component.Unit) []runtime.ComponentUnitDiagnostic {
return c.runtimeMgr.PerformDiagnostics(ctx, units...)
func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
return c.runtimeMgr.PerformDiagnostics(ctx, req...)
}

// Run runs the coordinator.
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{
func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier {
return func(comps []component.Component, _ map[string]interface{}) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == fleetServer {
if comp.InputSpec != nil && comp.InputSpec.InputType == fleetServer {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput && unit.Config.Type == elasticsearch {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), &serverCfg.Output.Elasticsearch)
Expand Down Expand Up @@ -89,7 +89,7 @@ func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) co
func EndpointComponentModifier(fleetCfg *configuration.FleetAgentConfig) coordinator.ComponentsModifier {
return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) {
for i, comp := range comps {
if comp.Spec.InputType == endpoint {
if comp.InputSpec != nil && comp.InputSpec.InputType == endpoint {
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeInput && unit.Config.Type == endpoint {
unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), map[string]interface{}{"fleet": fleetCfg})
Expand Down
10 changes: 9 additions & 1 deletion internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,19 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
component := item.Component
state := item.State

var shipperReference *fleetapi.CheckinShipperReference
if component.Shipper != nil {
shipperReference = &fleetapi.CheckinShipperReference{
ComponentID: component.Shipper.ComponentID,
UnitID: component.Shipper.UnitID,
}
}
checkinComponent := fleetapi.CheckinComponent{
ID: component.ID,
Type: component.Spec.InputType,
Type: component.Type(),
Status: stateString(state.State),
Message: state.Message,
Shipper: shipperReference,
}

if state.Units != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (m *managedConfigManager) waitForFleetServer(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case compState := <-sub.Ch():
if compState.Component.Spec.InputType == "fleet-server" {
if compState.Component.InputSpec != nil && compState.Component.InputSpec.InputType == "fleet-server" {
if fleetServerRunning(compState.State) {
m.log.With("state", compState.State).Debugf("Fleet Server is running")
return nil
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = "/opt/Elastic/Agent"

// SocketPath is the socket path used when installed.
SocketPath = "unix:///run/elastic-agent.sock"
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = "unix:///run/elastic-agent.sock"

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = "unix:///run/elastic-agent-%s-pipe.sock"

// ServiceName is the service name when installed.
ServiceName = "elastic-agent"
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = "/Library/Elastic/Agent"

// SocketPath is the socket path used when installed.
SocketPath = "unix:///var/run/elastic-agent.sock"
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = "unix:///var/run/elastic-agent.sock"

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = "unix:///var/run/elastic-agent-%s-pipe.sock"

// ServiceName is the service name when installed.
ServiceName = "co.elastic.elastic-agent"
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/paths/paths_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ const (
// InstallPath is the installation path using for install command.
InstallPath = `C:\Program Files\Elastic\Agent`

// SocketPath is the socket path used when installed.
SocketPath = `\\.\pipe\elastic-agent-system`
// ControlSocketPath is the control socket path used when installed.
ControlSocketPath = `\\.\pipe\elastic-agent-system`

// ShipperSocketPipePattern is the socket path used when installed for a shipper pipe.
ShipperSocketPipePattern = `\\.\pipe\elastic-agent-%s-pipe.sock`

// ServiceName is the service name when installed.
ServiceName = "Elastic Agent"
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
return fmt.Errorf("unable to find unit with ID: %s/%s", compID, unitID)
}
if !opts.showSpec {
comp.Spec = component.InputRuntimeSpec{}
comp.InputSpec = nil
comp.ShipperSpec = nil
}
if !opts.showConfig {
for key, unit := range comp.Units {
Expand All @@ -314,7 +315,8 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
// Hide runtime specification unless toggled on.
if !opts.showSpec {
for i, comp := range comps {
comp.Spec = component.InputRuntimeSpec{}
comp.InputSpec = nil
comp.ShipperSpec = nil
comps[i] = comp
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func Address() string {
// when installed the control address is fixed
if info.RunningInstalled() {
return paths.SocketPath
return paths.ControlSocketPath
}

// unix socket path must be less than 104 characters
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func Address() string {
// when installed the control address is fixed
if info.RunningInstalled() {
return paths.SocketPath
return paths.ControlSocketPath
}

// not install, adjust the path based on data path
Expand Down
11 changes: 7 additions & 4 deletions internal/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Version struct {
Snapshot bool `json:"snapshot" yaml:"snapshot"`
}

// ComponentVersionInfo is the version information for the component.
type ComponentVersionInfo struct {
// Name of the component.
Name string `json:"name" yaml:"name"`
Expand Down Expand Up @@ -115,8 +116,9 @@ type DiagnosticFileResult struct {

// DiagnosticUnitRequest allows a specific unit to be targeted for diagnostics.
type DiagnosticUnitRequest struct {
UnitID string
UnitType UnitType
ComponentID string
UnitID string
UnitType UnitType
}

// DiagnosticUnitResult is a set of results for a unit.
Expand Down Expand Up @@ -308,8 +310,9 @@ func (c *client) DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitReq
reqs := make([]*cproto.DiagnosticUnitRequest, 0, len(units))
for _, u := range units {
reqs = append(reqs, &cproto.DiagnosticUnitRequest{
UnitType: u.UnitType,
UnitId: u.UnitID,
ComponentId: u.ComponentID,
UnitType: u.UnitType,
UnitId: u.UnitID,
})
}

Expand Down
Loading

0 comments on commit 73b3d2e

Please sign in to comment.