diff --git a/src/go/plugin/go.d/agent/functions/function.go b/src/go/plugin/go.d/agent/functions/function.go deleted file mode 100644 index b65d3d713590d6..00000000000000 --- a/src/go/plugin/go.d/agent/functions/function.go +++ /dev/null @@ -1,96 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -package functions - -import ( - "bytes" - "context" - "encoding/csv" - "fmt" - "strconv" - "strings" - "time" -) - -type Function struct { - key string - UID string - Timeout time.Duration - Name string - Args []string - Payload []byte - Permissions string - Source string - ContentType string -} - -func (f *Function) String() string { - return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'", - f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload)) -} - -func parseFunction(s string) (*Function, error) { - r := csv.NewReader(strings.NewReader(s)) - r.Comma = ' ' - - parts, err := r.Read() - if err != nil { - return nil, err - } - - // FUNCTION UID Timeout "Name ...Parameters" 0xPermissions "SourceType" [ContentType] - if n := len(parts); n != 6 && n != 7 { - return nil, fmt.Errorf("unexpected number of words: want 6 or 7, got %d (%v)", n, parts) - } - - timeout, err := strconv.ParseInt(parts[2], 10, 64) - if err != nil { - return nil, err - } - - cmd := strings.Split(parts[3], " ") - - fn := &Function{ - key: parts[0], - UID: parts[1], - Timeout: time.Duration(timeout) * time.Second, - Name: cmd[0], - Args: cmd[1:], - Permissions: parts[4], - Source: parts[5], - } - - if len(parts) == 7 { - fn.ContentType = parts[6] - } - - return fn, nil -} - -func parseFunctionWithPayload(ctx context.Context, s string, in input) (*Function, error) { - fn, err := parseFunction(s) - if err != nil { - return nil, err - } - - var buf bytes.Buffer - - for { - select { - case <-ctx.Done(): - return nil, nil - case line, ok := <-in.lines(): - if !ok { - return nil, nil - } - if line == "FUNCTION_PAYLOAD_END" { - fn.Payload = append(fn.Payload, buf.Bytes()...) - return fn, nil - } - if buf.Len() > 0 { - buf.WriteString("\n") - } - buf.WriteString(line) - } - } -} diff --git a/src/go/plugin/go.d/agent/functions/manager.go b/src/go/plugin/go.d/agent/functions/manager.go index f81963cccea750..569e67913eeb54 100644 --- a/src/go/plugin/go.d/agent/functions/manager.go +++ b/src/go/plugin/go.d/agent/functions/manager.go @@ -8,7 +8,6 @@ import ( "fmt" "log/slog" "strconv" - "strings" "sync" "time" @@ -55,6 +54,8 @@ func (m *Manager) Run(ctx context.Context, quitCh chan struct{}) { } func (m *Manager) run(ctx context.Context, quitCh chan struct{}) { + parser := newInputParser() + for { select { case <-ctx.Done(): @@ -63,29 +64,15 @@ func (m *Manager) run(ctx context.Context, quitCh chan struct{}) { if !ok { return } - - var fn *Function - var err error - - // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, - // we need to discard the current one and switch to the new one - switch { - case strings.HasPrefix(line, "FUNCTION "): - fn, err = parseFunction(line) - case strings.HasPrefix(line, "FUNCTION_PAYLOAD "): - fn, err = parseFunctionWithPayload(ctx, line, m.input) - case line == "": - continue - case line == "QUIT": + if line == "QUIT" { if quitCh != nil { quitCh <- struct{}{} return } - default: - m.Warningf("unexpected line: '%s'", line) continue } + fn, err := parser.parse(line) if err != nil { m.Warningf("parse function: %v ('%s')", err, line) continue diff --git a/src/go/plugin/go.d/agent/functions/parser.go b/src/go/plugin/go.d/agent/functions/parser.go new file mode 100644 index 00000000000000..0cac3c775e01ba --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/parser.go @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bytes" + "encoding/csv" + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +type Function struct { + key string + UID string + Timeout time.Duration + Name string + Args []string + Payload []byte + Permissions string + Source string + ContentType string +} + +func (f *Function) String() string { + return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'", + f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload)) +} + +func newInputParser() *inputParser { + return &inputParser{} +} + +type inputParser struct { + currentFn *Function + readingPayload bool + payloadBuf bytes.Buffer +} + +func (p *inputParser) parse(line string) (*Function, error) { + if line = strings.TrimSpace(line); line == "" { + return nil, nil + } + + if p.readingPayload { + return p.handlePayloadLine(line) + } + + switch { + case strings.HasPrefix(line, "FUNCTION "): + return p.parseFunction(line) + case strings.HasPrefix(line, "FUNCTION_PAYLOAD "): + fn, err := p.parseFunction(line) + if err != nil { + return nil, err + } + p.readingPayload = true + p.currentFn = fn + p.payloadBuf.Reset() + return nil, nil + default: + return nil, errors.New("unexpected line format") + } +} + +func (p *inputParser) handlePayloadLine(line string) (*Function, error) { + if line == "FUNCTION_PAYLOAD_END" { + p.readingPayload = false + p.currentFn.Payload = []byte(p.payloadBuf.String()) + fn := p.currentFn + p.currentFn = nil + return fn, nil + } + + if strings.HasPrefix(line, "FUNCTION") { + p.readingPayload = false + p.currentFn = nil + p.payloadBuf.Reset() + return p.parse(line) + } + + if p.payloadBuf.Len() > 0 { + p.payloadBuf.WriteByte('\n') + } + p.payloadBuf.WriteString(line) + + return nil, nil +} + +func (p *inputParser) parseFunction(line string) (*Function, error) { + r := csv.NewReader(strings.NewReader(line)) + r.Comma = ' ' + + parts, err := r.Read() + if err != nil { + return nil, fmt.Errorf("failed to parse CSV: %w", err) + } + + if n := len(parts); n != 6 && n != 7 { + return nil, fmt.Errorf("unexpected number of parts: want 6 or 7, got %d", n) + } + + timeout, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid timeout value: %w", err) + } + + nameAndArgs := strings.Split(parts[3], " ") + if len(nameAndArgs) == 0 { + return nil, fmt.Errorf("empty function name and arguments") + } + + fn := &Function{ + key: parts[0], + UID: parts[1], + Timeout: time.Duration(timeout) * time.Second, + Name: nameAndArgs[0], + Args: nameAndArgs[1:], + Permissions: parts[4], + Source: parts[5], + } + + if len(parts) == 7 { + fn.ContentType = parts[6] + } + + return fn, nil +}