From c7462c0ffb729392b5a8d657165da4c15a159272 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 3 Jul 2024 21:15:20 +0100 Subject: [PATCH 01/12] Tighten up stream integration tests Reject messages which contain errors when running the integration test suite. This comes in handy when one wants to attach some processors to the input(s) in integration tests for performing additional validations such as throwing an error if some expected metadata field isn't present or has the wrong value. Signed-off-by: Mihai Todor Signed-off-by: Jem Davies --- public/service/integration/stream_test_helpers.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/public/service/integration/stream_test_helpers.go b/public/service/integration/stream_test_helpers.go index 06570aa95..07b6ed43f 100644 --- a/public/service/integration/stream_test_helpers.go +++ b/public/service/integration/stream_test_helpers.go @@ -511,7 +511,10 @@ func receiveMessage( require.NoError(t, ackFn(ctx, err)) require.Len(t, b, 1) - return b.Get(0) + msg := b.Get(0) + require.NoError(t, msg.ErrorGet()) + + return msg } func receiveBatch( From c72fa98f59f3e75138b0868ffa3bc405e033ad64 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 3 Jul 2024 21:25:54 +0100 Subject: [PATCH 02/12] Add `escape_html` parameter to the `format_json` bloblang method Signed-off-by: Mihai Todor Signed-off-by: Jem Davies --- internal/bloblang/query/methods_strings.go | 46 ++++++++++++++++++++-- internal/bloblang/query/methods_test.go | 20 +++++++--- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/internal/bloblang/query/methods_strings.go b/internal/bloblang/query/methods_strings.go index 3140729b4..6a5abf1d8 100644 --- a/internal/bloblang/query/methods_strings.go +++ b/internal/bloblang/query/methods_strings.go @@ -1246,6 +1246,22 @@ var _ = registerSimpleMethod( `{"doc":{"foo":"bar"}}`, `{"foo":"bar"}`, ), + NewExampleSpec("Escapes problematic HTML characters.", + `root = this.doc.format_json()`, + `{"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}}`, + `{ + "email": "foo\u0026bar@bento.dev", + "name": "foo\u003ebar" +}`, + ), + NewExampleSpec("Set the `escape_html` parameter to false to disable escaping of problematic HTML characters.", + `root = this.doc.format_json(escape_html: false)`, + `{"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}}`, + `{ + "email": "foo&bar@bento.dev", + "name": "foo>bar" +}`, + ), ). Beta(). Param(ParamString( @@ -1255,7 +1271,11 @@ var _ = registerSimpleMethod( Param(ParamBool( "no_indent", "Disable indentation.", - ).Default(false)), + ).Default(false)). + Param(ParamBool( + "escape_html", + "Escape problematic HTML characters.", + ).Default(true)), func(args *ParsedParams) (simpleMethod, error) { indentOpt, err := args.FieldOptionalString("indent") if err != nil { @@ -1269,11 +1289,29 @@ var _ = registerSimpleMethod( if err != nil { return nil, err } + escapeHTMLOpt, err := args.FieldOptionalBool("escape_html") + if err != nil { + return nil, err + } return func(v any, ctx FunctionContext) (any, error) { - if *noIndentOpt { - return json.Marshal(v) + buffer := &bytes.Buffer{} + + encoder := json.NewEncoder(buffer) + if !*noIndentOpt { + encoder.SetIndent("", indent) + } + if !*escapeHTMLOpt { + encoder.SetEscapeHTML(false) } - return json.MarshalIndent(v, "", indent) + + if err := encoder.Encode(v); err != nil { + return nil, err + } + + // This hack is here because `format_json()` initially relied on `json.Marshal()` or `json.MarshalIndent()` + // which don't add a trailing newline to the output and, also, other `format_*` methods in bloblang don't + // append a trailing newline. + return bytes.TrimRight(buffer.Bytes(), "\n"), nil }, nil }, ) diff --git a/internal/bloblang/query/methods_test.go b/internal/bloblang/query/methods_test.go index 224e6d4c7..2a169c1ea 100644 --- a/internal/bloblang/query/methods_test.go +++ b/internal/bloblang/query/methods_test.go @@ -115,11 +115,21 @@ func TestMethods(t *testing.T) { jsonFn(`{"doc":{"foo":"bar"}}`), method("format_json", ""), ), - output: []byte(`{ -"doc": { -"foo": "bar" -} -}`), + output: []byte(`{"doc":{"foo":"bar"}}`), + }, + "check format_json with escaping problematic HTML characters": { + input: methods( + jsonFn(`{"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}}`), + method("format_json", ""), + ), + output: []byte(`{"doc":{"email":"foo\u0026bar@bento.dev","name":"foo\u003ebar"}}`), + }, + "check format_json without escaping problematic HTML characters": { + input: methods( + jsonFn(`{"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}}`), + method("format_json", "", true, false), + ), + output: []byte(`{"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}}`), }, "check format_yaml": { input: methods( From 8a8afb4dfbe71001af5b3a192f53222483ba9cf1 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 3 Jul 2024 21:52:15 +0100 Subject: [PATCH 03/12] Add max_retries field to the retry processor Signed-off-by: Jem Davies --- internal/impl/pure/processor_retry.go | 54 ++++++++++++++++--- internal/impl/pure/processor_retry_test.go | 60 +++++++++++++++++++++ website/docs/components/processors/retry.md | 18 +++++++ website/docs/guides/bloblang/methods.md | 25 +++++++++ 4 files changed, 149 insertions(+), 8 deletions(-) diff --git a/internal/impl/pure/processor_retry.go b/internal/impl/pure/processor_retry.go index d64c77d67..45eadc552 100644 --- a/internal/impl/pure/processor_retry.go +++ b/internal/impl/pure/processor_retry.go @@ -19,6 +19,7 @@ const ( rpFieldProcessors = "processors" rpFieldBackoff = "backoff" rpFieldParallel = "parallel" + rpFieldMaxRetries = "max_retries" ) func retryProcSpec() *service.ConfigSpec { @@ -36,6 +37,16 @@ By default the retry backoff has a specified `+"[`max_elapsed_time`](#backoffmax In order to avoid permanent loops any error associated with messages as they first enter a retry processor will be cleared. + +## Metadata + +This processor adds the following metadata fields to each message: + +`+"```text"+` +- retry_count - The number of retry attempts. +- backoff_duration - The total time elapsed while performing retries. +`+"```"+` + :::caution Batching If you wish to wrap a batch-aware series of processors then take a look at the [batching section](#batching) below. ::: @@ -97,6 +108,9 @@ output: service.NewBoolField(rpFieldParallel). Description("When processing batches of messages these batches are ignored and the processors apply to each message sequentially. However, when this field is set to `true` each message will be processed in parallel. Caution should be made to ensure that batch sizes do not surpass a point where this would cause resource (CPU, memory, API limits) contention."). Default(false), + service.NewIntField(rpFieldMaxRetries). + Description("The maximum number of retry attempts before the request is aborted. Setting this value to `0` will result in unbounded number of retries."). + Default(0), ) } @@ -128,6 +142,10 @@ func init() { return nil, err } + if p.maxRetries, err = conf.FieldInt(rpFieldMaxRetries); err != nil { + return nil, err + } + return interop.NewUnwrapInternalBatchProcessor(processor.NewAutoObservedBatchedProcessor("retry", p, mgr)), nil }) if err != nil { @@ -136,10 +154,11 @@ func init() { } type retryProc struct { - children []processor.V1 - boff *backoff.ExponentialBackOff - parallel bool - log log.Modular + children []processor.V1 + boff *backoff.ExponentialBackOff + parallel bool + maxRetries int + log log.Modular } func (r *retryProc) ProcessBatch(ctx *processor.BatchProcContext, msgs message.Batch) ([]message.Batch, error) { @@ -184,16 +203,28 @@ func (r *retryProc) ProcessBatch(ctx *processor.BatchProcContext, msgs message.B return []message.Batch{resMsg}, nil } -func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) ([]message.Batch, error) { +func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) (resBatches []message.Batch, err error) { // NOTE: We always ensure we start off with a copy of the reference backoff. boff := *r.boff boff.Reset() + retries := 0 + var backoffDuration time.Duration + + defer func() { + for _, b := range resBatches { + for _, m := range b { + m.MetaSetMut("retry_count", retries) + m.MetaSetMut("backoff_duration", backoffDuration) + } + } + }() + // Ensure we do not start off with an error. p.ErrorSet(nil) for { - resBatches, err := processor.ExecuteAll(ctx, r.children, message.Batch{p.ShallowCopy()}) + resBatches, err = processor.ExecuteAll(ctx, r.children, message.Batch{p.ShallowCopy()}) if err != nil { return nil, err } @@ -214,13 +245,20 @@ func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) ([]mes return resBatches, nil } + retries++ + if retries == r.maxRetries { + r.log.With("error", err).Debug("Error occurred and maximum number of retries was reached.") + return resBatches, nil + } + nextSleep := boff.NextBackOff() + backoffDuration += nextSleep if nextSleep == backoff.Stop { - r.log.With("error", err).Debug("Error occured and maximum wait period was reached.") + r.log.With("error", err).Debug("Error occurred and maximum wait period was reached.") return resBatches, nil } - r.log.With("error", err, "backoff", nextSleep).Debug("Error occured, sleeping for next backoff period.") + r.log.With("error", err, "backoff", nextSleep).Debug("Error occurred, sleeping for next backoff period.") select { case <-time.After(nextSleep): case <-ctx.Done(): diff --git a/internal/impl/pure/processor_retry_test.go b/internal/impl/pure/processor_retry_test.go index 56a78f2ae..234463cda 100644 --- a/internal/impl/pure/processor_retry_test.go +++ b/internal/impl/pure/processor_retry_test.go @@ -45,6 +45,14 @@ retry: var resMsgs []string for _, m := range resBatches[0] { + retryCount, ok := m.MetaGetMut("retry_count") + require.True(t, ok) + assert.Equal(t, 0, retryCount) + + backoffDuration, ok := m.MetaGetMut("backoff_duration") + require.True(t, ok) + assert.Equal(t, backoffDuration, time.Duration(0)) + resMsgs = append(resMsgs, string(m.AsBytes())) } assert.Equal(t, []string{ @@ -154,6 +162,58 @@ retry: require.NoError(t, p.Close(context.Background())) } +func TestRetryMaxRetriesFailure(t *testing.T) { + conf, err := testutil.ProcessorFromYAML(` +retry: + max_retries: 2 + processors: + - resource: foo +`) + require.NoError(t, err) + + mockMgr := mock.NewManager() + + var fooCalls uint32 + mockMgr.Processors["foo"] = func(b message.Batch) ([]message.Batch, error) { + b[0].SetBytes([]byte(string(b[0].AsBytes()) + " updated")) + atomic.AddUint32(&fooCalls, 1) + b[0].ErrorSet(errors.New("nope")) + return []message.Batch{ + {b[0]}, + }, nil + } + + p, err := mockMgr.NewProcessor(conf) + require.NoError(t, err) + + resBatches, err := p.ProcessBatch(context.Background(), message.Batch{ + message.NewPart([]byte("hello world a")), + }) + require.NoError(t, err) + require.Len(t, resBatches, 1) + require.Len(t, resBatches[0], 1) + + var resMsgs []string + for _, m := range resBatches[0] { + retryCount, ok := m.MetaGetMut("retry_count") + require.True(t, ok) + assert.Equal(t, 2, retryCount) + + backoffDuration, ok := m.MetaGetMut("backoff_duration") + require.True(t, ok) + assert.Greater(t, backoffDuration, time.Duration(0)) + + resMsgs = append(resMsgs, string(m.AsBytes())) + } + assert.Equal(t, []string{ + "hello world a updated", + }, resMsgs) + + assert.Equal(t, uint32(2), fooCalls) + + require.NoError(t, p.Close(context.Background())) +} + func TestRetryParallelErrors(t *testing.T) { conf, err := testutil.ProcessorFromYAML(` retry: diff --git a/website/docs/components/processors/retry.md b/website/docs/components/processors/retry.md index 70cda97fe..f4201786b 100644 --- a/website/docs/components/processors/retry.md +++ b/website/docs/components/processors/retry.md @@ -32,6 +32,7 @@ retry: max_elapsed_time: 1m processors: [] # No default (required) parallel: false + max_retries: 0 ``` Executes child processors and if a resulting message is errored then, after a specified backoff period, the same original message will be attempted again through those same processors. If the child processors result in more than one message then the retry mechanism will kick in if _any_ of the resulting messages are errored. @@ -42,6 +43,15 @@ By default the retry backoff has a specified [`max_elapsed_time`](#backoffmax_el In order to avoid permanent loops any error associated with messages as they first enter a retry processor will be cleared. + +## Metadata + +This processor adds the following metadata fields to each message: +```text +- retry_count - The number of retry attempts. +- backoff_duration - The total time elapsed while performing retries. +``` + :::caution Batching If you wish to wrap a batch-aware series of processors then take a look at the [batching section](#batching) below. ::: @@ -158,6 +168,14 @@ When processing batches of messages these batches are ignored and the processors Type: `bool` Default: `false` +### `max_retries` + +The maximum number of retry attempts before the request is aborted. Setting this value to `0` will result in unbounded number of retries. + + +Type: `int` +Default: `0` + ## Batching When messages are batched the child processors of a retry are executed for each individual message in isolation, performed serially by default but in parallel when the field [`parallel`](#parallel) is set to `true`. This is an intentional limitation of the retry processor and is done in order to ensure that errors are correctly associated with a given input message. Otherwise, the archiving, expansion, grouping, filtering and so on of the child processors could obfuscate this relationship. diff --git a/website/docs/guides/bloblang/methods.md b/website/docs/guides/bloblang/methods.md index 6ceeac5bc..40c324cf5 100644 --- a/website/docs/guides/bloblang/methods.md +++ b/website/docs/guides/bloblang/methods.md @@ -2803,6 +2803,7 @@ Serializes a target value into a pretty-printed JSON byte array (with 4 space in **`indent`** <string, default `" "`> Indentation string. Each element in a JSON object or array will begin on a new, indented line followed by one or more copies of indent according to the indentation nesting. **`no_indent`** <bool, default `false`> Disable indentation. +**`escape_html`** <bool, default `true`> Escape problematic HTML characters. #### Examples @@ -2847,6 +2848,30 @@ root = this.doc.format_json(no_indent: true) # Out: {"foo":"bar"} ``` +Escapes problematic HTML characters. + +```coffee +root = this.doc.format_json() + +# In: {"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}} +# Out: { +# "email": "foo\u0026bar@bento.dev", +# "name": "foo\u003ebar" +# } +``` + +Set the `escape_html` parameter to false to disable escaping of problematic HTML characters. + +```coffee +root = this.doc.format_json(escape_html: false) + +# In: {"doc":{"email":"foo&bar@bento.dev","name":"foo>bar"}} +# Out: { +# "email": "foo&bar@bento.dev", +# "name": "foo>bar" +# } +``` + ### `format_msgpack` Formats data as a [MessagePack](https://msgpack.org/) message in bytes format. From fd72f9a659d6d654840ebf6350d2ddb2a21b43ee Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 3 Jul 2024 22:16:38 +0100 Subject: [PATCH 04/12] Add "array" Method Signed-off-by: Jem Davies --- internal/bloblang/query/methods.go | 27 ++++++++++++++++++++ internal/bloblang/query/methods_test.go | 28 +++++++++++++++++++++ website/docs/components/processors/retry.md | 1 + website/docs/guides/bloblang/methods.md | 14 +++++++++++ 4 files changed, 70 insertions(+) diff --git a/internal/bloblang/query/methods.go b/internal/bloblang/query/methods.go index 51b86a242..5fc8e994f 100644 --- a/internal/bloblang/query/methods.go +++ b/internal/bloblang/query/methods.go @@ -78,6 +78,33 @@ func applyMethod(target Function, args *ParsedParams) (Function, error) { //------------------------------------------------------------------------------ +var _ = registerSimpleMethod( + NewMethodSpec( + "array", "", + ).InCategory( + MethodCategoryCoercion, + "Return an array containing the target value. If the value is already an array it is unchanged.", + NewExampleSpec("", + `root.my_array = this.name.array()`, + `{"name":"foobar bazson"}`, + `{"my_array":["foobar bazson"]}`, + ), + ), + func(*ParsedParams) (simpleMethod, error) { + return func(v any, ctx FunctionContext) (any, error) { + switch v.(type) { + case []any: + return v, nil + } + arr := make([]any, 1) + arr[0] = v + return arr, nil + }, nil + }, +) + +//------------------------------------------------------------------------------ + var _ = registerMethod( NewMethodSpec("bool", "").InCategory( MethodCategoryCoercion, diff --git a/internal/bloblang/query/methods_test.go b/internal/bloblang/query/methods_test.go index 2a169c1ea..11dc10740 100644 --- a/internal/bloblang/query/methods_test.go +++ b/internal/bloblang/query/methods_test.go @@ -502,6 +502,34 @@ func TestMethods(t *testing.T) { ), output: false, }, + "check array": { + input: methods( + literalFn([]any{1}), + method("array"), + ), + output: []any{1}, + }, + "check array 2": { + input: methods( + literalFn(1), + method("array"), + ), + output: []any{1}, + }, + "check array 3": { + input: methods( + literalFn(nil), + method("array"), + ), + output: []any{nil}, + }, + "check array 4": { + input: methods( + literalFn([]any{}), + method("array"), + ), + output: []any{}, + }, "check bool": { input: methods( literalFn("true"), diff --git a/website/docs/components/processors/retry.md b/website/docs/components/processors/retry.md index f4201786b..cb4982e55 100644 --- a/website/docs/components/processors/retry.md +++ b/website/docs/components/processors/retry.md @@ -47,6 +47,7 @@ In order to avoid permanent loops any error associated with messages as they fir ## Metadata This processor adds the following metadata fields to each message: + ```text - retry_count - The number of retry attempts. - backoff_duration - The total time elapsed while performing retries. diff --git a/website/docs/guides/bloblang/methods.md b/website/docs/guides/bloblang/methods.md index 40c324cf5..f6f2b80c3 100644 --- a/website/docs/guides/bloblang/methods.md +++ b/website/docs/guides/bloblang/methods.md @@ -1760,6 +1760,20 @@ root.created_at_unix = this.created_at.ts_unix_nano() ## Type Coercion +### `array` + +Return an array containing the target value. If the value is already an array it is unchanged. + +#### Examples + + +```coffee +root.my_array = this.name.array() + +# In: {"name":"foobar bazson"} +# Out: {"my_array":["foobar bazson"]} +``` + ### `bool` Attempt to parse a value into a boolean. An optional argument can be provided, in which case if the value cannot be parsed the argument will be returned instead. If the value is a number then any non-zero value will resolve to `true`, if the value is a string then any of the following values are considered valid: `1, t, T, TRUE, true, True, 0, f, F, FALSE`. From 0d40c3d1144ed4a20dcfea6b959f8c15695dabd0 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 21:18:21 +0100 Subject: [PATCH 05/12] Add generic map to manager Signed-off-by: Jem Davies --- internal/bundle/package.go | 3 +++ internal/manager/mock/manager.go | 35 ++++++++++++++++++++++---------- internal/manager/type.go | 15 ++++++++++++++ internal/manager/type_test.go | 25 +++++++++++++++++++++++ 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/internal/bundle/package.go b/internal/bundle/package.go index 312e93eb7..5fc2125e4 100644 --- a/internal/bundle/package.go +++ b/internal/bundle/package.go @@ -93,6 +93,9 @@ type NewManagement interface { GetPipe(name string) (<-chan message.Transaction, error) SetPipe(name string, t <-chan message.Transaction) UnsetPipe(name string, t <-chan message.Transaction) + + GetGeneric(key any) (any, bool) + SetGeneric(key, value any) } type componentErr struct { diff --git a/internal/manager/mock/manager.go b/internal/manager/mock/manager.go index 4ef0947fc..b0c0863da 100644 --- a/internal/manager/mock/manager.go +++ b/internal/manager/mock/manager.go @@ -37,6 +37,8 @@ type Manager struct { Pipes map[string]<-chan message.Transaction lock sync.Mutex + genericValues *sync.Map + // OnRegisterEndpoint can be set in order to intercept endpoints registered // by components. OnRegisterEndpoint func(path string, h http.HandlerFunc) @@ -50,17 +52,18 @@ type Manager struct { // NewManager provides a new mock manager. func NewManager() *Manager { return &Manager{ - Version: "mock", - Inputs: map[string]*Input{}, - Caches: map[string]map[string]CacheItem{}, - RateLimits: map[string]RateLimit{}, - Outputs: map[string]OutputWriter{}, - Processors: map[string]Processor{}, - Pipes: map[string]<-chan message.Transaction{}, - CustomFS: ifs.OS(), - M: metrics.Noop(), - L: log.Noop(), - T: noop.NewTracerProvider(), + Version: "mock", + Inputs: map[string]*Input{}, + Caches: map[string]map[string]CacheItem{}, + RateLimits: map[string]RateLimit{}, + Outputs: map[string]OutputWriter{}, + Processors: map[string]Processor{}, + Pipes: map[string]<-chan message.Transaction{}, + CustomFS: ifs.OS(), + M: metrics.Noop(), + L: log.Noop(), + T: noop.NewTracerProvider(), + genericValues: &sync.Map{}, } } @@ -369,3 +372,13 @@ func (m *Manager) SetPipe(name string, t <-chan message.Transaction) { func (m *Manager) UnsetPipe(name string, t <-chan message.Transaction) { delete(m.Pipes, name) } + +// GetGeneric attempts to obtain and return a generic resource value by key. +func (m *Manager) GetGeneric(key any) (any, bool) { + return m.genericValues.Load(key) +} + +// SetGeneric attempts to set a generic resource to a given value by key. +func (m *Manager) SetGeneric(key, value any) { + m.genericValues.Store(key, value) +} diff --git a/internal/manager/type.go b/internal/manager/type.go index d388bd5d5..e3f93b7be 100644 --- a/internal/manager/type.go +++ b/internal/manager/type.go @@ -92,6 +92,9 @@ type Type struct { pipes map[string]<-chan message.Transaction pipeLock *sync.RWMutex + + // Generic key/value store for plugin implementations. + genericResources *sync.Map } // OptFunc is an opt setting for a manager type. @@ -204,6 +207,8 @@ func New(conf ResourceConfig, opts ...OptFunc) (*Type, error) { pipes: map[string]<-chan message.Transaction{}, pipeLock: &sync.RWMutex{}, + + genericResources: &sync.Map{}, } for _, opt := range opts { @@ -414,6 +419,16 @@ func (t *Type) UnsetPipe(name string, tran <-chan message.Transaction) { t.pipeLock.Unlock() } +// GetGeneric attempts to obtain and return a generic resource value by key. +func (t *Type) GetGeneric(key any) (any, bool) { + return t.genericResources.Load(key) +} + +// SetGeneric attempts to set a generic resource to a given value by key. +func (t *Type) SetGeneric(key, value any) { + t.genericResources.Store(key, value) +} + //------------------------------------------------------------------------------ // WithMetricsMapping returns a manager with the stored metrics exporter wrapped diff --git a/internal/manager/type_test.go b/internal/manager/type_test.go index f2504c5b5..9fddd88a8 100644 --- a/internal/manager/type_test.go +++ b/internal/manager/type_test.go @@ -539,3 +539,28 @@ func TestManagerPipeGetSet(t *testing.T) { t.Error("Wrong transaction chan returned") } } + +type testKeyTypeA int +type testKeyTypeB int + +const testKeyA testKeyTypeA = iota +const testKeyB testKeyTypeB = iota + +func TestManagerGenericResources(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + mgr.SetGeneric(testKeyA, "foo") + mgr.SetGeneric(testKeyB, "bar") + + _, exists := mgr.GetGeneric("not a key") + assert.False(t, exists) + + v, exists := mgr.GetGeneric(testKeyA) + assert.True(t, exists) + assert.Equal(t, "foo", v) + + v, exists = mgr.GetGeneric(testKeyB) + assert.True(t, exists) + assert.Equal(t, "bar", v) +} From 4385faf90e278904a212d55c52bf389a40674e80 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 21:21:53 +0100 Subject: [PATCH 06/12] Add generic resources for plugins Signed-off-by: Jem Davies --- internal/bundle/package.go | 1 + internal/manager/mock/manager.go | 7 +++++++ internal/manager/type.go | 15 +++++++++---- internal/manager/type_test.go | 13 ++++++++++++ public/service/resources.go | 25 ++++++++++++++++++++++ public/service/resources_test.go | 36 ++++++++++++++++++++++++++++++++ 6 files changed, 93 insertions(+), 4 deletions(-) diff --git a/internal/bundle/package.go b/internal/bundle/package.go index 5fc2125e4..ca82b9b53 100644 --- a/internal/bundle/package.go +++ b/internal/bundle/package.go @@ -95,6 +95,7 @@ type NewManagement interface { UnsetPipe(name string, t <-chan message.Transaction) GetGeneric(key any) (any, bool) + GetOrSetGeneric(key, value any) (actual any, loaded bool) SetGeneric(key, value any) } diff --git a/internal/manager/mock/manager.go b/internal/manager/mock/manager.go index b0c0863da..4227eb4e1 100644 --- a/internal/manager/mock/manager.go +++ b/internal/manager/mock/manager.go @@ -378,6 +378,13 @@ func (m *Manager) GetGeneric(key any) (any, bool) { return m.genericValues.Load(key) } +// GetOrSetGeneric attempts to obtain an existing value for a given key if +// present. Otherwise, it stores and returns the provided value. The loaded +// result is true if the value was loaded, false if stored. +func (m *Manager) GetOrSetGeneric(key, value any) (actual any, loaded bool) { + return m.genericValues.LoadOrStore(key, value) +} + // SetGeneric attempts to set a generic resource to a given value by key. func (m *Manager) SetGeneric(key, value any) { m.genericValues.Store(key, value) diff --git a/internal/manager/type.go b/internal/manager/type.go index e3f93b7be..b61acc750 100644 --- a/internal/manager/type.go +++ b/internal/manager/type.go @@ -94,7 +94,7 @@ type Type struct { pipeLock *sync.RWMutex // Generic key/value store for plugin implementations. - genericResources *sync.Map + genericValues *sync.Map } // OptFunc is an opt setting for a manager type. @@ -208,7 +208,7 @@ func New(conf ResourceConfig, opts ...OptFunc) (*Type, error) { pipes: map[string]<-chan message.Transaction{}, pipeLock: &sync.RWMutex{}, - genericResources: &sync.Map{}, + genericValues: &sync.Map{}, } for _, opt := range opts { @@ -421,12 +421,19 @@ func (t *Type) UnsetPipe(name string, tran <-chan message.Transaction) { // GetGeneric attempts to obtain and return a generic resource value by key. func (t *Type) GetGeneric(key any) (any, bool) { - return t.genericResources.Load(key) + return t.genericValues.Load(key) +} + +// GetOrSetGeneric attempts to obtain an existing value for a given key if +// present. Otherwise, it stores and returns the provided value. The loaded +// result is true if the value was loaded, false if stored. +func (t *Type) GetOrSetGeneric(key, value any) (actual any, loaded bool) { + return t.genericValues.LoadOrStore(key, value) } // SetGeneric attempts to set a generic resource to a given value by key. func (t *Type) SetGeneric(key, value any) { - t.genericResources.Store(key, value) + t.genericValues.Store(key, value) } //------------------------------------------------------------------------------ diff --git a/internal/manager/type_test.go b/internal/manager/type_test.go index 9fddd88a8..73f5864e4 100644 --- a/internal/manager/type_test.go +++ b/internal/manager/type_test.go @@ -564,3 +564,16 @@ func TestManagerGenericResources(t *testing.T) { assert.True(t, exists) assert.Equal(t, "bar", v) } + +func TestManagerGenericGetOrSet(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + v, loaded := mgr.GetOrSetGeneric(testKeyA, "foo") + assert.False(t, loaded) + assert.Equal(t, "foo", v) + + v, loaded = mgr.GetOrSetGeneric(testKeyA, "bar") + assert.True(t, loaded) + assert.Equal(t, "foo", v) +} diff --git a/public/service/resources.go b/public/service/resources.go index c35e71b6a..a5c603f58 100644 --- a/public/service/resources.go +++ b/public/service/resources.go @@ -250,6 +250,31 @@ func (r *Resources) HasRateLimit(name string) bool { return r.mgr.ProbeRateLimit(name) } +// GetGeneric queries the resources for a generic key value, potentially set by +// another plugin or instantiation of this plugin. +func (r *Resources) GetGeneric(key any) (any, bool) { + return r.mgr.GetGeneric(key) +} + +// GetOrSetGeneric attempts to obtain an existing generic value for a given key +// if present. Otherwise, it stores and returns the provided value. The loaded +// result is true if the value was loaded, false if stored. +func (r *Resources) GetOrSetGeneric(key, value any) (actual any, loaded bool) { + return r.mgr.GetOrSetGeneric(key, value) +} + +// SetGeneric sets a generic key/value pair, which can be accessed by other +// plugin implementations with access to the same resources. +// +// The provided key must be comparable and should not be of type string or any +// other built-in type to avoid collisions between packages using resources. +// Users of SetGeneric should define their own types for keys. To avoid +// allocating when assigning to an any type, keys often have concrete type +// struct{}. +func (r *Resources) SetGeneric(key, value any) { + r.mgr.SetGeneric(key, value) +} + //------------------------------------------------------------------------------ type resourcesUnwrapper struct { diff --git a/public/service/resources_test.go b/public/service/resources_test.go index d6d533f99..c84b6ed2b 100644 --- a/public/service/resources_test.go +++ b/public/service/resources_test.go @@ -150,3 +150,39 @@ output: {"id":3,"purpose":"test resource outputs"} `, string(outBytes)) } + +type testKeyTypeA int +type testKeyTypeB int + +const testKeyA testKeyTypeA = iota +const testKeyB testKeyTypeB = iota + +func TestResourcesGenericValues(t *testing.T) { + res := service.MockResources() + + res.SetGeneric(testKeyA, "foo") + res.SetGeneric(testKeyB, "bar") + + _, exists := res.GetGeneric("not a key") + assert.False(t, exists) + + v, exists := res.GetGeneric(testKeyA) + assert.True(t, exists) + assert.Equal(t, "foo", v) + + v, exists = res.GetGeneric(testKeyB) + assert.True(t, exists) + assert.Equal(t, "bar", v) +} + +func TestResourcesGenericGetOrSet(t *testing.T) { + res := service.MockResources() + + v, loaded := res.GetOrSetGeneric(testKeyA, "foo") + assert.False(t, loaded) + assert.Equal(t, "foo", v) + + v, loaded = res.GetOrSetGeneric(testKeyA, "bar") + assert.True(t, loaded) + assert.Equal(t, "foo", v) +} From 990300bb2a0fecf282beaa2468ad48db0dcd2689 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 21:25:36 +0100 Subject: [PATCH 07/12] Add fnv32 hash option Signed-off-by: Jem Davies --- internal/bloblang/query/methods_strings.go | 9 ++++++++- internal/bloblang/query/methods_test.go | 8 ++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/internal/bloblang/query/methods_strings.go b/internal/bloblang/query/methods_strings.go index 6a5abf1d8..a4e62e003 100644 --- a/internal/bloblang/query/methods_strings.go +++ b/internal/bloblang/query/methods_strings.go @@ -18,6 +18,7 @@ import ( "fmt" "hash" "hash/crc32" + "hash/fnv" "html" "io" "net/url" @@ -759,7 +760,7 @@ var _ = registerSimpleMethod( ` Hashes a string or byte array according to a chosen algorithm and returns the result as a byte array. When mapping the result to a JSON field the value should be cast to a string using the method `+"[`string`][methods.string], or encoded using the method [`encode`][methods.encode]"+`, otherwise it will be base64 encoded by default. -Available algorithms are: `+"`hmac_sha1`, `hmac_sha256`, `hmac_sha512`, `md5`, `sha1`, `sha256`, `sha512`, `xxhash64`, `crc32`"+`. +Available algorithms are: `+"`hmac_sha1`, `hmac_sha256`, `hmac_sha512`, `md5`, `sha1`, `sha256`, `sha512`, `xxhash64`, `crc32`, `fnv32`"+`. The following algorithms require a key, which is specified as a second argument: `+"`hmac_sha1`, `hmac_sha256`, `hmac_sha512`"+`.`, NewExampleSpec("", @@ -870,6 +871,12 @@ root.h2 = this.value.hash(algorithm: "crc32", polynomial: "Koopman").encode("hex _, _ = hasher.Write(b) return hasher.Sum(nil), nil } + case "fnv32": + hashFn = func(b []byte) ([]byte, error) { + h := fnv.New32() + _, _ = h.Write(b) + return []byte(strconv.FormatUint(uint64(h.Sum32()), 10)), nil + } default: return nil, fmt.Errorf("unrecognized hash type: %v", algorithmStr) } diff --git a/internal/bloblang/query/methods_test.go b/internal/bloblang/query/methods_test.go index 11dc10740..11522411d 100644 --- a/internal/bloblang/query/methods_test.go +++ b/internal/bloblang/query/methods_test.go @@ -873,6 +873,14 @@ func TestMethods(t *testing.T) { ), err: `string literal: unsupported crc32 hash key "not-supported"`, }, + "check fnv32 hash": { + input: methods( + literalFn("hello world"), + method("hash", "fnv32"), + method("string"), + ), + output: "1418570095", + }, "check hex encode": { input: methods( literalFn("hello world"), From 02fa7c31b6be8e628027cc1edfdf5aa57694e21f Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 21:30:57 +0100 Subject: [PATCH 08/12] Fix docs typo in `Processor.Process()` Also in `BatchProcessor.ProcessBatch()`. According to Ash: > You need to ensure that if you end up yielding N messages then > you need to copy the original message N times rather than create > N new instantiations. That's because we use the context from the > original message to carry things like distributed tracing info > and synchronous response mechanisms. Co-authored-by: Ashley Jeffs Signed-off-by: Mihai Todor Signed-off-by: Jem Davies --- public/service/processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/service/processor.go b/public/service/processor.go index d0ef58545..4fdeada19 100644 --- a/public/service/processor.go +++ b/public/service/processor.go @@ -21,7 +21,7 @@ type Processor interface { // with the patterns outlined in https://warpstreamlabs.github.io/bento/docs/configuration/error_handling. // // The Message types returned MUST be derived from the provided message, and - // CANNOT be custom implementations of Message. In order to copy the + // CANNOT be custom instantiations of Message. In order to copy the // provided message use the Copy method. Process(context.Context, *Message) (MessageBatch, error) @@ -53,7 +53,7 @@ type BatchProcessor interface { // with a nil error. // // The Message types returned MUST be derived from the provided messages, - // and CANNOT be custom implementations of Message. In order to copy the + // and CANNOT be custom instantiations of Message. In order to copy the // provided messages use the Copy method. ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error) From 1498721c6eda4bb2dcf8e62f3e7aca93a7a6c941 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 21:53:14 +0100 Subject: [PATCH 09/12] Use new `rickb777/period` library for `parse_duration_iso8601()` This library supersedes `rickb777/date` for durations as mentioned here: https://github.com/rickb777/period/issues/1. The maintainer advised to switch to this new library in several other issues: - https://github.com/rickb777/date/issues/15 - https://github.com/rickb777/date/issues/16 - https://github.com/rickb777/date/issues/12 Signed-off-by: Mihai Todor Signed-off-by: Jem Davies --- go.mod | 21 +++++------ go.sum | 45 +++++++++++++----------- internal/impl/pure/bloblang_time.go | 5 ++- internal/impl/pure/bloblang_time_test.go | 6 ++-- website/docs/guides/bloblang/methods.md | 2 +- 5 files changed, 41 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index a0299e081..44853877f 100644 --- a/go.mod +++ b/go.mod @@ -108,7 +108,7 @@ require ( github.com/rabbitmq/amqp091-go v1.9.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/redis/go-redis/v9 v9.4.0 - github.com/rickb777/date v1.20.5 + github.com/rickb777/period v1.0.5 github.com/robfig/cron/v3 v3.0.1 github.com/segmentio/ksuid v1.0.4 github.com/sijms/go-ora/v2 v2.8.19 @@ -139,12 +139,12 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/multierr v1.11.0 - golang.org/x/crypto v0.21.0 + golang.org/x/crypto v0.25.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d - golang.org/x/net v0.23.0 + golang.org/x/net v0.27.0 golang.org/x/oauth2 v0.17.0 - golang.org/x/sync v0.6.0 - golang.org/x/text v0.14.0 + golang.org/x/sync v0.7.0 + golang.org/x/text v0.16.0 google.golang.org/api v0.162.0 google.golang.org/protobuf v1.33.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 @@ -251,6 +251,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/css v1.0.0 // indirect github.com/gosimple/unidecode v1.0.1 // indirect + github.com/govalues/decimal v0.1.29 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect @@ -303,7 +304,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/rickb777/plural v1.4.1 // indirect + github.com/rickb777/plural v1.4.2 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -325,11 +326,11 @@ require ( go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.18.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/term v0.22.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.16.1 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect diff --git a/go.sum b/go.sum index 20a11bf1c..694122df4 100644 --- a/go.sum +++ b/go.sum @@ -585,6 +585,8 @@ github.com/gosimple/slug v1.13.1 h1:bQ+kpX9Qa6tHRaK+fZR0A0M2Kd7Pa5eHPPsb1JpHD+Q= github.com/gosimple/slug v1.13.1/go.mod h1:UiRaFH+GEilHstLUmcBgWcI42viBN7mAb818JrYOeFQ= github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6T/o= github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc= +github.com/govalues/decimal v0.1.29 h1:GKC5g9y9oWxKIy51czdHTShOABwHm/shVuOVPwG415M= +github.com/govalues/decimal v0.1.29/go.mod h1:LUlHHucpCmA4rJfNrDvMgrWibDpYnDNWqJuNU1/gxW8= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -850,8 +852,8 @@ github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5Bn github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= @@ -937,10 +939,10 @@ github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwy github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/rickb777/date v1.20.5 h1:Ybjz7J7ga9ui4VJizQpil0l330r6wkn6CicaoattIxQ= -github.com/rickb777/date v1.20.5/go.mod h1:6BPrm3/aQI0I8jvlD1fAlm/86k5eSeTQ2mR5FEmTnSw= -github.com/rickb777/plural v1.4.1 h1:5MMLcbIaapLFmvDGRT5iPk8877hpTPt8Y9cdSKRw9sU= -github.com/rickb777/plural v1.4.1/go.mod h1:kdmXUpmKBJTS0FtG/TFumd//VBWsNTD7zOw7x4umxNw= +github.com/rickb777/period v1.0.5 h1:jAzlI2knYam5VMy0X8eYgqJBl0ew57N+J1djJSBOulM= +github.com/rickb777/period v1.0.5/go.mod h1:AmEwpgIShi3EEw34qbafoPJxVeRbv9VVtjLyOeRwK6c= +github.com/rickb777/plural v1.4.2 h1:Kl/syFGLFZ5EbuV8c9SVud8s5HI2HpCCtOMw2U1kS+A= +github.com/rickb777/plural v1.4.2/go.mod h1:kdmXUpmKBJTS0FtG/TFumd//VBWsNTD7zOw7x4umxNw= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -1157,8 +1159,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1205,8 +1207,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= -golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1244,8 +1246,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1264,8 +1266,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1325,8 +1327,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1335,8 +1337,8 @@ golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= +golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1351,8 +1353,9 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1403,8 +1406,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= -golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/impl/pure/bloblang_time.go b/internal/impl/pure/bloblang_time.go index 6a830b1ee..4fb8238f8 100644 --- a/internal/impl/pure/bloblang_time.go +++ b/internal/impl/pure/bloblang_time.go @@ -5,7 +5,7 @@ import ( "time" "github.com/itchyny/timefmt-go" - "github.com/rickb777/date/period" + "github.com/rickb777/period" "github.com/warpstreamlabs/bento/internal/bloblang/query" "github.com/warpstreamlabs/bento/public/bloblang" @@ -193,8 +193,7 @@ func init() { parseDurISOCtor := func(args *bloblang.ParsedParams) (bloblang.Method, error) { return bloblang.StringMethod(func(s string) (any, error) { - // No need to normalise the output since we need it expressed as nanoseconds. - d, err := period.Parse(s, false) + d, err := period.Parse(s) if err != nil { return nil, err } diff --git a/internal/impl/pure/bloblang_time_test.go b/internal/impl/pure/bloblang_time_test.go index 6cc7ae3f9..63bf55809 100644 --- a/internal/impl/pure/bloblang_time_test.go +++ b/internal/impl/pure/bloblang_time_test.go @@ -148,9 +148,9 @@ func TestTimestampMethods(t *testing.T) { output: int64(110839937300000000), }, { - name: "check parse duration ISO-8601 ignore more than one decimal place", - mapping: `root = "P3Y6M4DT12H30M5.33S".parse_duration_iso8601()`, - output: int64(110839937300000000), + name: "check parse duration ISO-8601 preserves more than one decimal place", + mapping: `root = "P3Y6M4DT12H30M5.123456789S".parse_duration_iso8601()`, + output: int64(110839937123456789), }, { name: "check parse duration ISO-8601 only allow fractions in the last field", diff --git a/website/docs/guides/bloblang/methods.md b/website/docs/guides/bloblang/methods.md index f6f2b80c3..e7ac824b3 100644 --- a/website/docs/guides/bloblang/methods.md +++ b/website/docs/guides/bloblang/methods.md @@ -3348,7 +3348,7 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex") Hashes a string or byte array according to a chosen algorithm and returns the result as a byte array. When mapping the result to a JSON field the value should be cast to a string using the method [`string`][methods.string], or encoded using the method [`encode`][methods.encode], otherwise it will be base64 encoded by default. -Available algorithms are: `hmac_sha1`, `hmac_sha256`, `hmac_sha512`, `md5`, `sha1`, `sha256`, `sha512`, `xxhash64`, `crc32`. +Available algorithms are: `hmac_sha1`, `hmac_sha256`, `hmac_sha512`, `md5`, `sha1`, `sha256`, `sha512`, `xxhash64`, `crc32`, `fnv32`. The following algorithms require a key, which is specified as a second argument: `hmac_sha1`, `hmac_sha256`, `hmac_sha512`. From f9faa392cedc440b30afa472fa2ce37cf7048eba Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 22:04:05 +0100 Subject: [PATCH 10/12] Expand connection status for inputs Signed-off-by: Jem Davies --- internal/bundle/tracing/input.go | 5 +- internal/component/connection.go | 66 +++++++++++++++++++ internal/component/errors.go | 24 +++++++ internal/component/input/async_reader.go | 20 +++--- internal/component/input/batcher/batcher.go | 5 +- internal/component/input/interface.go | 8 ++- .../component/input/wrap_with_pipeline.go | 9 +-- .../input/wrap_with_pipeline_test.go | 7 +- internal/component/observability.go | 10 +++ internal/impl/io/input_dynamic_fan_in.go | 6 +- internal/impl/io/input_http_server.go | 9 +-- internal/impl/pure/input_broker_fan_in.go | 12 ++-- .../impl/pure/input_broker_fan_in_test.go | 6 +- internal/impl/pure/input_inproc.go | 7 +- internal/impl/pure/input_read_until.go | 9 ++- internal/impl/pure/input_resource.go | 8 ++- internal/impl/pure/input_sequence.go | 7 +- internal/manager/input_wrapper.go | 10 ++- internal/manager/mock/input.go | 17 ++++- internal/stream/type.go | 5 +- 20 files changed, 192 insertions(+), 58 deletions(-) create mode 100644 internal/component/connection.go diff --git a/internal/bundle/tracing/input.go b/internal/bundle/tracing/input.go index ea202df3f..3d3a3db7f 100644 --- a/internal/bundle/tracing/input.go +++ b/internal/bundle/tracing/input.go @@ -6,6 +6,7 @@ import ( "github.com/Jeffail/shutdown" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/message" ) @@ -68,8 +69,8 @@ func (t *tracedInput) TransactionChan() <-chan message.Transaction { return t.tChan } -func (t *tracedInput) Connected() bool { - return t.wrapped.Connected() +func (t *tracedInput) ConnectionStatus() component.ConnectionStatuses { + return t.wrapped.ConnectionStatus() } func (t *tracedInput) TriggerStopConsuming() { diff --git a/internal/component/connection.go b/internal/component/connection.go new file mode 100644 index 000000000..a2d83c2de --- /dev/null +++ b/internal/component/connection.go @@ -0,0 +1,66 @@ +package component + +// ConnectionStatus represents the current connection status of a given +// component. +type ConnectionStatus struct { + Label string + Path []string + Connected bool + Err error +} + +type ConnectionStatuses []*ConnectionStatus + +func (s ConnectionStatuses) AllActive() bool { + if len(s) == 0 { + return false + } + for _, c := range s { + if !c.Connected { + return false + } + } + return true +} + +// ConnectionFailing returns a ConnectionStatus representing a component +// connection where we are attempting to connect to the service but are +// currently unable due to the provided error. +func ConnectionFailing(o Observability, err error) *ConnectionStatus { + return &ConnectionStatus{ + Label: o.Label(), + Path: o.Path(), + Connected: false, + Err: err, + } +} + +// ConnectionActive returns a ConnectionStatus representing a component +// connection where we have an active connection. +func ConnectionActive(o Observability) *ConnectionStatus { + return &ConnectionStatus{ + Label: o.Label(), + Path: o.Path(), + Connected: true, + } +} + +// ConnectionPending returns a ConnectionStatus representing a component that +// has not yet attempted to establish its connection. +func ConnectionPending(o Observability) *ConnectionStatus { + return &ConnectionStatus{ + Label: o.Label(), + Path: o.Path(), + Connected: false, + } +} + +// ConnectionClosed returns a ConnectionStatus representing a component that has +// intentionally closed its connection. +func ConnectionClosed(o Observability) *ConnectionStatus { + return &ConnectionStatus{ + Label: o.Label(), + Path: o.Path(), + Connected: false, + } +} diff --git a/internal/component/errors.go b/internal/component/errors.go index 26f6b9210..4bf4409d5 100644 --- a/internal/component/errors.go +++ b/internal/component/errors.go @@ -32,6 +32,30 @@ func ErrInvalidType(typeStr, tried string) error { } } +//------------------------------------------------------------------------------ + +// LabelledError is an error that could be returned by components annotated by +// their label (or path) in order to provide extra context to which specific +// component within a config is yielding it. This is particularly useful in +// situations such as ConnectionStatus aggregates where a broker yields multiple +// errors from a range of child components. +type LabelledError struct { + Label string + Err error +} + +// Error returns a formatted error string. +func (e *LabelledError) Error() string { + return fmt.Sprintf("%v: %v", e.Label, e.Err) +} + +// Unwrap returns the underlying error value. +func (e *LabelledError) Unwrap() error { + return e.Err +} + +//------------------------------------------------------------------------------ + // Errors used throughout the codebase. var ( ErrTimeout = errors.New("action timed out") diff --git a/internal/component/input/async_reader.go b/internal/component/input/async_reader.go index 174b15be4..712d1b648 100644 --- a/internal/component/input/async_reader.go +++ b/internal/component/input/async_reader.go @@ -19,7 +19,7 @@ import ( // AsyncReader is an input implementation that reads messages from an // input.Async component. type AsyncReader struct { - connected int32 + connection atomic.Pointer[component.ConnectionStatus] connBackoff backoff.BackOff readBackoff backoff.BackOff @@ -61,6 +61,7 @@ func NewAsyncReader( for _, opt := range opts { opt(rdr) } + rdr.connection.Store(component.ConnectionPending(rdr.mgr)) go rdr.loop() return rdr, nil @@ -98,7 +99,7 @@ func (r *AsyncReader) loop() { defer func() { _ = r.reader.Close(context.Background()) - atomic.StoreInt32(&r.connected, 0) + r.connection.Store(component.ConnectionClosed(r.mgr)) close(r.transactions) r.shutSig.TriggerHasStopped() @@ -120,7 +121,8 @@ func (r *AsyncReader) loop() { if r.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) { return false } - r.mgr.Logger().Error("Failed to connect to %v: %v\n", r.typeStr, err) + r.connection.Store(component.ConnectionFailing(r.mgr, err)) + r.mgr.Logger().Error("Failed to connect to %v: %v", r.typeStr, err) mFailedConn.Incr(1) var nextBoff time.Duration @@ -151,7 +153,7 @@ func (r *AsyncReader) loop() { r.mgr.Logger().Info("Input type %v is now active", r.typeStr) mConn.Incr(1) - atomic.StoreInt32(&r.connected, 1) + r.connection.Store(component.ConnectionActive(r.mgr)) for { msg, ackFn, err := r.reader.ReadBatch(closeAtLeisureCtx) @@ -159,14 +161,14 @@ func (r *AsyncReader) loop() { // If our reader says it is not connected. if errors.Is(err, component.ErrNotConnected) { mLostConn.Incr(1) - atomic.StoreInt32(&r.connected, 0) + r.connection.Store(component.ConnectionFailing(r.mgr, component.ErrNotConnected)) // Continue to try to reconnect while still active. if !initConnection() { return } mConn.Incr(1) - atomic.StoreInt32(&r.connected, 1) + r.connection.Store(component.ConnectionActive(r.mgr)) continue } @@ -242,8 +244,10 @@ func (r *AsyncReader) TransactionChan() <-chan message.Transaction { // Connected returns a boolean indicating whether this input is currently // connected to its target. -func (r *AsyncReader) Connected() bool { - return atomic.LoadInt32(&r.connected) == 1 +func (r *AsyncReader) ConnectionStatus() component.ConnectionStatuses { + return []*component.ConnectionStatus{ + r.connection.Load(), + } } // TriggerStopConsuming instructs the input to start shutting down resources diff --git a/internal/component/input/batcher/batcher.go b/internal/component/input/batcher/batcher.go index 69c1d9035..3c4ae57da 100644 --- a/internal/component/input/batcher/batcher.go +++ b/internal/component/input/batcher/batcher.go @@ -8,6 +8,7 @@ import ( "github.com/Jeffail/shutdown" "github.com/warpstreamlabs/bento/internal/batch/policy" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/log" "github.com/warpstreamlabs/bento/internal/message" @@ -152,8 +153,8 @@ func (m *Impl) loop() { } // Connected returns true if the underlying input is connected. -func (m *Impl) Connected() bool { - return m.child.Connected() +func (m *Impl) ConnectionStatus() component.ConnectionStatuses { + return m.child.ConnectionStatus() } // TransactionChan returns the channel used for consuming messages from this diff --git a/internal/component/input/interface.go b/internal/component/input/interface.go index ec26ce2f0..09c436fe3 100644 --- a/internal/component/input/interface.go +++ b/internal/component/input/interface.go @@ -3,6 +3,7 @@ package input import ( "context" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/message" ) @@ -14,9 +15,10 @@ type Streamed interface { // transaction will be sent. TransactionChan() <-chan message.Transaction - // Connected returns a boolean indicating whether this input is currently - // connected to its target. - Connected() bool + // ConnectionStatus returns the current status of the given component + // connection. The result is a slice in order to accommodate higher order + // components that wrap several others. + ConnectionStatus() component.ConnectionStatuses // TriggerStopConsuming instructs the input to start shutting down resources // once all pending messages are delivered and acknowledged. This call does diff --git a/internal/component/input/wrap_with_pipeline.go b/internal/component/input/wrap_with_pipeline.go index 2200c65f4..a9764ce56 100644 --- a/internal/component/input/wrap_with_pipeline.go +++ b/internal/component/input/wrap_with_pipeline.go @@ -3,6 +3,7 @@ package input import ( "context" + "github.com/warpstreamlabs/bento/internal/component" iprocessor "github.com/warpstreamlabs/bento/internal/component/processor" "github.com/warpstreamlabs/bento/internal/message" ) @@ -51,10 +52,10 @@ func (i *WithPipeline) TransactionChan() <-chan message.Transaction { return i.pipe.TransactionChan() } -// Connected returns a boolean indicating whether this input is currently -// connected to its target. -func (i *WithPipeline) Connected() bool { - return i.in.Connected() +// ConnectionStatus returns the current status of the connection of the wrapped +// component. +func (i *WithPipeline) ConnectionStatus() component.ConnectionStatuses { + return i.in.ConnectionStatus() } //------------------------------------------------------------------------------ diff --git a/internal/component/input/wrap_with_pipeline_test.go b/internal/component/input/wrap_with_pipeline_test.go index 17867ccc4..672008ec1 100644 --- a/internal/component/input/wrap_with_pipeline_test.go +++ b/internal/component/input/wrap_with_pipeline_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" iprocessor "github.com/warpstreamlabs/bento/internal/component/processor" "github.com/warpstreamlabs/bento/internal/message" @@ -26,8 +27,10 @@ func (m *mockInput) TransactionChan() <-chan message.Transaction { return m.ts } -func (m *mockInput) Connected() bool { - return true +func (m *mockInput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(component.NoopObservability()), + } } func (m *mockInput) TriggerStopConsuming() { diff --git a/internal/component/observability.go b/internal/component/observability.go index 56201f468..16a41335d 100644 --- a/internal/component/observability.go +++ b/internal/component/observability.go @@ -15,6 +15,8 @@ type Observability interface { Metrics() metrics.Type Logger() log.Modular Tracer() trace.TracerProvider + Path() []string + Label() string } type mockObs struct{} @@ -31,6 +33,14 @@ func (m mockObs) Tracer() trace.TracerProvider { return noop.NewTracerProvider() } +func (m mockObs) Path() []string { + return nil +} + +func (m mockObs) Label() string { + return "" +} + // NoopObservability returns an implementation of Observability that does // nothing. func NoopObservability() Observability { diff --git a/internal/impl/io/input_dynamic_fan_in.go b/internal/impl/io/input_dynamic_fan_in.go index 90d46d797..5a0e06e5a 100644 --- a/internal/impl/io/input_dynamic_fan_in.go +++ b/internal/impl/io/input_dynamic_fan_in.go @@ -97,9 +97,9 @@ func (d *dynamicFanInInput) TransactionChan() <-chan message.Transaction { return d.transactionChan } -func (d *dynamicFanInInput) Connected() bool { - // Always return true as this is fuzzy right now. - return true +func (d *dynamicFanInInput) ConnectionStatus() component.ConnectionStatuses { + // Always return nil as this is fuzzy right now. + return nil } func (d *dynamicFanInInput) addInput(ident string, in input.Streamed) error { diff --git a/internal/impl/io/input_http_server.go b/internal/impl/io/input_http_server.go index 3e23527c4..76458c974 100644 --- a/internal/impl/io/input_http_server.go +++ b/internal/impl/io/input_http_server.go @@ -25,6 +25,7 @@ import ( "github.com/warpstreamlabs/bento/internal/api" "github.com/warpstreamlabs/bento/internal/bundle" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/component/interop" "github.com/warpstreamlabs/bento/internal/component/metrics" @@ -907,10 +908,10 @@ func (h *httpServerInput) TransactionChan() <-chan message.Transaction { return h.transactions } -// Connected returns a boolean indicating whether this input is currently -// connected to its target. -func (h *httpServerInput) Connected() bool { - return true +func (h *httpServerInput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(h.mgr), + } } func (h *httpServerInput) TriggerStopConsuming() { diff --git a/internal/impl/pure/input_broker_fan_in.go b/internal/impl/pure/input_broker_fan_in.go index 5f8ca5178..ca0444a3b 100644 --- a/internal/impl/pure/input_broker_fan_in.go +++ b/internal/impl/pure/input_broker_fan_in.go @@ -7,6 +7,7 @@ import ( "github.com/Jeffail/shutdown" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/message" ) @@ -77,20 +78,19 @@ func (i *fanInInputBroker) TransactionChan() <-chan message.Transaction { return i.transactions } -func (i *fanInInputBroker) Connected() bool { +func (i *fanInInputBroker) ConnectionStatus() component.ConnectionStatuses { i.remainingMapMut.Lock() defer i.remainingMapMut.Unlock() if len(i.remainingMap) == 0 { - return false + return nil } + var statuses component.ConnectionStatuses for index := range i.remainingMap { - if !i.closables[index].Connected() { - return false - } + statuses = append(statuses, i.closables[index].ConnectionStatus()...) } - return true + return statuses } func (i *fanInInputBroker) loop() { diff --git a/internal/impl/pure/input_broker_fan_in_test.go b/internal/impl/pure/input_broker_fan_in_test.go index bfb16995d..db6ec187a 100644 --- a/internal/impl/pure/input_broker_fan_in_test.go +++ b/internal/impl/pure/input_broker_fan_in_test.go @@ -90,15 +90,15 @@ func TestFanInConnected(t *testing.T) { fanIn, err := newFanInInputBroker(Inputs) require.NoError(t, err) - assert.True(t, fanIn.Connected()) + assert.True(t, fanIn.ConnectionStatus().AllActive()) close(tInOne) time.Sleep(time.Millisecond * 100) - assert.True(t, fanIn.Connected()) + assert.True(t, fanIn.ConnectionStatus().AllActive()) close(tInTwo) assert.Eventually(t, func() bool { - return !fanIn.Connected() + return !fanIn.ConnectionStatus().AllActive() }, time.Second, time.Millisecond*10) } diff --git a/internal/impl/pure/input_inproc.go b/internal/impl/pure/input_inproc.go index 3b21e819d..540aeb08d 100644 --- a/internal/impl/pure/input_inproc.go +++ b/internal/impl/pure/input_inproc.go @@ -7,6 +7,7 @@ import ( "github.com/Jeffail/shutdown" "github.com/warpstreamlabs/bento/internal/bundle" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/interop" "github.com/warpstreamlabs/bento/internal/component/metrics" "github.com/warpstreamlabs/bento/internal/log" @@ -108,8 +109,10 @@ func (i *inprocInput) TransactionChan() <-chan message.Transaction { return i.transactions } -func (i *inprocInput) Connected() bool { - return true +func (i *inprocInput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(i.mgr), + } } func (i *inprocInput) TriggerStopConsuming() { diff --git a/internal/impl/pure/input_read_until.go b/internal/impl/pure/input_read_until.go index 755491831..9f7d1c3da 100644 --- a/internal/impl/pure/input_read_until.go +++ b/internal/impl/pure/input_read_until.go @@ -12,6 +12,7 @@ import ( "github.com/Jeffail/shutdown" "github.com/warpstreamlabs/bento/internal/bloblang/mapping" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/component/interop" "github.com/warpstreamlabs/bento/internal/log" @@ -302,15 +303,13 @@ func (r *readUntilInput) TransactionChan() <-chan message.Transaction { return r.transactions } -// Connected returns a boolean indicating whether this input is currently -// connected to its target. -func (r *readUntilInput) Connected() bool { +func (r *readUntilInput) ConnectionStatus() component.ConnectionStatuses { wrappedP := r.wrappedInputLocked.Load() if wrappedP != nil { i := *wrappedP - return i.Connected() + return i.ConnectionStatus() } - return false + return nil } func (r *readUntilInput) TriggerStopConsuming() { diff --git a/internal/impl/pure/input_resource.go b/internal/impl/pure/input_resource.go index 6b64d1b1c..2075d4796 100644 --- a/internal/impl/pure/input_resource.go +++ b/internal/impl/pure/input_resource.go @@ -144,11 +144,13 @@ func (r *resourceInput) TransactionChan() (tChan <-chan message.Transaction) { return r.tChan } -func (r *resourceInput) Connected() (isConnected bool) { +func (r *resourceInput) ConnectionStatus() (s component.ConnectionStatuses) { if err := r.mgr.AccessInput(context.Background(), r.name, func(i input.Streamed) { - isConnected = i.Connected() + s = i.ConnectionStatus() }); err != nil { - r.log.Error("Failed to obtain input resource '%v': %v", r.name, err) + return component.ConnectionStatuses{ + component.ConnectionFailing(r.mgr, err), + } } return } diff --git a/internal/impl/pure/input_sequence.go b/internal/impl/pure/input_sequence.go index e6f09242d..a77b5d9e5 100644 --- a/internal/impl/pure/input_sequence.go +++ b/internal/impl/pure/input_sequence.go @@ -12,6 +12,7 @@ import ( "github.com/Jeffail/shutdown" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/input" "github.com/warpstreamlabs/bento/internal/component/interop" "github.com/warpstreamlabs/bento/internal/message" @@ -602,11 +603,11 @@ func (r *sequenceInput) TransactionChan() <-chan message.Transaction { return r.transactions } -func (r *sequenceInput) Connected() bool { +func (r *sequenceInput) ConnectionStatus() component.ConnectionStatuses { if t, _ := r.getTarget(); t != nil { - return t.Connected() + return t.ConnectionStatus() } - return false + return nil } func (r *sequenceInput) TriggerStopConsuming() { diff --git a/internal/manager/input_wrapper.go b/internal/manager/input_wrapper.go index f9509c49b..dd1da6b86 100644 --- a/internal/manager/input_wrapper.go +++ b/internal/manager/input_wrapper.go @@ -74,11 +74,15 @@ func (w *InputWrapper) TransactionChan() <-chan message.Transaction { return w.tranChan } -func (w *InputWrapper) Connected() bool { +// Connected returns a boolean indicating whether the wrapped input is currently +// connected to its target. +func (w *InputWrapper) ConnectionStatus() (s component.ConnectionStatuses) { w.inputLock.Lock() - con := w.ctrl.input != nil && w.ctrl.input.Connected() + if w.ctrl.input != nil { + s = w.ctrl.input.ConnectionStatus() + } w.inputLock.Unlock() - return con + return } func (w *InputWrapper) loop() { diff --git a/internal/manager/mock/input.go b/internal/manager/mock/input.go index d6d23af75..232bdedbc 100644 --- a/internal/manager/mock/input.go +++ b/internal/manager/mock/input.go @@ -4,12 +4,14 @@ import ( "context" "sync" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/message" ) // Input provides a mocked input implementation. type Input struct { TChan chan message.Transaction + closed bool closeOnce sync.Once } @@ -27,9 +29,16 @@ func NewInput(batches []message.Batch) *Input { return &Input{TChan: ts} } -// Connected always returns true. -func (f *Input) Connected() bool { - return true +// ConnectionStatus returns the current connection activity. +func (f *Input) ConnectionStatus() component.ConnectionStatuses { + if f.closed { + return component.ConnectionStatuses{ + component.ConnectionClosed(component.NoopObservability()), + } + } + return component.ConnectionStatuses{ + component.ConnectionActive(component.NoopObservability()), + } } // TransactionChan returns a transaction channel. @@ -41,6 +50,7 @@ func (f *Input) TransactionChan() <-chan message.Transaction { func (f *Input) TriggerStopConsuming() { f.closeOnce.Do(func() { close(f.TChan) + f.closed = true }) } @@ -48,6 +58,7 @@ func (f *Input) TriggerStopConsuming() { func (f *Input) TriggerCloseNow() { f.closeOnce.Do(func() { close(f.TChan) + f.closed = true }) } diff --git a/internal/stream/type.go b/internal/stream/type.go index 5212220ac..0096c57d0 100644 --- a/internal/stream/type.go +++ b/internal/stream/type.go @@ -49,7 +49,8 @@ func New(conf Config, mgr bundle.NewManagement, opts ...func(*Type)) (*Type, err } healthCheck := func(w http.ResponseWriter, r *http.Request) { - inputConnected := t.inputLayer.Connected() + inputStatuses := t.inputLayer.ConnectionStatus() + inputConnected := inputStatuses.AllActive() outputConnected := t.outputLayer.Connected() if atomic.LoadUint32(&t.closed) == 1 { @@ -92,7 +93,7 @@ func OptOnClose(onClose func()) func(*Type) { // IsReady returns a boolean indicating whether both the input and output layers // of the stream are connected. func (t *Type) IsReady() bool { - return t.inputLayer.Connected() && t.outputLayer.Connected() + return t.inputLayer.ConnectionStatus().AllActive() && t.outputLayer.Connected() } func (t *Type) start() (err error) { From 8a4751e08b84eccd82b590903494b65430a346ce Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Fri, 12 Jul 2024 22:07:53 +0100 Subject: [PATCH 11/12] Add output connection status Signed-off-by: Jem Davies --- internal/bundle/tracing/output.go | 5 ++-- internal/component/connection.go | 3 +++ internal/component/input/async_reader.go | 5 ++-- internal/component/input/batcher/batcher.go | 4 ++- internal/component/output/async_writer.go | 25 +++++++++++-------- internal/component/output/batcher/batcher.go | 9 ++++--- internal/component/output/interface.go | 15 ++++++----- internal/component/output/not_batched.go | 4 +-- .../component/output/wrap_with_pipeline.go | 10 +++++--- .../output/wrap_with_pipeline_test.go | 7 ++++-- internal/config/test/case.go | 1 + internal/impl/io/input_dynamic_fan_in.go | 3 ++- internal/impl/io/output_dynamic_fan_out.go | 8 +++--- internal/impl/io/output_http_server.go | 9 ++++--- internal/impl/pure/output_broker_fan_out.go | 8 +++--- .../pure/output_broker_fan_out_sequential.go | 8 +++--- .../output_broker_fan_out_sequential_test.go | 2 +- .../impl/pure/output_broker_fan_out_test.go | 4 +-- internal/impl/pure/output_broker_greedy.go | 9 +++---- .../impl/pure/output_broker_round_robin.go | 8 +++--- internal/impl/pure/output_drop_on.go | 4 +-- internal/impl/pure/output_fallback.go | 10 +++----- internal/impl/pure/output_inproc.go | 6 +++-- internal/impl/pure/output_reject_errored.go | 6 ++--- internal/impl/pure/output_resource.go | 11 ++++---- internal/impl/pure/output_resource_test.go | 2 +- internal/impl/pure/output_retry.go | 6 ++--- internal/impl/pure/output_switch.go | 8 +++--- internal/manager/input_wrapper.go | 7 ++++-- internal/manager/mock/manager.go | 2 ++ internal/manager/mock/output.go | 21 +++++++++++----- internal/manager/output_wrapper.go | 6 ++--- internal/stream/type.go | 6 +++-- 33 files changed, 133 insertions(+), 109 deletions(-) diff --git a/internal/bundle/tracing/output.go b/internal/bundle/tracing/output.go index aa6c14077..47400ece8 100644 --- a/internal/bundle/tracing/output.go +++ b/internal/bundle/tracing/output.go @@ -6,6 +6,7 @@ import ( "github.com/Jeffail/shutdown" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/output" "github.com/warpstreamlabs/bento/internal/message" ) @@ -67,8 +68,8 @@ func (t *tracedOutput) Consume(inChan <-chan message.Transaction) error { return t.wrapped.Consume(t.tChan) } -func (t *tracedOutput) Connected() bool { - return t.wrapped.Connected() +func (t *tracedOutput) ConnectionStatus() component.ConnectionStatuses { + return t.wrapped.ConnectionStatus() } func (t *tracedOutput) TriggerCloseNow() { diff --git a/internal/component/connection.go b/internal/component/connection.go index a2d83c2de..9e5137150 100644 --- a/internal/component/connection.go +++ b/internal/component/connection.go @@ -9,8 +9,11 @@ type ConnectionStatus struct { Err error } +// ConnectionStatuses represents an aggregate of connection statuses. type ConnectionStatuses []*ConnectionStatus +// AllActive returns true if there is one or more connections and they are all +// active. func (s ConnectionStatuses) AllActive() bool { if len(s) == 0 { return false diff --git a/internal/component/input/async_reader.go b/internal/component/input/async_reader.go index 712d1b648..4f4d0305f 100644 --- a/internal/component/input/async_reader.go +++ b/internal/component/input/async_reader.go @@ -242,8 +242,9 @@ func (r *AsyncReader) TransactionChan() <-chan message.Transaction { return r.transactions } -// Connected returns a boolean indicating whether this input is currently -// connected to its target. +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. func (r *AsyncReader) ConnectionStatus() component.ConnectionStatuses { return []*component.ConnectionStatus{ r.connection.Load(), diff --git a/internal/component/input/batcher/batcher.go b/internal/component/input/batcher/batcher.go index 3c4ae57da..279074e19 100644 --- a/internal/component/input/batcher/batcher.go +++ b/internal/component/input/batcher/batcher.go @@ -152,7 +152,9 @@ func (m *Impl) loop() { } } -// Connected returns true if the underlying input is connected. +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. func (m *Impl) ConnectionStatus() component.ConnectionStatuses { return m.child.ConnectionStatus() } diff --git a/internal/component/output/async_writer.go b/internal/component/output/async_writer.go index 1156c9131..3b1fab8dc 100644 --- a/internal/component/output/async_writer.go +++ b/internal/component/output/async_writer.go @@ -41,12 +41,13 @@ type AsyncSink interface { // AsyncWriter is an output type that writes messages to a writer.Type. type AsyncWriter struct { - isConnected int32 + connection atomic.Pointer[component.ConnectionStatus] typeStr string maxInflight int writer AsyncSink + mgr component.Observability log log.Modular stats metrics.Type tracer trace.TracerProvider @@ -62,12 +63,14 @@ func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component. typeStr: typeStr, maxInflight: maxInflight, writer: w, + mgr: mgr, log: mgr.Logger(), stats: mgr.Metrics(), tracer: mgr.Tracer(), transactions: nil, shutSig: shutdown.NewSignaller(), } + aWriter.connection.Store(component.ConnectionPending(mgr)) return aWriter, nil } @@ -100,7 +103,7 @@ func (w *AsyncWriter) loop() { defer func() { _ = w.writer.Close(context.Background()) - atomic.StoreInt32(&w.isConnected, 0) + w.connection.Store(component.ConnectionClosed(w.mgr)) w.shutSig.TriggerHasStopped() }() @@ -118,6 +121,7 @@ func (w *AsyncWriter) loop() { if w.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) { return false } + w.connection.Store(component.ConnectionFailing(w.mgr, err)) w.log.Error("Failed to connect to %v: %v\n", w.typeStr, err) mFailedConn.Incr(1) @@ -145,21 +149,21 @@ func (w *AsyncWriter) loop() { w.log.Info("Output type %v is now active", w.typeStr) mConn.Incr(1) - atomic.StoreInt32(&w.isConnected, 1) + w.connection.Store(component.ConnectionActive(w.mgr)) wg := sync.WaitGroup{} wg.Add(w.maxInflight) connectMut := sync.Mutex{} connectLoop := func(msg message.Batch) (latency int64, err error) { - atomic.StoreInt32(&w.isConnected, 0) + w.connection.Store(component.ConnectionFailing(w.mgr, component.ErrNotConnected)) connectMut.Lock() defer connectMut.Unlock() // If another goroutine got here first and we're able to send over the // connection, then we gracefully accept defeat. - if atomic.LoadInt32(&w.isConnected) == 1 { + if w.connection.Load().Connected { if latency, err = w.latencyMeasuringWrite(closeLeisureCtx, msg); err != component.ErrNotConnected { return } else if err != nil { @@ -175,7 +179,7 @@ func (w *AsyncWriter) loop() { return } if latency, err = w.latencyMeasuringWrite(closeLeisureCtx, msg); err != component.ErrNotConnected { - atomic.StoreInt32(&w.isConnected, 1) + w.connection.Store(component.ConnectionActive(w.mgr)) mConn.Incr(1) return } else if err != nil { @@ -255,10 +259,11 @@ func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error { return nil } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (w *AsyncWriter) Connected() bool { - return atomic.LoadInt32(&w.isConnected) == 1 +// ConnectionStatus returns the status of the given output connection. +func (w *AsyncWriter) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + w.connection.Load(), + } } // TriggerCloseNow shuts down the output and stops processing messages. diff --git a/internal/component/output/batcher/batcher.go b/internal/component/output/batcher/batcher.go index 4cff7cd02..5c307af85 100644 --- a/internal/component/output/batcher/batcher.go +++ b/internal/component/output/batcher/batcher.go @@ -159,10 +159,11 @@ func (m *Impl) loop() { } } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (m *Impl) Connected() bool { - return m.child.Connected() +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. +func (m *Impl) ConnectionStatus() component.ConnectionStatuses { + return m.child.ConnectionStatus() } // Consume assigns a messages channel for the output to read. diff --git a/internal/component/output/interface.go b/internal/component/output/interface.go index a18028664..93f0c9e2d 100644 --- a/internal/component/output/interface.go +++ b/internal/component/output/interface.go @@ -3,6 +3,7 @@ package output import ( "context" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/message" ) @@ -12,9 +13,10 @@ type Sync interface { // WriteTransaction attempts to write a transaction to an output. WriteTransaction(context.Context, message.Transaction) error - // Connected returns a boolean indicating whether this output is currently - // connected to its target. - Connected() bool + // ConnectionStatus returns the current status of the given component + // connection. The result is a slice in order to accommodate higher order + // components that wrap several others. + ConnectionStatus() component.ConnectionStatuses // TriggerStopConsuming instructs the output to start shutting down // resources once all pending messages are delivered and acknowledged. @@ -35,9 +37,10 @@ type Streamed interface { // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error - // Connected returns a boolean indicating whether this output is currently - // connected to its target. - Connected() bool + // ConnectionStatus returns the current status of the given component + // connection. The result is a slice in order to accommodate higher order + // components that wrap several others. + ConnectionStatus() component.ConnectionStatuses // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. diff --git a/internal/component/output/not_batched.go b/internal/component/output/not_batched.go index 1c9192d38..32abe839c 100644 --- a/internal/component/output/not_batched.go +++ b/internal/component/output/not_batched.go @@ -148,8 +148,8 @@ func (n *notBatchedOutput) Consume(ts <-chan message.Transaction) error { return nil } -func (n *notBatchedOutput) Connected() bool { - return n.out.Connected() +func (n *notBatchedOutput) ConnectionStatus() component.ConnectionStatuses { + return n.out.ConnectionStatus() } func (n *notBatchedOutput) TriggerCloseNow() { diff --git a/internal/component/output/wrap_with_pipeline.go b/internal/component/output/wrap_with_pipeline.go index 2ce6ab34a..2fafeafb7 100644 --- a/internal/component/output/wrap_with_pipeline.go +++ b/internal/component/output/wrap_with_pipeline.go @@ -3,6 +3,7 @@ package output import ( "context" + "github.com/warpstreamlabs/bento/internal/component" iprocessor "github.com/warpstreamlabs/bento/internal/component/processor" "github.com/warpstreamlabs/bento/internal/message" ) @@ -51,10 +52,11 @@ func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error { return i.pipe.Consume(tsChan) } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (i *WithPipeline) Connected() bool { - return i.out.Connected() +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. +func (i *WithPipeline) ConnectionStatus() component.ConnectionStatuses { + return i.out.ConnectionStatus() } //------------------------------------------------------------------------------ diff --git a/internal/component/output/wrap_with_pipeline_test.go b/internal/component/output/wrap_with_pipeline_test.go index d052e64f9..4e5d5fdbb 100644 --- a/internal/component/output/wrap_with_pipeline_test.go +++ b/internal/component/output/wrap_with_pipeline_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/output" "github.com/warpstreamlabs/bento/internal/component/processor" "github.com/warpstreamlabs/bento/internal/component/testutil" @@ -29,8 +30,10 @@ func (m *mockOutput) Consume(ts <-chan message.Transaction) error { return nil } -func (m *mockOutput) Connected() bool { - return true +func (m *mockOutput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(component.NoopObservability()), + } } func (m *mockOutput) TriggerCloseNow() { diff --git a/internal/config/test/case.go b/internal/config/test/case.go index 318cdf0b1..b569b65e1 100644 --- a/internal/config/test/case.go +++ b/internal/config/test/case.go @@ -15,6 +15,7 @@ const ( fieldCaseOutputBatches = "output_batches" ) +// Case contains a definition of a single Bento config test case. type Case struct { Name string Environment map[string]string diff --git a/internal/impl/io/input_dynamic_fan_in.go b/internal/impl/io/input_dynamic_fan_in.go index 5a0e06e5a..79765834b 100644 --- a/internal/impl/io/input_dynamic_fan_in.go +++ b/internal/impl/io/input_dynamic_fan_in.go @@ -98,7 +98,8 @@ func (d *dynamicFanInInput) TransactionChan() <-chan message.Transaction { } func (d *dynamicFanInInput) ConnectionStatus() component.ConnectionStatuses { - // Always return nil as this is fuzzy right now. + // TODO: We need to refactor the mechanisms for serving new inputs in order + // to allow access from here. return nil } diff --git a/internal/impl/io/output_dynamic_fan_out.go b/internal/impl/io/output_dynamic_fan_out.go index abb91723e..aec3f8f44 100644 --- a/internal/impl/io/output_dynamic_fan_out.go +++ b/internal/impl/io/output_dynamic_fan_out.go @@ -282,15 +282,13 @@ func (d *dynamicFanOutOutputBroker) loop() { } } -func (d *dynamicFanOutOutputBroker) Connected() bool { +func (d *dynamicFanOutOutputBroker) ConnectionStatus() (s component.ConnectionStatuses) { d.outputsMut.RLock() defer d.outputsMut.RUnlock() for _, out := range d.outputs { - if !out.output.Connected() { - return false - } + s = append(s, out.output.ConnectionStatus()...) } - return true + return } func (d *dynamicFanOutOutputBroker) TriggerCloseNow() { diff --git a/internal/impl/io/output_http_server.go b/internal/impl/io/output_http_server.go index 52ea44ccf..8639bea51 100644 --- a/internal/impl/io/output_http_server.go +++ b/internal/impl/io/output_http_server.go @@ -180,6 +180,7 @@ func init() { type httpServerOutput struct { conf hsoConfig log log.Modular + mgr bundle.NewManagement mux *mux.Router server *http.Server @@ -223,6 +224,7 @@ func newHTTPServerOutput(conf hsoConfig, mgr bundle.NewManagement) (output.Strea shutSig: shutdown.NewSignaller(), conf: conf, log: mgr.Logger(), + mgr: mgr, mux: gMux, server: server, @@ -473,9 +475,10 @@ func (h *httpServerOutput) Consume(ts <-chan message.Transaction) error { return nil } -func (h *httpServerOutput) Connected() bool { - // Always return true as this is fuzzy right now. - return true +func (h *httpServerOutput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(h.mgr), + } } func (h *httpServerOutput) TriggerCloseNow() { diff --git a/internal/impl/pure/output_broker_fan_out.go b/internal/impl/pure/output_broker_fan_out.go index 62a27c4b1..7281b6f71 100644 --- a/internal/impl/pure/output_broker_fan_out.go +++ b/internal/impl/pure/output_broker_fan_out.go @@ -48,13 +48,11 @@ func (o *fanOutOutputBroker) Consume(transactions <-chan message.Transaction) er return nil } -func (o *fanOutOutputBroker) Connected() bool { +func (o *fanOutOutputBroker) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range o.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } func (o *fanOutOutputBroker) loop() { diff --git a/internal/impl/pure/output_broker_fan_out_sequential.go b/internal/impl/pure/output_broker_fan_out_sequential.go index 602081156..793788f42 100644 --- a/internal/impl/pure/output_broker_fan_out_sequential.go +++ b/internal/impl/pure/output_broker_fan_out_sequential.go @@ -49,13 +49,11 @@ func (o *fanOutSequentialOutputBroker) Consume(transactions <-chan message.Trans return nil } -func (o *fanOutSequentialOutputBroker) Connected() bool { +func (o *fanOutSequentialOutputBroker) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range o.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } func (o *fanOutSequentialOutputBroker) loop() { diff --git a/internal/impl/pure/output_broker_fan_out_sequential_test.go b/internal/impl/pure/output_broker_fan_out_sequential_test.go index 4a0e56fab..44883c34d 100644 --- a/internal/impl/pure/output_broker_fan_out_sequential_test.go +++ b/internal/impl/pure/output_broker_fan_out_sequential_test.go @@ -33,7 +33,7 @@ func TestBasicFanOutSequential(t *testing.T) { require.NoError(t, err) require.NoError(t, oTM.Consume(readChan)) - assert.True(t, oTM.Connected()) + assert.True(t, oTM.ConnectionStatus().AllActive()) tCtx, done := context.WithTimeout(context.Background(), time.Second*5) defer done() diff --git a/internal/impl/pure/output_broker_fan_out_test.go b/internal/impl/pure/output_broker_fan_out_test.go index 8a8c762ad..9bbc8d5ec 100644 --- a/internal/impl/pure/output_broker_fan_out_test.go +++ b/internal/impl/pure/output_broker_fan_out_test.go @@ -37,7 +37,7 @@ func TestBasicFanOut(t *testing.T) { require.NoError(t, err) require.NoError(t, oTM.Consume(readChan)) - assert.True(t, oTM.Connected()) + assert.True(t, oTM.ConnectionStatus().AllActive()) tCtx, done := context.WithTimeout(context.Background(), time.Second*10) defer done() @@ -95,7 +95,7 @@ func TestBasicFanOutMutations(t *testing.T) { require.NoError(t, err) require.NoError(t, oTM.Consume(readChan)) - assert.True(t, oTM.Connected()) + assert.True(t, oTM.ConnectionStatus().AllActive()) tCtx, done := context.WithTimeout(context.Background(), time.Second*10) defer done() diff --git a/internal/impl/pure/output_broker_greedy.go b/internal/impl/pure/output_broker_greedy.go index 1234571ea..09f9edfa5 100644 --- a/internal/impl/pure/output_broker_greedy.go +++ b/internal/impl/pure/output_broker_greedy.go @@ -3,6 +3,7 @@ package pure import ( "context" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/component/output" "github.com/warpstreamlabs/bento/internal/message" ) @@ -26,13 +27,11 @@ func (g *greedyOutputBroker) Consume(ts <-chan message.Transaction) error { return nil } -func (g *greedyOutputBroker) Connected() bool { +func (g *greedyOutputBroker) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range g.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } func (g *greedyOutputBroker) TriggerCloseNow() { diff --git a/internal/impl/pure/output_broker_round_robin.go b/internal/impl/pure/output_broker_round_robin.go index 5f4e5b3aa..858a07fa9 100644 --- a/internal/impl/pure/output_broker_round_robin.go +++ b/internal/impl/pure/output_broker_round_robin.go @@ -45,13 +45,11 @@ func (o *roundRobinOutputBroker) Consume(ts <-chan message.Transaction) error { return nil } -func (o *roundRobinOutputBroker) Connected() bool { +func (o *roundRobinOutputBroker) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range o.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } func (o *roundRobinOutputBroker) loop() { diff --git a/internal/impl/pure/output_drop_on.go b/internal/impl/pure/output_drop_on.go index 335bdc244..d87095c1f 100644 --- a/internal/impl/pure/output_drop_on.go +++ b/internal/impl/pure/output_drop_on.go @@ -285,8 +285,8 @@ func (d *dropOnWriter) Consume(ts <-chan message.Transaction) error { return nil } -func (d *dropOnWriter) Connected() bool { - return d.wrapped.Connected() +func (d *dropOnWriter) ConnectionStatus() component.ConnectionStatuses { + return d.wrapped.ConnectionStatus() } func (d *dropOnWriter) TriggerCloseNow() { diff --git a/internal/impl/pure/output_fallback.go b/internal/impl/pure/output_fallback.go index 0b26738ea..932745c62 100644 --- a/internal/impl/pure/output_fallback.go +++ b/internal/impl/pure/output_fallback.go @@ -146,15 +146,11 @@ func (t *fallbackBroker) Consume(ts <-chan message.Transaction) error { return nil } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (t *fallbackBroker) Connected() bool { +func (t *fallbackBroker) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range t.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } //------------------------------------------------------------------------------ diff --git a/internal/impl/pure/output_inproc.go b/internal/impl/pure/output_inproc.go index 627b3b62c..0bbc3bb05 100644 --- a/internal/impl/pure/output_inproc.go +++ b/internal/impl/pure/output_inproc.go @@ -105,8 +105,10 @@ func (i *inprocOutput) Consume(ts <-chan message.Transaction) error { return nil } -func (i *inprocOutput) Connected() bool { - return true +func (i *inprocOutput) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(i.mgr), + } } func (i *inprocOutput) TriggerCloseNow() { diff --git a/internal/impl/pure/output_reject_errored.go b/internal/impl/pure/output_reject_errored.go index 2a8d0bcf9..b3e468174 100644 --- a/internal/impl/pure/output_reject_errored.go +++ b/internal/impl/pure/output_reject_errored.go @@ -136,10 +136,8 @@ func (t *rejectErroredBroker) Consume(ts <-chan message.Transaction) error { return nil } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (t *rejectErroredBroker) Connected() bool { - return t.output.Connected() +func (t *rejectErroredBroker) ConnectionStatus() component.ConnectionStatuses { + return t.output.ConnectionStatus() } //------------------------------------------------------------------------------ diff --git a/internal/impl/pure/output_resource.go b/internal/impl/pure/output_resource.go index f81c9dc4d..8660c50df 100644 --- a/internal/impl/pure/output_resource.go +++ b/internal/impl/pure/output_resource.go @@ -145,12 +145,13 @@ func (r *resourceOutput) Consume(ts <-chan message.Transaction) error { return nil } -func (r *resourceOutput) Connected() (isConnected bool) { - var err error - if err = r.mgr.AccessOutput(context.Background(), r.name, func(o output.Sync) { - isConnected = o.Connected() +func (r *resourceOutput) ConnectionStatus() (s component.ConnectionStatuses) { + if err := r.mgr.AccessOutput(context.Background(), r.name, func(o output.Sync) { + s = o.ConnectionStatus() }); err != nil { - r.log.Error("Failed to obtain output resource '%v': %v", r.name, err) + return component.ConnectionStatuses{ + component.ConnectionFailing(r.mgr, err), + } } return } diff --git a/internal/impl/pure/output_resource_test.go b/internal/impl/pure/output_resource_test.go index bcc4ef89a..6cdfa9cfd 100644 --- a/internal/impl/pure/output_resource_test.go +++ b/internal/impl/pure/output_resource_test.go @@ -37,7 +37,7 @@ func TestResourceOutput(t *testing.T) { p, err := mgr.NewOutput(nConf) require.NoError(t, err) - assert.True(t, p.Connected()) + assert.True(t, p.ConnectionStatus().AllActive()) tChan := make(chan message.Transaction) assert.NoError(t, p.Consume(tChan)) diff --git a/internal/impl/pure/output_retry.go b/internal/impl/pure/output_retry.go index 29b6b768b..8be314354 100644 --- a/internal/impl/pure/output_retry.go +++ b/internal/impl/pure/output_retry.go @@ -248,10 +248,8 @@ func (r *indefiniteRetry) Consume(ts <-chan message.Transaction) error { return nil } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (r *indefiniteRetry) Connected() bool { - return r.wrapped.Connected() +func (r *indefiniteRetry) ConnectionStatus() component.ConnectionStatuses { + return r.wrapped.ConnectionStatus() } // CloseAsync shuts down the Retry input and stops processing requests. diff --git a/internal/impl/pure/output_switch.go b/internal/impl/pure/output_switch.go index d8b931ca0..2854098c5 100644 --- a/internal/impl/pure/output_switch.go +++ b/internal/impl/pure/output_switch.go @@ -271,13 +271,11 @@ func (o *switchOutput) Consume(transactions <-chan message.Transaction) error { return nil } -func (o *switchOutput) Connected() bool { +func (o *switchOutput) ConnectionStatus() (s component.ConnectionStatuses) { for _, out := range o.outputs { - if !out.Connected() { - return false - } + s = append(s, out.ConnectionStatus()...) } - return true + return } func (o *switchOutput) dispatchToTargets( diff --git a/internal/manager/input_wrapper.go b/internal/manager/input_wrapper.go index dd1da6b86..679e528d4 100644 --- a/internal/manager/input_wrapper.go +++ b/internal/manager/input_wrapper.go @@ -70,12 +70,15 @@ func (w *InputWrapper) SwapInput(i input.Streamed) { w.inputLock.Unlock() } +// TransactionChan returns a transactions channel for consuming messages from +// the wrapped input. func (w *InputWrapper) TransactionChan() <-chan message.Transaction { return w.tranChan } -// Connected returns a boolean indicating whether the wrapped input is currently -// connected to its target. +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. func (w *InputWrapper) ConnectionStatus() (s component.ConnectionStatuses) { w.inputLock.Lock() if w.ctrl.input != nil { diff --git a/internal/manager/mock/manager.go b/internal/manager/mock/manager.go index 4227eb4e1..c8fe40802 100644 --- a/internal/manager/mock/manager.go +++ b/internal/manager/mock/manager.go @@ -67,6 +67,8 @@ func NewManager() *Manager { } } +// EngineVersion returns the version stamp associated with the underlying +// bento engine. func (m *Manager) EngineVersion() string { return m.Version } diff --git a/internal/manager/mock/output.go b/internal/manager/mock/output.go index d58c6bc1c..a450ca5bc 100644 --- a/internal/manager/mock/output.go +++ b/internal/manager/mock/output.go @@ -3,6 +3,7 @@ package mock import ( "context" + "github.com/warpstreamlabs/bento/internal/component" "github.com/warpstreamlabs/bento/internal/message" ) @@ -14,9 +15,13 @@ func (o OutputWriter) WriteTransaction(ctx context.Context, t message.Transactio return o(ctx, t) } -// Connected always returns true. -func (o OutputWriter) Connected() bool { - return true +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. +func (o OutputWriter) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(component.NoopObservability()), + } } // TriggerStopConsuming does nothing. @@ -38,9 +43,13 @@ type OutputChanneled struct { TChan <-chan message.Transaction } -// Connected returns true. -func (m *OutputChanneled) Connected() bool { - return true +// ConnectionStatus returns the current status of the given component +// connection. The result is a slice in order to accommodate higher order +// components that wrap several others. +func (m *OutputChanneled) ConnectionStatus() component.ConnectionStatuses { + return component.ConnectionStatuses{ + component.ConnectionActive(component.NoopObservability()), + } } // Consume sets the read channel. This implementation is NOT thread safe. diff --git a/internal/manager/output_wrapper.go b/internal/manager/output_wrapper.go index f18b4ff4a..d9879a3c1 100644 --- a/internal/manager/output_wrapper.go +++ b/internal/manager/output_wrapper.go @@ -45,10 +45,8 @@ func (w *outputWrapper) WriteTransaction(ctx context.Context, t message.Transact return nil } -// Connected returns a boolean indicating whether this output is currently -// connected to its target. -func (w *outputWrapper) Connected() bool { - return w.output.Connected() +func (w *outputWrapper) ConnectionStatus() component.ConnectionStatuses { + return w.output.ConnectionStatus() } func (w *outputWrapper) TriggerStopConsuming() { diff --git a/internal/stream/type.go b/internal/stream/type.go index 0096c57d0..9a621ff4f 100644 --- a/internal/stream/type.go +++ b/internal/stream/type.go @@ -51,7 +51,9 @@ func New(conf Config, mgr bundle.NewManagement, opts ...func(*Type)) (*Type, err healthCheck := func(w http.ResponseWriter, r *http.Request) { inputStatuses := t.inputLayer.ConnectionStatus() inputConnected := inputStatuses.AllActive() - outputConnected := t.outputLayer.Connected() + + outputStatuses := t.outputLayer.ConnectionStatus() + outputConnected := outputStatuses.AllActive() if atomic.LoadUint32(&t.closed) == 1 { http.Error(w, "Stream terminated", http.StatusNotFound) @@ -93,7 +95,7 @@ func OptOnClose(onClose func()) func(*Type) { // IsReady returns a boolean indicating whether both the input and output layers // of the stream are connected. func (t *Type) IsReady() bool { - return t.inputLayer.ConnectionStatus().AllActive() && t.outputLayer.Connected() + return t.inputLayer.ConnectionStatus().AllActive() && t.outputLayer.ConnectionStatus().AllActive() } func (t *Type) start() (err error) { From 3d270c5a48ebe30569ec80714125e292c58e3cbd Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Thu, 18 Jul 2024 22:23:19 +0100 Subject: [PATCH 12/12] update changelogs Signed-off-by: Jem Davies --- CHANGELOG.md | 6 ++++++ CHANGELOG.old.md | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27fe30d4d..2c2bc828a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## 1.2.0 - TBD + +### Upstream Changes + +- [v4.31.0 - 2024-07-18](./CHANGELOG.old.md#4.31.0-2024-07-18) + ## 1.1.0 - 2024-07-12 ### Added diff --git a/CHANGELOG.old.md b/CHANGELOG.old.md index a3cffe6c0..985f1749e 100644 --- a/CHANGELOG.old.md +++ b/CHANGELOG.old.md @@ -3,6 +3,17 @@ Changelog All notable changes to this project will be documented in this file. +## 4.31.0 - 2024-07-18 + +### Added + +- Field max_retries added to the retry processor. +- Algorithm fnv32 added to the hash bloblang method +- Parameter escape_html added to the format_json() Bloblang method. +- New array bloblang method +- Go API: New generic key/value store methods added to the *Resources type. +- Use new rickb777/period library for parse_duration_iso8601 + ## 4.30.0 - 2024-06-13 ### Added