From 95ba93d6b16114355add1cf9f3769349918d5365 Mon Sep 17 00:00:00 2001 From: Chuckame Date: Wed, 7 Feb 2024 13:03:00 +0100 Subject: [PATCH] add writer schema for decoding --- .../com/github/avrokotlin/avro4k/Avro2.kt | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/com/github/avrokotlin/avro4k/Avro2.kt b/src/main/kotlin/com/github/avrokotlin/avro4k/Avro2.kt index ef8010d8..a5e01350 100644 --- a/src/main/kotlin/com/github/avrokotlin/avro4k/Avro2.kt +++ b/src/main/kotlin/com/github/avrokotlin/avro4k/Avro2.kt @@ -183,31 +183,31 @@ sealed class AvroSerializationDelegate { /** * The created encoder is stateful, so you can use this encoder for multiple encodings. */ - fun Avro2.newEncoder(writerSchema: Schema, serializer: SerializationStrategy, outputStream: OutputStream): Encoder { + fun newEncoder(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy, outputStream: OutputStream): Encoder { val avroEncoder = when (encodedAs) { EncodedAs.BINARY -> EncoderFactory.get().binaryEncoder(outputStream, null) EncodedAs.JSON_COMPACT -> EncoderFactory.get().jsonEncoder(writerSchema, outputStream, false) EncodedAs.JSON_PRETTY -> EncoderFactory.get().jsonEncoder(writerSchema, outputStream, true) } - return newEncoder(writerSchema, serializer, avroEncoder) + return newEncoder(avro, writerSchema, serializer, avroEncoder) } /** * The created decoder is stateful, so you can use this decoder for multiple decodings. */ - fun Avro2.newDecoder(readerSchema: Schema, deserializer: DeserializationStrategy, inputStream: InputStream): Decoder { + fun newDecoder(avro: Avro2, readerSchema: Schema, writerSchema: Schema, deserializer: DeserializationStrategy, inputStream: InputStream): Decoder { val avroDecoder = when (encodedAs) { EncodedAs.BINARY -> DecoderFactory.get().binaryDecoder(inputStream, null) EncodedAs.JSON_COMPACT, EncodedAs.JSON_PRETTY -> DecoderFactory.get().jsonDecoder(readerSchema, inputStream) } - return newDecoder(readerSchema, deserializer, avroDecoder) + return newDecoder(avro, readerSchema, writerSchema, deserializer, avroDecoder) } - internal fun Avro2.newEncoder(writerSchema: Schema, serializer: SerializationStrategy, avroEncoder: AvroEncoder): Encoder { + internal fun newEncoder(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy, avroEncoder: AvroEncoder): Encoder { TODO() } - internal fun Avro2.newDecoder(readerSchema: Schema, deserializer: DeserializationStrategy, avroDecoder: AvroDecoder): Decoder { + internal fun newDecoder(avro: Avro2, readerSchema: Schema, writerSchema: Schema, deserializer: DeserializationStrategy, avroDecoder: AvroDecoder): Decoder { TODO() } @@ -229,24 +229,24 @@ sealed class AvroSerializationDelegate { */ data class Pure(override val encodedAs: EncodedAs) : AvroSerializationDelegate() { override fun encodeValue(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy, value: T, outputStream: OutputStream) { - avro.newEncoder(writerSchema, serializer, outputStream) + newEncoder(avro, writerSchema, serializer, outputStream) .encodeSerializableValue(serializer, value) } override fun encodeSequence(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy, sequence: Sequence, outputStream: OutputStream) { - val encoder = avro.newEncoder(writerSchema, serializer, outputStream) + val encoder = newEncoder(avro, writerSchema, serializer, outputStream) sequence.forEach { encoder.encodeSerializableValue(serializer, it) } } override fun decodeValue(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy, inputStream: InputStream): T = - avro.newDecoder(readerSchema, deserializer, inputStream) + newDecoder(avro, readerSchema, readerSchema, deserializer, inputStream) .decodeSerializableValue(deserializer) override fun decodeSequence(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy, inputStream: InputStream): Sequence = sequence { - val decoder = avro.newDecoder(readerSchema, deserializer, inputStream) + val decoder = newDecoder(avro, readerSchema, readerSchema, deserializer, inputStream) while (inputStream.available() > 0) { yield(decoder.decodeSerializableValue(deserializer)) } @@ -288,14 +288,14 @@ sealed class AvroSerializationDelegate { } override fun write(datum: T, encoder: AvroEncoder) { - avro.newEncoder(writerSchema, serializer, encoder) + newEncoder(avro, writerSchema, serializer, encoder) .encodeSerializableValue(serializer, datum) } } override fun decodeSequence(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy, inputStream: InputStream): Sequence = sequence { - val datumReader: DatumReader = KotlinxSerializationDatumReader(deserializer, avro) + val datumReader: DatumReader = KotlinxSerializationDatumReader(deserializer, avro, readerSchema) DataFileStream(inputStream, datumReader).use { dataFileStream -> yieldAll(dataFileStream.iterator()) } @@ -304,15 +304,16 @@ sealed class AvroSerializationDelegate { internal class KotlinxSerializationDatumReader( private val deserializer: DeserializationStrategy, private val avro: Avro2, + private val readerSchema: Schema, ) : DatumReader { - private lateinit var readerSchema: Schema + private lateinit var writerSchema: Schema override fun setSchema(schema: Schema) { - readerSchema = schema + writerSchema = schema } override fun read(reuse: T?, decoder: AvroDecoder): T { - return avro.newDecoder(readerSchema, deserializer, decoder) + return newDecoder(avro, readerSchema, writerSchema, deserializer, decoder) .decodeSerializableValue(deserializer) } } @@ -345,7 +346,7 @@ sealed class AvroSerializationDelegate { outputStream.write(MAGIC_BYTE) outputStream.write(FORMAT_VERSION) outputStream.write(writerSchema.crc64avro()) - avro.newEncoder(writerSchema, serializer, outputStream) + newEncoder(avro, writerSchema, serializer, outputStream) .encodeSerializableValue(serializer, value) } @@ -356,7 +357,7 @@ sealed class AvroSerializationDelegate { checkNotNull(writerSchema) { "schema not found for the given object's schema fingerprint" } - return avro.newDecoder(readerSchema, deserializer, inputStream) + return newDecoder(avro, readerSchema, writerSchema, deserializer, inputStream) .decodeSerializableValue(deserializer) }