Skip to content

Commit

Permalink
add writer schema for decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Chuckame committed Apr 10, 2024
1 parent 9ef010d commit 95ba93d
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/main/kotlin/com/github/avrokotlin/avro4k/Avro2.kt
Original file line number Diff line number Diff line change
Expand Up @@ -183,31 +183,31 @@ sealed class AvroSerializationDelegate {
/**
* The created encoder is stateful, so you can use this encoder for multiple encodings.
*/
fun <T> Avro2.newEncoder(writerSchema: Schema, serializer: SerializationStrategy<T>, outputStream: OutputStream): Encoder {
fun <T> newEncoder(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy<T>, outputStream: OutputStream): Encoder {
val avroEncoder = when (encodedAs) {
EncodedAs.BINARY -> EncoderFactory.get().binaryEncoder(outputStream, null)
EncodedAs.JSON_COMPACT -> EncoderFactory.get().jsonEncoder(writerSchema, outputStream, false)
EncodedAs.JSON_PRETTY -> EncoderFactory.get().jsonEncoder(writerSchema, outputStream, true)
}
return newEncoder(writerSchema, serializer, avroEncoder)
return newEncoder(avro, writerSchema, serializer, avroEncoder)
}

/**
* The created decoder is stateful, so you can use this decoder for multiple decodings.
*/
fun <T> Avro2.newDecoder(readerSchema: Schema, deserializer: DeserializationStrategy<T>, inputStream: InputStream): Decoder {
fun <T> newDecoder(avro: Avro2, readerSchema: Schema, writerSchema: Schema, deserializer: DeserializationStrategy<T>, inputStream: InputStream): Decoder {
val avroDecoder = when (encodedAs) {
EncodedAs.BINARY -> DecoderFactory.get().binaryDecoder(inputStream, null)
EncodedAs.JSON_COMPACT, EncodedAs.JSON_PRETTY -> DecoderFactory.get().jsonDecoder(readerSchema, inputStream)
}
return newDecoder(readerSchema, deserializer, avroDecoder)
return newDecoder(avro, readerSchema, writerSchema, deserializer, avroDecoder)
}

internal fun <T> Avro2.newEncoder(writerSchema: Schema, serializer: SerializationStrategy<T>, avroEncoder: AvroEncoder): Encoder {
internal fun <T> newEncoder(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy<T>, avroEncoder: AvroEncoder): Encoder {
TODO()
}

internal fun <T> Avro2.newDecoder(readerSchema: Schema, deserializer: DeserializationStrategy<T>, avroDecoder: AvroDecoder): Decoder {
internal fun <T> newDecoder(avro: Avro2, readerSchema: Schema, writerSchema: Schema, deserializer: DeserializationStrategy<T>, avroDecoder: AvroDecoder): Decoder {
TODO()
}

Expand All @@ -229,24 +229,24 @@ sealed class AvroSerializationDelegate {
*/
data class Pure(override val encodedAs: EncodedAs) : AvroSerializationDelegate() {
override fun <T> encodeValue(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy<T>, value: T, outputStream: OutputStream) {
avro.newEncoder(writerSchema, serializer, outputStream)
newEncoder(avro, writerSchema, serializer, outputStream)
.encodeSerializableValue(serializer, value)
}

override fun <T> encodeSequence(avro: Avro2, writerSchema: Schema, serializer: SerializationStrategy<T>, sequence: Sequence<T>, outputStream: OutputStream) {
val encoder = avro.newEncoder(writerSchema, serializer, outputStream)
val encoder = newEncoder(avro, writerSchema, serializer, outputStream)
sequence.forEach {
encoder.encodeSerializableValue(serializer, it)
}
}

override fun <T> decodeValue(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy<T>, inputStream: InputStream): T =
avro.newDecoder(readerSchema, deserializer, inputStream)
newDecoder(avro, readerSchema, readerSchema, deserializer, inputStream)
.decodeSerializableValue(deserializer)

override fun <T> decodeSequence(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy<T>, inputStream: InputStream): Sequence<T> =
sequence<T> {
val decoder = avro.newDecoder(readerSchema, deserializer, inputStream)
val decoder = newDecoder(avro, readerSchema, readerSchema, deserializer, inputStream)
while (inputStream.available() > 0) {
yield(decoder.decodeSerializableValue(deserializer))
}
Expand Down Expand Up @@ -288,14 +288,14 @@ sealed class AvroSerializationDelegate {
}

override fun write(datum: T, encoder: AvroEncoder) {
avro.newEncoder(writerSchema, serializer, encoder)
newEncoder(avro, writerSchema, serializer, encoder)
.encodeSerializableValue(serializer, datum)
}
}

override fun <T> decodeSequence(avro: Avro2, readerSchema: Schema, deserializer: DeserializationStrategy<T>, inputStream: InputStream): Sequence<T> =
sequence {
val datumReader: DatumReader<T> = KotlinxSerializationDatumReader(deserializer, avro)
val datumReader: DatumReader<T> = KotlinxSerializationDatumReader(deserializer, avro, readerSchema)
DataFileStream(inputStream, datumReader).use { dataFileStream ->
yieldAll(dataFileStream.iterator())
}
Expand All @@ -304,15 +304,16 @@ sealed class AvroSerializationDelegate {
internal class KotlinxSerializationDatumReader<T>(
private val deserializer: DeserializationStrategy<T>,
private val avro: Avro2,
private val readerSchema: Schema,
) : DatumReader<T> {
private lateinit var readerSchema: Schema
private lateinit var writerSchema: Schema

override fun setSchema(schema: Schema) {
readerSchema = schema
writerSchema = schema
}

override fun read(reuse: T?, decoder: AvroDecoder): T {
return avro.newDecoder(readerSchema, deserializer, decoder)
return newDecoder(avro, readerSchema, writerSchema, deserializer, decoder)
.decodeSerializableValue(deserializer)
}
}
Expand Down Expand Up @@ -345,7 +346,7 @@ sealed class AvroSerializationDelegate {
outputStream.write(MAGIC_BYTE)
outputStream.write(FORMAT_VERSION)
outputStream.write(writerSchema.crc64avro())
avro.newEncoder(writerSchema, serializer, outputStream)
newEncoder(avro, writerSchema, serializer, outputStream)
.encodeSerializableValue(serializer, value)
}

Expand All @@ -356,7 +357,7 @@ sealed class AvroSerializationDelegate {

checkNotNull(writerSchema) { "schema not found for the given object's schema fingerprint" }

return avro.newDecoder(readerSchema, deserializer, inputStream)
return newDecoder(avro, readerSchema, writerSchema, deserializer, inputStream)
.decodeSerializableValue(deserializer)
}

Expand Down

0 comments on commit 95ba93d

Please sign in to comment.