Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream changes #56

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.old.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.30.0 - 2024-06-13

### Added

- Go API: New APIs for capturing synchronous responses from downstream components. (@Jeffail)
- Go API: Ability to customise the overall configuration schema of a stream builder. (@Jeffail)
- New `sin`, `cos`, `tan` and `pi` bloblang methods. (@mfamador)
- Field `proxy_url` added to the `websocket` input and output. (@mihaitodor)

### Fixed

- The `websocket` input and output now obey the `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` environment variables. (@mihaitodor)

## 4.29.0 - 2024-06-10

### Added
Expand Down
4 changes: 3 additions & 1 deletion internal/impl/aws/processor_dynamodb_partiql.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func newDynamoDBPartiQL(

func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) {
stmts := []types.BatchStatementRequest{}
executor := batch.BloblangExecutor(d.args)

for i := range batch {
req := types.BatchStatementRequest{}
req.Statement = &d.query
Expand All @@ -116,7 +118,7 @@ func (d *dynamoDBPartiQL) ProcessBatch(ctx context.Context, batch service.Messag
req.Statement = &query
}

argMsg, err := batch.BloblangQuery(i, d.args)
argMsg, err := executor.Query(i)
if err != nil {
return nil, fmt.Errorf("error evaluating arg mapping at index %d: %v", i, err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/impl/azure/cosmosdb/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a
fmt.Errorf("current batch has %d messages, but the CosmosDB transactional batch limit is %d", len(batch), maxTransactionalBatchSize)
}

pkQueryResult, err := batch.BloblangQueryValue(0, config.PartitionKeys)
executor := batch.BloblangExecutor(config.PartitionKeys)
pkQueryResult, err := executor.QueryValue(0)
if err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate partition key values: %s", err)
}
Expand Down Expand Up @@ -114,7 +115,8 @@ func ExecMessageBatch(ctx context.Context, batch service.MessageBatch, client *a

var value any
if po.Value != nil {
if value, err = batch.BloblangQueryValue(idx, po.Value); err != nil {
executor := batch.BloblangExecutor(po.Value)
if value, err = executor.QueryValue(idx); err != nil {
return azcosmos.TransactionalBatchResponse{}, fmt.Errorf("failed to evaluate patch value: %s", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/cassandra/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func (c *cassandraWriter) writeBatch(session *gocql.Session, b service.MessageBa
func (c *cassandraWriter) mapArgs(b service.MessageBatch, index int) ([]any, error) {
if c.argsMapping != nil {
// We've got an "args_mapping" field, extract values from there.
part, err := b.BloblangQuery(index, c.argsMapping)
executor := b.BloblangExecutor(c.argsMapping)
part, err := executor.Query(index)
if err != nil {
return nil, fmt.Errorf("executing bloblang mapping: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/impl/couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo
func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBatch) ([]service.MessageBatch, error) {
newMsg := inBatch.Copy()
ops := make([]gocb.BulkOp, len(inBatch))
var executor *service.MessageBatchBloblangExecutor
if p.content != nil {
executor = inBatch.BloblangExecutor(p.content)
}

// generate query
for index := range newMsg {
Expand All @@ -129,7 +133,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat
// generate content
var content []byte
if p.content != nil {
res, err := inBatch.BloblangQuery(index, p.content)
res, err := executor.Query(index)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion internal/impl/gcp/processor_bigquery_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,17 @@ func (proc *bigQuerySelectProcessor) ProcessBatch(ctx context.Context, batch ser

outBatch := make(service.MessageBatch, 0, len(batch))

var executor *service.MessageBatchBloblangExecutor
if argsMapping != nil {
executor = batch.BloblangExecutor(argsMapping)
}

for i, msg := range batch {
outBatch = append(outBatch, msg)

var args []any
if argsMapping != nil {
resMsg, err := batch.BloblangQuery(i, argsMapping)
resMsg, err := executor.Query(i)
if err != nil {
msg.SetError(fmt.Errorf("failed to resolve args mapping: %w", err))
continue
Expand Down
165 changes: 165 additions & 0 deletions internal/impl/io/bloblang_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package io_test

import (
"fmt"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace/noop"

"github.com/warpstreamlabs/bento/public/bloblang"
"github.com/warpstreamlabs/bento/public/service"

_ "github.com/warpstreamlabs/bento/internal/impl/io"
)

func TestFunctionExamples(t *testing.T) {
tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_functions_test")
require.NoError(t, err)
t.Cleanup(func() {
os.Remove(tmpJSONFile.Name())
})

_, err = tmpJSONFile.WriteString(`{"foo":"bar"}`)
require.NoError(t, err)

key := "BENTO_TEST_BLOBLANG_FILE"
t.Setenv(key, tmpJSONFile.Name())

env := bloblang.GlobalEnvironment()
env.WalkFunctions(func(name string, view *bloblang.FunctionView) {
t.Run(name, func(t *testing.T) {
t.Parallel()

spec := view.TemplateData()
for i, e := range spec.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
textMap := propagation.MapCarrier{
"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
}
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}))

textProp := otel.GetTextMapPropagator()
otelCtx := textProp.Extract(msg.Context(), textMap)
pCtx, _ := noop.NewTracerProvider().Tracer("blobby").Start(otelCtx, "test")
msg = msg.WithContext(pCtx)

p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
})
})
}

func TestMethodExamples(t *testing.T) {
tmpJSONFile, err := os.CreateTemp("", "bento_bloblang_methods_test")
require.NoError(t, err)
t.Cleanup(func() {
os.Remove(tmpJSONFile.Name())
})

_, err = tmpJSONFile.WriteString(`
{
"type":"object",
"properties":{
"foo":{
"type":"string"
}
}
}`)
require.NoError(t, err)

key := "BENTO_TEST_BLOBLANG_SCHEMA_FILE"
t.Setenv(key, tmpJSONFile.Name())

env := bloblang.GlobalEnvironment()
env.WalkMethods(func(name string, view *bloblang.MethodView) {
spec := view.TemplateData()
t.Run(spec.Name, func(t *testing.T) {
t.Parallel()
for i, e := range spec.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else if exp == "<Message deleted>" {
require.NoError(t, err)
require.Nil(t, p)
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
for _, target := range spec.Categories {
for i, e := range target.Examples {
if e.SkipTesting {
continue
}

m, err := env.Parse(e.Mapping)
require.NoError(t, err)

for j, io := range e.Results {
msg := service.NewMessage([]byte(io[0]))
p, err := msg.BloblangQuery(m)
exp := io[1]
if strings.HasPrefix(exp, "Error(") {
exp = exp[7 : len(exp)-2]
require.EqualError(t, err, exp, fmt.Sprintf("%v-%v", i, j))
} else if exp == "<Message deleted>" {
require.NoError(t, err)
require.Nil(t, p)
} else {
require.NoError(t, err)

pBytes, err := p.AsBytes()
require.NoError(t, err)

assert.Equal(t, exp, string(pBytes), fmt.Sprintf("%v-%v", i, j))
}
}
}
}
})
})
}
3 changes: 2 additions & 1 deletion internal/impl/mongodb/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func writeMapsFromParsed(conf *service.ParsedConfig, operation Operation) (maps
}

func extJSONFromMap(b service.MessageBatch, i int, m *bloblang.Executor) (any, error) {
msg, err := b.BloblangQuery(i, m)
executor := b.BloblangExecutor(m)
msg, err := executor.Query(i)
if err != nil {
return nil, err
}
Expand Down
Loading