Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add avro_schema_nested field to schema_registry_decode / encode #90

26 changes: 18 additions & 8 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
Field(service.NewBoolField("avro_raw_json").
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json.").
Advanced().Default(false)).
Field(service.NewBoolField("avro_nested_schemas").
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
Advanced().Default(false).Version("1.2.0")).
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
Expand All @@ -71,8 +74,9 @@ func init() {
//------------------------------------------------------------------------------

type schemaRegistryDecoder struct {
avroRawJSON bool
client *schemaRegistryClient
avroRawJSON bool
avroNestedSchemas bool
client *schemaRegistryClient

schemas map[int]*cachedSchemaDecoder
cacheMut sync.RWMutex
Expand Down Expand Up @@ -100,22 +104,28 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, avroNestedSchemas, mgr)
}

func newSchemaRegistryDecoder(
urlStr string,
reqSigner func(f fs.FS, req *http.Request) error,
tlsConf *tls.Config,
avroRawJSON bool,
avroNestedSchemas bool,
mgr *service.Resources,
) (*schemaRegistryDecoder, error) {
s := &schemaRegistryDecoder{
avroRawJSON: avroRawJSON,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
avroRawJSON: avroRawJSON,
avroNestedSchemas: avroNestedSchemas,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
}
var err error
if s.client, err = newSchemaRegistryClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/impl/confluent/processor_schema_registry_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) {
return nil, fmt.Errorf("nope")
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)
require.NoError(t, decoder.Close(context.Background()))

Expand Down Expand Up @@ -455,7 +455,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down
14 changes: 12 additions & 2 deletions internal/impl/confluent/processor_schema_registry_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ We will be considering alternative approaches in future so please [get in touch]
Example("1h")).
Field(service.NewBoolField("avro_raw_json").
Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json.").
Advanced().Default(false).Version("1.0.0"))
Advanced().Default(false).Version("1.0.0")).
Field(service.NewBoolField("avro_nested_schemas").
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
Advanced().Default(false).Version("1.2.0"))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
spec = spec.Field(f.Version("1.0.0"))
Expand All @@ -96,6 +99,7 @@ type schemaRegistryEncoder struct {
client *schemaRegistryClient
subject *service.InterpolatedString
avroRawJSON bool
avroNestedSchemas bool
schemaRefreshAfter time.Duration

schemas map[string]cachedSchemaEncoder
Expand All @@ -121,6 +125,10 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
if err != nil {
return nil, err
}
refreshPeriodStr, err := conf.FieldString("refresh_period")
if err != nil {
return nil, err
Expand All @@ -141,7 +149,7 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, refreshPeriod, refreshTicker, mgr)
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, avroNestedSchemas, refreshPeriod, refreshTicker, mgr)
}

func newSchemaRegistryEncoder(
Expand All @@ -150,12 +158,14 @@ func newSchemaRegistryEncoder(
tlsConf *tls.Config,
subject *service.InterpolatedString,
avroRawJSON bool,
avroNestedSchemas bool,
schemaRefreshAfter, schemaRefreshTicker time.Duration,
mgr *service.Resources,
) (*schemaRegistryEncoder, error) {
s := &schemaRegistryEncoder{
subject: subject,
avroRawJSON: avroRawJSON,
avroNestedSchemas: avroNestedSchemas,
schemaRefreshAfter: schemaRefreshAfter,
schemas: map[string]cachedSchemaEncoder{},
shutSig: shutdown.NewSignaller(),
Expand Down
16 changes: 8 additions & 8 deletions internal/impl/confluent/processor_schema_registry_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestSchemaRegistryEncodeAvroRawJSON(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestSchemaRegistryEncodeAvroLogicalTypes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestSchemaRegistryEncodeAvroRawJSONLogicalTypes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down Expand Up @@ -496,7 +496,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down Expand Up @@ -598,7 +598,7 @@ func TestSchemaRegistryEncodeJSON(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestSchemaRegistryEncodeJSONConstantRefreshes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Millisecond, time.Millisecond*10, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Millisecond, time.Millisecond*10, service.MockResources())
require.NoError(t, err)

input := `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}`
Expand Down
102 changes: 96 additions & 6 deletions internal/impl/confluent/serde_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@ package confluent
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/linkedin/goavro/v2"

"github.com/warpstreamlabs/bento/public/service"
)

type field struct {
Name string `json:"name"`
Type string `json:"type"`
}

type reference struct {
Type string `json:"type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Fields []field `json:"fields"`
}

func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
if len(info.References) == 0 {
return info.Schema, nil
Expand Down Expand Up @@ -45,10 +59,76 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in
return string(schemaHydratedBytes), nil
}

func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
schema, err := resolveAvroReferences(ctx, s.client, info)
func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
gregfurman marked this conversation as resolved.
Show resolved Hide resolved
if len(info.References) == 0 {
return info.Schema, nil
}

refsMap := map[string]string{}
if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error {
refsMap[name] = info.Schema
return nil
}); err != nil {
return "", err
}

var schemaDry any
if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil {
return "", fmt.Errorf("failed to parse root schema as enum: %w", err)
}

schemaRaw, _ := json.Marshal(schemaDry)
schemaString := string(schemaRaw)

var schema reference
err := json.Unmarshal(schemaRaw, &schema)
if err != nil {
return nil, err
return "", fmt.Errorf("failed to unmarshal root schema: %w", err)
}

var ref reference
maxIterations := 100

for i := 0; i <= maxIterations; i++ {
if i == maxIterations {
return "", errors.New("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected")
}
initialSchemaDry := schemaString

for k, v := range refsMap {
err := json.Unmarshal([]byte(refsMap[k]), &ref)
if err != nil {
return "", fmt.Errorf("failed to unmarshal refsMap value %s: %w", k, err)
}

if schema.Namespace == ref.Namespace {
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Name+`"`, `"type":`+v)
} else {
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Namespace+`.`+ref.Name+`"`, `"type":`+v)
}
}
if schemaString == initialSchemaDry {
break
}
}

return schemaString, nil
}

func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
var schema string
var err error

if s.avroNestedSchemas {
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
if err != nil {
return nil, err
}
} else {
schema, err = resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
}
}

var codec *goavro.Codec
Expand Down Expand Up @@ -84,9 +164,19 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaI
}

func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) {
schema, err := resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
var schema string
var err error

if s.avroNestedSchemas {
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
if err != nil {
return nil, err
}
} else {
schema, err = resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
}
}

var codec *goavro.Codec
Expand Down
Loading