Skip to content

Commit

Permalink
[opampsupervisor]: Skip executing the collector if no config is provi…
Browse files Browse the repository at this point in the history
…ded (open-telemetry#35430)

**Description:** <Describe what has changed.>
If an empty config map is received, the supervisor does not run the
agent.

~The current logic here works fine, but I'm considering adding an option
to only do this if the user opts into it. I'm not sure if there's a
reason why a user might want to run the collector with the noop config
though (maybe for the agent's self-telemetry?)~

I've thought about it some more, and I don't think we need a config
option here. If users want the collector to use a noop config, they can
send a basic noop config.

I think we should also implement open-telemetry#32598 (closed as stale, we'll want to
re-open this or open a new issue for it), which would allow users to
configure a backup config to use when no config is provided by the
server, if they would like.

**Link to tracking Issue:** Closes open-telemetry#33680

**Testing:**
e2e test added
Manually tested with a modified OpAMP server to send an empty config map

**Documentation:**
Update spec where it seemed applicable to call out this behavior.
  • Loading branch information
BinaryFissionGames authored and jriguera committed Oct 4, 2024
1 parent 906fe1e commit bebfd85
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 44 deletions.
13 changes: 13 additions & 0 deletions .chloggen/feat_opampsupervisor-start-stop-empty-confmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Skip executing the collector if no config is provided

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33680]
107 changes: 93 additions & 14 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,10 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:12345")
if err != nil {
t.Logf("Failed agent healthcheck request: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)
// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
require.ErrorContains(t, err, "connection refused")

// Start the server and wait for the supervisor to connect
server.start()
Expand Down Expand Up @@ -1266,6 +1256,95 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
require.FileExists(t, filepath.Join(storageDir, "effective.yaml"))
}

func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
agentCfgChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
select {
case agentCfgChan <- string(config.Body):
default:
}
}
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "healthcheck_port", map[string]string{
"url": server.addr,
"healthcheck_port": "12345",
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, _, _ := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

select {
case <-agentCfgChan:
case <-time.After(1 * time.Second):
require.FailNow(t, "timed out waitng for agent to report its initial config")
}

// Use health check endpoint to determine if the collector is actually running
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:12345")
if err != nil {
t.Logf("Failed agent healthcheck request: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Send empty config
emptyHash := sha256.Sum256([]byte{})
server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{},
},
ConfigHash: emptyHash[:],
},
})

select {
case <-agentCfgChan:
case <-time.After(1 * time.Second):
require.FailNow(t, "timed out waitng for agent to report its noop config")
}

// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
require.ErrorContains(t, err, "connection refused")

}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
20 changes: 12 additions & 8 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ agent:
### Operation When OpAMP Server is Unavailable
When the supervisor cannot connect to the OpAMP server, the collector will
be run with the last known configuration, or with a "noop" configuration
if no previous configuration is persisted. The supervisor will continually
attempt to reconnect to the OpAMP server with exponential backoff.
be run with the last known configuration if a previous configuration is persisted.
If no previous configuration has been persisted, the collector does not run.
The supervisor will continually attempt to reconnect to the OpAMP server with exponential backoff.
### Executing Collector
Expand Down Expand Up @@ -204,6 +204,10 @@ Configuration*](https://github.com/open-telemetry/opamp-spec/blob/main/specifica
from the OpAMP Backend, merges it with an optional local config file and
writes it to the Collector's config file, then restarts the Collector.
If the remote configuration from the OpAMP Backend contains an empty config map,
the collector will be stopped and will not be run again until a non-empty config map
is received from the OpAMP Backend.
In the future once config file watching is implemented the Collector can
reload the config without the need for the Supervisor to restart the
Collector process.
Expand Down Expand Up @@ -244,13 +248,13 @@ configuration.
To overcome this problem the Supervisor starts the Collector with an
"noop" configuration that collects nothing but allows the opamp
extension to be started. The "noop" configuration is a single pipeline
with an OTLP receiver that listens on a random port and a debug
exporter, and the opamp extension. The purpose of the "noop"
configuration is to make sure the Collector starts and the opamp
extension communicates with the Supervisor.
with an nop receiver, a nop exporter, and the opamp extension.
The purpose of the "noop" configuration is to make sure the Collector starts
and the opamp extension communicates with the Supervisor. The Collector is stopped
after the AgentDescription is received from the Collector.
Once the initial Collector launch is successful and the remote
configuration is received by the Supervisor the Supervisor restarts the
configuration is received by the Supervisor the Supervisor starts the
Collector with the new config. The new config is also cached by the
Supervisor in a local file, so that subsequent restarts no longer need
to start the Collector using the "noop" configuration. Caching of the
Expand Down
52 changes: 40 additions & 12 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ const (

const maxBufferedCustomMessages = 10

type configState struct {
// Supervisor-assembled config to be given to the Collector.
mergedConfig string
// true if the server provided configmap was empty
configMapIsEmpty bool
}

func (c *configState) equal(other *configState) bool {
return other.mergedConfig == c.mergedConfig && other.configMapIsEmpty == c.configMapIsEmpty
}

// Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient
// to work with an OpAMP Server.
type Supervisor struct {
Expand Down Expand Up @@ -107,8 +118,8 @@ type Supervisor struct {
// will listen on for health check requests from the Supervisor.
agentHealthCheckEndpoint string

// Supervisor-assembled config to be given to the Collector.
mergedConfig *atomic.Value
// Internal config state for agent use. See the configState struct for more details.
cfgState *atomic.Value

// Final effective config of the Collector.
effectiveConfig *atomic.Value
Expand Down Expand Up @@ -143,7 +154,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
pidProvider: defaultPIDProvider{},
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -793,8 +804,8 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
}

// write the initial merged config to disk
cfg := s.mergedConfig.Load().(string)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
cfgState := s.cfgState.Load().(*configState)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}

Expand All @@ -806,9 +817,11 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig {
cfgStr, ok := s.effectiveConfig.Load().(string)
if !ok {
cfgStr, ok = s.mergedConfig.Load().(string)
cfgState, ok := s.cfgState.Load().(*configState)
if !ok {
cfgStr = ""
} else {
cfgStr = cfgState.mergedConfig
}
}

Expand Down Expand Up @@ -870,7 +883,11 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele
func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) {
var k = koanf.New("::")

if c := config.GetConfig(); c != nil {
configMapIsEmpty := len(config.GetConfig().GetConfigMap()) == 0

if !configMapIsEmpty {
c := config.GetConfig()

// Sort to make sure the order of merging is stable.
var names []string
for name := range c.ConfigMap {
Expand Down Expand Up @@ -939,11 +956,16 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c
}

// Check if supervisor's merged config is changed.
newMergedConfig := string(newMergedConfigBytes)

newConfigState := &configState{
mergedConfig: string(newMergedConfigBytes),
configMapIsEmpty: configMapIsEmpty,
}

configChanged = false

oldConfig := s.mergedConfig.Swap(newMergedConfig)
if oldConfig == nil || oldConfig.(string) != newMergedConfig {
oldConfigState := s.cfgState.Swap(newConfigState)
if oldConfigState == nil || !oldConfigState.(*configState).equal(newConfigState) {
s.logger.Debug("Merged config changed.")
configChanged = true
}
Expand All @@ -963,6 +985,12 @@ func (s *Supervisor) handleRestartCommand() error {
}

func (s *Supervisor) startAgent() {
if s.cfgState.Load().(*configState).configMapIsEmpty {
// Don't start the agent if there is no config to run
s.logger.Info("No config present, not starting agent.")
return
}

err := s.commander.Start(context.Background())
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
Expand Down Expand Up @@ -1104,14 +1132,14 @@ func (s *Supervisor) runAgentProcess() {

func (s *Supervisor) stopAgentApplyConfig() {
s.logger.Debug("Stopping the agent to apply new config")
cfg := s.mergedConfig.Load().(string)
cfgState := s.cfgState.Load().(*configState)
err := s.commander.Stop(context.Background())

if err != nil {
s.logger.Error("Could not stop agent process", zap.Error(err))
}

if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}
}
Expand Down
21 changes: 11 additions & 10 deletions cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_composeEffectiveConfig(t *testing.T) {
pidProvider: staticPIDProvider(1234),
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
}

Expand Down Expand Up @@ -159,7 +159,7 @@ service:
expectedConfig = bytes.ReplaceAll(expectedConfig, []byte("\r\n"), []byte("\n"))

require.True(t, configChanged)
require.Equal(t, string(expectedConfig), s.mergedConfig.Load().(string))
require.Equal(t, string(expectedConfig), s.cfgState.Load().(*configState).mergedConfig)
}

func Test_onMessage(t *testing.T) {
Expand All @@ -176,7 +176,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
Expand Down Expand Up @@ -205,7 +205,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: testUUID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func Test_onMessage(t *testing.T) {
hasNewConfig: make(chan struct{}, 1),
persistentState: &persistentState{InstanceID: testUUID},
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentConn: agentConnAtomic,
agentHealthCheckEndpoint: "localhost:8000",
Expand Down Expand Up @@ -332,7 +332,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
Expand All @@ -358,10 +358,11 @@ func Test_onMessage(t *testing.T) {
})

require.Equal(t, newID, s.persistentState.InstanceID)
t.Log(s.mergedConfig.Load())
require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics")
require.Contains(t, s.mergedConfig.Load(), newID.String())
require.Contains(t, s.mergedConfig.Load(), "runtime.type: test")
t.Log(s.cfgState.Load())
mergedCfg := s.cfgState.Load().(*configState).mergedConfig
require.Contains(t, mergedCfg, "prometheus/own_metrics")
require.Contains(t, mergedCfg, newID.String())
require.Contains(t, mergedCfg, "runtime.type: test")
})
}

Expand Down

0 comments on commit bebfd85

Please sign in to comment.