Skip to content

Commit

Permalink
Add new bloblang and interp executors for batches
Browse files Browse the repository at this point in the history
Signed-off-by: Jem Davies <jemsot@gmail.com>
  • Loading branch information
jem-davies committed Jun 23, 2024
1 parent 0cc3fb2 commit 776e827
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 4 deletions.
8 changes: 5 additions & 3 deletions internal/impl/pure/buffer_system_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ func (w *systemWindowBuffer) nextSystemWindow() (prevStart, prevEnd, start, end
return
}

func (w *systemWindowBuffer) getTimestamp(i int, batch service.MessageBatch) (ts time.Time, err error) {
func (w *systemWindowBuffer) getTimestamp(i int, exec *service.MessageBatchBloblangExecutor) (ts time.Time, err error) {
var tsValueMsg *service.Message
if tsValueMsg, err = batch.BloblangQuery(i, w.tsMapping); err != nil {
if tsValueMsg, err = exec.Query(i); err != nil {
w.logger.Errorf("Timestamp mapping failed for message: %v", err)
err = fmt.Errorf("timestamp mapping failed: %w", err)
return
Expand Down Expand Up @@ -321,9 +321,11 @@ func (w *systemWindowBuffer) WriteBatch(ctx context.Context, msgBatch service.Me
messageAdded := false
aggregatedAck := batch.NewCombinedAcker(batch.AckFunc(aFn))

bExec := msgBatch.BloblangExecutor(w.tsMapping)

// And now add new messages.
for i, msg := range msgBatch {
ts, err := w.getTimestamp(i, msgBatch)
ts, err := w.getTimestamp(i, bExec)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/processor_cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func shouldSkip(batch service.MessageBatch, predicate *bloblang.Executor) (bool,
return false, nil
}

predResult, err := batch.BloblangQuery(0, predicate)
predResult, err := batch.BloblangExecutor(predicate).Query(0)
if err != nil {
return false, fmt.Errorf("failed to execute skip_on mapping: %w", err)
}
Expand Down
14 changes: 14 additions & 0 deletions public/service/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ func (m *Message) BloblangMutateFrom(blobl *bloblang.Executor, from *Message) (*
//
// This method allows mappings to perform windowed aggregations across message
// batches.
//
// Deprecated: Use the much more efficient BloblangExecutor method instead.
func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
Expand Down Expand Up @@ -493,6 +495,8 @@ func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Messa
//
// This method allows mappings to perform windowed aggregations across message
// batches.
//
// Deprecated: Use the much more efficient BloblangExecutor method instead.
func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (any, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
Expand Down Expand Up @@ -536,6 +540,8 @@ func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (a
// Note that using overlay means certain functions within the Bloblang mapping
// will behave differently. In the root of the mapping the right-hand keywords
// `root` and `this` refer to the same mutable root of the output document.
//
// Deprecated: Use the much more efficient BloblangExecutor method instead.
func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
Expand All @@ -562,6 +568,10 @@ func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Mess
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
//
// Note: For performance reasons, if this method is being executed for each
// member of a batch individually, you should instead use an
// InterpolationExecutor.
func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error) {
msg := make(message.Batch, len(b))
for i, m := range b {
Expand All @@ -576,6 +586,10 @@ func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (s
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
//
// Note: For performance reasons, if this method is being executed for each
// member of a batch individually, you should instead use an
// InterpolationExecutor.
func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error) {
msg := make(message.Batch, len(b))
for i, m := range b {
Expand Down
157 changes: 157 additions & 0 deletions public/service/message_batch_blobl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package service

import (
"github.com/warpstreamlabs/bento/internal/bloblang/field"
"github.com/warpstreamlabs/bento/internal/bloblang/mapping"
"github.com/warpstreamlabs/bento/internal/bloblang/query"
"github.com/warpstreamlabs/bento/internal/message"
"github.com/warpstreamlabs/bento/internal/value"
"github.com/warpstreamlabs/bento/public/bloblang"
)

// MessageBatchBloblangExecutor is a mechanism for executing a given bloblang
// executor against a message batch, with each invocation from the perspective
// of a given index of the batch. This allows mappings to perform windowed
// aggregations across message batches.
type MessageBatchBloblangExecutor struct {
oldBatch message.Batch
exe *mapping.Executor
}

// BloblangExecutor instantiates a mechanism for executing a given bloblang
// executor against a message batch, with each invocation from the perspective
// of a given index of the batch. This allows mappings to perform windowed
// aggregations across message batches.
func (b MessageBatch) BloblangExecutor(blobl *bloblang.Executor) *MessageBatchBloblangExecutor {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()

msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}

return &MessageBatchBloblangExecutor{
oldBatch: msg,
exe: uw,
}
}

// Query executes a parsed Bloblang mapping on a message batch, from the
// perspective of a particular message index, and returns a message back or an
// error if the mapping fails. If the mapping results in the root being deleted
// the returned message will be nil, which indicates it has been filtered.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
func (b MessageBatchBloblangExecutor) Query(index int) (*Message, error) {
res, err := b.exe.MapPart(index, b.oldBatch)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}

// QueryValue executes a parsed Bloblang mapping on a message batch,
// from the perspective of a particular message index, and returns the raw value
// result or an error if the mapping fails. The error bloblang.ErrRootDeleted is
// returned if the root of the mapping value is deleted, this is in order to
// allow distinction between a real nil value and a deleted value.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
func (b MessageBatchBloblangExecutor) QueryValue(index int) (any, error) {
res, err := b.exe.Exec(query.FunctionContext{
Maps: b.exe.Maps(),
Vars: map[string]any{},
Index: index,
MsgBatch: b.oldBatch,
})
if err != nil {
return nil, err
}

switch res.(type) {
case value.Delete:
return nil, bloblang.ErrRootDeleted
case value.Nothing:
return nil, nil
}
return res, nil
}

// Mutate executes a parsed Bloblang mapping onto a message within the
// batch, where the contents of the message are mutated directly rather than
// creating an entirely new object.
//
// Returns the same message back in a mutated form, or an error if the mapping
// fails. If the mapping results in the root being deleted the returned message
// will be nil, which indicates it has been filtered.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
//
// Note that using overlay means certain functions within the Bloblang mapping
// will behave differently. In the root of the mapping the right-hand keywords
// `root` and `this` refer to the same mutable root of the output document.
func (b MessageBatchBloblangExecutor) Mutate(index int) (*Message, error) {
res, err := b.exe.MapOnto(b.oldBatch[index], index, b.oldBatch)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}

//------------------------------------------------------------------------------

// MessageBatchInterpolationExecutor is a mechanism for executing a given
// bloblang interpolation string against a message batch, with each invocation
// from the perspective of a given index of the batch. This allows
// interpolations to perform windowed aggregations across message batches.
type MessageBatchInterpolationExecutor struct {
oldBatch message.Batch
i *field.Expression
}

// InterpolationExecutor instantiates a mechanism for executing a given bloblang
// interpolation string against a message batch, with each invocation from the
// perspective of a given index of the batch. This allows interpolations to
// perform windowed aggregations across message batches.
func (b MessageBatch) InterpolationExecutor(i *InterpolatedString) *MessageBatchInterpolationExecutor {
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}

return &MessageBatchInterpolationExecutor{
oldBatch: msg,
i: i.expr,
}
}

// TryString resolves an interpolated string expression on a message batch, from
// the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
func (b MessageBatchInterpolationExecutor) TryString(index int) (string, error) {
return b.i.String(index, b.oldBatch)
}

// TryBytes resolves an interpolated string expression on a message batch, from
// the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
func (b MessageBatchInterpolationExecutor) TryBytes(index int) ([]byte, error) {
return b.i.Bytes(index, b.oldBatch)
}
158 changes: 158 additions & 0 deletions public/service/message_batch_blobl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package service

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/warpstreamlabs/bento/public/bloblang"
)

func TestMessageBatchExecutorMapping(t *testing.T) {
partOne := NewMessage(nil)
partOne.SetStructured(map[string]any{
"content": "hello world 1",
})

partTwo := NewMessage(nil)
partTwo.SetStructured(map[string]any{
"content": "hello world 2",
})

blobl, err := bloblang.Parse(`root.new_content = json("content").from_all().join(" - ")`)
require.NoError(t, err)

res, err := MessageBatch{partOne, partTwo}.BloblangExecutor(blobl).Query(0)
require.NoError(t, err)

resI, err := res.AsStructured()
require.NoError(t, err)
assert.Equal(t, map[string]any{
"new_content": "hello world 1 - hello world 2",
}, resI)
}

func TestMessageBatchExecutorQueryValue(t *testing.T) {
partOne := NewMessage(nil)
partOne.SetStructured(map[string]any{
"content": "hello world 1",
})

partTwo := NewMessage(nil)
partTwo.SetStructured(map[string]any{
"content": "hello world 2",
})

tests := map[string]struct {
mapping string
batchIndex int
exp any
err string
}{
"returns string": {
mapping: `root = json("content")`,
exp: "hello world 1",
},
"returns integer": {
mapping: `root = json("content").length()`,
exp: int64(13),
},
"returns float": {
mapping: `root = json("content").length() / 2`,
exp: float64(6.5),
},
"returns bool": {
mapping: `root = json("content").length() > 0`,
exp: true,
},
"returns bytes": {
mapping: `root = content()`,
exp: []byte(`{"content":"hello world 1"}`),
},
"returns nil": {
mapping: `root = null`,
exp: nil,
},
"returns null string": {
mapping: `root = "null"`,
exp: "null",
},
"returns an array": {
mapping: `root = [ json("content") ]`,
exp: []any{"hello world 1"},
},
"returns an object": {
mapping: `root.new_content = json("content")`,
exp: map[string]any{"new_content": "hello world 1"},
},
"supports batch-wide queries": {
mapping: `root.new_content = json("content").from_all().join(" - ")`,
exp: map[string]any{"new_content": "hello world 1 - hello world 2"},
},
"handles the specified message index correctly": {
mapping: `root = json("content")`,
batchIndex: 1,
exp: "hello world 2",
},
"returns an error if the mapping throws": {
mapping: `root = throw("kaboom")`,
exp: nil,
err: "failed assignment (line 1): kaboom",
},
"returns an error if the root is deleted": {
mapping: `root = deleted()`,
exp: nil,
err: "root was deleted",
},
"doesn't error out if a field is deleted": {
mapping: `root.foo = deleted()`,
exp: map[string]any{},
err: "",
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
blobl, err := bloblang.Parse(test.mapping)
require.NoError(t, err)

res, err := MessageBatch{partOne, partTwo}.BloblangExecutor(blobl).QueryValue(test.batchIndex)
if test.err != "" {
require.ErrorContains(t, err, test.err)
} else {
require.NoError(t, err)
}

assert.Equal(t, test.exp, res)
})
}
}

func TestInterpolationExecutor(t *testing.T) {
batch := MessageBatch{
NewMessage([]byte("foo")),
NewMessage([]byte("bar")),
}

interp, err := NewInterpolatedString("${! content().uppercase().from(0) + content() }")
require.NoError(t, err)

exec := batch.InterpolationExecutor(interp)

s, err := exec.TryString(0)
require.NoError(t, err)
assert.Equal(t, "FOOfoo", s)

b, err := exec.TryBytes(0)
require.NoError(t, err)
assert.Equal(t, "FOOfoo", string(b))

s, err = exec.TryString(1)
require.NoError(t, err)
assert.Equal(t, "FOObar", s)

b, err = exec.TryBytes(1)
require.NoError(t, err)
assert.Equal(t, "FOObar", string(b))
}

0 comments on commit 776e827

Please sign in to comment.