diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 71ddb230b..abede8352 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -48,6 +48,9 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Field(service.NewBoolField("avro_raw_json"). Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json."). Advanced().Default(false)). + Field(service.NewBoolField("avro_nested_schemas"). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)"). + Advanced().Default(false).Version("1.2.0")). Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) for _, f := range service.NewHTTPRequestAuthSignerFields() { @@ -71,8 +74,9 @@ func init() { //------------------------------------------------------------------------------ type schemaRegistryDecoder struct { - avroRawJSON bool - client *schemaRegistryClient + avroRawJSON bool + avroNestedSchemas bool + client *schemaRegistryClient schemas map[int]*cachedSchemaDecoder cacheMut sync.RWMutex @@ -100,7 +104,11 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr) + avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas") + if err != nil { + return nil, err + } + return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, avroNestedSchemas, mgr) } func newSchemaRegistryDecoder( @@ -108,14 +116,16 @@ func newSchemaRegistryDecoder( reqSigner func(f fs.FS, req *http.Request) error, tlsConf *tls.Config, avroRawJSON bool, + avroNestedSchemas bool, mgr *service.Resources, ) (*schemaRegistryDecoder, error) { s := &schemaRegistryDecoder{ - avroRawJSON: avroRawJSON, - schemas: map[int]*cachedSchemaDecoder{}, - shutSig: shutdown.NewSignaller(), - logger: mgr.Logger(), - mgr: mgr, + avroRawJSON: avroRawJSON, + avroNestedSchemas: avroNestedSchemas, + schemas: map[int]*cachedSchemaDecoder{}, + shutSig: shutdown.NewSignaller(), + logger: mgr.Logger(), + mgr: mgr, } var err error if s.client, err = newSchemaRegistryClient(urlStr, reqSigner, tlsConf, mgr); err != nil { diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index 1b5a518bb..0a97ee867 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -227,7 +227,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -330,7 +330,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -408,7 +408,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) { return nil, fmt.Errorf("nope") }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) require.NoError(t, decoder.Close(context.Background())) @@ -455,7 +455,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -518,7 +518,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources()) require.NoError(t, err) tests := []struct { diff --git a/internal/impl/confluent/processor_schema_registry_encode.go b/internal/impl/confluent/processor_schema_registry_encode.go index dbb5b3edf..0dcb71a07 100644 --- a/internal/impl/confluent/processor_schema_registry_encode.go +++ b/internal/impl/confluent/processor_schema_registry_encode.go @@ -70,7 +70,10 @@ We will be considering alternative approaches in future so please [get in touch] Example("1h")). Field(service.NewBoolField("avro_raw_json"). Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json."). - Advanced().Default(false).Version("1.0.0")) + Advanced().Default(false).Version("1.0.0")). + Field(service.NewBoolField("avro_nested_schemas"). + Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)"). + Advanced().Default(false).Version("1.2.0")) for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("1.0.0")) @@ -96,6 +99,7 @@ type schemaRegistryEncoder struct { client *schemaRegistryClient subject *service.InterpolatedString avroRawJSON bool + avroNestedSchemas bool schemaRefreshAfter time.Duration schemas map[string]cachedSchemaEncoder @@ -121,6 +125,10 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } + avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas") + if err != nil { + return nil, err + } refreshPeriodStr, err := conf.FieldString("refresh_period") if err != nil { return nil, err @@ -141,7 +149,7 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, refreshPeriod, refreshTicker, mgr) + return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, avroNestedSchemas, refreshPeriod, refreshTicker, mgr) } func newSchemaRegistryEncoder( @@ -150,12 +158,14 @@ func newSchemaRegistryEncoder( tlsConf *tls.Config, subject *service.InterpolatedString, avroRawJSON bool, + avroNestedSchemas bool, schemaRefreshAfter, schemaRefreshTicker time.Duration, mgr *service.Resources, ) (*schemaRegistryEncoder, error) { s := &schemaRegistryEncoder{ subject: subject, avroRawJSON: avroRawJSON, + avroNestedSchemas: avroNestedSchemas, schemaRefreshAfter: schemaRefreshAfter, schemas: map[string]cachedSchemaEncoder{}, shutSig: shutdown.NewSignaller(), diff --git a/internal/impl/confluent/processor_schema_registry_encode_test.go b/internal/impl/confluent/processor_schema_registry_encode_test.go index 7d4acee38..704957b9f 100644 --- a/internal/impl/confluent/processor_schema_registry_encode_test.go +++ b/internal/impl/confluent/processor_schema_registry_encode_test.go @@ -129,7 +129,7 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -211,7 +211,7 @@ func TestSchemaRegistryEncodeAvroRawJSON(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -293,7 +293,7 @@ func TestSchemaRegistryEncodeAvroLogicalTypes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -370,7 +370,7 @@ func TestSchemaRegistryEncodeAvroRawJSONLogicalTypes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -435,7 +435,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) require.NoError(t, encoder.Close(context.Background())) @@ -496,7 +496,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) require.NoError(t, encoder.Close(context.Background())) @@ -598,7 +598,7 @@ func TestSchemaRegistryEncodeJSON(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -691,7 +691,7 @@ func TestSchemaRegistryEncodeJSONConstantRefreshes(t *testing.T) { subj, err := service.NewInterpolatedString("foo") require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Millisecond, time.Millisecond*10, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Millisecond, time.Millisecond*10, service.MockResources()) require.NoError(t, err) input := `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}` diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_avro.go index 337bb717c..adabf172d 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_avro.go @@ -3,13 +3,27 @@ package confluent import ( "context" "encoding/json" + "errors" "fmt" + "strings" "github.com/linkedin/goavro/v2" "github.com/warpstreamlabs/bento/public/service" ) +type field struct { + Name string `json:"name"` + Type string `json:"type"` +} + +type reference struct { + Type string `json:"type"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Fields []field `json:"fields"` +} + func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) { if len(info.References) == 0 { return info.Schema, nil @@ -45,10 +59,76 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in return string(schemaHydratedBytes), nil } -func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) { - schema, err := resolveAvroReferences(ctx, s.client, info) +func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) { + if len(info.References) == 0 { + return info.Schema, nil + } + + refsMap := map[string]string{} + if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error { + refsMap[name] = info.Schema + return nil + }); err != nil { + return "", err + } + + var schemaDry any + if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil { + return "", fmt.Errorf("failed to parse root schema as enum: %w", err) + } + + schemaRaw, _ := json.Marshal(schemaDry) + schemaString := string(schemaRaw) + + var schema reference + err := json.Unmarshal(schemaRaw, &schema) if err != nil { - return nil, err + return "", fmt.Errorf("failed to unmarshal root schema: %w", err) + } + + var ref reference + maxIterations := 100 + + for i := 0; i <= maxIterations; i++ { + if i == maxIterations { + return "", errors.New("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected") + } + initialSchemaDry := schemaString + + for k, v := range refsMap { + err := json.Unmarshal([]byte(refsMap[k]), &ref) + if err != nil { + return "", fmt.Errorf("failed to unmarshal refsMap value %s: %w", k, err) + } + + if schema.Namespace == ref.Namespace { + schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Name+`"`, `"type":`+v) + } else { + schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Namespace+`.`+ref.Name+`"`, `"type":`+v) + } + } + if schemaString == initialSchemaDry { + break + } + } + + return schemaString, nil +} + +func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) { + var schema string + var err error + + if s.avroNestedSchemas { + schema, err = resolveAvroReferencesNested(ctx, s.client, info) + if err != nil { + return nil, err + } + } else { + schema, err = resolveAvroReferences(ctx, s.client, info) + if err != nil { + return nil, err + } } var codec *goavro.Codec @@ -84,9 +164,19 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaI } func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) { - schema, err := resolveAvroReferences(ctx, s.client, info) - if err != nil { - return nil, err + var schema string + var err error + + if s.avroNestedSchemas { + schema, err = resolveAvroReferencesNested(ctx, s.client, info) + if err != nil { + return nil, err + } + } else { + schema, err = resolveAvroReferences(ctx, s.client, info) + if err != nil { + return nil, err + } } var codec *goavro.Codec diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_avro_test.go index 706180114..15bb54e51 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_avro_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/Jeffail/gabs/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -90,10 +91,10 @@ func TestAvroReferences(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -141,3 +142,313 @@ func TestAvroReferences(t *testing.T) { }) } } + +func TestAvroReferencesNested(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + + userSchema := ` + { + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "email", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + addressSchema := ` + { + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "State" + }, + { + "name": "zip", + "type": "string" + } + ] + }` + + stateSchema := ` + { + "type": "record", + "name": "State", + "namespace": "com.example", + "fields": [ + { + "name": "state", + "type": "string" + } + ] + }` + + userGabs, _ := gabs.ParseJSON([]byte(userSchema)) + addressGabs, _ := gabs.ParseJSON([]byte(addressSchema)) + stateGabs, _ := gabs.ParseJSON([]byte(stateSchema)) + + userSchema = userGabs.String() + addressSchema = addressGabs.String() + stateSchema = stateGabs.String() + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/com.example.User/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": userSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + case "/subjects/com.example.Address/versions/1", "/schemas/ids/2": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": addressSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.State", "subject": "com.example.State", "version": 1}, + }, + }), nil + case "/subjects/com.example.State/versions/1", "/schemas/ids/3": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": stateSchema, + "schemaType": "AVRO", + }), nil + } + return nil, nil + }) + + subj, err := service.NewInterpolatedString("com.example.User") + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains []string + }{ + { + name: "a foo", + input: `{"name":"JohnDoe","email":"john.doe@example.com","address":{"street":"123MainSt","city":"Anytown","state":{"state":"CA"},"zip":"12345"}}`, + output: `{}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, true, time.Minute*10, time.Minute, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = encoder.Close(tCtx) + }) + + inMsg := service.NewMessage([]byte(test.input)) + encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg}) + require.NoError(t, err) + require.Len(t, encodedMsgs, 1) + require.Len(t, encodedMsgs[0], 1) + + encodedMsg := encodedMsgs[0][0] + + if len(test.errContains) > 0 { + require.Error(t, encodedMsg.GetError()) + for _, errStr := range test.errContains { + assert.Contains(t, encodedMsg.GetError().Error(), errStr) + } + return + } + + b, err := encodedMsg.AsBytes() + require.NoError(t, err) + + require.NoError(t, encodedMsg.GetError()) + require.NotEqual(t, test.input, string(b)) + + var n any + require.Error(t, json.Unmarshal(b, &n), "message contents should no longer be valid JSON") + + }) + } +} + +func TestAvroReferencesNestedCircularDependency(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + userSchema := ` + { + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "email", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + addressSchema := ` + { + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + { + "name": "street", + "type": "string" + }, + { + "name": "city", + "type": "string" + }, + { + "name": "state", + "type": "State" + }, + { + "name": "zip", + "type": "string" + } + ] + }` + + stateSchema := ` + { + "type": "record", + "name": "State", + "namespace": "com.example", + "fields": [ + { + "name": "state", + "type": "string" + }, + { + "name": "address", + "type": "Address" + } + ] + }` + + userGabs, _ := gabs.ParseJSON([]byte(userSchema)) + addressGabs, _ := gabs.ParseJSON([]byte(addressSchema)) + stateGabs, _ := gabs.ParseJSON([]byte(stateSchema)) + + userSchema = userGabs.String() + addressSchema = addressGabs.String() + stateSchema = stateGabs.String() + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/com.example.User/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": userSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + case "/subjects/com.example.Address/versions/1", "/schemas/ids/2": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": addressSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.State", "subject": "com.example.State", "version": 1}, + }, + }), nil + case "/subjects/com.example.State/versions/1", "/schemas/ids/3": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 1, + "schema": stateSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "com.example.Address", "subject": "com.example.Address", "version": 1}, + }, + }), nil + } + return nil, nil + }) + + subj, err := service.NewInterpolatedString("com.example.User") + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains []string + }{ + { + name: "a foo", + input: `{"hello":"world"}`, + output: `{}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, true, time.Minute*10, time.Minute, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = encoder.Close(tCtx) + }) + + inMsg := service.NewMessage([]byte(test.input)) + encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg}) + + require.NoError(t, err) + require.Len(t, encodedMsgs, 1) + require.Len(t, encodedMsgs[0], 1) + + encodedMsg := encodedMsgs[0][0] + require.Error(t, encodedMsg.GetError()) + require.Equal(t, "maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected", encodedMsg.GetError().Error()) + + }) + } +} diff --git a/internal/impl/confluent/serde_json_test.go b/internal/impl/confluent/serde_json_test.go index bfc93f420..53ddd399e 100644 --- a/internal/impl/confluent/serde_json_test.go +++ b/internal/impl/confluent/serde_json_test.go @@ -93,10 +93,10 @@ func TestResolveJsonSchema(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_protobuf_test.go b/internal/impl/confluent/serde_protobuf_test.go index 6dcec5216..f5c72a9b0 100644 --- a/internal/impl/confluent/serde_protobuf_test.go +++ b/internal/impl/confluent/serde_protobuf_test.go @@ -79,10 +79,10 @@ message bar { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -209,10 +209,10 @@ message bar { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -268,7 +268,7 @@ func runEncoderAgainstInputsMultiple(t testing.TB, urlStr, subject string, input subj, err := service.NewInterpolatedString(subject) require.NoError(t, err) - encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { _ = encoder.Close(tCtx) diff --git a/website/docs/components/processors/schema_registry_decode.md b/website/docs/components/processors/schema_registry_decode.md index 33f2b5afc..f5c748404 100644 --- a/website/docs/components/processors/schema_registry_decode.md +++ b/website/docs/components/processors/schema_registry_decode.md @@ -43,6 +43,7 @@ schema_registry_decode: label: "" schema_registry_decode: avro_raw_json: false + avro_nested_schemas: false url: "" # No default (required) oauth: enabled: false @@ -104,6 +105,15 @@ Whether Avro messages should be decoded into normal JSON ("json that meets the e Type: `bool` Default: `false` +### `avro_nested_schemas` + +Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100) + + +Type: `bool` +Default: `false` +Requires version 1.2.0 or newer + ### `url` The base URL of the schema registry service. diff --git a/website/docs/components/processors/schema_registry_encode.md b/website/docs/components/processors/schema_registry_encode.md index 1f3d3ae7b..b01da1b20 100644 --- a/website/docs/components/processors/schema_registry_encode.md +++ b/website/docs/components/processors/schema_registry_encode.md @@ -50,6 +50,7 @@ schema_registry_encode: subject: foo # No default (required) refresh_period: 10m avro_raw_json: false + avro_nested_schemas: false oauth: enabled: false consumer_key: "" @@ -163,6 +164,15 @@ Type: `bool` Default: `false` Requires version 1.0.0 or newer +### `avro_nested_schemas` + +Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100) + + +Type: `bool` +Default: `false` +Requires version 1.2.0 or newer + ### `oauth` Allows you to specify open authentication via OAuth version 1.