Skip to content

Commit

Permalink
Added Exportable, Exporter, ExportColumn, to allow to determine…
Browse files Browse the repository at this point in the history
… a parquet output schema for a certain exportable at runtime

`ExportColumn`s can be defined anywhere in the code and
included in the output file for a certain `Exportable` providing the
column to the `Exporter`.

`ExportColumns` are also deserializable, as long as they are loaded by the
jvm.
  • Loading branch information
T0mexX committed Aug 12, 2024
1 parent 206f474 commit ae73520
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.trace.util.parquet.exporter

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
import org.apache.parquet.schema.Type
import org.opendc.common.logger.logger
import org.slf4j.Logger
import kotlin.reflect.KClass

/**
* A column that can be used to build a parquet schema to export [T] records.
*
* See [columnSerializer] for deserialization of this class.
*
* ```kotlin
* class Foo: Exportable<Foo> {
* ...
* val MY_FIELD = ExportColumn<Foo>(
* field = Types.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("my_field_name")
* ) { exportable: Foo -> addDouble(exportable.getMyValue()) }
* ```
*
* @param[field]
* The apache parquet field, it includes information such as:
* - Required (not)
* - Field name
* - [PrimitiveType] (e.g. [INT32], [DOUBLE] etc.)
* - [LogicalTypeAnnotation] (e.g. TIMESTAMP, etc.)
*
* @param[getValue]
* Retrieves the value to be exported from the [Exportable] of [T] passed as param.
* The value returned needs to match the expected [PrimitiveType] defined in the field.
*
* A second type parameter could have been added to the class to enforce the correct type at compile time,
* however it would have added too much complexity to the interface. `ExportColumn<Exportable>` -> `ExportColumn<Exportable, *>`
*
* @param[regex] The pattern used to determine whether a string refers to this column.
* The default one matches the column name with either underscores or blank
* spaces between words in a case-insensitive manner.
*
* @param[exportableClass]
* The [KClass] of the [Exportable]. Used for intuitive lof messages. This class
* can be instantiated with inline constructor [Companion.invoke] without providing this parameter.
*/
public class ExportColumn<T : Exportable>
@PublishedApi
internal constructor(
public val field: Type,
@PublishedApi internal val regex: Regex,
@PublishedApi internal val exportableClass: KClass<T>,
internal val getValue: (T) -> Any?,
) {
/**
* The name of the column (e.g. "timestamp").
*/
public val name: String by lazy { field.name }

/**
* The primitive type of the field (e.g. INT32).
*/
public val primitiveTypeName: PrimitiveTypeName by lazy { field.asPrimitiveType().primitiveTypeName }

init {
// Adds the column among those that can be deserialized.
addField(this)
}

override fun toString(): String = "[ExportColumn: name=$name, exportable=${exportableClass.simpleName}]"

public companion object {
@PublishedApi
internal val LOG: Logger by logger()

/**
* Reified constructor, needed to store [T] class without providing it as parameter.
*/
public inline operator fun <reified T : Exportable> invoke(
field: Type,
regex: Regex = Regex("\\s*(?:${field.name}|${field.name.replace('_', ' ')})\\s*", RegexOption.IGNORE_CASE),
noinline getValue: (T) -> Any?,
): ExportColumn<T> =
ExportColumn(
field = field,
getValue = getValue,
exportableClass = T::class,
regex = regex,
)

/**
* All the columns that have been instantiated. They are added in `init` block.
* Keep in mind that in order to deserialize to a column, that column needs to be loaded by the jvm.
*/
@PublishedApi
internal val allColumns: MutableSet<ExportColumn<*>> = mutableSetOf()

@PublishedApi
internal val allColumnsLock: Mutex = Mutex()

/**
* Function invoked in the `init` block of each [ExportColumn].
* Adds the column to those that can be deserialized.
*/
private fun <T : Exportable> addField(column: ExportColumn<T>): Unit =
runBlocking {
allColumnsLock.withLock { allColumns.add(column) }
}

/**
* @return the [ExportColumn] whose [ExportColumn.regex] matches [columnName] **and**
* whose generic type ([Exportable]) is [T] if any, `null` otherwise.
*
* This method needs to be inlined and reified cause of runtime type erasure
* that does not allow to type check the generic class parameter.
*/
@Suppress("UNCHECKED_CAST") // I do not know why it is needed since the cast is nullable.
@PublishedApi
internal inline fun <reified T : Exportable> matchingColOrNull(columnName: String): ExportColumn<T>? =
runBlocking {
val allColumns = allColumnsLock.withLock { allColumns.toSet() }

allColumns.forEach { column ->
// If it is an ExportColumn of same type.
if (column.exportableClass == T::class) {
// Just a smart cast that always succeeds at runtime cause
// of type erasure but is needed at compile time.
(column as? ExportColumn<T>)
?.regex
// If fieldName matches the field regex.
?.matchEntire(columnName)
?.let { return@runBlocking column }
}
}

null
}

/**
* Returns all [ExportColumn]s of type [T] that have been loaded up until now.
*/
@Suppress("UNCHECKED_CAST")
public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> =
runBlocking {
allColumnsLock.withLock { allColumns.filter { it.exportableClass == T::class } as List<ExportColumn<T>> }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.trace.util.parquet.exporter

import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.serialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonDecoder
import kotlinx.serialization.json.jsonArray
import org.opendc.common.logger.errAndNull
import org.opendc.common.logger.logger

/**
* Returns a serializer for [ExportColumn] of [T] based on [ExportColumn.name]. Export columns can be
* deserialized from string values if the string matches a [ExportColumn.regex].
*
* ###### Note:
* - **In order to deserialize columns, they need to be loaded at runtime**.
* - **The serializer needs the reified type [T], meaning static deserialization
* (e.g. `@Serializable`, `@Serializer`) will not work. The serializer for [ExportColumn] of [T] needs to be retrieved with this method.**
*
* It is assumed the user always know what type of column is needed from deserialization,
* so that column can be encoded only by their name, not including their type (which would be tedious to write in json).
*
* ```kotlin
* // Decode column of Foo
* class Foo: Exportable
* json.decodeFrom<smth>(deserializer = columnSerializer<Foo>(), <smth>)
*
* // Decode a collection of columns of Foo
* json.decodeFrom<smth>(deserializer = ListSerializer(columnSerializer<Foo>()), <smth>)
* ```
*/
public inline fun <reified T : Exportable> columnSerializer(): KSerializer<ExportColumn<T>> =
object : KSerializer<ExportColumn<T>> {
override val descriptor: SerialDescriptor = serialDescriptor<String>()

override fun deserialize(decoder: Decoder): ExportColumn<T> {
val strValue = decoder.decodeString().trim('"')
return ExportColumn.matchingColOrNull<T>(strValue)
?: throw SerializationException(
"unable to deserialize export column '$strValue'." +
"Keep in mind that export columns need to be loaded by the jvm in order to be deserialized",
)
}

override fun serialize(
encoder: Encoder,
value: ExportColumn<T>,
) {
encoder.encodeString(value.name)
}
}

/**
* Serializer for a [List] of [ExportColumn] of [T], with the peculiarity of
* ignoring unrecognized column names (logging an error when an
* unrecognized column is encountered).
*/
public class ColListSerializer<T : Exportable>(
private val columnSerializer: KSerializer<ExportColumn<T>>,
) : KSerializer<List<ExportColumn<T>>> {
private val listSerializer = ListSerializer(columnSerializer)
override val descriptor: SerialDescriptor = ListSerializer(columnSerializer).descriptor

/**
* Unrecognized columns are ignored and an error message is logged.
*
* @return the decoded list of [ExportColumn]s (might be empty).
* @throws[SerializationException] if the current element is not a [jsonArray] or its string representation.
*/
override fun deserialize(decoder: Decoder): List<ExportColumn<T>> =
(decoder as? JsonDecoder)?.decodeJsonElement()?.jsonArray?.mapNotNull {
try {
Json.decodeFromJsonElement(columnSerializer, it)
} catch (_: Exception) {
LOG.errAndNull("no match found for column $it, ignoring...")
}
} ?: let {
val strValue = decoder.decodeString().trim('"')
// Basically a recursive call with a json decoder instead of the argument decoder.
Json.decodeFromString(strValue)
}

override fun serialize(
encoder: Encoder,
value: List<ExportColumn<T>>,
) {
listSerializer.serialize(encoder, value)
}

private companion object {
val LOG by logger()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.trace.util.parquet.exporter

/**
* Classes that implement this interface can be exported
* as records in a parquet file through an [Exporter].
*/
public interface Exportable {
public companion object {
public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> {
return ExportColumn.getAllLoadedColumns()
}
}
}
Loading

0 comments on commit ae73520

Please sign in to comment.