Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 20: auto detect whether stream includes SchemaRegistry encoding info #42

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
10 changes: 6 additions & 4 deletions prestodb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ plugins {
id 'java'
id 'distribution'
id 'maven'
id 'com.github.davidmc24.gradle.plugin.avro-base' version "1.0.0"
}

apply plugin: 'java'
apply from: "$rootDir/gradle/checkstyle.gradle"

apply plugin: "com.github.davidmc24.gradle.plugin.avro"
sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/'

repositories {
mavenLocal()

maven {
url = uri('https://oss.jfrog.org/jfrog-dependencies')
}

maven {
url = uri('https://jitpack.io')
}
Expand Down Expand Up @@ -67,6 +68,7 @@ dependencies {
testImplementation "com.facebook.airlift:testing:${airliftTestingVersion}"

testCompile (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion)
testImplementation "io.pravega:schemaregistry-server:${pravegaSchemaRegistryVersion}"

compileOnly "io.airlift:slice:${airliftSliceVersion}"
compileOnly "io.airlift:units:${airliftUnitsVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@

import static io.pravega.connectors.presto.PravegaHandleResolver.convertSplit;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO_INLINE;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.CSV;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.JSON;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.JSON_INLINE;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.PROTOBUF;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.PROTOBUF_INLINE;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -166,18 +163,12 @@ private KVSerializer<?> serializer(PravegaObjectSchema schema, SerializerConfig
{
switch (schema.getFormat()) {
case AVRO:
return new AvroSerializer(schema.getSchemaLocation().get());
case AVRO_INLINE:
return new AvroSerializer(serializerConfig);
return new AvroSerializer(serializerConfig, schema.getSchemaLocation().get());

case PROTOBUF:
return new ProtobufSerializer(schema.getSchemaLocation().get());
case PROTOBUF_INLINE:
return new ProtobufSerializer(serializerConfig);
return new ProtobufSerializer(serializerConfig, schema.getSchemaLocation().get());

case JSON:
return new JsonSerializer();
case JSON_INLINE:
return new JsonSerializer(serializerConfig);

case CSV:
Expand All @@ -192,15 +183,12 @@ private EventDecoder eventDecoder(PravegaObjectSchema schema, Set<DecoderColumnH
{
switch (schema.getFormat()) {
case AVRO:
case AVRO_INLINE:
return new AvroRowDecoder(decoderColumnHandles);

case PROTOBUF:
case PROTOBUF_INLINE:
return new ProtobufRowDecoder(decoderColumnHandles);

case JSON:
case JSON_INLINE:
return jsonRowDecoderFactory.create(decoderColumnHandles);

case CSV: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.StreamManager;
Expand Down Expand Up @@ -57,10 +56,6 @@
import static io.pravega.connectors.presto.util.PravegaNameUtils.multiSourceStream;
import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_streamNameToTableName;
import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_tableNameToStreamName;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KEY;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_KEY;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_VALUE;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.INLINE_SUFFIX;
import static io.pravega.connectors.presto.util.PravegaSchemaUtils.readSchema;
import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.mapFieldsFromSchema;
import static java.nio.file.Files.readAllBytes;
Expand Down Expand Up @@ -91,8 +86,6 @@ public class PravegaTableDescriptionSupplier

private JsonCodec<PravegaStreamDescription> streamDescriptionCodec;

// "inline" means that event was written using schema registry wrapped client and schema encoding id
// is within the raw event data in pravega
@Inject
PravegaTableDescriptionSupplier(PravegaConnectorConfig pravegaConnectorConfig,
JsonCodec<PravegaStreamDescription> streamDescriptionCodec)
Expand Down Expand Up @@ -370,7 +363,7 @@ private Optional<List<PravegaStreamFieldGroup>> fieldGroupsFromSchemaRegistry(fi

SerializationFormat format = schemas.get(i).getSchemaInfo().getSerializationFormat();
fieldGroups.add(new PravegaStreamFieldGroup(
dataFormat(properties.getProperties(), format, kv, i),
normalizeDataFormat(format),
Optional.of(colPrefix),
dataSchema(format, schemas.get(i)),
Optional.of(mapFieldsFromSchema(colPrefix, format, schemas.get(i)))));
Expand Down Expand Up @@ -442,37 +435,12 @@ private static List<File> listFiles(File dir)
return ImmutableList.of();
}

private static String dataFormat(ImmutableMap<String, String> groupProperties,
SerializationFormat format,
boolean kvTable,
int kvIdx)
private static String normalizeDataFormat(SerializationFormat format)
{
/*
TODO: auto-detect https://github.com/pravega/pravega-sql/issues/58

(1) no schema registry.
(2) Register and evolve schemas in registry but do not use registry client while writing data
(3) Register schemas in the registry and use registry client to encode schema Id with payload
"inline" is for #3. for e.g. "avro" -> "avro-inline". PravegaRecordSetProvider is interested in this

hopefully this can all go away (see linked issue 58 above)

but for now the following is our convention
if "inline" exists in our properties, all data uses SR
else if it is a kv table key+value may be different. both, neither, or either may use SR
look for "inlinekey" / "inlinevalue"
*/

String key = GROUP_PROPERTIES_INLINE_KEY;

if (kvTable && !groupProperties.containsKey(key)) {
key = kvIdx == 0 ? GROUP_PROPERTIES_INLINE_KV_KEY : GROUP_PROPERTIES_INLINE_KV_VALUE;
}

String finalFormat = format == SerializationFormat.Custom
// (CSV is custom)
return format == SerializationFormat.Custom
? format.getFullTypeName().toLowerCase(Locale.ENGLISH)
: format.name().toLowerCase(Locale.ENGLISH);
return finalFormat + (groupProperties.containsKey(key) ? INLINE_SUFFIX : "");
}

private static Optional<String> dataSchema(SerializationFormat format, SchemaWithVersion schemaWithVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package io.pravega.connectors.presto.decoder;

import io.pravega.connectors.presto.util.ByteBufferInputStream;
import com.google.protobuf.DynamicMessage;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.presto.util.PravegaSerializationUtils;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import io.pravega.schemaregistry.serializers.SerializerFactory;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -35,9 +34,9 @@
public class AvroSerializer
extends KVSerializer<GenericRecord>
{

private static class GenericRecordSerializer
implements Serializer<Object>
{
implements Serializer<Object> {
private final DatumReader<GenericRecord> datumReader;

private final Schema schema;
Expand All @@ -49,16 +48,16 @@ private static class GenericRecordSerializer
}

@Override
public ByteBuffer serialize(Object value)
public ByteBuffer serialize(Object object)
{
return ByteBuffer.wrap(((DynamicMessage) value).toByteArray());
return PravegaSerializationUtils.serialize((GenericRecord) object);
}

@Override
public GenericRecord deserialize(ByteBuffer serializedValue)
{
try (DataFileStream<GenericRecord> dataFileReader =
new DataFileStream<>(new ByteBufferInputStream(serializedValue), datumReader)) {
new DataFileStream<>(new ByteBufferInputStream(serializedValue), datumReader)) {
// TODO: need to figure out how to auto-detect format of avro data
// for e.g, is schema provided for every row? (this is how the normal presto avro decoder takes it)
// i would think more typically case would be that schema defined once and thus schema not provided
Expand All @@ -73,17 +72,14 @@ public GenericRecord deserialize(ByteBuffer serializedValue)
}
}

private final Serializer<Object> delegate;

public AvroSerializer(SerializerConfig config)
{
this.delegate = SerializerFactory.genericDeserializer(config);
public AvroSerializer(SerializerConfig config, String schema) {
super(config, schema);
}

public AvroSerializer(String encodedSchema)
@Override
public Serializer<Object> serializerForSchema(String schema)
{
Schema schema = (new Schema.Parser()).parse(encodedSchema);
this.delegate = new GenericRecordSerializer(schema);
return new GenericRecordSerializer((new Schema.Parser()).parse(schema));
}

@Override
Expand All @@ -95,7 +91,7 @@ public ByteBuffer serialize(GenericRecord value)
@Override
public GenericRecord deserialize(ByteBuffer serializedValue)
{
return (GenericRecord) delegate.deserialize(serializedValue);
return super.deserialize(serializedValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

package io.pravega.connectors.presto.decoder;
import io.pravega.client.stream.Serializer;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

Expand All @@ -23,6 +25,7 @@ public class CsvSerializer
{
public CsvSerializer()
{
super(null, null);
}

@Override
Expand All @@ -39,6 +42,12 @@ public String deserialize(ByteBuffer serializedValue)
serializedValue.remaining());
}

@Override
public Serializer<Object> serializerForSchema(String schema)
{
throw new UnsupportedOperationException();
}

@Override
public DecodableEvent toEvent(Object obj)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.pravega.client.stream.Serializer;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import io.pravega.schemaregistry.serializers.SerializerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -59,16 +58,15 @@ public JsonNode deserialize(ByteBuffer serializedValue)
}
}

private final Serializer<Object> delegate;

public JsonSerializer(SerializerConfig config)
{
this.delegate = SerializerFactory.genericDeserializer(config);
super(config, null);
}

public JsonSerializer()
@Override
public Serializer<Object> serializerForSchema(String schema /* null for json */)
{
this.delegate = new JsonTreeSerializer();
return new JsonTreeSerializer();
}

@Override
Expand All @@ -80,7 +78,7 @@ public ByteBuffer serialize(JsonNode value)
@Override
public JsonNode deserialize(ByteBuffer serializedValue)
{
return (JsonNode) delegate.deserialize(serializedValue);
return super.deserialize(serializedValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,81 @@

package io.pravega.connectors.presto.decoder;

import com.facebook.airlift.log.Logger;
import io.pravega.client.stream.Serializer;
import io.pravega.schemaregistry.client.exceptions.RegistryExceptions;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import io.pravega.schemaregistry.serializers.SerializerFactory;

import javax.ws.rs.ProcessingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;

// deserialize using externally provided schema or using SR+SerializerConfig
public abstract class KVSerializer<T>
implements Serializer<T>
{
implements Serializer<T> {
private static final Logger log = Logger.get(KVSerializer.class);

protected Serializer<Object> delegate = null;

private final Serializer<Object> schemaRegistrySerializer;

private boolean schemaRegistryDeserializer;

private final String schema;

protected KVSerializer(SerializerConfig config, String schema) {
// construct serializer up front to avoid classpath issues later
Serializer<Object> schemaRegistrySerializer1;
try {
schemaRegistrySerializer1 = config == null
? null
: SerializerFactory.genericDeserializer(config);
} catch (ProcessingException | RegistryExceptions.ResourceNotFoundException e) {
// will not be found if schema.table doesn't use SR
schemaRegistrySerializer1 = null;
}
this.schemaRegistrySerializer = schemaRegistrySerializer1;
this.schema = schema;
}

public boolean schemaRegistryDeserializer()
{
return schemaRegistryDeserializer;
}

// format of data is unknown, whether schema is encoded inline by pravega schema registry or not
// try to deserialize without, and if it fails, use serializerConfig
protected void chooseDeserializer(ByteBuffer serializedValue)
{
Serializer<Object> serializer = serializerForSchema(schema);
// cast to Buffer avoids any compile with java11 run in java8 weirdness (such as NoSuchMethodError)
((Buffer) serializedValue).mark();
try {
if (serializer.deserialize(serializedValue) != null) {
delegate = serializer;
}
}
catch (RuntimeException e) {
log.info("could not deserialize, try SR deserializer");
delegate = schemaRegistrySerializer;
schemaRegistryDeserializer = true;
}
finally {
((Buffer) serializedValue).reset();
}
}

public T deserialize(ByteBuffer serializedValue)
{
if (delegate == null) {
chooseDeserializer(serializedValue);
}
return (T) delegate.deserialize(serializedValue);
}

public abstract Serializer<Object> serializerForSchema(String schema);

// create an event that can be passed down to decoders
public abstract DecodableEvent toEvent(Object obj);
}
Loading