Skip to content

Commit

Permalink
fix(forwarder): improve CSV handling (#324)
Browse files Browse the repository at this point in the history
A few simple performance fixes for converting CSV to JSON:
- avoid unmarshalling if target is json.RawMessage, since that calls
  `json.Valid`.
- avoid calling `strconv.Quote` where possible. Most of the inputs we
  handle in practice do not require special handling.
- keep track of max record length and use it to initialize our buffer,
  which avoids resizing.

In profiling, these changes reduced processing time for a VPC flow logs
file by ~33%.
  • Loading branch information
jta authored Jul 29, 2024
1 parent e2db56e commit 36b455a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 19 deletions.
22 changes: 19 additions & 3 deletions pkg/handler/forwarder/s3http/internal/decoders/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type CSVDecoder struct {
*csv.Reader
buffered *bufio.Reader
header []string
maxSize int
sync.Once
}

Expand All @@ -74,20 +75,35 @@ func (dec *CSVDecoder) Decode(v any) error {
return fmt.Errorf("failed to decode record: %w", err)
}

var buf bytes.Buffer
buf := bytes.NewBuffer(make([]byte, 0, dec.maxSize))

buf.WriteString(`{`)
for i, colName := range dec.header {
if i < len(record) && record[i] != "" {
if buf.Len() != 1 {
buf.WriteString(`,`)
}
buf.WriteString(colName + `:` + strconv.Quote(record[i]))

buf.WriteString(colName + `:`)

// it is cheaper to verify if naive quoting is enough
if value := []byte(`"` + record[i] + `"`); json.Valid(value) {
buf.Write(value)
} else {
buf.WriteString(strconv.Quote(record[i]))
}
}
}
buf.WriteString(`}`)

if err := json.Unmarshal(buf.Bytes(), v); err != nil {
if buf.Len() > dec.maxSize {
dec.maxSize = buf.Len()
}

// avoid unmarshalling if possible
if r, ok := v.(*json.RawMessage); ok {
*r = buf.Bytes()
} else if err := json.Unmarshal(buf.Bytes(), v); err != nil {
return fmt.Errorf("failed to decode CSV: %w", err)
}
return nil
Expand Down
24 changes: 18 additions & 6 deletions pkg/handler/forwarder/s3http/internal/decoders/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestDecoders(t *testing.T) {
ContentType string
ContentEncoding string
InputFile string
DisableRawJSON bool
}{
{
ContentType: "application/json",
Expand Down Expand Up @@ -53,8 +54,9 @@ func TestDecoders(t *testing.T) {
InputFile: "testdata/example.txt",
},
{
ContentType: "application/x-aws-cloudwatchlogs",
InputFile: "testdata/cloudwatchlogs.json",
ContentType: "application/x-aws-cloudwatchlogs",
InputFile: "testdata/cloudwatchlogs.json",
DisableRawJSON: true,
},
}

Expand All @@ -70,12 +72,22 @@ func TestDecoders(t *testing.T) {
var buf bytes.Buffer

enc := json.NewEncoder(&buf)

process := func(v any) error {
if err := dec.Decode(v); err != nil {
return err
}
return enc.Encode(v)
}

for dec.More() {
var v json.RawMessage
if err := dec.Decode(&v); err != nil {
t.Fatal(err)
if tt.DisableRawJSON {
err = process(new(any))
} else {
err = process(new(json.RawMessage))
}
if err := enc.Encode(v); err != nil {

if err != nil {
t.Fatal(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"id":"1","timestamp":1620678200,"message":"Error: Unable to connect to database server","owner":"123456789012","logGroup":"example-log-group","logStream":"example-log-stream","subscriptionFilters":["example-filter"],"messageType":"DATA_MESSAGE"}
{"id":"2","timestamp":1620678300,"message":"Warning: High CPU usage detected","owner":"123456789012","logGroup":"example-log-group","logStream":"example-log-stream","subscriptionFilters":["example-filter"],"messageType":"DATA_MESSAGE"}
{"id":"3","timestamp":1620678400,"message":"Info: Application started successfully","owner":"123456789012","logGroup":"example-log-group","logStream":"example-log-stream","subscriptionFilters":["example-filter"],"messageType":"DATA_MESSAGE"}
{"id":"4","timestamp":1620678500,"message":"Error: Disk space running low","owner":"123456789012","logGroup":"example-log-group","logStream":"example-log-stream","subscriptionFilters":["example-filter"],"messageType":"DATA_MESSAGE"}
{"id":"5","timestamp":1620678600,"message":"Info: User 'admin' logged in","owner":"123456789012","logGroup":"example-log-group","logStream":"example-log-stream","subscriptionFilters":["example-filter"],"messageType":"DATA_MESSAGE"}
{"id":"1","timestamp":1620679200,"message":"Info: Server restarted","owner":"987654321098","logGroup":"another-log-group","logStream":"another-log-stream","subscriptionFilters":["filter-1","filter-2"],"messageType":"DATA_MESSAGE"}
{"id":"2","timestamp":1620679300,"message":"Error: Service crashed unexpectedly","owner":"987654321098","logGroup":"another-log-group","logStream":"another-log-stream","subscriptionFilters":["filter-1","filter-2"],"messageType":"DATA_MESSAGE"}
{"id":"3","timestamp":1620679400,"message":"Warning: Memory usage exceeding 90%","owner":"987654321098","logGroup":"another-log-group","logStream":"another-log-stream","subscriptionFilters":["filter-1","filter-2"],"messageType":"DATA_MESSAGE"}
{"id":"4","timestamp":1620679500,"message":"Info: New user registered","owner":"987654321098","logGroup":"another-log-group","logStream":"another-log-stream","subscriptionFilters":["filter-1","filter-2"],"messageType":"DATA_MESSAGE"}
{"id":"5","timestamp":1620679600,"message":"Error: Network connection lost","owner":"987654321098","logGroup":"another-log-group","logStream":"another-log-stream","subscriptionFilters":["filter-1","filter-2"],"messageType":"DATA_MESSAGE"}
{"id":"1","logGroup":"example-log-group","logStream":"example-log-stream","message":"Error: Unable to connect to database server","messageType":"DATA_MESSAGE","owner":"123456789012","subscriptionFilters":["example-filter"],"timestamp":1620678200}
{"id":"2","logGroup":"example-log-group","logStream":"example-log-stream","message":"Warning: High CPU usage detected","messageType":"DATA_MESSAGE","owner":"123456789012","subscriptionFilters":["example-filter"],"timestamp":1620678300}
{"id":"3","logGroup":"example-log-group","logStream":"example-log-stream","message":"Info: Application started successfully","messageType":"DATA_MESSAGE","owner":"123456789012","subscriptionFilters":["example-filter"],"timestamp":1620678400}
{"id":"4","logGroup":"example-log-group","logStream":"example-log-stream","message":"Error: Disk space running low","messageType":"DATA_MESSAGE","owner":"123456789012","subscriptionFilters":["example-filter"],"timestamp":1620678500}
{"id":"5","logGroup":"example-log-group","logStream":"example-log-stream","message":"Info: User 'admin' logged in","messageType":"DATA_MESSAGE","owner":"123456789012","subscriptionFilters":["example-filter"],"timestamp":1620678600}
{"id":"1","logGroup":"another-log-group","logStream":"another-log-stream","message":"Info: Server restarted","messageType":"DATA_MESSAGE","owner":"987654321098","subscriptionFilters":["filter-1","filter-2"],"timestamp":1620679200}
{"id":"2","logGroup":"another-log-group","logStream":"another-log-stream","message":"Error: Service crashed unexpectedly","messageType":"DATA_MESSAGE","owner":"987654321098","subscriptionFilters":["filter-1","filter-2"],"timestamp":1620679300}
{"id":"3","logGroup":"another-log-group","logStream":"another-log-stream","message":"Warning: Memory usage exceeding 90%","messageType":"DATA_MESSAGE","owner":"987654321098","subscriptionFilters":["filter-1","filter-2"],"timestamp":1620679400}
{"id":"4","logGroup":"another-log-group","logStream":"another-log-stream","message":"Info: New user registered","messageType":"DATA_MESSAGE","owner":"987654321098","subscriptionFilters":["filter-1","filter-2"],"timestamp":1620679500}
{"id":"5","logGroup":"another-log-group","logStream":"another-log-stream","message":"Error: Network connection lost","messageType":"DATA_MESSAGE","owner":"987654321098","subscriptionFilters":["filter-1","filter-2"],"timestamp":1620679600}

0 comments on commit 36b455a

Please sign in to comment.