diff --git a/agent/agent.go b/agent/agent.go index a60b813d44931..7228760192765 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -106,11 +106,6 @@ func (a *Agent) Run(ctx context.Context) error { time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet, a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval)) - log.Printf("D! [agent] Initializing plugins") - if err := a.initPlugins(); err != nil { - return err - } - if a.Config.Persister != nil { log.Printf("D! [agent] Initializing plugin states") if err := a.initPersister(); err != nil { @@ -124,6 +119,11 @@ func (a *Agent) Run(ctx context.Context) error { } } + log.Printf("D! [agent] Initializing plugins") + if err := a.initPlugins(); err != nil { + return err + } + startTime := time.Now() log.Printf("D! [agent] Connecting outputs") diff --git a/plugins/common/starlark/starlark.go b/plugins/common/starlark/starlark.go index abd34c24a96c5..5b43e19bfa378 100644 --- a/plugins/common/starlark/starlark.go +++ b/plugins/common/starlark/starlark.go @@ -1,6 +1,8 @@ package starlark import ( + "bytes" + "encoding/gob" "errors" "fmt" "strings" @@ -26,6 +28,77 @@ type Common struct { globals starlark.StringDict functions map[string]*starlark.Function parameters map[string]starlark.Tuple + state *starlark.Dict +} + +func (s *Common) GetState() interface{} { + // Return the actual byte-type instead of nil allowing the persister + // to guess instantiate variable of the appropriate type + if s.state == nil { + return []byte{} + } + + // Convert the starlark dict into a golang dictionary for serialization + state := make(map[string]interface{}, s.state.Len()) + items := s.state.Items() + for _, item := range items { + if len(item) != 2 { + // We do expect key-value pairs in the state so there should be + // two items. + s.Log.Errorf("state item %+v does not contain a key-value pair", item) + continue + } + k, ok := item.Index(0).(starlark.String) + if !ok { + s.Log.Errorf("state item %+v has invalid key type %T", item, item.Index(0)) + continue + } + v, err := asGoValue(item.Index(1)) + if err != nil { + s.Log.Errorf("state item %+v value cannot be converted: %v", item, err) + continue + } + state[k.GoString()] = v + } + + // Do a binary GOB encoding to preserve types + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(state); err != nil { + s.Log.Errorf("encoding state failed: %v", err) + return []byte{} + } + + return buf.Bytes() +} + +func (s *Common) SetState(state interface{}) error { + data, ok := state.([]byte) + if !ok { + return fmt.Errorf("unexpected type %T for state", state) + } + if len(data) == 0 { + return nil + } + + // Decode the binary GOB encoding + var dict map[string]interface{} + if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(&dict); err != nil { + return fmt.Errorf("decoding state failed: %w", err) + } + + // Convert the golang dict back to starlark types + s.state = starlark.NewDict(len(dict)) + for k, v := range dict { + sv, err := asStarlarkValue(v) + if err != nil { + return fmt.Errorf("value %v of state item %q cannot be set: %w", v, k, err) + } + if err := s.state.SetKey(starlark.String(k), sv); err != nil { + return fmt.Errorf("state item %q cannot be set: %w", k, err) + } + } + + return nil } func (s *Common) Init() error { @@ -47,14 +120,29 @@ func (s *Common) Init() error { builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) builtins["catch"] = starlark.NewBuiltin("catch", catch) - err := s.addConstants(&builtins) - if err != nil { + + if err := s.addConstants(&builtins); err != nil { return err } + // Insert the persisted state if any + if s.state != nil { + builtins["state"] = s.state + } + + // Load the program. In case of an error we can try to insert the state + // which can be used implicitly e.g. when persisting states program, err := s.sourceProgram(builtins) if err != nil { - return err + // Try again with a declared state. This might be necessary for + // state persistence. + s.state = starlark.NewDict(0) + builtins["state"] = s.state + p, serr := s.sourceProgram(builtins) + if serr != nil { + return err + } + program = p } // Execute source @@ -62,12 +150,16 @@ func (s *Common) Init() error { if err != nil { return err } - // Make available a shared state to the apply function - globals["state"] = starlark.NewDict(0) - // Freeze the global state. This prevents modifications to the processor + // In case the program declares a global "state" we should insert it to + // avoid warnings about inserting into a frozen variable + if _, found := globals["state"]; found { + globals["state"] = starlark.NewDict(0) + } + + // Freeze the global state. This prevents modifications to the processor // state and prevents scripts from containing errors storing tracking - // metrics. Tasks that require global state will not be possible due to + // metrics. Tasks that require global state will not be possible due to // this, so maybe we should relax this in the future. globals.Freeze() @@ -107,6 +199,9 @@ func (s *Common) AddFunction(name string, params ...starlark.Value) error { // Add all the constants defined in the plugin as constants of the script func (s *Common) addConstants(builtins *starlark.StringDict) error { for key, val := range s.Constants { + if key == "state" { + return errors.New("'state' constant uses reserved name") + } sVal, err := asStarlarkValue(val) if err != nil { return fmt.Errorf("converting type %T failed: %w", val, err) diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index b8a197340a35b..61785e98b4483 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -27,14 +27,12 @@ func (*Starlark) SampleConfig() string { } func (s *Starlark) Init() error { - err := s.Common.Init() - if err != nil { + if err := s.Common.Init(); err != nil { return err } // The source should define an apply function. - err = s.AddFunction("apply", &common.Metric{}) - if err != nil { + if err := s.AddFunction("apply", &common.Metric{}); err != nil { return err } @@ -118,6 +116,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err default: return fmt.Errorf("invalid type returned: %T", rv) } + return nil } diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index 14793792d4105..fd2dc2241ea6f 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -1,6 +1,8 @@ package starlark import ( + "bytes" + "encoding/gob" "errors" "fmt" "os" @@ -3563,6 +3565,191 @@ def apply(metric): } } +func TestGlobalState(t *testing.T) { + source := ` +def apply(metric): + count = state.get("count", 0) + count += 1 + state["count"] = count + + metric.fields["count"] = count + + return metric +` + // Define the metrics + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 10), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 20), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 30), + )} + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42, "count": 1}, + time.Unix(1713188113, 10), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42, "count": 2}, + time.Unix(1713188113, 20), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42, "count": 3}, + time.Unix(1713188113, 30), + ), + } + + // Configure the plugin + plugin := &Starlark{ + Common: common.Common{ + StarlarkLoadFunc: testLoadFunc, + Source: source, + Log: testutil.Logger{}, + }, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + + // Do the processing + for _, m := range input { + require.NoError(t, plugin.Add(m, &acc)) + } + plugin.Stop() + + // Check + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual) +} + +func TestStatePersistence(t *testing.T) { + source := ` +def apply(metric): + count = state.get("count", 0) + count += 1 + state["count"] = count + + metric.fields["count"] = count + metric.tags["instance"] = state.get("instance", "unknown") + + return metric +` + // Define the metrics + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 10), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 20), + ), + metric.New( + "test", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(1713188113, 30), + )} + expected := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"instance": "myhost"}, + map[string]interface{}{"value": 42, "count": 1}, + time.Unix(1713188113, 10), + ), + metric.New( + "test", + map[string]string{"instance": "myhost"}, + map[string]interface{}{"value": 42, "count": 2}, + time.Unix(1713188113, 20), + ), + metric.New( + "test", + map[string]string{"instance": "myhost"}, + map[string]interface{}{"value": 42, "count": 3}, + time.Unix(1713188113, 30), + ), + } + + // Configure the plugin + plugin := &Starlark{ + Common: common.Common{ + StarlarkLoadFunc: testLoadFunc, + Source: source, + Log: testutil.Logger{}, + }, + } + + // Setup the "persisted" state + var pi telegraf.StatefulPlugin = plugin + var buf bytes.Buffer + require.NoError(t, gob.NewEncoder(&buf).Encode(map[string]interface{}{"instance": "myhost"})) + require.NoError(t, pi.SetState(buf.Bytes())) + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + + // Do the processing + for _, m := range input { + require.NoError(t, plugin.Add(m, &acc)) + } + plugin.Stop() + + // Check + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual) + + // Check getting the persisted state + expectedState := map[string]interface{}{"instance": "myhost", "count": int64(3)} + + var actualState map[string]interface{} + stateData, ok := pi.GetState().([]byte) + require.True(t, ok, "state is not a bytes array") + require.NoError(t, gob.NewDecoder(bytes.NewBuffer(stateData)).Decode(&actualState)) + require.EqualValues(t, expectedState, actualState, "mismatch in state") +} + +func TestUsePredefinedStateName(t *testing.T) { + source := ` +def apply(metric): + return metric +` + // Configure the plugin + plugin := &Starlark{ + Common: common.Common{ + StarlarkLoadFunc: testLoadFunc, + Source: source, + Constants: map[string]interface{}{"state": "invalid"}, + Log: testutil.Logger{}, + }, + } + require.ErrorContains(t, plugin.Init(), "'state' constant uses reserved name") +} + // parses metric lines out of line protocol following a header, with a trailing blank line func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) { parser := &influx.Parser{}