Skip to content

Commit

Permalink
chore(go.d.plugin): improve function parser (netdata#19143)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyam8 authored Dec 6, 2024
1 parent 3ab31a7 commit a1d0fe5
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 113 deletions.
96 changes: 0 additions & 96 deletions src/go/plugin/go.d/agent/functions/function.go

This file was deleted.

21 changes: 4 additions & 17 deletions src/go/plugin/go.d/agent/functions/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,6 +54,8 @@ func (m *Manager) Run(ctx context.Context, quitCh chan struct{}) {
}

func (m *Manager) run(ctx context.Context, quitCh chan struct{}) {
parser := newInputParser()

for {
select {
case <-ctx.Done():
Expand All @@ -63,29 +64,15 @@ func (m *Manager) run(ctx context.Context, quitCh chan struct{}) {
if !ok {
return
}

var fn *Function
var err error

// FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
// we need to discard the current one and switch to the new one
switch {
case strings.HasPrefix(line, "FUNCTION "):
fn, err = parseFunction(line)
case strings.HasPrefix(line, "FUNCTION_PAYLOAD "):
fn, err = parseFunctionWithPayload(ctx, line, m.input)
case line == "":
continue
case line == "QUIT":
if line == "QUIT" {
if quitCh != nil {
quitCh <- struct{}{}
return
}
default:
m.Warningf("unexpected line: '%s'", line)
continue
}

fn, err := parser.parse(line)
if err != nil {
m.Warningf("parse function: %v ('%s')", err, line)
continue
Expand Down
130 changes: 130 additions & 0 deletions src/go/plugin/go.d/agent/functions/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// SPDX-License-Identifier: GPL-3.0-or-later

package functions

import (
"bytes"
"encoding/csv"
"errors"
"fmt"
"strconv"
"strings"
"time"
)

type Function struct {
key string
UID string
Timeout time.Duration
Name string
Args []string
Payload []byte
Permissions string
Source string
ContentType string
}

func (f *Function) String() string {
return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'",
f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload))
}

func newInputParser() *inputParser {
return &inputParser{}
}

type inputParser struct {
currentFn *Function
readingPayload bool
payloadBuf bytes.Buffer
}

func (p *inputParser) parse(line string) (*Function, error) {
if line = strings.TrimSpace(line); line == "" {
return nil, nil
}

if p.readingPayload {
return p.handlePayloadLine(line)
}

switch {
case strings.HasPrefix(line, "FUNCTION "):
return p.parseFunction(line)
case strings.HasPrefix(line, "FUNCTION_PAYLOAD "):
fn, err := p.parseFunction(line)
if err != nil {
return nil, err
}
p.readingPayload = true
p.currentFn = fn
p.payloadBuf.Reset()
return nil, nil
default:
return nil, errors.New("unexpected line format")
}
}

func (p *inputParser) handlePayloadLine(line string) (*Function, error) {
if line == "FUNCTION_PAYLOAD_END" {
p.readingPayload = false
p.currentFn.Payload = []byte(p.payloadBuf.String())
fn := p.currentFn
p.currentFn = nil
return fn, nil
}

if strings.HasPrefix(line, "FUNCTION") {
p.readingPayload = false
p.currentFn = nil
p.payloadBuf.Reset()
return p.parse(line)
}

if p.payloadBuf.Len() > 0 {
p.payloadBuf.WriteByte('\n')
}
p.payloadBuf.WriteString(line)

return nil, nil
}

func (p *inputParser) parseFunction(line string) (*Function, error) {
r := csv.NewReader(strings.NewReader(line))
r.Comma = ' '

parts, err := r.Read()
if err != nil {
return nil, fmt.Errorf("failed to parse CSV: %w", err)
}

if n := len(parts); n != 6 && n != 7 {
return nil, fmt.Errorf("unexpected number of parts: want 6 or 7, got %d", n)
}

timeout, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid timeout value: %w", err)
}

nameAndArgs := strings.Split(parts[3], " ")
if len(nameAndArgs) == 0 {
return nil, fmt.Errorf("empty function name and arguments")
}

fn := &Function{
key: parts[0],
UID: parts[1],
Timeout: time.Duration(timeout) * time.Second,
Name: nameAndArgs[0],
Args: nameAndArgs[1:],
Permissions: parts[4],
Source: parts[5],
}

if len(parts) == 7 {
fn.ContentType = parts[6]
}

return fn, nil
}

0 comments on commit a1d0fe5

Please sign in to comment.