From a7899a5f079f2357df84fcc1e134d858f8569cc2 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 7 Aug 2024 14:57:59 +0100 Subject: [PATCH 01/12] fix func resolveAvroReferences() Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro.go | 54 +++++++++++++++++++++------ 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index 337bb717c..6c4121af0 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -4,12 +4,25 @@ import ( "context" "encoding/json" "fmt" + "strings" "github.com/linkedin/goavro/v2" "github.com/warpstreamlabs/bento/public/service" ) +type field struct { + Name string `json:"name"` + Type string `json:"type"` +} + +type reference struct { + Type string `json:"type"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Fields []field `json:"fields"` +} + func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) { if len(info.References) == 0 { return info.Schema, nil @@ -23,26 +36,43 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in return "", nil } - schemaDry := []string{} + var schemaDry any if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil { return "", fmt.Errorf("failed to parse root schema as enum: %w", err) } - schemaHydrated := make([]json.RawMessage, len(schemaDry)) - for i, name := range schemaDry { - def, exists := refsMap[name] - if !exists { - return "", fmt.Errorf("referenced type '%v' was not found in references", name) - } - schemaHydrated[i] = []byte(def) - } + schemaRaw, _ := json.Marshal(schemaDry) + schemaString := string(schemaRaw) - schemaHydratedBytes, err := json.Marshal(schemaHydrated) + var schema reference + err := json.Unmarshal([]byte(schemaRaw), &schema) if err != nil { - return "", fmt.Errorf("failed to marshal hydrated schema: %w", err) + panic(err) + } + + var ref reference + + for { + initialSchemaDry := schemaString + + for k, v := range refsMap { + err := json.Unmarshal([]byte(refsMap[k]), &ref) + if err != nil { + panic(err) + } + + if schema.Namespace == ref.Namespace { + schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Name+`"`, `"type":`+v) + } else { + schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Namespace+`.`+ref.Name+`"`, `"type":`+v) + } + } + if schemaString == initialSchemaDry { + break + } } - return string(schemaHydratedBytes), nil + return schemaString, nil } func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) { From 13e56aa4968539191f5028a7dd54a151e4de4d73 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Thu, 8 Aug 2024 18:49:10 +0100 Subject: [PATCH 02/12] fix linter Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index 6c4121af0..177618f3e 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -45,7 +45,7 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in schemaString := string(schemaRaw) var schema reference - err := json.Unmarshal([]byte(schemaRaw), &schema) + err := json.Unmarshal(schemaRaw, &schema) if err != nil { panic(err) } From ae0a3cdac78474c3828b2d536176e295734a5323 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Thu, 8 Aug 2024 19:30:33 +0100 Subject: [PATCH 03/12] remove panic() Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index 177618f3e..c9924c92d 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -47,7 +47,7 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in var schema reference err := json.Unmarshal(schemaRaw, &schema) if err != nil { - panic(err) + return "", fmt.Errorf("failed to unmarshal root schema: %w", err) } var ref reference @@ -58,7 +58,7 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in for k, v := range refsMap { err := json.Unmarshal([]byte(refsMap[k]), &ref) if err != nil { - panic(err) + return "", fmt.Errorf("failed to unmarshal refsMap value %s: %w", k, err) } if schema.Namespace == ref.Namespace { From 2fcad5f85bcaa7e78c12823ec707ac5f0f6bd9fc Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Sun, 11 Aug 2024 16:20:05 +0100 Subject: [PATCH 04/12] add avro nested schema functionality Signed-off-by: Jem Davies --- .../processor_schema_registry_decode.go | 26 ++++--- .../processor_schema_registry_decode_test.go | 10 +-- .../processor_schema_registry_encode.go | 14 +++- .../processor_schema_registry_encode_test.go | 16 ++--- internal/impl/confluent/serde_avro.go | 67 +++++++++++++++++-- internal/impl/confluent/serde_avro_test.go | 4 +- internal/impl/confluent/serde_json_test.go | 4 +- .../impl/confluent/serde_protobuf_test.go | 10 +-- 8 files changed, 113 insertions(+), 38 deletions(-) diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 71ddb230b..9705a9c8e 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -48,6 +48,9 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Field(service.NewBoolField("avro_raw_json"). Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json."). Advanced().Default(false)). + Field(service.NewBoolField("avro_nested_schemas"). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). + Advanced().Default(false).Version("?.?.?")). // TODO: replace with the version numb Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) for _, f := range service.NewHTTPRequestAuthSignerFields() { @@ -71,8 +74,9 @@ func init() { //------------------------------------------------------------------------------ type schemaRegistryDecoder struct { - avroRawJSON bool - client *schemaRegistryClient + avroRawJSON bool + avroNestedSchemas bool + client *schemaRegistryClient schemas map[int]*cachedSchemaDecoder cacheMut sync.RWMutex @@ -100,7 +104,11 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr) + avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas") + if err != nil { + return nil, err + } + return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, avroNestedSchemas, mgr) } func newSchemaRegistryDecoder( @@ -108,14 +116,16 @@ func newSchemaRegistryDecoder( reqSigner func(f fs.FS, req *http.Request) error, tlsConf *tls.Config, avroRawJSON bool, + avroNestedSchemas bool, mgr *service.Resources, ) (*schemaRegistryDecoder, error) { s := &schemaRegistryDecoder{ - avroRawJSON: avroRawJSON, - schemas: map[int]*cachedSchemaDecoder{}, - shutSig: shutdown.NewSignaller(), - logger: mgr.Logger(), - mgr: mgr, + avroRawJSON: avroRawJSON, + avroNestedSchemas: avroNestedSchemas, + schemas: map[int]*cachedSchemaDecoder{}, + shutSig: shutdown.NewSignaller(), + logger: mgr.Logger(), + mgr: mgr, } var err error if s.client, err = newSchemaRegistryClient(urlStr, reqSigner, tlsConf, mgr); err != nil { diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index 1b5a518bb..0a97ee867 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -227,7 +227,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -330,7 +330,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -408,7 +408,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) { return nil, fmt.Errorf("nope") }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) require.NoError(t, decoder.Close(context.Background())) @@ -455,7 +455,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -518,7 +518,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { diff --git a/internal/impl/confluent/processor_schema_registry_encode.go b/internal/impl/confluent/processor_schema_registry_encode.go index dbb5b3edf..7e5ea0b93 100644 --- a/internal/impl/confluent/processor_schema_registry_encode.go +++ b/internal/impl/confluent/processor_schema_registry_encode.go @@ -70,7 +70,10 @@ We will be considering alternative approaches in future so please [get in touch] Example("1h")). Field(service.NewBoolField("avro_raw_json"). Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json."). - Advanced().Default(false).Version("1.0.0")) + Advanced().Default(false).Version("1.0.0")). + Field(service.NewBoolField("avro_nested_schemas"). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). + Advanced().Default(false).Version("?.?.?")) // TODO: replace with the version number for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("1.0.0")) @@ -96,6 +99,7 @@ type schemaRegistryEncoder struct { client *schemaRegistryClient subject *service.InterpolatedString avroRawJSON bool + avroNestedSchemas bool schemaRefreshAfter time.Duration schemas map[string]cachedSchemaEncoder @@ -121,6 +125,10 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } + avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas") + if err != nil { + return nil, err + } refreshPeriodStr, err := conf.FieldString("refresh_period") if err != nil { return nil, err @@ -141,7 +149,7 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, refreshPeriod, refreshTicker, mgr) + return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, avroNestedSchemas, refreshPeriod, refreshTicker, mgr) } func newSchemaRegistryEncoder( @@ -150,12 +158,14 @@ func newSchemaRegistryEncoder( tlsConf *tls.Config, subject *service.InterpolatedString, avroRawJSON bool, + avroNestedSchemas bool, schemaRefreshAfter, schemaRefreshTicker time.Duration, mgr *service.Resources, ) (*schemaRegistryEncoder, error) { s := &schemaRegistryEncoder{ subject: subject, avroRawJSON: avroRawJSON, + avroNestedSchemas: avroNestedSchemas, schemaRefreshAfter: schemaRefreshAfter, schemas: map[string]cachedSchemaEncoder{}, shutSig: shutdown.NewSignaller(), diff --git a/internal/impl/confluent/processor_schema_registry_encode_test.go b/internal/impl/confluent/processor_schema_registry_encode_test.go index 7d4acee38..704957b9f 100644 --- a/internal/impl/confluent/processor_schema_registry_encode_test.go +++ b/internal/impl/confluent/processor_schema_registry_encode_test.go @@ -129,7 +129,7 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -211,7 +211,7 @@ func TestSchemaRegistryEncodeAvroRawJSON(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -293,7 +293,7 @@ func TestSchemaRegistryEncodeAvroLogicalTypes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -370,7 +370,7 @@ func TestSchemaRegistryEncodeAvroRawJSONLogicalTypes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -435,7 +435,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) require.NoError(t, encoder.Close(context.Background())) @@ -496,7 +496,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) require.NoError(t, encoder.Close(context.Background())) @@ -598,7 +598,7 @@ func TestSchemaRegistryEncodeJSON(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -691,7 +691,7 @@ func TestSchemaRegistryEncodeJSONConstantRefreshes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Millisecond, time.Millisecond*10, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Millisecond, time.Millisecond*10, service.MockResources()) require.NoError(t, err) input := `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}` diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index c9924c92d..144d4f0e9 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -36,6 +36,41 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in return "", nil } + schemaDry := []string{} + if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil { + return "", fmt.Errorf("failed to parse root schema as enum: %w", err) + } + + schemaHydrated := make([]json.RawMessage, len(schemaDry)) + for i, name := range schemaDry { + def, exists := refsMap[name] + if !exists { + return "", fmt.Errorf("referenced type '%v' was not found in references", name) + } + schemaHydrated[i] = []byte(def) + } + + schemaHydratedBytes, err := json.Marshal(schemaHydrated) + if err != nil { + return "", fmt.Errorf("failed to marshal hydrated schema: %w", err) + } + + return string(schemaHydratedBytes), nil +} + +func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) { + if len(info.References) == 0 { + return info.Schema, nil + } + + refsMap := map[string]string{} + if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error { + refsMap[name] = info.Schema + return nil + }); err != nil { + return "", nil + } + var schemaDry any if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil { return "", fmt.Errorf("failed to parse root schema as enum: %w", err) @@ -76,9 +111,19 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in } func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) { - schema, err := resolveAvroReferences(ctx, s.client, info) - if err != nil { - return nil, err + var schema string + var err error + + if s.avroNestedSchemas { + schema, err = resolveAvroReferencesNested(ctx, s.client, info) + if err != nil { + return nil, err + } + } else { + schema, err = resolveAvroReferences(ctx, s.client, info) + if err != nil { + return nil, err + } } var codec *goavro.Codec @@ -114,9 +159,19 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaI } func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) { - schema, err := resolveAvroReferences(ctx, s.client, info) - if err != nil { - return nil, err + var schema string + var err error + + if s.avroNestedSchemas { + schema, err = resolveAvroReferencesNested(ctx, s.client, info) + if err != nil { + return nil, err + } + } else { + schema, err = resolveAvroReferences(ctx, s.client, info) + if err != nil { + return nil, err + } } var codec *goavro.Codec diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index 706180114..ffddad949 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -90,10 +90,10 @@ func TestAvroReferences(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_json_test.go b/internal/impl/confluent/serde_json_test.go index bfc93f420..53ddd399e 100644 --- a/internal/impl/confluent/serde_json_test.go +++ b/internal/impl/confluent/serde_json_test.go @@ -93,10 +93,10 @@ func TestResolveJsonSchema(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_protobuf_test.go b/internal/impl/confluent/serde_protobuf_test.go index 6dcec5216..f5c72a9b0 100644 --- a/internal/impl/confluent/serde_protobuf_test.go +++ b/internal/impl/confluent/serde_protobuf_test.go @@ -79,10 +79,10 @@ message bar { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -209,10 +209,10 @@ message bar { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -268,7 +268,7 @@ func runEncoderAgainstInputsMultiple(t testing.TB, urlStr, subject string, input subj, err := service.NewInterpolatedString(subject) require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { _ = encoder.Close(tCtx) From 2f5de5ed960b5d4b9ce62bec574bc19e49f974d5 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Sun, 11 Aug 2024 16:23:08 +0100 Subject: [PATCH 05/12] make docs Signed-off-by: Jem Davies --- .../components/processors/schema_registry_decode.md | 10 ++++++++++ .../components/processors/schema_registry_encode.md | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/website/docs/components/processors/schema_registry_decode.md b/website/docs/components/processors/schema_registry_decode.md index 33f2b5afc..c0e8195fc 100644 --- a/website/docs/components/processors/schema_registry_decode.md +++ b/website/docs/components/processors/schema_registry_decode.md @@ -43,6 +43,7 @@ schema_registry_decode: label: "" schema_registry_decode: avro_raw_json: false + avro_nested_schemas: false url: "" # No default (required) oauth: enabled: false @@ -104,6 +105,15 @@ Whether Avro messages should be decoded into normal JSON ("json that meets the e Type: `bool` Default: `false` +### `avro_nested_schemas` + +Whether Avro Schemas are nested. If true bento will resolve schema references. + + +Type: `bool` +Default: `false` +Requires version ?.?.? or newer + ### `url` The base URL of the schema registry service. diff --git a/website/docs/components/processors/schema_registry_encode.md b/website/docs/components/processors/schema_registry_encode.md index 1f3d3ae7b..27227d4b3 100644 --- a/website/docs/components/processors/schema_registry_encode.md +++ b/website/docs/components/processors/schema_registry_encode.md @@ -50,6 +50,7 @@ schema_registry_encode: subject: foo # No default (required) refresh_period: 10m avro_raw_json: false + avro_nested_schemas: false oauth: enabled: false consumer_key: "" @@ -163,6 +164,15 @@ Type: `bool` Default: `false` Requires version 1.0.0 or newer +### `avro_nested_schemas` + +Whether Avro Schemas are nested. If true bento will resolve schema references. + + +Type: `bool` +Default: `false` +Requires version ?.?.? or newer + ### `oauth` Allows you to specify open authentication via OAuth version 1. From 10985fea9016c7a0fb22f511b1784831ce2745b0 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 13 Aug 2024 21:39:27 +0100 Subject: [PATCH 06/12] add test for nested schemas Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro_test.go | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index ffddad949..860193367 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -141,3 +141,99 @@ func TestAvroReferences(t *testing.T) { }) } } + +func TestAvroReferencesNested(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + + userSchema := `{"type":"record","name":"User","namespace":"com.example","fields":[{"name":"name","type":"string"},{"name":"email","type":"string"},{"name":"address","type":"Address"}]}` + addressSchema := `{"type":"record","name":"Address","namespace":"com.example","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"state","type":"State"},{"name":"zip","type":"string"}]}` + stateSchema := `{"type":"record","name":"State","namespace":"com.example","fields":[{"name":"state","type":"string"}]}` + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/com.example.User/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": userSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + case "/subjects/com.example.Address/versions/1", "/schemas/ids/2": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": addressSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.State", "subject": "com.example.State", "version": 1}, + }, + }), nil + case "/subjects/com.example.State/versions/1", "/schemas/ids/3": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": stateSchema, + "schemaType": "AVRO", + }), nil + } + return nil, nil + }) + + subj, err := service.NewInterpolatedString("com.example.User") + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains []string + }{ + { + name: "a foo", + input: `{"name":"JohnDoe","email":"john.doe@example.com","address":{"street":"123MainSt","city":"Anytown","state":{"state":"CA"},"zip":"12345"}}`, + output: `{}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, true, time.Minute*10, time.Minute, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = encoder.Close(tCtx) + }) + + inMsg := service.NewMessage([]byte(test.input)) + encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg}) + require.NoError(t, err) + require.Len(t, encodedMsgs, 1) + require.Len(t, encodedMsgs[0], 1) + + encodedMsg := encodedMsgs[0][0] + + if len(test.errContains) > 0 { + require.Error(t, encodedMsg.GetError()) + for _, errStr := range test.errContains { + assert.Contains(t, encodedMsg.GetError().Error(), errStr) + } + return + } + + b, err := encodedMsg.AsBytes() + require.NoError(t, err) + + require.NoError(t, encodedMsg.GetError()) + require.NotEqual(t, test.input, string(b)) + + var n any + require.Error(t, json.Unmarshal(b, &n), "message contents should no longer be valid JSON") + + }) + } +} From d40118b5e3c79178ddc1727442395e6c6688de45 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 13 Aug 2024 21:45:31 +0100 Subject: [PATCH 07/12] alter version to 1.2.0 Signed-off-by: Jem Davies --- internal/impl/confluent/processor_schema_registry_decode.go | 2 +- internal/impl/confluent/processor_schema_registry_encode.go | 2 +- website/docs/components/processors/schema_registry_decode.md | 2 +- website/docs/components/processors/schema_registry_encode.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 9705a9c8e..5530aacd3 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -50,7 +50,7 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Advanced().Default(false)). Field(service.NewBoolField("avro_nested_schemas"). Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). - Advanced().Default(false).Version("?.?.?")). // TODO: replace with the version numb + Advanced().Default(false).Version("1.2.0")). Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) for _, f := range service.NewHTTPRequestAuthSignerFields() { diff --git a/internal/impl/confluent/processor_schema_registry_encode.go b/internal/impl/confluent/processor_schema_registry_encode.go index 7e5ea0b93..4da894121 100644 --- a/internal/impl/confluent/processor_schema_registry_encode.go +++ b/internal/impl/confluent/processor_schema_registry_encode.go @@ -73,7 +73,7 @@ We will be considering alternative approaches in future so please [get in touch] Advanced().Default(false).Version("1.0.0")). Field(service.NewBoolField("avro_nested_schemas"). Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). - Advanced().Default(false).Version("?.?.?")) // TODO: replace with the version number + Advanced().Default(false).Version("1.2.0")) for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("1.0.0")) diff --git a/website/docs/components/processors/schema_registry_decode.md b/website/docs/components/processors/schema_registry_decode.md index c0e8195fc..205ba6bac 100644 --- a/website/docs/components/processors/schema_registry_decode.md +++ b/website/docs/components/processors/schema_registry_decode.md @@ -112,7 +112,7 @@ Whether Avro Schemas are nested. If true bento will resolve schema references. Type: `bool` Default: `false` -Requires version ?.?.? or newer +Requires version 1.2.0 or newer ### `url` diff --git a/website/docs/components/processors/schema_registry_encode.md b/website/docs/components/processors/schema_registry_encode.md index 27227d4b3..67c308370 100644 --- a/website/docs/components/processors/schema_registry_encode.md +++ b/website/docs/components/processors/schema_registry_encode.md @@ -171,7 +171,7 @@ Whether Avro Schemas are nested. If true bento will resolve schema references. Type: `bool` Default: `false` -Requires version ?.?.? or newer +Requires version 1.2.0 or newer ### `oauth` From 1f73689f53f551f6fedc220d57e12171d488f673 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 20 Aug 2024 19:30:12 +0100 Subject: [PATCH 08/12] add line breaks to test data Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro_test.go | 70 +++++++++++++++++++++- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index 860193367..102047781 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/Jeffail/gabs/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -146,9 +147,72 @@ func TestAvroReferencesNested(t *testing.T) { tCtx, done := context.WithTimeout(context.Background(), time.Second*10) defer done() - userSchema := `{"type":"record","name":"User","namespace":"com.example","fields":[{"name":"name","type":"string"},{"name":"email","type":"string"},{"name":"address","type":"Address"}]}` - addressSchema := `{"type":"record","name":"Address","namespace":"com.example","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"state","type":"State"},{"name":"zip","type":"string"}]}` - stateSchema := `{"type":"record","name":"State","namespace":"com.example","fields":[{"name":"state","type":"string"}]}` + userSchema := ` + { + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "email", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + addressSchema := ` + { + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "State" + }, + { + "name": "zip", + "type": "string" + } + ] + }` + + stateSchema := ` + { + "type": "record", + "name": "State", + "namespace": "com.example", + "fields": [ + { + "name": "state", + "type": "string" + } + ] + }` + + userGabs, _ := gabs.ParseJSON([]byte(userSchema)) + addressGabs, _ := gabs.ParseJSON([]byte(addressSchema)) + stateGabs, _ := gabs.ParseJSON([]byte(stateSchema)) + + userSchema = userGabs.String() + addressSchema = addressGabs.String() + stateSchema = stateGabs.String() urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { switch path { From a0deaafa9b3dc07144e639833ecef0884ee12d01 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 20 Aug 2024 21:04:24 +0100 Subject: [PATCH 09/12] add maximum recursion limit to resolveAvroReferencesNested() + unit test, where there is a circular dep. Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro.go | 8 +- internal/impl/confluent/serde_avro_test.go | 151 +++++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index 144d4f0e9..c24aaef36 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -68,7 +68,7 @@ func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClie refsMap[name] = info.Schema return nil }); err != nil { - return "", nil + return "", err } var schemaDry any @@ -86,8 +86,12 @@ func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClie } var ref reference + maxIterations := 100 - for { + for i := 0; i <= maxIterations; i++ { + if i == maxIterations { + return "", fmt.Errorf("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected") + } initialSchemaDry := schemaString for k, v := range refsMap { diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index 102047781..15bb54e51 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -301,3 +301,154 @@ func TestAvroReferencesNested(t *testing.T) { }) } } + +func TestAvroReferencesNestedCircularDependency(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + userSchema := ` + { + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "email", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + addressSchema := ` + { + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "State" + }, + { + "name": "zip", + "type": "string" + } + ] + }` + + stateSchema := ` + { + "type": "record", + "name": "State", + "namespace": "com.example", + "fields": [ + { + "name": "state", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + userGabs, _ := gabs.ParseJSON([]byte(userSchema)) + addressGabs, _ := gabs.ParseJSON([]byte(addressSchema)) + stateGabs, _ := gabs.ParseJSON([]byte(stateSchema)) + + userSchema = userGabs.String() + addressSchema = addressGabs.String() + stateSchema = stateGabs.String() + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/com.example.User/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": userSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + case "/subjects/com.example.Address/versions/1", "/schemas/ids/2": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": addressSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.State", "subject": "com.example.State", "version": 1}, + }, + }), nil + case "/subjects/com.example.State/versions/1", "/schemas/ids/3": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": stateSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + } + return nil, nil + }) + + subj, err := service.NewInterpolatedString("com.example.User") + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains []string + }{ + { + name: "a foo", + input: `{"hello":"world"}`, + output: `{}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, true, time.Minute*10, time.Minute, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = encoder.Close(tCtx) + }) + + inMsg := service.NewMessage([]byte(test.input)) + encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg}) + + require.NoError(t, err) + require.Len(t, encodedMsgs, 1) + require.Len(t, encodedMsgs[0], 1) + + encodedMsg := encodedMsgs[0][0] + require.Error(t, encodedMsg.GetError()) + require.Equal(t, "maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected", encodedMsg.GetError().Error()) + + }) + } +} From e191883a49fcd15932c059c4e65841672f6de6b7 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 20 Aug 2024 21:08:12 +0100 Subject: [PATCH 10/12] fix lint Signed-off-by: Jem Davies --- internal/impl/confluent/serde_avro.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index c24aaef36..adabf172d 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -3,6 +3,7 @@ package confluent import ( "context" "encoding/json" + "errors" "fmt" "strings" @@ -90,7 +91,7 @@ func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClie for i := 0; i <= maxIterations; i++ { if i == maxIterations { - return "", fmt.Errorf("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected") + return "", errors.New("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected") } initialSchemaDry := schemaString From 29aae5090be10eecb0b46e9213ceb496aff9cf72 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 20 Aug 2024 21:55:56 +0100 Subject: [PATCH 11/12] add mention of maximum depth on avro_nested_schemas field Signed-off-by: Jem Davies --- internal/impl/confluent/processor_schema_registry_decode.go | 2 +- internal/impl/confluent/processor_schema_registry_encode.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 5530aacd3..abede8352 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -49,7 +49,7 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json."). Advanced().Default(false)). Field(service.NewBoolField("avro_nested_schemas"). - Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)"). Advanced().Default(false).Version("1.2.0")). Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) diff --git a/internal/impl/confluent/processor_schema_registry_encode.go b/internal/impl/confluent/processor_schema_registry_encode.go index 4da894121..0dcb71a07 100644 --- a/internal/impl/confluent/processor_schema_registry_encode.go +++ b/internal/impl/confluent/processor_schema_registry_encode.go @@ -72,7 +72,7 @@ We will be considering alternative approaches in future so please [get in touch] Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json."). Advanced().Default(false).Version("1.0.0")). Field(service.NewBoolField("avro_nested_schemas"). - Description("Whether Avro Schemas are nested. If true bento will resolve schema references."). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)"). Advanced().Default(false).Version("1.2.0")) for _, f := range service.NewHTTPRequestAuthSignerFields() { From c9280018cae8af85d47473145fe916c09e48776a Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 20 Aug 2024 21:57:14 +0100 Subject: [PATCH 12/12] make docs Signed-off-by: Jem Davies --- website/docs/components/processors/schema_registry_decode.md | 2 +- website/docs/components/processors/schema_registry_encode.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/docs/components/processors/schema_registry_decode.md b/website/docs/components/processors/schema_registry_decode.md index 205ba6bac..f5c748404 100644 --- a/website/docs/components/processors/schema_registry_decode.md +++ b/website/docs/components/processors/schema_registry_decode.md @@ -107,7 +107,7 @@ Default: `false` ### `avro_nested_schemas` -Whether Avro Schemas are nested. If true bento will resolve schema references. +Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100) Type: `bool` diff --git a/website/docs/components/processors/schema_registry_encode.md b/website/docs/components/processors/schema_registry_encode.md index 67c308370..b01da1b20 100644 --- a/website/docs/components/processors/schema_registry_encode.md +++ b/website/docs/components/processors/schema_registry_encode.md @@ -166,7 +166,7 @@ Requires version 1.0.0 or newer ### `avro_nested_schemas` -Whether Avro Schemas are nested. If true bento will resolve schema references. +Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100) Type: `bool`