From 55036500a9144746fbea364b30c5ea0275868d3b Mon Sep 17 00:00:00 2001 From: t0m3x Date: Mon, 12 Aug 2024 11:25:44 +0200 Subject: [PATCH] Refactored exporters. Allows output column selection in scenario (#241) --- opendc-common/build.gradle.kts | 4 + .../kotlin/org/opendc/common/logger/Logger.kt | 131 ++++++++ opendc-common/src/main/resources/log4j2.xml | 43 +++ .../opendc-compute-telemetry/build.gradle.kts | 3 + .../export/parquet/ComputeExportConfig.kt | 192 ++++++++++++ .../export/parquet/DfltHostExportColumns.kt | 195 ++++++++++++ .../export/parquet/DfltServerExportColumns.kt | 153 ++++++++++ .../parquet/DfltServiceExportColumns.kt | 95 ++++++ .../export/parquet/ParquetComputeMonitor.kt | 109 +++++-- .../export/parquet/ParquetHostDataWriter.kt | 280 ------------------ .../export/parquet/ParquetServerDataWriter.kt | 224 -------------- .../parquet/ParquetServiceDataWriter.kt | 139 --------- .../telemetry/export/parquet/README.md | 70 +++++ .../telemetry/table/HostTableReader.kt | 3 +- .../telemetry/table/ServerTableReader.kt | 7 +- .../telemetry/table/ServiceTableReader.kt | 3 +- .../experiments/base/runner/ScenarioRunner.kt | 1 + .../experiments/base/scenario/Scenario.kt | 3 + .../base/scenario/ScenarioFactories.kt | 1 + .../base/scenario/ScenarioReader.kt | 19 +- .../base/scenario/specs/ScenariosSpec.kt | 14 + .../opendc-trace-parquet/build.gradle.kts | 8 + .../trace/util}/parquet/ParquetDataWriter.kt | 3 +- .../util/parquet/exporter/ExportColumn.kt | 174 +++++++++++ .../exporter/ExportColumnSerializer.kt | 120 ++++++++ .../trace/util/parquet/exporter/Exportable.kt | 23 +- .../trace/util/parquet/exporter/Exporter.kt | 146 +++++++++ 27 files changed, 1463 insertions(+), 700 deletions(-) create mode 100644 opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt create mode 100644 opendc-common/src/main/resources/log4j2.xml create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt delete mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt create mode 100644 opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md rename {opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export => opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util}/parquet/ParquetDataWriter.kt (97%) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt rename opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt => opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt (69%) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts index 2dd35d83b..aeb9bc4d4 100644 --- a/opendc-common/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -36,5 +36,9 @@ dependencies { implementation(libs.kotlin.logging) implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion") + api(libs.log4j.core) + api(libs.log4j.slf4j) + api(libs.kotlin.logging) + testImplementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt b/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt new file mode 100644 index 000000000..ee2f828a8 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.logger + +import mu.KotlinLogging +import org.slf4j.Logger + +/** + * @return a slf4j logger named as the calling class simple name. + * Can also be used in the companion object to limit the instances of loggers. + * + * + * ```kotlin + * class Foo { + * val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo") + * + * companion object { + * val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo") + * val LOG by logger("smth") // Same as: KotlinLogging.logger(name = "smth") + * } + * } + * ``` + */ +public fun T.logger(name: String? = null): Lazy { + return lazy { + KotlinLogging.logger( + name + ?: runCatching { this::class.java.enclosingClass.simpleName } + .getOrNull() + ?: "unknown", + ) + } +} + +/** + * Logs [msg] with WARN level and returns null. + * ```kotlin + * // Replace + * LOG.warn() + * return null + * // With + * return LOG.warnAndNull() + */ +public fun Logger.warnAndNull(msg: String): Nothing? { + this.warn(msg) + return null +} + +/** + * Logs [msg] with ERROR level and returns null. + * ```kotlin + * // Replace + * LOG.error() + * return null + * // With + * return LOG.errAndNull() + */ +public fun Logger.errAndNull(msg: String): Nothing? { + this.error(msg) + return null +} + +/** + * Logs [msg] with *WARN* level and returns [obj]. + * + * + * ```kotlin + * // Replace + * if ( !in map) { + * LOG.warn("warn-message") + * return + * } else map[] + * // With + * map.getOrDefault(, LOG.withWarn(, "")) + * ``` + */ +public fun Logger.withWarn( + obj: T, + msg: String, +): T { + this.warn(msg) + return obj +} + +/** + * Logs [msg] with *ERROR* level and returns [obj]. + */ +public fun Logger.withErr( + obj: T, + msg: String, +): T { + this.error(msg) + return obj +} + +/** + * Logs [msg] with *INFO* level on a new line. + * ```console + * + * 09:28:08.830 [INFO] ScenariosSpec - + * | === Compute Export Config === + * | Host Fields (columns) : timestamp, + * ... + * // Instead of + * 09:28:08.830 [INFO] ScenariosSpec - | === Compute Export Config === + * | Host Fields (columns) : timestamp, + * ``` + */ +public fun Logger.infoNewLine(msg: String) { + info("\n" + msg) +} diff --git a/opendc-common/src/main/resources/log4j2.xml b/opendc-common/src/main/resources/log4j2.xml new file mode 100644 index 000000000..073893607 --- /dev/null +++ b/opendc-common/src/main/resources/log4j2.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts index 10f7c610b..e86924494 100644 --- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts +++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts @@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation" // Build configuration plugins { `kotlin-library-conventions` + kotlin("plugin.serialization") version "1.9.22" } dependencies { api(projects.opendcCompute.opendcComputeApi) + api(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt new file mode 100644 index 000000000..02e3e0bbe --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.jsonObject +import org.opendc.common.logger.logger +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ColListSerializer +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.columnSerializer + +/** + * Aggregates the necessary settings to personalize the output + * parquet files for compute workloads. + * + * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. + * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file. + * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. + */ +@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) +public data class ComputeExportConfig( + public val hostExportColumns: Set>, + public val serverExportColumns: Set>, + public val serviceExportColumns: Set>, +) { + public constructor( + hostExportColumns: Collection>, + serverExportColumns: Collection>, + serviceExportColumns: Collection>, + ) : this( + hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS, + serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS, + serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS, + ) + + /** + * @return formatted string representing the export config. + */ + public fun fmt(): String = + """ + | === Compute Export Config === + | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} + | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')} + | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} + """.trimIndent() + + public companion object { + internal val LOG by logger() + + /** + * Force the jvm to load the default [ExportColumn]s relevant to compute export, + * so that they are available for deserialization. + */ + public fun loadDfltColumns() { + DfltHostExportColumns + DfltServerExportColumns + DfltServiceExportColumns + } + + /** + * Config that includes all columns defined in [DfltHostExportColumns], + * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded + * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader]. + */ + public val ALL_COLUMNS: ComputeExportConfig by lazy { + loadDfltColumns() + ComputeExportConfig( + hostExportColumns = ExportColumn.getAllLoadedColumns(), + serverExportColumns = ExportColumn.getAllLoadedColumns(), + serviceExportColumns = ExportColumn.getAllLoadedColumns(), + ) + } + + /** + * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs. + * + * This serializer makes use of reified column serializers for the 2 properties. + */ + internal object ComputeExportConfigSerializer : KSerializer { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") { + element( + "hostExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + element( + "serverExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + element( + "serviceExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + } + + override fun deserialize(decoder: Decoder): ComputeExportConfig { + val jsonDec = + (decoder as? JsonDecoder) ?: let { + // Basically a recursive call with a JsonDecoder. + return json.decodeFromString(decoder.decodeString().trim('"')) + } + + // Loads the default columns so that they are available for deserialization. + loadDfltColumns() + val elem = jsonDec.decodeJsonElement().jsonObject + + val hostFields: List> = elem["hostExportColumns"].toFieldList() + val serverFields: List> = elem["serverExportColumns"].toFieldList() + val serviceFields: List> = elem["serviceExportColumns"].toFieldList() + + return ComputeExportConfig( + hostExportColumns = hostFields, + serverExportColumns = serverFields, + serviceExportColumns = serviceFields, + ) + } + + override fun serialize( + encoder: Encoder, + value: ComputeExportConfig, + ) { + encoder.encodeStructure(descriptor) { + encodeSerializableElement( + descriptor, + 0, + ColListSerializer(columnSerializer()), + value.hostExportColumns.toList(), + ) + encodeSerializableElement( + descriptor, + 1, + ColListSerializer(columnSerializer()), + value.serverExportColumns.toList(), + ) + encodeSerializableElement( + descriptor, + 2, + ColListSerializer(columnSerializer()), + value.serviceExportColumns.toList(), + ) + } + } + } + } +} + +private val json = Json { ignoreUnknownKeys = true } + +private inline fun JsonElement?.toFieldList(): List> = + this?.let { + json.decodeFromJsonElement(ColListSerializer(columnSerializer()), it) + }?.ifEmpty { + ComputeExportConfig.LOG.warn( + "deserialized list of export columns for exportable ${T::class.simpleName} " + + "produced empty list, falling back to all loaded columns", + ) + ExportColumn.getAllLoadedColumns() + } ?: ExportColumn.getAllLoadedColumns() diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt new file mode 100644 index 000000000..68b5a6645 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltHostExportColumns + * ... + * ``` + */ +public object DfltHostExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOST_ID: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { Binary.fromString(it.host.id) } + + public val HOST_NAME: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_name"), + ) { Binary.fromString(it.host.name) } + + public val CPU_COUNT: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.host.cpuCount } + + public val MEM_CAPACITY: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.host.memCapacity } + + public val GUESTS_TERMINATED: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_terminated"), + ) { it.guestsTerminated } + + public val GUESTS_RUNNING: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_running"), + ) { it.guestsRunning } + + public val GUESTS_ERROR: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_error"), + ) { it.guestsError } + + public val GUESTS_INVALID: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_invalid"), + ) { it.guestsInvalid } + + public val CPU_LIMIT: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_USAGE: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_usage"), + ) { it.cpuUsage } + + public val CPU_DEMAND: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_demand"), + ) { it.cpuDemand } + + public val CPU_UTILIZATION: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_utilization"), + ) { it.cpuUtilization } + + public val CPU_TIME_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val POWER_DRAW: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("power_draw"), + ) { it.powerDraw } + + public val ENERGY_USAGE: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("energy_usage"), + ) { it.energyUsage } + + public val CARBON_INTENSITY: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_intensity"), + ) { it.carbonIntensity } + + public val CARBON_EMISSION: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_emission"), + ) { it.carbonEmission } + + public val UP_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val BOOT_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt new file mode 100644 index 000000000..188f7c190 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltServerExportColumns + * ... + * ``` + */ +public object DfltServerExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val SERVER_ID: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_id"), + ) { Binary.fromString(it.server.id) } + + public val HOST_ID: ExportColumn = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { it.host?.id?.let { Binary.fromString(it) } } + + public val SERVER_NAME: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + ) { Binary.fromString(it.server.name) } + + public val CPU_COUNT: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.server.cpuCount } + + public val MEM_CAPACITY: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.server.memCapacity } + + public val CPU_LIMIT: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_TIME_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val UP_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val PROVISION_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("provision_time"), + ) { it.provisionTime?.toEpochMilli() } + + public val BOOT_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt new file mode 100644 index 000000000..893965454 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltServiceExportColumns + * ... + * ``` + */ +public object DfltServiceExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOSTS_UP: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("hosts_up"), + ) { it.hostsUp } + + public val SERVERS_PENDING: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("servers_pending"), + ) { it.serversPending } + + public val SERVERS_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("servers_active"), + ) { it.serversActive } + + public val ATTEMPTS_SUCCESS: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_success"), + ) { it.attemptsSuccess } + + public val AT3yyTEMPTS_FAILURE: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_failure"), + ) { it.attemptsFailure } + + public val ATTEMPTS_ERROR: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_error"), + ) { it.attemptsError } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt index 1c9104975..6bea4cc2a 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -26,45 +26,100 @@ import org.opendc.compute.telemetry.ComputeMonitor import org.opendc.compute.telemetry.table.HostTableReader import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.Exporter import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = - ParquetServerDataWriter( - File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val hostWriter = - ParquetHostDataWriter( - File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val serviceWriter = - ParquetServiceDataWriter( - File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) +public class ParquetComputeMonitor( + private val hostExporter: Exporter, + private val serverExporter: Exporter, + private val serviceExporter: Exporter, +) : ComputeMonitor, AutoCloseable { + override fun record(reader: HostTableReader) { + hostExporter.write(reader) } - override fun record(reader: HostTableReader) { - hostWriter.write(reader) + override fun record(reader: ServerTableReader) { + serverExporter.write(reader) } override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) + serviceExporter.write(reader) } override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() + hostExporter.close() + serverExporter.close() + serviceExporter.close() + } + + public companion object { + /** + * Overloaded constructor with [ComputeExportConfig] as parameter. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + computeExportConfig: ComputeExportConfig, + ): ParquetComputeMonitor = + invoke( + base = base, + partition = partition, + bufferSize = bufferSize, + hostExportColumns = computeExportConfig.hostExportColumns, + serverExportColumns = computeExportConfig.serverExportColumns, + serviceExportColumns = computeExportConfig.serviceExportColumns, + ) + + /** + * Constructor that loads default [ExportColumn]s defined in + * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns] + * in case optional parameters are omitted and all fields need to be retrieved. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + hostExportColumns: Collection>? = null, + serverExportColumns: Collection>? = null, + serviceExportColumns: Collection>? = null, + ): ParquetComputeMonitor { + // Loads the fields in case they need to be retrieved if optional params are omitted. + ComputeExportConfig.loadDfltColumns() + + return ParquetComputeMonitor( + hostExporter = + Exporter( + outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, + columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serverExporter = + Exporter( + outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, + columns = serverExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serviceExporter = + Exporter( + outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, + columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + ) + } } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 020e67f24..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [HostTableReader]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "host-writer" - - /** - * A [WriteSupport] implementation for a [HostTableReader]. - */ - private class HostDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: HostTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: HostTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("host_id", 2) - consumer.addBinary(Binary.fromString(data.host.id)) - consumer.endField("host_id", 2) - - consumer.startField("host_name", 3) - consumer.addBinary(Binary.fromString(data.host.name)) - consumer.endField("host_name", 3) - - consumer.startField("cpu_count", 4) - consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 4) - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.host.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("guests_terminated", 6) - consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 6) - - consumer.startField("guests_running", 7) - consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 7) - - consumer.startField("guests_error", 8) - consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 8) - - consumer.startField("guests_invalid", 9) - consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 9) - - consumer.startField("cpu_limit", 10) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 10) - - consumer.startField("cpu_usage", 11) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_usage", 11) - - consumer.startField("cpu_demand", 12) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_demand", 12) - - consumer.startField("cpu_utilization", 13) - consumer.addDouble(data.cpuUtilization) - consumer.endField("cpu_utilization", 13) - - consumer.startField("cpu_time_active", 14) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 14) - - consumer.startField("cpu_time_idle", 15) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 15) - - consumer.startField("cpu_time_steal", 16) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 16) - - consumer.startField("cpu_time_lost", 17) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 17) - - consumer.startField("power_draw", 18) - consumer.addDouble(data.powerDraw) - consumer.endField("power_draw", 18) - - consumer.startField("energy_usage", 19) - consumer.addDouble(data.energyUsage) - consumer.endField("energy_usage", 19) - - consumer.startField("carbon_intensity", 20) - consumer.addDouble(data.carbonIntensity) - consumer.endField("carbon_intensity", 20) - - consumer.startField("carbon_emission", 21) - consumer.addDouble(data.carbonEmission) - consumer.endField("carbon_emission", 21) - - consumer.startField("uptime", 22) - consumer.addLong(data.uptime) - consumer.endField("uptime", 22) - - consumer.startField("downtime", 23) - consumer.addLong(data.downtime) - consumer.endField("downtime", 23) - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 24) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 24) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 25) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 25) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the host data. - */ - val SCHEMA: MessageType = - Types - .buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_name"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_demand"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_utilization"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("power_draw"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("energy_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_intensity"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_emission"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("host") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index e1b489ac8..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServerTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [ServerTableReader]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "server-writer" - - /** - * A [WriteSupport] implementation for a [ServerTableReader]. - */ - private class ServerDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServerTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServerTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("server_id", 2) - consumer.addBinary(Binary.fromString(data.server.id)) - consumer.endField("server_id", 2) - - consumer.startField("server_name", 3) - consumer.addBinary(Binary.fromString(data.server.name)) - consumer.endField("server_name", 3) - - val hostId = data.host?.id - if (hostId != null) { - consumer.startField("host_id", 4) - consumer.addBinary(Binary.fromString(hostId)) - consumer.endField("host_id", 4) - } - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("cpu_count", 6) - consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 6) - - consumer.startField("cpu_limit", 7) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 7) - - consumer.startField("cpu_time_active", 8) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 8) - - consumer.startField("cpu_time_idle", 9) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 9) - - consumer.startField("cpu_time_steal", 10) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 10) - - consumer.startField("cpu_time_lost", 11) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 11) - - consumer.startField("uptime", 12) - consumer.addLong(data.uptime) - consumer.endField("uptime", 12) - - consumer.startField("downtime", 13) - consumer.addLong(data.downtime) - consumer.endField("downtime", 13) - - val provisionTime = data.provisionTime - if (provisionTime != null) { - consumer.startField("provision_time", 14) - consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 14) - } - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 15) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 15) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 16) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 16) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the server data. - */ - val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("server") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index eba8fc4fe..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServiceTableReader -import java.io.File - -/** - * A Parquet event writer for [ServiceTableReader]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, ServiceDataWriteSupport(), bufferSize) { - override fun toString(): String = "service-writer" - - /** - * A [WriteSupport] implementation for a [ServiceTableReader]. - */ - private class ServiceDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServiceTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServiceTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("hosts_up", 2) - consumer.addInteger(data.hostsUp) - consumer.endField("hosts_up", 2) - - consumer.startField("hosts_down", 3) - consumer.addInteger(data.hostsDown) - consumer.endField("hosts_down", 3) - - consumer.startField("servers_pending", 4) - consumer.addInteger(data.serversPending) - consumer.endField("servers_pending", 4) - - consumer.startField("servers_active", 5) - consumer.addInteger(data.serversActive) - consumer.endField("servers_active", 5) - - consumer.startField("attempts_success", 6) - consumer.addInteger(data.attemptsSuccess) - consumer.endField("attempts_pending", 6) - - consumer.startField("attempts_failure", 7) - consumer.addInteger(data.attemptsFailure) - consumer.endField("attempts_failure", 7) - - consumer.startField("attempts_error", 8) - consumer.addInteger(data.attemptsError) - consumer.endField("attempts_error", 8) - - consumer.endMessage() - } - } - - private companion object { - private val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_up"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_down"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_pending"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_success"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_failure"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_error"), - ) - .named("service") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md new file mode 100644 index 000000000..f48bc2294 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md @@ -0,0 +1,70 @@ +### Summary +Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`. + +### Columns +The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn`) and it is going to be deserializable as long as it is loaded by the jvm. + +### Deserialization +Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space). + +###### E.g.: +***column name*** = "cpu\_count" +***default column regex*** = "\\s*(?:cpu_count|cpu count)\\s*" (case insensitive) +***matches*** = "cpu\_count", "cpu count", "CpU/_cOuNt" etc. + +### JSON Schema +```json +// scenario.json +{ + ... + "computeExportConfig": { + "type": "object", + "properties": { + "hostExportColumns": { "type": "array" }, + "serverExportColumns": { "type": "array" } , + "serviceExportColumns": { "type": "array" } , + "required": [ /* NONE REQUIRED */ ] + } + }, + ... + "required": [ + ... + // NOT REQUIRED + ] +} +``` + +  +###### Bad Formatting Cases +- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged. +- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged. +- If no list is provided, then all loaded columns for that `Exportable` are used. + + +### Example + +```json +// scenario.json +{ + ... + "computeExportConfig": { + "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"], + "serverExportColumns": ["invalid-entry2"], + "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"] + }, + ... +``` + +```json +// console output +10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring... +10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring... +10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns +10:51:56.584 [INFO] ScenariosSpec - +| === Compute Export Config === +| Host columns : timestamp, timestamp_absolute, guests_invalid +| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute +| Service columns : timestamp, servers_active, servers_pending + +``` + diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index d41c6dc08..a7b8bedba 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a host trace entry. */ -public interface HostTableReader { +public interface HostTableReader : Exportable { public fun copy(): HostTableReader public fun setValues(table: HostTableReader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt index 511130251..a1aed7787 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -22,12 +22,14 @@ package org.opendc.compute.telemetry.table +import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a server trace entry. */ -public interface ServerTableReader { +public interface ServerTableReader : Exportable { public fun copy(): ServerTableReader public fun setValues(table: ServerTableReader) @@ -102,3 +104,6 @@ public interface ServerTableReader { */ public val cpuLostTime: Long } + +// Loads the default export fields for deserialization whenever this file is loaded. +private val _ignore = DfltServerExportColumns diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index e6c2a1ae4..c3a92fc7f 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a service trace entry. */ -public interface ServiceTableReader { +public interface ServiceTableReader : Exportable { public fun copy(): ServiceTableReader public fun setValues(table: ServiceTableReader) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index b48b8fe6e..0f76d5807 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -159,6 +159,7 @@ public fun addExportModel( File("${scenario.outputFolder}/raw-output/$index"), "seed=$seed", bufferSize = 4096, + computeExportConfig = scenario.computeExportConfig, ), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt index 9ce462f4e..c31f03005 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.base.scenario +import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec import org.opendc.experiments.base.scenario.specs.ExportModelSpec @@ -41,6 +42,7 @@ import org.opendc.experiments.base.scenario.specs.WorkloadSpec * @property name The String representing the name of the scenario. It defaults to an empty string. * @property runs The Int representing the number of runs of the scenario. It defaults to 1. * @property initialSeed The Int representing the initial seed of the scenario. It defaults to 0. + * @property computeExportConfig configures which parquet columns are to be included in the output files. */ public data class Scenario( var id: Int = -1, @@ -52,6 +54,7 @@ public data class Scenario( val carbonTracePath: String? = null, val exportModelSpec: ExportModelSpec = ExportModelSpec(), val outputFolder: String = "output", + val computeExportConfig: ComputeExportConfig, val name: String = "", val runs: Int = 1, val initialSeed: Int = 0, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt index e47d9c58f..fb43f102d 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt @@ -82,6 +82,7 @@ public fun getScenarios(scenariosSpec: ScenariosSpec): List { name = scenarioID.toString(), runs = scenariosSpec.runs, initialSeed = scenariosSpec.initialSeed, + computeExportConfig = scenarioSpec.computeExportConfig, ) trackScenario(scenarioSpec, outputFolder) scenarios.add(scenario) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt index a7cda768a..1fba71d1b 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.base.scenario import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream +import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.ScenariosSpec import java.io.File import java.io.InputStream @@ -35,25 +36,19 @@ public class ScenarioReader { // private val jsonReader = Json { serializersModule = failureModule } private val jsonReader = Json - @OptIn(ExperimentalSerializationApi::class) - public fun read(file: File): ScenariosSpec { - val input = file.inputStream() + public fun read(file: File): ScenariosSpec = read(file.inputStream()) - return jsonReader.decodeFromStream(input) - } - - @OptIn(ExperimentalSerializationApi::class) - public fun read(path: Path): ScenariosSpec { - val input = path.inputStream() - - return jsonReader.decodeFromStream(input) - } + public fun read(path: Path): ScenariosSpec = read(path.inputStream()) /** * Read the specified [input]. */ @OptIn(ExperimentalSerializationApi::class) public fun read(input: InputStream): ScenariosSpec { + // Loads the default parquet output fields, + // so that they can be deserialized + ComputeExportConfig.loadDfltColumns() + return jsonReader.decodeFromStream(input) } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt index da3ceecf4..cb4491b6e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt @@ -23,6 +23,9 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable +import org.opendc.common.logger.infoNewLine +import org.opendc.common.logger.logger +import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig import java.util.UUID @Serializable @@ -30,6 +33,7 @@ public data class ScenarioSpec( var id: Int = -1, var name: String = "", val outputFolder: String = "output", + val computeExportConfig: ComputeExportConfig, val topology: ScenarioTopologySpec, val workload: WorkloadSpec, val allocationPolicy: AllocationPolicySpec = AllocationPolicySpec(), @@ -50,6 +54,8 @@ public data class ScenarioSpec( * @property outputFolder * @property initialSeed * @property runs + * @property computeExportConfig configures which parquet columns are to + * be included in the output files. */ @Serializable public data class ScenariosSpec( @@ -65,6 +71,7 @@ public data class ScenariosSpec( val failureModels: Set = setOf(null), val checkpointModels: Set = setOf(null), val carbonTracePaths: Set = setOf(null), + val computeExportConfig: ComputeExportConfig = ComputeExportConfig.ALL_COLUMNS, ) { init { require(runs > 0) { "The number of runs should always be positive" } @@ -75,6 +82,8 @@ public data class ScenariosSpec( name = "unnamed-simulation-${UUID.randomUUID().toString().substring(0, 4)}" // "workload=${workloads[0].name}_topology=${topologies[0].name}_allocationPolicy=${allocationPolicies[0].name}" } + + LOG.infoNewLine(computeExportConfig.fmt()) } public fun getCartesian(): Sequence { @@ -101,6 +110,7 @@ public data class ScenariosSpec( id, name, outputFolder, + computeExportConfig = computeExportConfig, topologyList[(i / topologyDiv) % topologyList.size], workloadList[(i / workloadDiv) % workloadList.size], allocationPolicyList[(i / allocationDiv) % allocationPolicyList.size], @@ -113,4 +123,8 @@ public data class ScenariosSpec( } } } + + internal companion object { + private val LOG by logger() + } } diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 4cdd43506..0a0507ef3 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -25,9 +25,17 @@ description = "Parquet helpers for traces in OpenDC" // Build configuration plugins { `kotlin-library-conventions` + kotlin("plugin.serialization") version "1.9.22" } dependencies { + // Needed for ParquetDataWriter + implementation(libs.kotlin.logging) + + implementation(projects.opendcCommon) + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") + // This configuration is necessary for a slim dependency on Apache Parquet api(libs.parquet) { exclude(group = "org.apache.hadoop") diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt similarity index 97% rename from opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt rename to opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt index b96ee28b5..e4b9a1472 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.compute.telemetry.export.parquet +package org.opendc.trace.util.parquet import mu.KotlinLogging import org.apache.parquet.column.ParquetProperties @@ -28,7 +28,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt new file mode 100644 index 000000000..90e00f4bb --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.Type +import org.opendc.common.logger.logger +import org.slf4j.Logger +import kotlin.reflect.KClass + +/** + * A column that can be used to build a parquet schema to export [T] records. + * + * See [columnSerializer] for deserialization of this class. + * + * ```kotlin + * class Foo: Exportable { + * ... + * val MY_FIELD = ExportColumn( + * field = Types.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("my_field_name") + * ) { exportable: Foo -> addDouble(exportable.getMyValue()) } + * ``` + * + * @param[field] + * The apache parquet field, it includes information such as: + * - Required (not) + * - Field name + * - [PrimitiveType] (e.g. [INT32], [DOUBLE] etc.) + * - [LogicalTypeAnnotation] (e.g. TIMESTAMP, etc.) + * + * @param[getValue] + * Retrieves the value to be exported from the [Exportable] of [T] passed as param. + * The value returned needs to match the expected [PrimitiveType] defined in the field. + * + * A second type parameter could have been added to the class to enforce the correct type at compile time, + * however it would have added too much complexity to the interface. `ExportColumn` -> `ExportColumn` + * + * @param[regex] The pattern used to determine whether a string refers to this column. + * The default one matches the column name with either underscores or blank + * spaces between words in a case-insensitive manner. + * + * @param[exportableClass] + * The [KClass] of the [Exportable]. Used for intuitive lof messages. This class + * can be instantiated with inline constructor [Companion.invoke] without providing this parameter. + */ +public class ExportColumn + @PublishedApi + internal constructor( + public val field: Type, + @PublishedApi internal val regex: Regex, + @PublishedApi internal val exportableClass: KClass, + internal val getValue: (T) -> Any?, + ) { + /** + * The name of the column (e.g. "timestamp"). + */ + public val name: String by lazy { field.name } + + /** + * The primitive type of the field (e.g. INT32). + */ + public val primitiveTypeName: PrimitiveTypeName by lazy { field.asPrimitiveType().primitiveTypeName } + + init { + // Adds the column among those that can be deserialized. + addField(this) + } + + override fun toString(): String = "[ExportColumn: name=$name, exportable=${exportableClass.simpleName}]" + + public companion object { + @PublishedApi + internal val LOG: Logger by logger() + + /** + * Reified constructor, needed to store [T] class without providing it as parameter. + */ + public inline operator fun invoke( + field: Type, + regex: Regex = Regex("\\s*(?:${field.name}|${field.name.replace('_', ' ')})\\s*", RegexOption.IGNORE_CASE), + noinline getValue: (T) -> Any?, + ): ExportColumn = + ExportColumn( + field = field, + getValue = getValue, + exportableClass = T::class, + regex = regex, + ) + + /** + * All the columns that have been instantiated. They are added in `init` block. + * Keep in mind that in order to deserialize to a column, that column needs to be loaded by the jvm. + */ + @PublishedApi + internal val allColumns: MutableSet> = mutableSetOf() + + @PublishedApi + internal val allColumnsLock: Mutex = Mutex() + + /** + * Function invoked in the `init` block of each [ExportColumn]. + * Adds the column to those that can be deserialized. + */ + private fun addField(column: ExportColumn): Unit = + runBlocking { + allColumnsLock.withLock { allColumns.add(column) } + } + + /** + * @return the [ExportColumn] whose [ExportColumn.regex] matches [columnName] **and** + * whose generic type ([Exportable]) is [T] if any, `null` otherwise. + * + * This method needs to be inlined and reified cause of runtime type erasure + * that does not allow to type check the generic class parameter. + */ + @Suppress("UNCHECKED_CAST") // I do not know why it is needed since the cast is nullable. + @PublishedApi + internal inline fun matchingColOrNull(columnName: String): ExportColumn? = + runBlocking { + val allColumns = allColumnsLock.withLock { allColumns.toSet() } + + allColumns.forEach { column -> + // If it is an ExportColumn of same type. + if (column.exportableClass == T::class) { + // Just a smart cast that always succeeds at runtime cause + // of type erasure but is needed at compile time. + (column as? ExportColumn) + ?.regex + // If fieldName matches the field regex. + ?.matchEntire(columnName) + ?.let { return@runBlocking column } + } + } + + null + } + + /** + * Returns all [ExportColumn]s of type [T] that have been loaded up until now. + */ + @Suppress("UNCHECKED_CAST") + public inline fun getAllLoadedColumns(): List> = + runBlocking { + allColumnsLock.withLock { allColumns.filter { it.exportableClass == T::class } as List> } + } + } + } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt new file mode 100644 index 000000000..e07980f91 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerializationException +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.serialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.jsonArray +import org.opendc.common.logger.errAndNull +import org.opendc.common.logger.logger + +/** + * Returns a serializer for [ExportColumn] of [T] based on [ExportColumn.name]. Export columns can be + * deserialized from string values if the string matches a [ExportColumn.regex]. + * + * ###### Note: + * - **In order to deserialize columns, they need to be loaded at runtime**. + * - **The serializer needs the reified type [T], meaning static deserialization + * (e.g. `@Serializable`, `@Serializer`) will not work. The serializer for [ExportColumn] of [T] needs to be retrieved with this method.** + * + * It is assumed the user always know what type of column is needed from deserialization, + * so that column can be encoded only by their name, not including their type (which would be tedious to write in json). + * + * ```kotlin + * // Decode column of Foo + * class Foo: Exportable + * json.decodeFrom(deserializer = columnSerializer(), ) + * + * // Decode a collection of columns of Foo + * json.decodeFrom(deserializer = ListSerializer(columnSerializer()), ) + * ``` + */ +public inline fun columnSerializer(): KSerializer> = + object : KSerializer> { + override val descriptor: SerialDescriptor = serialDescriptor() + + override fun deserialize(decoder: Decoder): ExportColumn { + val strValue = decoder.decodeString().trim('"') + return ExportColumn.matchingColOrNull(strValue) + ?: throw SerializationException( + "unable to deserialize export column '$strValue'." + + "Keep in mind that export columns need to be loaded by the jvm in order to be deserialized", + ) + } + + override fun serialize( + encoder: Encoder, + value: ExportColumn, + ) { + encoder.encodeString(value.name) + } + } + +/** + * Serializer for a [List] of [ExportColumn] of [T], with the peculiarity of + * ignoring unrecognized column names (logging an error when an + * unrecognized column is encountered). + */ +public class ColListSerializer( + private val columnSerializer: KSerializer>, +) : KSerializer>> { + private val listSerializer = ListSerializer(columnSerializer) + override val descriptor: SerialDescriptor = ListSerializer(columnSerializer).descriptor + + /** + * Unrecognized columns are ignored and an error message is logged. + * + * @return the decoded list of [ExportColumn]s (might be empty). + * @throws[SerializationException] if the current element is not a [jsonArray] or its string representation. + */ + override fun deserialize(decoder: Decoder): List> = + (decoder as? JsonDecoder)?.decodeJsonElement()?.jsonArray?.mapNotNull { + try { + Json.decodeFromJsonElement(columnSerializer, it) + } catch (_: Exception) { + LOG.errAndNull("no match found for column $it, ignoring...") + } + } ?: let { + val strValue = decoder.decodeString().trim('"') + // Basically a recursive call with a json decoder instead of the argument decoder. + Json.decodeFromString(strValue) + } + + override fun serialize( + encoder: Encoder, + value: List>, + ) { + listSerializer.serialize(encoder, value) + } + + private companion object { + val LOG by logger() + } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt similarity index 69% rename from opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt rename to opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt index a2e82df1d..61e766d09 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 AtLarge Research + * Copyright (c) 2024 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.telemetry.export.parquet - -import org.apache.parquet.io.api.Binary -import java.nio.ByteBuffer -import java.util.UUID +package org.opendc.trace.util.parquet.exporter /** - * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. + * Classes that implement this interface can be exported + * as records in a parquet file through an [Exporter]. */ -internal fun UUID.toBinary(): Binary { - val bb = ByteBuffer.allocate(16) - bb.putLong(mostSignificantBits) - bb.putLong(leastSignificantBits) - bb.rewind() - return Binary.fromConstantByteBuffer(bb) +public interface Exportable { + public companion object { + public inline fun getAllLoadedColumns(): List> { + return ExportColumn.getAllLoadedColumns() + } + } } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt new file mode 100644 index 000000000..05f36530e --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.trace.util.parquet.ParquetDataWriter +import java.io.File + +public class Exporter + @PublishedApi + internal constructor( + outputFile: File, + writeSupp: WriteSupport, + bufferSize: Int, + ) : ParquetDataWriter( + path = outputFile, + writeSupport = writeSupp, + bufferSize = bufferSize, + ) { + public companion object { + /** + * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name. + * @param[outputFile] the output file where the [Exportable]s will be written. + * @param[columns] the columns that will be included in the output parquet file. + * @param[schemaName] the name of the schema of the output parquet file. + */ + public inline operator fun invoke( + outputFile: File, + vararg columns: ExportColumn = emptyArray(), + schemaName: String? = null, + bufferSize: Int = 4096, + ): Exporter = + Exporter( + outputFile = outputFile, + writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"), + bufferSize = bufferSize, + ) + + /** + * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name. + * @param[outputFile] the output file where the [Exportable]s will be written. + * @param[columns] the columns that will be included in the output parquet file. + * @param[schemaName] the name of the schema of the output parquet file. + */ + public inline operator fun invoke( + outputFile: File, + columns: Collection> = emptySet(), + schemaName: String? = null, + bufferSize: Int = 4096, + ): Exporter = + Exporter( + outputFile = outputFile, + writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"), + bufferSize = bufferSize, + ) + + /** + * @return an anonymous [WriteSupport] for [T] with only the columns included in [columns]. + */ + @PublishedApi + internal fun writeSuppFor( + columns: Set>, + schemaName: String, + ): WriteSupport = + object : WriteSupport() { + private lateinit var cons: RecordConsumer + + private val schema: MessageType = + Types + .buildMessage() + .addFields(*columns.map { it.field }.toTypedArray()) + .named(schemaName) + + override fun init(configuration: Configuration): WriteContext = WriteContext(schema, emptyMap()) + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + cons = recordConsumer + } + + override fun write(record: T) = + with(cons) { + startMessage() + + columns.forEachIndexed { idx, column -> + fun Any.castedOrThrow(): T { + @Suppress("UNCHECKED_CAST") + return (this as? T) ?: throw TypeCastException( + "attempt to add value of type ${this::class} to export " + + "field $column which requires a different type", + ) + } + val valueToAdd: Any = + column.getValue( + record, + ) ?: return@forEachIndexed // Maybe add explicit check for optional fields + + startField(column.name, idx) + when (column.primitiveTypeName) { + INT32 -> addInteger(valueToAdd.castedOrThrow()) + INT64 -> addLong(valueToAdd.castedOrThrow()) + DOUBLE -> addDouble(valueToAdd.castedOrThrow()) + BINARY -> addBinary(valueToAdd.castedOrThrow()) + FLOAT -> addFloat(valueToAdd.castedOrThrow()) + BOOLEAN -> addBoolean(valueToAdd.castedOrThrow()) + else -> throw RuntimeException( + "parquet primitive type name '${column.primitiveTypeName} is not supported", + ) + } + endField(column.name, idx) + } + + cons.endMessage() + } + } + } + }