Schema Registry Support #1010
Replies: 6 comments 6 replies
-
This honestly doesn't feel like much of a con to me. I'd say having this handling individually configurable and usable adds a degree of flexibility. I agree with the recommendation of doing Option 1 as the first pass. |
Beta Was this translation helpful? Give feedback.
-
IIUC, the idea is to have a single processor, which would be configured with one of the supported schema registries (versus one processor per supported schema registry)? |
Beta Was this translation helpful? Give feedback.
-
There are a couple of decisions we need to make regarding the implementation:
The first two are kind of related since most libraries provide both, although in some cases they are decoupled so we could choose to take only one piece out of a library. I'll break down each part into the available options and at the end explain my recommendation. Schema Registry ClientIn this part we just need a client that makes it easy to interact with a schema registry. What we need from the client:
There are a couple of options:
Serialization/deserialization (serde) layerHere we need a layer that allows us to serialize/deserialize payloads. Requirements for this layer:
Here we have similar options to before:
Avro libraryGiven that we are always working with dynamic data and have no specific Go struct we will need to write the logic to extract a schema from a payload (raw or structured). In other words, we can't use a function like The requirements for the Avro library:
Options:
RecommendationI recommend choosing the following libraries:
|
Beta Was this translation helpful? Give feedback.
-
Regarding the processors themselves, I propose to have 4 processors, one pair for decoding and encoding the payload, another for the key. Keep in mind that I'm choosing the name of the processors based on the naming we are currently using for processors (all lower case letters without separators), this will possibly be reworked in #997.
Footnotes |
Beta Was this translation helpful? Give feedback.
-
Adding the reference of this other design document as it's being finalized #1532 |
Beta Was this translation helpful? Give feedback.
-
Done in #984. |
Beta Was this translation helpful? Give feedback.
-
Introduction
This document describes the design for adding Schema Registry support to Conduit.
Background
A Schema Registry is a tool that helps Conduit understand the format of the data it receives. Often, data is sent in binary formats that can make it difficult to interpret without knowing the schema beforehand. By adding support for a Schema Registry, Conduit can understand how to parse the binary data and turn it into structured data that can be manipulated by processors.
At the same time, uploading the schema of a structured payload to a registry can also be helpful. This allows Conduit to take structured data and compress it into a binary format for transmission. By doing so, Conduit can optimize the amount of data being sent, making it faster and more efficient.
Goals
This design should allow Conduit to do the following:
Note: outputting a binary format in connectors is not in the scope of this design. Here we only target decoding and encoding the key and payload fields of an OpenCDC record, but not the OpenCDC record itself.
Implementation options
Option 1 - 2 processors
Implement 2 separate processors.
The first processor knows how to fetch a schema from a schema registry and decode raw data using the schema. It needs to be able to process fields
Record.Key
,Record.Paylod.Before
and/orRecord.Payload.After
. If the schema is not found the processor fails.The second processor is able to encode structured data into a binary format (e.g. Avro, Protobuf) and produce a schema that can be used to decode it back to structured data. It can also upload the schema to a schema registry. If the schema can't be uploaded to the schema registry the processor fails.
By default, these processors use a predefined metadata field as the schema ID (e.g.
conduit.key.schemaId
,conduit.payload.before.schemaId
orconduit.payload.after.schemaId
). The user is able to override this behavior and configure a custom schema ID using static data and/or data taken from the record (e.g. metadata).When other processors manipulate the record we do not need to track those changes and update the schema, because the schema is extracted on demand by the processor.
Each processor needs a schema registry URL in its configuration. To make this simpler we can later add the option to configure a pipeline schema registry URL and/or a global schema registry URL. The processor would then pick the first URL it can find in the hierarchy (processor > pipeline > global).
Pros
Record
type does not need to change.Cons
User needs to understand and configure 2 processors.
A schema can't be extracted from structured data in a lossless way.
For example, consider an Avro schema where a field can contain multiple types and has a default value:
It's impossible to extract this schema from a concrete value without knowing the original schema.
Option 2 - Attach Schema to Record
This option builds on top of option 1 and tries to address the last con, lossless schema handling.
Conduit provides processors to decode and encode data using a schema, but additionally attaches the schema to the record. This means Conduit needs to somehow track changes done to the key and payload of a record and update the schema accordingly. Every time a processor changes a field (i.e. creates, updates, deletes it) Conduit needs to detect it and update the schema attached to the record.
Pros
Cons
Questions
Recommendation
We propose to start with option 1 and add option 2 if/when we see that there's a real need to produce the same schema as we get by the schema registry.
Beta Was this translation helpful? Give feedback.
All reactions