Skip to content

Commit

Permalink
Merge pull request #56 from warpstreamlabs/jem-upstream-changes
Browse files Browse the repository at this point in the history
upstream changes
  • Loading branch information
jem-davies committed Jun 27, 2024
2 parents 1d22b5d + d19cd8b commit d081d03
Show file tree
Hide file tree
Showing 22 changed files with 901 additions and 19 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.old.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.30.0 - 2024-06-13

### Added

- Go API: New APIs for capturing synchronous responses from downstream components. (@Jeffail)
- Go API: Ability to customise the overall configuration schema of a stream builder. (@Jeffail)
- New `sin`, `cos`, `tan` and `pi` bloblang methods. (@mfamador)
- Field `proxy_url` added to the `websocket` input and output. (@mihaitodor)

### Fixed

- The `websocket` input and output now obey the `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` environment variables. (@mihaitodor)

## 4.29.0 - 2024-06-10

### Added
Expand Down
4 changes: 3 additions & 1 deletion internal/impl/aws/processor_dynamodb_partiql.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func newDynamoDBPartiQL(

func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) {
stmts := []types.BatchStatementRequest{}
executor := batch.BloblangExecutor(d.args)

for i := range batch {
req := types.BatchStatementRequest{}
req.Statement = &d.query
Expand All @@ -116,7 +118,7 @@ func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.Messag
req.Statement = &query
}

argMsg, err := batch.BloblangQuery(i, d.args)
argMsg, err := executor.Query(i)
if err != nil {
return nil, fmt.Errorf("error evaluating arg mapping at index %d: %v", i, err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/impl/azure/cosmosdb/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a
fmt.Errorf("current batch has %d messages, but the CosmosDB transactional batch limit is %d", len(batch), maxTransactionalBatchSize)
}

pkQueryResult, err := batch.BloblangQueryValue(0, config.PartitionKeys)
executor := batch.BloblangExecutor(config.PartitionKeys)
pkQueryResult, err := executor.QueryValue(0)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate partition key values: %s", err)
}
Expand Down Expand Up @@ -114,7 +115,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a

var value any
if po.Value != nil {
if value, err = batch.BloblangQueryValue(idx, po.Value); err != nil {
executor := batch.BloblangExecutor(po.Value)
if value, err = executor.QueryValue(idx); err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate patch value: %s", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/cassandra/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func (c *cassandraWriter) writeBatch(session *gocql.Session, b service.MessageBa
func (c *cassandraWriter) mapArgs(b service.MessageBatch, index int) ([]any, error) {
if c.argsMapping != nil {
// We've got an "args_mapping" field, extract values from there.
part, err := b.BloblangQuery(index, c.argsMapping)
executor := b.BloblangExecutor(c.argsMapping)
part, err := executor.Query(index)
if err != nil {
return nil, fmt.Errorf("executing bloblang mapping: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/impl/couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo
func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBatch) ([]service.MessageBatch, error) {
newMsg := inBatch.Copy()
ops := make([]gocb.BulkOp, len(inBatch))
var executor *service.MessageBatchBloblangExecutor
if p.content != nil {
executor = inBatch.BloblangExecutor(p.content)
}

// generate query
for index := range newMsg {
Expand All @@ -129,7 +133,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat
// generate content
var content []byte
if p.content != nil {
res, err := inBatch.BloblangQuery(index, p.content)
res, err := executor.Query(index)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/impl/gcp/processor_bigquery_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,17 @@ func (proc *bigQuerySelectProcessor) ProcessBatch(ctx context.Context, batch ser

outBatch := make(service.MessageBatch, 0, len(batch))

var executor *service.MessageBatchBloblangExecutor
if argsMapping != nil {
executor = batch.BloblangExecutor(argsMapping)
}

for i, msg := range batch {
outBatch = append(outBatch, msg)

var args []any
if argsMapping != nil {
resMsg, err := batch.BloblangQuery(i, argsMapping)
resMsg, err := executor.Query(i)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve args mapping: %w", err))
continue
Expand Down
165 changes: 165 additions & 0 deletions internal/impl/io/bloblang_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package io_test

import (
"fmt"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace/noop"

"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"

_ "github.com/warpstreamlabs/bento/internal/impl/io"
)

func TestFunctionExamples(t *testing.T) {
tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_functions_test")
require.NoError(t, err)
t.Cleanup(func() {
os.Remove(tmpJSONFile.Name())
})

_, err = tmpJSONFile.WriteString(`{"foo":"bar"}`)
require.NoError(t, err)

key := "BENTO_TEST_BLOBLANG_FILE"
t.Setenv(key, tmpJSONFile.Name())

env := bloblang.GlobalEnvironment()
env.WalkFunctions(func(name string, view *bloblang.FunctionView) {
t.Run(name, func(t *testing.T) {
t.Parallel()

spec := view.TemplateData()
for i, e := range spec.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
textMap := propagation.MapCarrier{
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
}
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}))

textProp := otel.GetTextMapPropagator()
otelCtx := textProp.Extract(msg.Context(), textMap)
pCtx, _ := noop.NewTracerProvider().Tracer("blobby").Start(otelCtx, "test")
msg = msg.WithContext(pCtx)

p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
})
})
}

func TestMethodExamples(t *testing.T) {
tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_methods_test")
require.NoError(t, err)
t.Cleanup(func() {
os.Remove(tmpJSONFile.Name())
})

_, err = tmpJSONFile.WriteString(`
{
"type":"object",
"properties":{
"foo":{
"type":"string"
}
}
}`)
require.NoError(t, err)

key := "BENTO_TEST_BLOBLANG_SCHEMA_FILE"
t.Setenv(key, tmpJSONFile.Name())

env := bloblang.GlobalEnvironment()
env.WalkMethods(func(name string, view *bloblang.MethodView) {
spec := view.TemplateData()
t.Run(spec.Name, func(t *testing.T) {
t.Parallel()
for i, e := range spec.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else if exp == "<Message deleted>" {
require.NoError(t, err)
require.Nil(t, p)
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
for _, target := range spec.Categories {
for i, e := range target.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else if exp == "<Message deleted>" {
require.NoError(t, err)
require.Nil(t, p)
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
}
})
})
}
3 changes: 2 additions & 1 deletion internal/impl/mongodb/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func writeMapsFromParsed(conf *service.ParsedConfig, operation Operation) (maps
}

func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) {
msg, err := b.BloblangQuery(i, m)
executor := b.BloblangExecutor(m)
msg, err := executor.Query(i)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d081d03

Please sign in to comment.