diff --git a/CHANGELOG.old.md b/CHANGELOG.old.md index 5c00dd14b..a3cffe6c0 100644 --- a/CHANGELOG.old.md +++ b/CHANGELOG.old.md @@ -3,6 +3,19 @@ Changelog All notable changes to this project will be documented in this file. +## 4.30.0 - 2024-06-13 + +### Added + +- Go API: New APIs for capturing synchronous responses from downstream components. (@Jeffail) +- Go API: Ability to customise the overall configuration schema of a stream builder. (@Jeffail) +- New `sin`, `cos`, `tan` and `pi` bloblang methods. (@mfamador) +- Field `proxy_url` added to the `websocket` input and output. (@mihaitodor) + +### Fixed + +- The `websocket` input and output now obey the `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` environment variables. (@mihaitodor) + ## 4.29.0 - 2024-06-10 ### Added diff --git a/internal/impl/aws/processor_dynamodb_partiql.go b/internal/impl/aws/processor_dynamodb_partiql.go index fd26eb00c..8128e1cb4 100644 --- a/internal/impl/aws/processor_dynamodb_partiql.go +++ b/internal/impl/aws/processor_dynamodb_partiql.go @@ -105,6 +105,8 @@ func newDynamoDBPartiQL( func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { stmts := []types.BatchStatementRequest{} + executor := batch.BloblangExecutor(d.args) + for i := range batch { req := types.BatchStatementRequest{} req.Statement = &d.query @@ -116,7 +118,7 @@ func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.Messag req.Statement = &query } - argMsg, err := batch.BloblangQuery(i, d.args) + argMsg, err := executor.Query(i) if err != nil { return nil, fmt.Errorf("error evaluating arg mapping at index %d: %v", i, err) } diff --git a/internal/impl/azure/cosmosdb/executor.go b/internal/impl/azure/cosmosdb/executor.go index 1f4a6db9c..374154711 100644 --- a/internal/impl/azure/cosmosdb/executor.go +++ b/internal/impl/azure/cosmosdb/executor.go @@ -26,7 +26,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a fmt.Errorf("current batch has %d messages, but the CosmosDB transactional batch limit is %d", len(batch), maxTransactionalBatchSize) } - pkQueryResult, err := batch.BloblangQueryValue(0, config.PartitionKeys) + executor := batch.BloblangExecutor(config.PartitionKeys) + pkQueryResult, err := executor.QueryValue(0) if err != nil { return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate partition key values: %s", err) } @@ -114,7 +115,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a var value any if po.Value != nil { - if value, err = batch.BloblangQueryValue(idx, po.Value); err != nil { + executor := batch.BloblangExecutor(po.Value) + if value, err = executor.QueryValue(idx); err != nil { return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate patch value: %s", err) } } diff --git a/internal/impl/cassandra/output.go b/internal/impl/cassandra/output.go index c23a16af6..cfa7252e1 100644 --- a/internal/impl/cassandra/output.go +++ b/internal/impl/cassandra/output.go @@ -216,7 +216,8 @@ func (c *cassandraWriter) writeBatch(session *gocql.Session, b service.MessageBa func (c *cassandraWriter) mapArgs(b service.MessageBatch, index int) ([]any, error) { if c.argsMapping != nil { // We've got an "args_mapping" field, extract values from there. - part, err := b.BloblangQuery(index, c.argsMapping) + executor := b.BloblangExecutor(c.argsMapping) + part, err := executor.Query(index) if err != nil { return nil, fmt.Errorf("executing bloblang mapping: %w", err) } diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 9725a060e..8a60aac56 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -117,6 +117,10 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBatch) ([]service.MessageBatch, error) { newMsg := inBatch.Copy() ops := make([]gocb.BulkOp, len(inBatch)) + var executor *service.MessageBatchBloblangExecutor + if p.content != nil { + executor = inBatch.BloblangExecutor(p.content) + } // generate query for index := range newMsg { @@ -129,7 +133,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat // generate content var content []byte if p.content != nil { - res, err := inBatch.BloblangQuery(index, p.content) + res, err := executor.Query(index) if err != nil { return nil, err } diff --git a/internal/impl/gcp/processor_bigquery_select.go b/internal/impl/gcp/processor_bigquery_select.go index 2a9724e0a..228232da8 100644 --- a/internal/impl/gcp/processor_bigquery_select.go +++ b/internal/impl/gcp/processor_bigquery_select.go @@ -168,12 +168,17 @@ func (proc *bigQuerySelectProcessor) ProcessBatch(ctx context.Context, batch ser outBatch := make(service.MessageBatch, 0, len(batch)) + var executor *service.MessageBatchBloblangExecutor + if argsMapping != nil { + executor = batch.BloblangExecutor(argsMapping) + } + for i, msg := range batch { outBatch = append(outBatch, msg) var args []any if argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, argsMapping) + resMsg, err := executor.Query(i) if err != nil { msg.SetError(fmt.Errorf("failed to resolve args mapping: %w", err)) continue diff --git a/internal/impl/io/bloblang_examples_test.go b/internal/impl/io/bloblang_examples_test.go new file mode 100644 index 000000000..4e6299e12 --- /dev/null +++ b/internal/impl/io/bloblang_examples_test.go @@ -0,0 +1,165 @@ +package io_test + +import ( + "fmt" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace/noop" + + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" + + _ "github.com/warpstreamlabs/bento/internal/impl/io" +) + +func TestFunctionExamples(t *testing.T) { + tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_functions_test") + require.NoError(t, err) + t.Cleanup(func() { + os.Remove(tmpJSONFile.Name()) + }) + + _, err = tmpJSONFile.WriteString(`{"foo":"bar"}`) + require.NoError(t, err) + + key := "BENTO_TEST_BLOBLANG_FILE" + t.Setenv(key, tmpJSONFile.Name()) + + env := bloblang.GlobalEnvironment() + env.WalkFunctions(func(name string, view *bloblang.FunctionView) { + t.Run(name, func(t *testing.T) { + t.Parallel() + + spec := view.TemplateData() + for i, e := range spec.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + textMap := propagation.MapCarrier{ + "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + } + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})) + + textProp := otel.GetTextMapPropagator() + otelCtx := textProp.Extract(msg.Context(), textMap) + pCtx, _ := noop.NewTracerProvider().Tracer("blobby").Start(otelCtx, "test") + msg = msg.WithContext(pCtx) + + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + }) + }) +} + +func TestMethodExamples(t *testing.T) { + tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_methods_test") + require.NoError(t, err) + t.Cleanup(func() { + os.Remove(tmpJSONFile.Name()) + }) + + _, err = tmpJSONFile.WriteString(` +{ + "type":"object", + "properties":{ + "foo":{ + "type":"string" + } + } +}`) + require.NoError(t, err) + + key := "BENTO_TEST_BLOBLANG_SCHEMA_FILE" + t.Setenv(key, tmpJSONFile.Name()) + + env := bloblang.GlobalEnvironment() + env.WalkMethods(func(name string, view *bloblang.MethodView) { + spec := view.TemplateData() + t.Run(spec.Name, func(t *testing.T) { + t.Parallel() + for i, e := range spec.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else if exp == "" { + require.NoError(t, err) + require.Nil(t, p) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + for _, target := range spec.Categories { + for i, e := range target.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else if exp == "" { + require.NoError(t, err) + require.Nil(t, p) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + } + }) + }) +} diff --git a/internal/impl/mongodb/common.go b/internal/impl/mongodb/common.go index f2dd26a4b..3cf6e52e5 100644 --- a/internal/impl/mongodb/common.go +++ b/internal/impl/mongodb/common.go @@ -379,7 +379,8 @@ func writeMapsFromParsed(conf *service.ParsedConfig, operation Operation) (maps } func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) { - msg, err := b.BloblangQuery(i, m) + executor := b.BloblangExecutor(m) + msg, err := executor.Query(i) if err != nil { return nil, err } diff --git a/internal/impl/pure/bloblang_examples_test.go b/internal/impl/pure/bloblang_examples_test.go new file mode 100644 index 000000000..f603cd1dd --- /dev/null +++ b/internal/impl/pure/bloblang_examples_test.go @@ -0,0 +1,165 @@ +package pure_test + +import ( + "fmt" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace/noop" + + "github.com/warpstreamlabs/bento/public/bloblang" + "github.com/warpstreamlabs/bento/public/service" + + _ "github.com/warpstreamlabs/bento/internal/impl/pure" +) + +func TestFunctionExamples(t *testing.T) { + tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_functions_test") + require.NoError(t, err) + t.Cleanup(func() { + os.Remove(tmpJSONFile.Name()) + }) + + _, err = tmpJSONFile.WriteString(`{"foo":"bar"}`) + require.NoError(t, err) + + key := "BENTO_TEST_BLOBLANG_FILE" + t.Setenv(key, tmpJSONFile.Name()) + + env := bloblang.GlobalEnvironment() + env.WalkFunctions(func(name string, view *bloblang.FunctionView) { + t.Run(name, func(t *testing.T) { + t.Parallel() + + spec := view.TemplateData() + for i, e := range spec.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + textMap := propagation.MapCarrier{ + "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + } + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})) + + textProp := otel.GetTextMapPropagator() + otelCtx := textProp.Extract(msg.Context(), textMap) + pCtx, _ := noop.NewTracerProvider().Tracer("blobby").Start(otelCtx, "test") + msg = msg.WithContext(pCtx) + + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + }) + }) +} + +func TestMethodExamples(t *testing.T) { + tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_methods_test") + require.NoError(t, err) + t.Cleanup(func() { + os.Remove(tmpJSONFile.Name()) + }) + + _, err = tmpJSONFile.WriteString(` +{ + "type":"object", + "properties":{ + "foo":{ + "type":"string" + } + } +}`) + require.NoError(t, err) + + key := "BENTO_TEST_BLOBLANG_SCHEMA_FILE" + t.Setenv(key, tmpJSONFile.Name()) + + env := bloblang.GlobalEnvironment() + env.WalkMethods(func(name string, view *bloblang.MethodView) { + spec := view.TemplateData() + t.Run(spec.Name, func(t *testing.T) { + t.Parallel() + for i, e := range spec.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else if exp == "" { + require.NoError(t, err) + require.Nil(t, p) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + for _, target := range spec.Categories { + for i, e := range target.Examples { + if e.SkipTesting { + continue + } + + m, err := env.Parse(e.Mapping) + require.NoError(t, err) + + for j, io := range e.Results { + msg := service.NewMessage([]byte(io[0])) + p, err := msg.BloblangQuery(m) + exp := io[1] + if strings.HasPrefix(exp, "Error(") { + exp = exp[7 : len(exp)-2] + require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j)) + } else if exp == "" { + require.NoError(t, err) + require.Nil(t, p) + } else { + require.NoError(t, err) + + pBytes, err := p.AsBytes() + require.NoError(t, err) + + assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j)) + } + } + } + } + }) + }) +} diff --git a/internal/impl/pure/buffer_system_window.go b/internal/impl/pure/buffer_system_window.go index 6f9ddc82e..ea6368cc2 100644 --- a/internal/impl/pure/buffer_system_window.go +++ b/internal/impl/pure/buffer_system_window.go @@ -265,9 +265,9 @@ func (w *systemWindowBuffer) nextSystemWindow() (prevStart, prevEnd, start, end return } -func (w *systemWindowBuffer) getTimestamp(i int, batch service.MessageBatch) (ts time.Time, err error) { +func (w *systemWindowBuffer) getTimestamp(i int, exec *service.MessageBatchBloblangExecutor) (ts time.Time, err error) { var tsValueMsg *service.Message - if tsValueMsg, err = batch.BloblangQuery(i, w.tsMapping); err != nil { + if tsValueMsg, err = exec.Query(i); err != nil { w.logger.Errorf("Timestamp mapping failed for message: %v", err) err = fmt.Errorf("timestamp mapping failed: %w", err) return @@ -321,9 +321,11 @@ func (w *systemWindowBuffer) WriteBatch(ctx context.Context, msgBatch service.Me messageAdded := false aggregatedAck := batch.NewCombinedAcker(batch.AckFunc(aFn)) + bExec := msgBatch.BloblangExecutor(w.tsMapping) + // And now add new messages. for i, msg := range msgBatch { - ts, err := w.getTimestamp(i, msgBatch) + ts, err := w.getTimestamp(i, bExec) if err != nil { return err } diff --git a/internal/impl/pure/processor_cached.go b/internal/impl/pure/processor_cached.go index 11229cf78..111d6a4f0 100644 --- a/internal/impl/pure/processor_cached.go +++ b/internal/impl/pure/processor_cached.go @@ -249,7 +249,7 @@ func shouldSkip(batch service.MessageBatch, predicate *bloblang.Executor) (bool, return false, nil } - predResult, err := batch.BloblangQuery(0, predicate) + predResult, err := batch.BloblangExecutor(predicate).Query(0) if err != nil { return false, fmt.Errorf("failed to execute skip_on mapping: %w", err) } diff --git a/internal/impl/redis/processor.go b/internal/impl/redis/processor.go index 78f871c05..cc0278f06 100644 --- a/internal/impl/redis/processor.go +++ b/internal/impl/redis/processor.go @@ -308,7 +308,8 @@ func getRedisOperator(opStr string) (redisOperator, error) { } func (r *redisProc) execRaw(ctx context.Context, index int, inBatch service.MessageBatch, msg *service.Message) error { - resMsg, err := inBatch.BloblangQuery(index, r.argsMapping) + executor := inBatch.BloblangExecutor(r.argsMapping) + resMsg, err := executor.Query(index) if err != nil { return fmt.Errorf("args mapping failed: %v", err) } diff --git a/internal/impl/redis/script_processor.go b/internal/impl/redis/script_processor.go index 65f3881bc..27025a763 100644 --- a/internal/impl/redis/script_processor.go +++ b/internal/impl/redis/script_processor.go @@ -186,7 +186,8 @@ func (r *redisScriptProc) Close(ctx context.Context) error { } func getArgsMapping(inBatch service.MessageBatch, index int, mapping *bloblang.Executor) ([]any, error) { - resMsg, err := inBatch.BloblangQuery(index, mapping) + executor := inBatch.BloblangExecutor(mapping) + resMsg, err := executor.Query(index) if err != nil { return nil, fmt.Errorf("mapping failed: %v", err) } @@ -208,7 +209,8 @@ func getArgsMapping(inBatch service.MessageBatch, index int, mapping *bloblang.E } func getKeysStrMapping(inBatch service.MessageBatch, index int, mapping *bloblang.Executor) ([]string, error) { - resMsg, err := inBatch.BloblangQuery(index, mapping) + executor := inBatch.BloblangExecutor(mapping) + resMsg, err := executor.Query(index) if err != nil { return nil, fmt.Errorf("mapping failed: %v", err) } diff --git a/internal/impl/sql/output_sql_insert.go b/internal/impl/sql/output_sql_insert.go index 1ca40d5ae..193185b8f 100644 --- a/internal/impl/sql/output_sql_insert.go +++ b/internal/impl/sql/output_sql_insert.go @@ -218,6 +218,12 @@ func (s *sqlInsertOutput) WriteBatch(ctx context.Context, batch service.MessageB var tx *sql.Tx var stmt *sql.Stmt + var executor *service.MessageBatchBloblangExecutor + + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + if s.useTxStmt { var err error if tx, err = s.db.Begin(); err != nil { @@ -236,7 +242,7 @@ func (s *sqlInsertOutput) WriteBatch(ctx context.Context, batch service.MessageB for i := range batch { var args []any if s.argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { return err } diff --git a/internal/impl/sql/output_sql_raw.go b/internal/impl/sql/output_sql_raw.go index 6179161c4..fe2dfeb3f 100644 --- a/internal/impl/sql/output_sql_raw.go +++ b/internal/impl/sql/output_sql_raw.go @@ -188,10 +188,15 @@ func (s *sqlRawOutput) WriteBatch(ctx context.Context, batch service.MessageBatc s.dbMut.RLock() defer s.dbMut.RUnlock() + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + for i := range batch { var args []any if s.argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { return err } diff --git a/internal/impl/sql/processor_sql_insert.go b/internal/impl/sql/processor_sql_insert.go index 54fbd5046..29428457d 100644 --- a/internal/impl/sql/processor_sql_insert.go +++ b/internal/impl/sql/processor_sql_insert.go @@ -197,6 +197,11 @@ func (s *sqlInsertProcessor) ProcessBatch(ctx context.Context, batch service.Mes var tx *sql.Tx var stmt *sql.Stmt + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + if s.useTxStmt { var err error if tx, err = s.db.Begin(); err != nil { @@ -215,7 +220,7 @@ func (s *sqlInsertProcessor) ProcessBatch(ctx context.Context, batch service.Mes for i, msg := range batch { var args []any if s.argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { s.logger.Debugf("Arguments mapping failed: %v", err) msg.SetError(err) diff --git a/internal/impl/sql/processor_sql_raw.go b/internal/impl/sql/processor_sql_raw.go index 776b43fd3..1190ed66d 100644 --- a/internal/impl/sql/processor_sql_raw.go +++ b/internal/impl/sql/processor_sql_raw.go @@ -188,11 +188,16 @@ func (s *sqlRawProcessor) ProcessBatch(ctx context.Context, batch service.Messag s.dbMut.RLock() defer s.dbMut.RUnlock() + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + batch = batch.Copy() for i, msg := range batch { var args []any if s.argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { s.logger.Debugf("Arguments mapping failed: %v", err) msg.SetError(err) diff --git a/internal/impl/sql/processor_sql_select.go b/internal/impl/sql/processor_sql_select.go index 827209bd0..71eeb33ae 100644 --- a/internal/impl/sql/processor_sql_select.go +++ b/internal/impl/sql/processor_sql_select.go @@ -192,11 +192,16 @@ func (s *sqlSelectProcessor) ProcessBatch(ctx context.Context, batch service.Mes s.dbMut.RLock() defer s.dbMut.RUnlock() + var executor *service.MessageBatchBloblangExecutor + if s.argsMapping != nil { + executor = batch.BloblangExecutor(s.argsMapping) + } + batch = batch.Copy() for i, msg := range batch { var args []any if s.argsMapping != nil { - resMsg, err := batch.BloblangQuery(i, s.argsMapping) + resMsg, err := executor.Query(i) if err != nil { s.logger.Debugf("Arguments mapping failed: %v", err) msg.SetError(err) diff --git a/public/bloblang/view.go b/public/bloblang/view.go index d831074f8..0289c4d40 100644 --- a/public/bloblang/view.go +++ b/public/bloblang/view.go @@ -3,6 +3,8 @@ package bloblang import ( "encoding/json" + "github.com/Jeffail/gabs/v2" + "github.com/warpstreamlabs/bento/internal/bloblang/query" ) @@ -27,6 +29,20 @@ func (v *FunctionView) FormatJSON() ([]byte, error) { return json.Marshal(v.spec) } +// TemplateData returns an exported struct containing information ready to +// inject into a template for generating documentation. +func (v *FunctionView) TemplateData() TemplateFunctionData { + return TemplateFunctionData{ + Status: string(v.spec.Status), + Name: v.spec.Name, + Category: v.spec.Category, + Description: v.spec.Description, + Params: templateParams(v.spec.Params), + Examples: templateExamples(v.spec.Examples), + Version: v.spec.Version, + } +} + // MethodView describes a particular method belonging to a Bloblang environment. type MethodView struct { spec query.MethodSpec @@ -46,3 +62,151 @@ func (v *MethodView) Description() string { func (v *MethodView) FormatJSON() ([]byte, error) { return json.Marshal(v.spec) } + +// TemplateData returns an exported struct containing information ready to +// inject into a template for generating documentation. +func (v *MethodView) TemplateData() TemplateMethodData { + return TemplateMethodData{ + Status: string(v.spec.Status), + Name: v.spec.Name, + Description: v.spec.Description, + Params: templateParams(v.spec.Params), + Examples: templateExamples(v.spec.Examples), + Categories: templateCategories(v.spec.Categories), + Version: v.spec.Version, + } +} + +//------------------------------------------------------------------------------ + +func templateParams(p query.Params) TemplateParamsData { + var tDefs []TemplateParamData + for _, d := range p.Definitions { + var jDefault string + if d.DefaultValue != nil { + jDefault = gabs.Wrap(*d.DefaultValue).String() + } + tDefs = append(tDefs, TemplateParamData{ + Name: d.Name, + Description: d.Description, + ValueType: string(d.ValueType), + IsOptional: d.IsOptional, + DefaultMarshalled: jDefault, + }) + } + return TemplateParamsData{ + Variadic: p.Variadic, + Definitions: tDefs, + } +} + +// TemplateParamData describes a single parameter defined for a bloblang plugin. +type TemplateParamData struct { + Name string + Description string + ValueType string + + // IsOptional is implicit when there's a DefaultMarshalled. However, there + // are times when a parameter is used to change behaviour without having a + // default. + IsOptional bool + DefaultMarshalled string +} + +// TemplateParamsData describes the overall set of parameters for a bloblang +// plugin. +type TemplateParamsData struct { + Variadic bool + Definitions []TemplateParamData +} + +func templateExamples(esa []query.ExampleSpec) (eda []TemplateExampleData) { + for _, es := range esa { + eda = append(eda, TemplateExampleData{ + Mapping: es.Mapping, + Summary: es.Summary, + Results: es.Results, + SkipTesting: es.SkipTesting, + }) + } + return +} + +// TemplateExampleData describes a single example for a given bloblang plugin. +type TemplateExampleData struct { + Mapping string + Summary string + Results [][2]string + + // True if this example will not function as shown when tested + SkipTesting bool +} + +func templateCategories(csa []query.MethodCatSpec) (cda []TemplateMethodCategoryData) { + for _, es := range csa { + cda = append(cda, TemplateMethodCategoryData{ + Category: es.Category, + Description: es.Description, + Examples: templateExamples(es.Examples), + }) + } + return +} + +// TemplateMethodCategoryData describes a behaviour, along with examples, of a +// method plugin for a given category. Separating documentation for a method +// into categories is sometimes appropriate in cases where the method behaves +// differently based on the target value. +type TemplateMethodCategoryData struct { + Category string + Description string + Examples []TemplateExampleData +} + +// TemplateMethodData describes a bloblang method. +type TemplateMethodData struct { + // The release status of the function. + Status string + + // Name of the method (as it appears in config). + Name string + + // Description of the method purpose (in markdown). + Description string + + // Params defines the expected arguments of the method. + Params TemplateParamsData + + // Examples shows general usage for the method. + Examples []TemplateExampleData + + // Categories further describe the method for a given category. + Categories []TemplateMethodCategoryData + + // Version is the Bento version this component was introduced. + Version string +} + +// TemplateFunctionData describes a bloblang function. +type TemplateFunctionData struct { + // The release status of the function. + Status string + + // Name of the function (as it appears in config). + Name string + + // Category is a rough category for the function. + Category string + + // Description of the functions purpose (in markdown). + Description string + + // Params defines the expected arguments of the function. + Params TemplateParamsData + + // Examples shows general usage for the function. + Examples []TemplateExampleData + + // Version is the Bento version this component was introduced. + Version string +} diff --git a/public/service/message.go b/public/service/message.go index 927e059de..bcf3afd83 100644 --- a/public/service/message.go +++ b/public/service/message.go @@ -465,6 +465,8 @@ func (m *Message) BloblangMutateFrom(blobl *bloblang.Executor, from *Message) (* // // This method allows mappings to perform windowed aggregations across message // batches. +// +// Deprecated: Use the much more efficient BloblangExecutor method instead. func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error) { uw := blobl.XUnwrapper().(interface { Unwrap() *mapping.Executor @@ -493,6 +495,8 @@ func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Messa // // This method allows mappings to perform windowed aggregations across message // batches. +// +// Deprecated: Use the much more efficient BloblangExecutor method instead. func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (any, error) { uw := blobl.XUnwrapper().(interface { Unwrap() *mapping.Executor @@ -536,6 +540,8 @@ func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (a // Note that using overlay means certain functions within the Bloblang mapping // will behave differently. In the root of the mapping the right-hand keywords // `root` and `this` refer to the same mutable root of the output document. +// +// Deprecated: Use the much more efficient BloblangExecutor method instead. func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error) { uw := blobl.XUnwrapper().(interface { Unwrap() *mapping.Executor @@ -562,6 +568,10 @@ func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Mess // This method allows interpolation functions to perform windowed aggregations // across message batches, and is a more powerful way to interpolate strings // than the standard .String method. +// +// Note: For performance reasons, if this method is being executed for each +// member of a batch individually, you should instead use an +// InterpolationExecutor. func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error) { msg := make(message.Batch, len(b)) for i, m := range b { @@ -576,6 +586,10 @@ func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (s // This method allows interpolation functions to perform windowed aggregations // across message batches, and is a more powerful way to interpolate strings // than the standard .String method. +// +// Note: For performance reasons, if this method is being executed for each +// member of a batch individually, you should instead use an +// InterpolationExecutor. func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error) { msg := make(message.Batch, len(b)) for i, m := range b { diff --git a/public/service/message_batch_blobl.go b/public/service/message_batch_blobl.go new file mode 100644 index 000000000..9b030141f --- /dev/null +++ b/public/service/message_batch_blobl.go @@ -0,0 +1,157 @@ +package service + +import ( + "github.com/warpstreamlabs/bento/internal/bloblang/field" + "github.com/warpstreamlabs/bento/internal/bloblang/mapping" + "github.com/warpstreamlabs/bento/internal/bloblang/query" + "github.com/warpstreamlabs/bento/internal/message" + "github.com/warpstreamlabs/bento/internal/value" + "github.com/warpstreamlabs/bento/public/bloblang" +) + +// MessageBatchBloblangExecutor is a mechanism for executing a given bloblang +// executor against a message batch, with each invocation from the perspective +// of a given index of the batch. This allows mappings to perform windowed +// aggregations across message batches. +type MessageBatchBloblangExecutor struct { + oldBatch message.Batch + exe *mapping.Executor +} + +// BloblangExecutor instantiates a mechanism for executing a given bloblang +// executor against a message batch, with each invocation from the perspective +// of a given index of the batch. This allows mappings to perform windowed +// aggregations across message batches. +func (b MessageBatch) BloblangExecutor(blobl *bloblang.Executor) *MessageBatchBloblangExecutor { + uw := blobl.XUnwrapper().(interface { + Unwrap() *mapping.Executor + }).Unwrap() + + msg := make(message.Batch, len(b)) + for i, m := range b { + msg[i] = m.part + } + + return &MessageBatchBloblangExecutor{ + oldBatch: msg, + exe: uw, + } +} + +// Query executes a parsed Bloblang mapping on a message batch, from the +// perspective of a particular message index, and returns a message back or an +// error if the mapping fails. If the mapping results in the root being deleted +// the returned message will be nil, which indicates it has been filtered. +// +// This method allows mappings to perform windowed aggregations across message +// batches. +func (b MessageBatchBloblangExecutor) Query(index int) (*Message, error) { + res, err := b.exe.MapPart(index, b.oldBatch) + if err != nil { + return nil, err + } + if res != nil { + return NewInternalMessage(res), nil + } + return nil, nil +} + +// QueryValue executes a parsed Bloblang mapping on a message batch, +// from the perspective of a particular message index, and returns the raw value +// result or an error if the mapping fails. The error bloblang.ErrRootDeleted is +// returned if the root of the mapping value is deleted, this is in order to +// allow distinction between a real nil value and a deleted value. +// +// This method allows mappings to perform windowed aggregations across message +// batches. +func (b MessageBatchBloblangExecutor) QueryValue(index int) (any, error) { + res, err := b.exe.Exec(query.FunctionContext{ + Maps: b.exe.Maps(), + Vars: map[string]any{}, + Index: index, + MsgBatch: b.oldBatch, + }) + if err != nil { + return nil, err + } + + switch res.(type) { + case value.Delete: + return nil, bloblang.ErrRootDeleted + case value.Nothing: + return nil, nil + } + return res, nil +} + +// Mutate executes a parsed Bloblang mapping onto a message within the +// batch, where the contents of the message are mutated directly rather than +// creating an entirely new object. +// +// Returns the same message back in a mutated form, or an error if the mapping +// fails. If the mapping results in the root being deleted the returned message +// will be nil, which indicates it has been filtered. +// +// This method allows mappings to perform windowed aggregations across message +// batches. +// +// Note that using overlay means certain functions within the Bloblang mapping +// will behave differently. In the root of the mapping the right-hand keywords +// `root` and `this` refer to the same mutable root of the output document. +func (b MessageBatchBloblangExecutor) Mutate(index int) (*Message, error) { + res, err := b.exe.MapOnto(b.oldBatch[index], index, b.oldBatch) + if err != nil { + return nil, err + } + if res != nil { + return NewInternalMessage(res), nil + } + return nil, nil +} + +//------------------------------------------------------------------------------ + +// MessageBatchInterpolationExecutor is a mechanism for executing a given +// bloblang interpolation string against a message batch, with each invocation +// from the perspective of a given index of the batch. This allows +// interpolations to perform windowed aggregations across message batches. +type MessageBatchInterpolationExecutor struct { + oldBatch message.Batch + i *field.Expression +} + +// InterpolationExecutor instantiates a mechanism for executing a given bloblang +// interpolation string against a message batch, with each invocation from the +// perspective of a given index of the batch. This allows interpolations to +// perform windowed aggregations across message batches. +func (b MessageBatch) InterpolationExecutor(i *InterpolatedString) *MessageBatchInterpolationExecutor { + msg := make(message.Batch, len(b)) + for i, m := range b { + msg[i] = m.part + } + + return &MessageBatchInterpolationExecutor{ + oldBatch: msg, + i: i.expr, + } +} + +// TryString resolves an interpolated string expression on a message batch, from +// the perspective of a particular message index. +// +// This method allows interpolation functions to perform windowed aggregations +// across message batches, and is a more powerful way to interpolate strings +// than the standard .String method. +func (b MessageBatchInterpolationExecutor) TryString(index int) (string, error) { + return b.i.String(index, b.oldBatch) +} + +// TryBytes resolves an interpolated string expression on a message batch, from +// the perspective of a particular message index. +// +// This method allows interpolation functions to perform windowed aggregations +// across message batches, and is a more powerful way to interpolate strings +// than the standard .String method. +func (b MessageBatchInterpolationExecutor) TryBytes(index int) ([]byte, error) { + return b.i.Bytes(index, b.oldBatch) +} diff --git a/public/service/message_batch_blobl_test.go b/public/service/message_batch_blobl_test.go new file mode 100644 index 000000000..0afe329bd --- /dev/null +++ b/public/service/message_batch_blobl_test.go @@ -0,0 +1,158 @@ +package service + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/warpstreamlabs/bento/public/bloblang" +) + +func TestMessageBatchExecutorMapping(t *testing.T) { + partOne := NewMessage(nil) + partOne.SetStructured(map[string]any{ + "content": "hello world 1", + }) + + partTwo := NewMessage(nil) + partTwo.SetStructured(map[string]any{ + "content": "hello world 2", + }) + + blobl, err := bloblang.Parse(`root.new_content = json("content").from_all().join(" - ")`) + require.NoError(t, err) + + res, err := MessageBatch{partOne, partTwo}.BloblangExecutor(blobl).Query(0) + require.NoError(t, err) + + resI, err := res.AsStructured() + require.NoError(t, err) + assert.Equal(t, map[string]any{ + "new_content": "hello world 1 - hello world 2", + }, resI) +} + +func TestMessageBatchExecutorQueryValue(t *testing.T) { + partOne := NewMessage(nil) + partOne.SetStructured(map[string]any{ + "content": "hello world 1", + }) + + partTwo := NewMessage(nil) + partTwo.SetStructured(map[string]any{ + "content": "hello world 2", + }) + + tests := map[string]struct { + mapping string + batchIndex int + exp any + err string + }{ + "returns string": { + mapping: `root = json("content")`, + exp: "hello world 1", + }, + "returns integer": { + mapping: `root = json("content").length()`, + exp: int64(13), + }, + "returns float": { + mapping: `root = json("content").length() / 2`, + exp: float64(6.5), + }, + "returns bool": { + mapping: `root = json("content").length() > 0`, + exp: true, + }, + "returns bytes": { + mapping: `root = content()`, + exp: []byte(`{"content":"hello world 1"}`), + }, + "returns nil": { + mapping: `root = null`, + exp: nil, + }, + "returns null string": { + mapping: `root = "null"`, + exp: "null", + }, + "returns an array": { + mapping: `root = [ json("content") ]`, + exp: []any{"hello world 1"}, + }, + "returns an object": { + mapping: `root.new_content = json("content")`, + exp: map[string]any{"new_content": "hello world 1"}, + }, + "supports batch-wide queries": { + mapping: `root.new_content = json("content").from_all().join(" - ")`, + exp: map[string]any{"new_content": "hello world 1 - hello world 2"}, + }, + "handles the specified message index correctly": { + mapping: `root = json("content")`, + batchIndex: 1, + exp: "hello world 2", + }, + "returns an error if the mapping throws": { + mapping: `root = throw("kaboom")`, + exp: nil, + err: "failed assignment (line 1): kaboom", + }, + "returns an error if the root is deleted": { + mapping: `root = deleted()`, + exp: nil, + err: "root was deleted", + }, + "doesn't error out if a field is deleted": { + mapping: `root.foo = deleted()`, + exp: map[string]any{}, + err: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + blobl, err := bloblang.Parse(test.mapping) + require.NoError(t, err) + + res, err := MessageBatch{partOne, partTwo}.BloblangExecutor(blobl).QueryValue(test.batchIndex) + if test.err != "" { + require.ErrorContains(t, err, test.err) + } else { + require.NoError(t, err) + } + + assert.Equal(t, test.exp, res) + }) + } +} + +func TestInterpolationExecutor(t *testing.T) { + batch := MessageBatch{ + NewMessage([]byte("foo")), + NewMessage([]byte("bar")), + } + + interp, err := NewInterpolatedString("${! content().uppercase().from(0) + content() }") + require.NoError(t, err) + + exec := batch.InterpolationExecutor(interp) + + s, err := exec.TryString(0) + require.NoError(t, err) + assert.Equal(t, "FOOfoo", s) + + b, err := exec.TryBytes(0) + require.NoError(t, err) + assert.Equal(t, "FOOfoo", string(b)) + + s, err = exec.TryString(1) + require.NoError(t, err) + assert.Equal(t, "FOObar", s) + + b, err = exec.TryBytes(1) + require.NoError(t, err) + assert.Equal(t, "FOObar", string(b)) +}