Skip to content

Schema Registry

Andrew Robertson edited this page Jun 29, 2021 · 6 revisions

The following assumes some familiarity with pravega https://pravega.io

Overview

A schema must be defined in order for you to query from a pravega stream. This schema is used by the connector to tell presto what the table definition is, and also helps the connector deserialize pravega event data in the stream.

We use the convention pravega scope.stream = presto schema.table (pravega scope maps to a presto schema, pravega scope.stream maps to presto schema.table)

To manage this, the connector has the abstract idea of a "Schema Registry" (SR) - some store where we can retrieve schemas that are associated with a given pravega stream.

Options for Schema Management

Pravega Schema Registry (PSR)

https://github.com/pravega/schema-registry

To enable, set the pravega.schema-registry configuration value in $PRESTO_HOME/etc/catalog/pravega.properties to your pravega schema-registry deployment, for e.g.:

pravega.schema-registry=http://localhost:9092

A couple things are required in order for the connector to work with PSR to find schemas for your streams

  • You must have a group in PSR with the name format {schema}.{table}, for example sensors.car
  • You must then add your schema added to the group (Note: currently we support only a single schema per stream)

Group

Group has format, some compatibility options, and finally optional properties. These are specified when creating a group:

  • More on the properties later (i.e. what does inline mean?)
registryClient.addGroup(
        "sensors.car",
        new GroupProperties(
                SerializationFormat.Avro,
                Compatibility.allowAny(),
                false,
                ImmutableMap.<String, String>builder().put(inline ? "inline" : "", "").build()));

Schema

schema must be added. There are a couple of choices here. Which option you choose will depend on how you are using pravega and PSR. https://github.com/pravega/schema-registry/wiki/Sample-Usage:-Pravega-Application

When writing events to pravega you must provide a serializer. PSR also provides serializers that will take care of serialize/deserialize and register schemas.

DIY

With this option you must add a schema explicitly

registryClient.addSchema("sensors.car", AvroSchema.of(CarSensors.class).getSchemaInfo());

PSR managed

With this option you can create a PSR serializer in which case it will add the schema to the group for you

SerializerConfig serializerConfig = SerializerConfig.builder()
	.groupId("sensors.car").registryConfig(schemaRegistryClientConfig)
	.registerSchema(true)
	.build();

Serializer<CarSensors> serializer = 
	SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(CarSensors.class));

Back to group properties: if using this option (PSR managed) you must include the key "inline" in your group properties. This is a temporary measure - please see: https://github.com/pravega/presto-connector/issues/20

Local Schema Registry (LSR)

You can manage schemas using static json files on local disk which contain stream, key/value table, and schema info.

This directory defaults to $PRESTO_HOME/etc/pravega/, but can be configured with property pravega.table-description-dir

  • Note: Schema in Local SR will override similarly named schemas in other registries
  • Note: These schema config files are only necessary on the presto coordinator

File name format: {schema}.{table}.json

There are a couple of required fields. schemaName, tableName, objectName. tableName and objectName usually will be equal. The "event" field is where you can specify an actual schema.

There are 2 options here:

  • Fields defined in file
  • Link to schema (file, url)

Fields defined in file

You may include the schmea directly, for example:

{
    "schemaName": "hr",
    "tableName": "employee",
    "objectName": "employee",
    "event": [{
        "dataFormat": "json",
        "fields": [
            {
                "name": "id",
                "mapping": "id",
                "type": "BIGINT"
            },        
            {
                "name": "first",
                "mapping": "first",
                "type": "VARCHAR(25)"
            },
            {
                "name": "last",
                "mapping": "last",
                "type": "VARCHAR(25)"
            }
        ]
    }]
}

Link to schema (file, url)

You can provide a pointer to a file containing the schema (same $PRESTO_HOME/etc/pravega directory) or url:

{
    "schemaName": "hr",
    "tableName": "employee",
    "objectName": "employee",
  	"event": [
    	{
      		"dataFormat": "avro",
      		"dataSchema": "employee.avsc"
    	}
    ]
}

Where employee.avsc is a standard avro schema defined in json:

{
    "namespace": "io.pravega.avro",
    "type": "record",
    "name": "Employee",
    "fields": [
    	{"name": "id", "type": "int"},
        {"name": "first", "type": "string"},
        {"name": "last", "type": "string"}
    ]
}

Local SR options

Using Local SR provides a couple of extra options.

Multi Source Streams

It may be the case that different streams are related to each other and share the same schema. These streams can be presented and queried as if they are a single table.

There are 2 options for defining component streams (multiexample.server.json)

Regular expressions

objectName can contain a regular expression. We will look for stream names within the pravega scope (schemaName) that match the regex.

{
  "schemaName": "multiexample",
  "tableName": "server",
  "objectName" : "server[0-9]"
}
Explicitly listed

Specify exactly which streams you would like by including a list to "objectArgs"

{
  "schemaName": "multiexample",
  "tableName": "server",
  "objectName": "server",
  "objectArgs" : ["server0","server2"]
}

Key Value Tables

The next option available in Local SR is to setup querying Pravega KeyValue Tables https://pravega.io/docs/v0.9.0/javadoc/clients/index.html?io/pravega/client/tables/KeyValueTable.html

It is different from a stream scheama in a couple of ways.

  • objectType must be KV_TABLE
  • objectArgs will be a list of key families to include
  • there will be 2 schemas - 1 for the key, 1 for the value
{
  "schemaName": "kv",
  "tableName": "employee",
  "objectName": "employee",
  "objectType": "KV_TABLE",
  "objectArgs": ["kf1", "kf2"],
  "event": [
    {
      "dataFormat": "avro",
      "dataSchema": "employee-id.avsc"
    },
    {
      "dataFormat": "avro",
      "dataSchema": "employee-value.avsc"
    }
  ]
}

Key and value schema will be "flattened" into a single presto table definition. for e.g.: describe pravega.kv.employee might show something like this for fields where the field names are prefixed:

key/id
value/first
value/last

Code Impl.

The term 'schema' is overloaded. Presto schema is a database schema and will contain a collection of tables. There is also 'schema' as applied to the data itself. i.e. what is the definition of the data you are storing.

In the connector we need to know 2 things:

  • what are the available presto schemas, and tables within those schemas
  • what are the fields for those tables (what does the data look like)

This info may come from different places (for example list of schemas + tables may come from local disk, data schema stored elsewhere). There are 2 APIs for this. SchemaSupplier, and SchemaRegistry.

SchemaSupplier

  • listSchemas should return available presto schemas
  • listTables should return tables available within the given schema
public interface SchemaSupplier
{
    List<String> listSchemas();

    List<PravegaTableHandle> listTables(String schema);
}

SchemaRegistry

  • getSchema return schema (field definitions) needed for the given table name.
  • getTable should return the full table definition. (PravegaStreamDescription is basically simple table metadata + schema)
public interface SchemaRegistry
{
    List<PravegaStreamFieldGroup> getSchema(SchemaTableName schemaTableName);

    PravegaStreamDescription getTable(SchemaTableName schemaTableName);
}