Skip to content

Commit

Permalink
Add avro_schema_nested field to schema_registry_decode / encode (#90)
Browse files Browse the repository at this point in the history
* add avro nested schema functionality

Signed-off-by: Jem Davies <jemsot@gmail.com>
  • Loading branch information
jem-davies authored Aug 20, 2024
1 parent 352a5d0 commit 63c8e61
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 38 deletions.
26 changes: 18 additions & 8 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
Field(service.NewBoolField("avro_raw_json").
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json.").
Advanced().Default(false)).
Field(service.NewBoolField("avro_nested_schemas").
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
Advanced().Default(false).Version("1.2.0")).
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
Expand All @@ -71,8 +74,9 @@ func init() {
//------------------------------------------------------------------------------

type schemaRegistryDecoder struct {
avroRawJSON bool
client *schemaRegistryClient
avroRawJSON bool
avroNestedSchemas bool
client *schemaRegistryClient

schemas map[int]*cachedSchemaDecoder
cacheMut sync.RWMutex
Expand Down Expand Up @@ -100,22 +104,28 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, avroNestedSchemas, mgr)
}

func newSchemaRegistryDecoder(
urlStr string,
reqSigner func(f fs.FS, req *http.Request) error,
tlsConf *tls.Config,
avroRawJSON bool,
avroNestedSchemas bool,
mgr *service.Resources,
) (*schemaRegistryDecoder, error) {
s := &schemaRegistryDecoder{
avroRawJSON: avroRawJSON,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
avroRawJSON: avroRawJSON,
avroNestedSchemas: avroNestedSchemas,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
}
var err error
if s.client, err = newSchemaRegistryClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/impl/confluent/processor_schema_registry_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) {
return nil, fmt.Errorf("nope")
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)
require.NoError(t, decoder.Close(context.Background()))

Expand Down Expand Up @@ -455,7 +455,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
return nil, nil
})

decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down
14 changes: 12 additions & 2 deletions internal/impl/confluent/processor_schema_registry_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ We will be considering alternative approaches in future so please [get in touch]
Example("1h")).
Field(service.NewBoolField("avro_raw_json").
Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json.").
Advanced().Default(false).Version("1.0.0"))
Advanced().Default(false).Version("1.0.0")).
Field(service.NewBoolField("avro_nested_schemas").
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
Advanced().Default(false).Version("1.2.0"))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
spec = spec.Field(f.Version("1.0.0"))
Expand All @@ -96,6 +99,7 @@ type schemaRegistryEncoder struct {
client *schemaRegistryClient
subject *service.InterpolatedString
avroRawJSON bool
avroNestedSchemas bool
schemaRefreshAfter time.Duration

schemas map[string]cachedSchemaEncoder
Expand All @@ -121,6 +125,10 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
if err != nil {
return nil, err
}
refreshPeriodStr, err := conf.FieldString("refresh_period")
if err != nil {
return nil, err
Expand All @@ -141,7 +149,7 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, refreshPeriod, refreshTicker, mgr)
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, avroNestedSchemas, refreshPeriod, refreshTicker, mgr)
}

func newSchemaRegistryEncoder(
Expand All @@ -150,12 +158,14 @@ func newSchemaRegistryEncoder(
tlsConf *tls.Config,
subject *service.InterpolatedString,
avroRawJSON bool,
avroNestedSchemas bool,
schemaRefreshAfter, schemaRefreshTicker time.Duration,
mgr *service.Resources,
) (*schemaRegistryEncoder, error) {
s := &schemaRegistryEncoder{
subject: subject,
avroRawJSON: avroRawJSON,
avroNestedSchemas: avroNestedSchemas,
schemaRefreshAfter: schemaRefreshAfter,
schemas: map[string]cachedSchemaEncoder{},
shutSig: shutdown.NewSignaller(),
Expand Down
16 changes: 8 additions & 8 deletions internal/impl/confluent/processor_schema_registry_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestSchemaRegistryEncodeAvroRawJSON(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestSchemaRegistryEncodeAvroLogicalTypes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestSchemaRegistryEncodeAvroRawJSONLogicalTypes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down Expand Up @@ -496,7 +496,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down Expand Up @@ -598,7 +598,7 @@ func TestSchemaRegistryEncodeJSON(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestSchemaRegistryEncodeJSONConstantRefreshes(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Millisecond, time.Millisecond*10, service.MockResources())
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Millisecond, time.Millisecond*10, service.MockResources())
require.NoError(t, err)

input := `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}`
Expand Down
102 changes: 96 additions & 6 deletions internal/impl/confluent/serde_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@ package confluent
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/linkedin/goavro/v2"

"github.com/warpstreamlabs/bento/public/service"
)

type field struct {
Name string `json:"name"`
Type string `json:"type"`
}

type reference struct {
Type string `json:"type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Fields []field `json:"fields"`
}

func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
if len(info.References) == 0 {
return info.Schema, nil
Expand Down Expand Up @@ -45,10 +59,76 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in
return string(schemaHydratedBytes), nil
}

func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
schema, err := resolveAvroReferences(ctx, s.client, info)
func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
if len(info.References) == 0 {
return info.Schema, nil
}

refsMap := map[string]string{}
if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error {
refsMap[name] = info.Schema
return nil
}); err != nil {
return "", err
}

var schemaDry any
if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil {
return "", fmt.Errorf("failed to parse root schema as enum: %w", err)
}

schemaRaw, _ := json.Marshal(schemaDry)
schemaString := string(schemaRaw)

var schema reference
err := json.Unmarshal(schemaRaw, &schema)
if err != nil {
return nil, err
return "", fmt.Errorf("failed to unmarshal root schema: %w", err)
}

var ref reference
maxIterations := 100

for i := 0; i <= maxIterations; i++ {
if i == maxIterations {
return "", errors.New("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected")
}
initialSchemaDry := schemaString

for k, v := range refsMap {
err := json.Unmarshal([]byte(refsMap[k]), &ref)
if err != nil {
return "", fmt.Errorf("failed to unmarshal refsMap value %s: %w", k, err)
}

if schema.Namespace == ref.Namespace {
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Name+`"`, `"type":`+v)
} else {
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Namespace+`.`+ref.Name+`"`, `"type":`+v)
}
}
if schemaString == initialSchemaDry {
break
}
}

return schemaString, nil
}

func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
var schema string
var err error

if s.avroNestedSchemas {
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
if err != nil {
return nil, err
}
} else {
schema, err = resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
}
}

var codec *goavro.Codec
Expand Down Expand Up @@ -84,9 +164,19 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaI
}

func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) {
schema, err := resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
var schema string
var err error

if s.avroNestedSchemas {
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
if err != nil {
return nil, err
}
} else {
schema, err = resolveAvroReferences(ctx, s.client, info)
if err != nil {
return nil, err
}
}

var codec *goavro.Codec
Expand Down
Loading

0 comments on commit 63c8e61

Please sign in to comment.