The goal of this tutorial is to discuss approaches to achieve resilient data microservices (or data streams) that can cope with the evolution of the data that flows through it.
The schema evolution topic has been discussed by Martin Kleppmann : schema-evolution-in-avro-protocol-buffers-thrift, and his book : Designing data intensive applications has an entire chapter on the subject.
Our intention is to demonstrate how could we apply those lessons using Spring Cloud Stream to create robust evolutionary data microservices.
Our proposed solution is to use a combination of a Schema Repository and specific serialization frameworks that can support different levels of schema evolution.
In our sample application we create an imaginary sensor data event (based on the android sensor framework), which schema has evolved between two versions. You can see the difference of each model bellow:
public class Sensor {
private String id;
private float temperature;
private float velocity;
private float acceleration;
private float[] accelerometer;
private float[] magneticField;
private float[] orientation;
}
public class Sensor {
private String id;
private float internaltemperature; #(1)
private float externalTemperature; #(2)
private float velocity;
private float acceleration;
private float[] accelerometer;
private float[] magneticField; #(3)
}
-
Field name change
-
An extra field has been added
-
The field orientation has been deprecated and its missing
"Schema evolution refers to the problem of evolving a database schema to adapt it to a change in the modeled reality" Wikipedia
There’s three main patterns for schema evolution: Backward compatibility, forward compatibility, full compatibility
In this case, a reader using a newer schema version should be capable of reading data written with an older version.
On this scenario, a data service written using an older schema should be able to read data from producers that are using a newer version
We can say that schemas are fully compatible if given a set of Schemas { v1, v2,…,vn}, any given version can read from any other combination on the set.
To support the compatibility modes described above, the schema evolution should be treated as a design time decision. Today most developers see it as an afterthought, only to realize later that version maintenance can become a real pain.
You have to make sure you pick the right serialization framework to support schema evolution. Most java frameworks will allow at least one form of the modes described above.
The essential component on a distributed data oriented microservices deployment is the schema registry. Not only it will store and version all the schemas used on your deployment, but it can also work as a design validation tool, one that would not enable the inclusion of new schemas if they clash with previous versions in use. This can be a great asset when planning to migrate versions of your services.
At it’s core, avro relies on a schema definition file in order to be able to read and write data. The schema definition for our Sensor data looks like this:
{
"namespace" : "io.igx.android",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"temperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0},
{"name":"accelerometer","type":[
"null",{
"type":"array",
"items":"float"
}
]},
{"name":"magneticField","type":[
"null",{
"type":"array",
"items":"float"
}
]},
{"name":"orientation","type":[
"null",{
"type":"array",
"items":"float"
}
]}
]
}
When it comes to Serialize/Deserialize data we have a couple of options when using avro
In this mode, you need to generate the source file that maps to the schema. There’s very good support for maven plugins, this along with a good IDE will make very easy to generate the sources without polluting your codebase.
The following is a snippet of the generated Sensor.java
class.
public class Sensor extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Sensor\",\"namespace\":\"io.igx.android\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"temperature\",\"type\":\"float\",\"default\":0.0},{\"name\":\"acceleration\",\"type\":\"float\",\"default\":0.0},{\"name\":\"velocity\",\"type\":\"float\",\"default\":0.0},{\"name\":\"accelerometer\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]},{\"name\":\"magneticField\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]},{\"name\":\"orientation\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"float\"}]}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.lang.CharSequence id;
@Deprecated public float temperature;
@Deprecated public float acceleration;
@Deprecated public float velocity;
@Deprecated public java.util.List<java.lang.Float> accelerometer;
@Deprecated public java.util.List<java.lang.Float> magneticField;
@Deprecated public java.util.List<java.lang.Float> orientation;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Sensor() {}
/**
* All-args constructor.
*/
public Sensor(java.lang.CharSequence id, java.lang.Float temperature, java.lang.Float acceleration, java.lang.Float velocity, java.util.List<java.lang.Float> accelerometer, java.util.List<java.lang.Float> magneticField, java.util.List<java.lang.Float> orientation) {
this.id = id;
this.temperature = temperature;
this.acceleration = acceleration;
this.velocity = velocity;
this.accelerometer = accelerometer;
this.magneticField = magneticField;
this.orientation = orientation;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return id;
case 1: return temperature;
case 2: return acceleration;
case 3: return velocity;
case 4: return accelerometer;
case 5: return magneticField;
case 6: return orientation;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
The target class will have a getSchema()
method that returns the original schema,
this can be handy when dealing with SpecificDatumReaders
You can then use the SpecificDatumWriter
to serialize this class
Sensor sensor = Sensor.newBuilder().build();
SpecificDatumWriter<Sensor> writer = new SpecificDatumWriter<>(Sensor.class);
DataFileWriter<Sensor> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.create(sensor.getSchema(),new File("sensors.dat"));
dataFileWriter.append(sensor);
dataFileWriter.close();
Another approach that offers a great deal of flexibility while respecting the schema for type validation is
to use a GenericRecord
. It works as a container, you can put entries on it, and
it will validate them according to the schema. With this approach you don’t need to generate classes.
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse("sensor.avsc");
GenericRecord sensor = new GenericData.Record(schema);
sensor.put("temperature",21.5);
sensor.put("acceleration",3.7);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.create(schema,new File("sensors.dat"));
dataFileWriter.append(sensor);
dataFileWriter.close();
Another approach is to have a Pojo mapped to your schema and use a ReflectDatumWriter
.
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse("tweet.avsc");
Tweet tweet = new Tweet();
ReflectDatumWriter<Tweet> writer = new ReflectDatumWriter<>(schema);
DataFileWriter<Tweet> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(tweet);
dataFileWriter.close();
This approach is good when you can’t generate classes, an example is if you need to integrate with a third party framework. Imagine if you want to use a Twitter framework to receive tweets and just serialize them without having to deal with any mapping between the framework type and your own type.
Spring Cloud Stream uses a codec abstraction to serialize data that is written/read from the channels. The interface is listed bellow
public interface Codec {
void encode(Object object, OutputStream outputStream) throws IOException;
byte[] encode(Object object) throws IOException;
<T> T decode(InputStream inputStream, Class<T> type) throws IOException;
<T> T decode(byte[] bytes, Class<T> type) throws IOException;
}
Let’s start with the format. Since we own both encoding and decoding parts it means we can add more information on the wire to help us out figure out how to read/write data.
A common pattern on binary protocols is to write a few bytes before the payload that can help us identify the data that is about to be read. If you look at Kafka message protocol for example, it uses ApiKey an ApiVersion as bytes in the beginning of the message.
This is where a schema repository comes in hand. As discussed by Martin Kleppmann : schema-evolution-in-avro-protocol-buffers-thrift and also proposed on AVRO-1124.
The basic idea is that your component should register automatically the schema during startup (much like Spring Cloud Eureka does ), by doing this, you should have an unique number that identifies your schema, and you can then use it to add to the message payload.
With this in mind the encoding
piece would look like this
@Override
public void encode(Object object, OutputStream outputStream) throws IOException {
Schema schema = getSchema(object); #(1)
Integer id = schemaRegistryClient.register(schema); #(2)
DatumWriter writer = getDatumWriter(object.getClass(),schema); #(3)
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
outputStream.write(ByteBuffer.allocate(4).putInt(id).array());
writer.write(object,encoder);
encoder.flush();
}
@Override
public byte[] encode(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
encode(o,baos);
return baos.toByteArray();
}
-
If we are using
GenericRecord
or a generated class, obtaining a schema is easy, since we just need to call thegetSchema
method of the object. If we are using Reflection, than a local schema cache needs to exist. We can leverage Spring Boot Auto configuration to register all schema files and map them to classes with the same namespace. -
Registering a schema will return a new id in case of a new schema or the existing id of a pre-registered schema
-
To obtain the right
DatumWriter
we use the same logic as in <1> if it’s aGenericRecord
orSpecificRecord
we useGenericDatumWriter
orSpecificDatumWriter
respectively, else we useReflectDatumWriter
The decoding process is very similar, on a reverse order now
@Override
public <T> T decode(InputStream inputStream, Class<T> type) throws IOException {
return decode(IOUtils.toByteArray(inputStream),type);
}
@Override
public <T> T decode(byte[] bytes, Class<T> type) throws IOException {
Assert.notNull(bytes, "'bytes' cannot be null");
Assert.notNull(bytes, "Class can not be null");
ByteBuffer buf = ByteBuffer.wrap(bytes);
byte[] payload = new byte[bytes.length-4];
Integer schemaId = buf.getInt(); #(1)
buf.get(payload); #(2)
Schema schema = schemaRegistryClient.fetch(schemaId); #(3)
DatumReader reader = getDatumReader(type,schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(payload,null);
return (T) reader.read(null,decoder);
}
-
First we find the schema id from the encoded data
-
Copy the remaining (payload) bytes
-
Retrieve the schema from the registry
As you can note, our codec is very simple, we only add four extra bytes to represent the schema identifier and as long as we can resolve them using an external repository it’s just a simple avro serialization procedure.
But there’s a small catch when it comes to resolving the writer and reader schema.
public void encode(Object object, OutputStream outputStream) throws IOException {
Schema schema = getSchema(object); #(1)
...
private Schema getSchema(Object payload){
Schema schema = null;
logger.debug("Obtaining schema for class {}", payload.getClass());
if(GenericContainer.class.isAssignableFrom(payload.getClass())) { #(2)
schema = ((GenericContainer) payload).getSchema();
logger.debug("Avro type detected, using schema from object");
}else{ #(3)
Integer id = localSchemaMap.get(payload.getClass().getName()); #(4)
if(id == null){
if(!properties.isDynamicSchemaGenerationEnabled()) {
throw new SchemaNotFoundException(String.format("No schema found on local cache for %s", payload.getClass()));
}
else{
Schema localSchema = ReflectData.get().getSchema(payload.getClass()); #(5)
id = schemaRegistryClient.register(localSchema);
}
}
schema = schemaRegistryClient.fetch(id);
}
return schema;
}
-
The first to do before serializing is finding the schema for the type
-
When the object to be serialized is an avro type, this is very simple as those types have the schema builtin with them
-
What about if we were using Refection and we don’t have a generated source or a GenericRecord?
-
If any schema file (*.avsc) is detected on the classpath, we register those schemas with the registry and store locally using the FQN of the class as the key
-
When locating a schema, if the local schema is not found and we enabled dynamic schema generation we generate the schema during runtime using reflection and register it with the registry.
I don’t believe that the dynamic schema generation to be a good idea, it can create schemas that are brittle and not ready for evolution (no default values, aliases or unions), hence why it’s disabled by default, you should enable it with caution.
The modules producer_v1, producer_v2, consumer_v1, consumer_v2
contains sample applications that can be run to demonstrate the schema evolution capabilities when using the registry and a codec.
You will need a confluent schema registry up and running for those to run, please look here on how to setup docker images that bootstrap the schema registry.
What’s the fun of writing a blog post if you can’t write your own biased benchmarks?
Just for fun I ran a few tests of Avro vs JSON (using Jackson), you can see the results bellow
Domain |
size in bytes |
|
Avro |
JSON |
|
Sensor |
94 |
237 |
Twitter Status |
847 |
3517 |
Domain |
Average time in nanoseconds |
|||
Avro read |
Avro write |
JSON read |
JSON write |
|
Sensor |
1333 |
1300 |
3433 |
3483 |