diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts index 10f7c610b..e86924494 100644 --- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts +++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts @@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation" // Build configuration plugins { `kotlin-library-conventions` + kotlin("plugin.serialization") version "1.9.22" } dependencies { api(projects.opendcCompute.opendcComputeApi) + api(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt new file mode 100644 index 000000000..06c04b3b3 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.jsonObject +import org.opendc.common.logger.infoNewLine +import org.opendc.common.logger.logger +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ColListSerializer +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.columnSerializer +import org.slf4j.Logger + +/** + * Aggregates the necessary settings to personalize the output + * parquet files for compute workloads. + * + * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. + * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file. + * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. + */ +@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) +public data class ComputeExportConfig( + public val hostExportColumns: List>, + public val serverExportColumns: List>, + public val serviceExportColumns: List>, +) { + /** + * @return formatted string representing the export config, to be logged with [Logger.infoNewLine]. + */ + public fun fmt(): String = + """ + | === Compute Export Config === + | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} + | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')} + | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} + """.trimIndent() + + public companion object { + internal val LOG by logger() + + /** + * Config that includes all columns defined in [DfltHostExportColumns], + * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded + * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader]. + */ + public val ALL_COLUMNS: ComputeExportConfig by lazy { + ParquetComputeMonitor.loadDfltFields() + ComputeExportConfig( + hostExportColumns = ExportColumn.getAllLoadedColumns(), + serverExportColumns = ExportColumn.getAllLoadedColumns(), + serviceExportColumns = ExportColumn.getAllLoadedColumns(), + ) + } + + /** + * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs. + * + * This serializer makes use of reified column serializers for the 2 properties. + */ + internal object ComputeExportConfigSerializer : KSerializer { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") { + element( + "hostExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + element( + "serverExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + element( + "serviceExportColumns", + ListSerializer(columnSerializer()).descriptor, + ) + } + + override fun deserialize(decoder: Decoder): ComputeExportConfig { + // Basically a recursive call with a JsonDecoder. + val jsonDec = + (decoder as? JsonDecoder) ?: let { + Json.decodeFromString(decoder.decodeString().trim('"')) + } + + // Loads the default columns so that they are available for deserialization. + ParquetComputeMonitor.loadDfltFields() + val elem = jsonDec.decodeJsonElement().jsonObject + + val hostFields: List> = elem["hostExportColumns"].toFieldList() + val serverFields: List> = elem["serverExportColumns"].toFieldList() + val serviceFields: List> = elem["serviceExportColumns"].toFieldList() + + return ComputeExportConfig( + hostExportColumns = hostFields, + serverExportColumns = serverFields, + serviceExportColumns = serviceFields, + ) + } + + override fun serialize( + encoder: Encoder, + value: ComputeExportConfig, + ) { + encoder.encodeStructure(descriptor) { + encodeSerializableElement( + descriptor, + 0, + ColListSerializer(columnSerializer()), + value.hostExportColumns, + ) + encodeSerializableElement( + descriptor, + 1, + ColListSerializer(columnSerializer()), + value.serverExportColumns, + ) + encodeSerializableElement( + descriptor, + 2, + ColListSerializer(columnSerializer()), + value.serviceExportColumns, + ) + } + } + } + } +} + +private val json = Json { ignoreUnknownKeys = true } + +private inline fun JsonElement?.toFieldList(): List> = + this?.let { + json.decodeFromJsonElement(ColListSerializer(columnSerializer()), it) + }?.ifEmpty { + ComputeExportConfig.LOG.warn( + "deserialized list of export columns for exportable ${T::class.simpleName} " + + "produced empty list, falling back to all loaded columns", + ) + ExportColumn.getAllLoadedColumns() + } ?: ExportColumn.getAllLoadedColumns() diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt new file mode 100644 index 000000000..7cb91be34 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +// Object needed to solve ambiguity for field names that are included in more than 1 exportable. +// Additional fields can be added at runtime from anywhere. +public object DfltHostExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOST_ID: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { Binary.fromString(it.host.id) } + + public val HOST_NAME: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_name"), + ) { Binary.fromString(it.host.name) } + + public val CPU_COUNT: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.host.cpuCount } + + public val MEM_CAPACITY: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.host.memCapacity } + + public val GUESTS_TERMINATED: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_terminated"), + ) { it.guestsTerminated } + + public val GUESTS_RUNNING: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_running"), + ) { it.guestsRunning } + + public val GUESTS_ERROR: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_error"), + ) { it.guestsError } + + public val GUESTS_INVALID: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("guests_invalid"), + ) { it.guestsInvalid } + + public val CPU_LIMIT: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_USAGE: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_usage"), + ) { it.cpuUsage } + + public val CPU_DEMAND: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_demand"), + ) { it.cpuDemand } + + public val CPU_UTILIZATION: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_utilization"), + ) { it.cpuUtilization } + + public val CPU_TIME_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val POWER_DRAW: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("power_draw"), + ) { it.powerDraw } + + public val ENERGY_USAGE: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("energy_usage"), + ) { it.energyUsage } + + public val CARBON_INTENSITY: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_intensity"), + ) { it.carbonIntensity } + + public val CARBON_EMISSION: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_emission"), + ) { it.carbonEmission } + + public val UP_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val BOOT_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt new file mode 100644 index 000000000..7329eb4ea --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +// Object needed to solve ambiguity for field names that are included in more than 1 exportable. +// Additional fields can be added at runtime from anywhere. +public object DfltServerExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val SERVER_ID: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_id"), + ) { Binary.fromString(it.server.id) } + + public val SERVER_NAME: ExportColumn = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + ) { Binary.fromString(it.server.name) } + + public val CPU_COUNT: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.server.cpuCount } + + public val MEM_CAPACITY: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.server.memCapacity } + + public val CPU_LIMIT: ExportColumn = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_TIME_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val UP_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val PROVISION_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("provision_time"), + ) { it.provisionTime?.toEpochMilli() } + + public val BOOT_TIME: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt new file mode 100644 index 000000000..483ee76e9 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +// Object needed to solve ambiguity for field names that are included in more than 1 exportable. +// Additional fields can be added at runtime from anywhere. +public object DfltServiceExportColumns { + public val TIMESTAMP: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOSTS_UP: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("hosts_up"), + ) { it.hostsUp } + + public val SERVERS_PENDING: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("servers_pending"), + ) { it.serversPending } + + public val SERVERS_ACTIVE: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("servers_active"), + ) { it.serversActive } + + public val ATTEMPTS_SUCCESS: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_success"), + ) { it.attemptsSuccess } + + public val AT3yyTEMPTS_FAILURE: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_failure"), + ) { it.attemptsFailure } + + public val ATTEMPTS_ERROR: ExportColumn = + ExportColumn( + field = Types.required(INT32).named("attempts_error"), + ) { it.attemptsError } +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt index 1c9104975..7ea477da2 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -26,45 +26,100 @@ import org.opendc.compute.telemetry.ComputeMonitor import org.opendc.compute.telemetry.table.HostTableReader import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.Exporter import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = - ParquetServerDataWriter( - File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val hostWriter = - ParquetHostDataWriter( - File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val serviceWriter = - ParquetServiceDataWriter( - File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) +public class ParquetComputeMonitor( + private val hostExporter: Exporter, + private val serverExporter: Exporter, + private val serviceExporter: Exporter, +) : ComputeMonitor, AutoCloseable { + override fun record(reader: HostTableReader) { + hostExporter.write(reader) } - override fun record(reader: HostTableReader) { - hostWriter.write(reader) + override fun record(reader: ServerTableReader) { + serverExporter.write(reader) } override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) + serviceExporter.write(reader) } override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() + hostExporter.close() + serverExporter.close() + serviceExporter.close() + } + + public companion object { + public fun loadDfltFields() { + DfltHostExportColumns + DfltServerExportColumns + DfltServiceExportColumns + } + + /** + * Constructor that loads default [ExportColumn]s defined in + * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns] + * in case optional parameters are omitted and all fields need to be retrieved. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + computeExportConfig: ComputeExportConfig, + ): ParquetComputeMonitor = + invoke( + base = base, + partition = partition, + bufferSize = bufferSize, + hostExportColumns = computeExportConfig.hostExportColumns, + serverExportColumns = computeExportConfig.serverExportColumns, + serviceExportColumns = computeExportConfig.serviceExportColumns, + ) + + /** + * Constructor that loads default [ExportColumn]s defined in + * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns] + * in case optional parameters are omitted and all fields need to be retrieved. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + hostExportColumns: Collection>? = null, + serverExportColumns: Collection>? = null, + serviceExportColumns: Collection>? = null, + ): ParquetComputeMonitor { + // Loads the fields in case they need to be retrieved if optional params are omitted. + loadDfltFields() + + return ParquetComputeMonitor( + hostExporter = + Exporter( + outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, + columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serverExporter = + Exporter( + outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, + columns = serverExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serviceExporter = + Exporter( + outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, + columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + ) + } } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 020e67f24..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [HostTableReader]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "host-writer" - - /** - * A [WriteSupport] implementation for a [HostTableReader]. - */ - private class HostDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: HostTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: HostTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("host_id", 2) - consumer.addBinary(Binary.fromString(data.host.id)) - consumer.endField("host_id", 2) - - consumer.startField("host_name", 3) - consumer.addBinary(Binary.fromString(data.host.name)) - consumer.endField("host_name", 3) - - consumer.startField("cpu_count", 4) - consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 4) - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.host.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("guests_terminated", 6) - consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 6) - - consumer.startField("guests_running", 7) - consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 7) - - consumer.startField("guests_error", 8) - consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 8) - - consumer.startField("guests_invalid", 9) - consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 9) - - consumer.startField("cpu_limit", 10) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 10) - - consumer.startField("cpu_usage", 11) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_usage", 11) - - consumer.startField("cpu_demand", 12) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_demand", 12) - - consumer.startField("cpu_utilization", 13) - consumer.addDouble(data.cpuUtilization) - consumer.endField("cpu_utilization", 13) - - consumer.startField("cpu_time_active", 14) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 14) - - consumer.startField("cpu_time_idle", 15) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 15) - - consumer.startField("cpu_time_steal", 16) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 16) - - consumer.startField("cpu_time_lost", 17) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 17) - - consumer.startField("power_draw", 18) - consumer.addDouble(data.powerDraw) - consumer.endField("power_draw", 18) - - consumer.startField("energy_usage", 19) - consumer.addDouble(data.energyUsage) - consumer.endField("energy_usage", 19) - - consumer.startField("carbon_intensity", 20) - consumer.addDouble(data.carbonIntensity) - consumer.endField("carbon_intensity", 20) - - consumer.startField("carbon_emission", 21) - consumer.addDouble(data.carbonEmission) - consumer.endField("carbon_emission", 21) - - consumer.startField("uptime", 22) - consumer.addLong(data.uptime) - consumer.endField("uptime", 22) - - consumer.startField("downtime", 23) - consumer.addLong(data.downtime) - consumer.endField("downtime", 23) - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 24) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 24) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 25) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 25) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the host data. - */ - val SCHEMA: MessageType = - Types - .buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_name"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_demand"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_utilization"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("power_draw"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("energy_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_intensity"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_emission"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("host") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index e1b489ac8..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServerTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [ServerTableReader]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "server-writer" - - /** - * A [WriteSupport] implementation for a [ServerTableReader]. - */ - private class ServerDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServerTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServerTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("server_id", 2) - consumer.addBinary(Binary.fromString(data.server.id)) - consumer.endField("server_id", 2) - - consumer.startField("server_name", 3) - consumer.addBinary(Binary.fromString(data.server.name)) - consumer.endField("server_name", 3) - - val hostId = data.host?.id - if (hostId != null) { - consumer.startField("host_id", 4) - consumer.addBinary(Binary.fromString(hostId)) - consumer.endField("host_id", 4) - } - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("cpu_count", 6) - consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 6) - - consumer.startField("cpu_limit", 7) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 7) - - consumer.startField("cpu_time_active", 8) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 8) - - consumer.startField("cpu_time_idle", 9) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 9) - - consumer.startField("cpu_time_steal", 10) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 10) - - consumer.startField("cpu_time_lost", 11) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 11) - - consumer.startField("uptime", 12) - consumer.addLong(data.uptime) - consumer.endField("uptime", 12) - - consumer.startField("downtime", 13) - consumer.addLong(data.downtime) - consumer.endField("downtime", 13) - - val provisionTime = data.provisionTime - if (provisionTime != null) { - consumer.startField("provision_time", 14) - consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 14) - } - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 15) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 15) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 16) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 16) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the server data. - */ - val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("server") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index eba8fc4fe..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServiceTableReader -import java.io.File - -/** - * A Parquet event writer for [ServiceTableReader]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, ServiceDataWriteSupport(), bufferSize) { - override fun toString(): String = "service-writer" - - /** - * A [WriteSupport] implementation for a [ServiceTableReader]. - */ - private class ServiceDataWriteSupport() : WriteSupport() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServiceTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServiceTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("hosts_up", 2) - consumer.addInteger(data.hostsUp) - consumer.endField("hosts_up", 2) - - consumer.startField("hosts_down", 3) - consumer.addInteger(data.hostsDown) - consumer.endField("hosts_down", 3) - - consumer.startField("servers_pending", 4) - consumer.addInteger(data.serversPending) - consumer.endField("servers_pending", 4) - - consumer.startField("servers_active", 5) - consumer.addInteger(data.serversActive) - consumer.endField("servers_active", 5) - - consumer.startField("attempts_success", 6) - consumer.addInteger(data.attemptsSuccess) - consumer.endField("attempts_pending", 6) - - consumer.startField("attempts_failure", 7) - consumer.addInteger(data.attemptsFailure) - consumer.endField("attempts_failure", 7) - - consumer.startField("attempts_error", 8) - consumer.addInteger(data.attemptsError) - consumer.endField("attempts_error", 8) - - consumer.endMessage() - } - } - - private companion object { - private val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_up"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_down"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_pending"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_success"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_failure"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_error"), - ) - .named("service") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md new file mode 100644 index 000000000..a05fc3b80 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md @@ -0,0 +1,70 @@ +### Summary +Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`. + +### Columns +The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn`) and it is going to be deserializable as long as it is loaded by the jvm. + +### Deserialization +Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space). + +###### E.g.: +***column name*** = "cpu\_count" +***defautl column regex*** = "\\s*(?:cpu_count|cpu count)\\s*" (case insensitive) +***matches*** = "cpu\_count", "cpu count", "CpU/_cOuNt" etc. + +### JSON Schema +```json +// scenario.json +{ + ... + "computeExportConfig": { + "type": "object", + "properties": { + "hostExportColumns": { "type": "array" }, + "serverExportColumns": { "type": "array" } , + "serviceExportColumns": { "type": "array" } , + "required": [ /* NONE REQUIRED */ ] + } + }, + ... + "required": [ + ... + // NOT REQUIRED + ] +} +``` + +  +###### Bad Formatting Cases +- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged. +- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged. +- If no list is provided, then all loaded columns for that `Exportable` are used. + + +### Example + +```json +// scenario.json +{ + ... + "computeExportConfig": { + "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry", "guests_invalid"], + "serverExportColumns": ["invalid-entry"], + "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"] + }, + ... +``` + +```json +// console output +10:51:56.561 [ERROR] ColListSerializer - no match found for column "misspell", ignoring... +10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry", ignoring... +10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns +10:51:56.584 [INFO] ScenariosSpec - +| === Compute Export Config === +| Host columns : timestamp, timestamp_absolute, guests_invalid +| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute +| Service columns : timestamp, servers_active, servers_pending + +``` + diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt deleted file mode 100644 index a2e82df1d..000000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.parquet.io.api.Binary -import java.nio.ByteBuffer -import java.util.UUID - -/** - * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. - */ -internal fun UUID.toBinary(): Binary { - val bb = ByteBuffer.allocate(16) - bb.putLong(mostSignificantBits) - bb.putLong(leastSignificantBits) - bb.rewind() - return Binary.fromConstantByteBuffer(bb) -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index d41c6dc08..a7b8bedba 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a host trace entry. */ -public interface HostTableReader { +public interface HostTableReader : Exportable { public fun copy(): HostTableReader public fun setValues(table: HostTableReader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt index 511130251..a1aed7787 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -22,12 +22,14 @@ package org.opendc.compute.telemetry.table +import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a server trace entry. */ -public interface ServerTableReader { +public interface ServerTableReader : Exportable { public fun copy(): ServerTableReader public fun setValues(table: ServerTableReader) @@ -102,3 +104,6 @@ public interface ServerTableReader { */ public val cpuLostTime: Long } + +// Loads the default export fields for deserialization whenever this file is loaded. +private val _ignore = DfltServerExportColumns diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index e6c2a1ae4..c3a92fc7f 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a service trace entry. */ -public interface ServiceTableReader { +public interface ServiceTableReader : Exportable { public fun copy(): ServiceTableReader public fun setValues(table: ServiceTableReader) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index b48b8fe6e..0f76d5807 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -159,6 +159,7 @@ public fun addExportModel( File("${scenario.outputFolder}/raw-output/$index"), "seed=$seed", bufferSize = 4096, + computeExportConfig = scenario.computeExportConfig, ), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt index 9ce462f4e..c31f03005 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.base.scenario +import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec import org.opendc.experiments.base.scenario.specs.ExportModelSpec @@ -41,6 +42,7 @@ import org.opendc.experiments.base.scenario.specs.WorkloadSpec * @property name The String representing the name of the scenario. It defaults to an empty string. * @property runs The Int representing the number of runs of the scenario. It defaults to 1. * @property initialSeed The Int representing the initial seed of the scenario. It defaults to 0. + * @property computeExportConfig configures which parquet columns are to be included in the output files. */ public data class Scenario( var id: Int = -1, @@ -52,6 +54,7 @@ public data class Scenario( val carbonTracePath: String? = null, val exportModelSpec: ExportModelSpec = ExportModelSpec(), val outputFolder: String = "output", + val computeExportConfig: ComputeExportConfig, val name: String = "", val runs: Int = 1, val initialSeed: Int = 0, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt index e47d9c58f..fb43f102d 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt @@ -82,6 +82,7 @@ public fun getScenarios(scenariosSpec: ScenariosSpec): List { name = scenarioID.toString(), runs = scenariosSpec.runs, initialSeed = scenariosSpec.initialSeed, + computeExportConfig = scenarioSpec.computeExportConfig, ) trackScenario(scenarioSpec, outputFolder) scenarios.add(scenario) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt index a7cda768a..f512f833c 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.base.scenario import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream +import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor import org.opendc.experiments.base.scenario.specs.ScenariosSpec import java.io.File import java.io.InputStream @@ -35,25 +36,19 @@ public class ScenarioReader { // private val jsonReader = Json { serializersModule = failureModule } private val jsonReader = Json - @OptIn(ExperimentalSerializationApi::class) - public fun read(file: File): ScenariosSpec { - val input = file.inputStream() + public fun read(file: File): ScenariosSpec = read(file.inputStream()) - return jsonReader.decodeFromStream(input) - } - - @OptIn(ExperimentalSerializationApi::class) - public fun read(path: Path): ScenariosSpec { - val input = path.inputStream() - - return jsonReader.decodeFromStream(input) - } + public fun read(path: Path): ScenariosSpec = read(path.inputStream()) /** * Read the specified [input]. */ @OptIn(ExperimentalSerializationApi::class) public fun read(input: InputStream): ScenariosSpec { + // Loads the default parquet output fields, + // so that they can be deserialized + ParquetComputeMonitor.loadDfltFields() + return jsonReader.decodeFromStream(input) } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt index da3ceecf4..40bd6fb86 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt @@ -23,6 +23,9 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable +import org.opendc.common.logger.infoNewLine +import org.opendc.common.logger.logger +import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig import java.util.UUID @Serializable @@ -30,6 +33,7 @@ public data class ScenarioSpec( var id: Int = -1, var name: String = "", val outputFolder: String = "output", + val computeExportConfig: ComputeExportConfig, val topology: ScenarioTopologySpec, val workload: WorkloadSpec, val allocationPolicy: AllocationPolicySpec = AllocationPolicySpec(), @@ -50,6 +54,8 @@ public data class ScenarioSpec( * @property outputFolder * @property initialSeed * @property runs + * @property computeExportConfig configures which parquet columns are to + * be included in the output files. */ @Serializable public data class ScenariosSpec( @@ -65,6 +71,7 @@ public data class ScenariosSpec( val failureModels: Set = setOf(null), val checkpointModels: Set = setOf(null), val carbonTracePaths: Set = setOf(null), + val computeExportConfig: ComputeExportConfig = ComputeExportConfig.ALL_COLUMNS, ) { init { require(runs > 0) { "The number of runs should always be positive" } @@ -75,6 +82,8 @@ public data class ScenariosSpec( name = "unnamed-simulation-${UUID.randomUUID().toString().substring(0, 4)}" // "workload=${workloads[0].name}_topology=${topologies[0].name}_allocationPolicy=${allocationPolicies[0].name}" } + + LOG.infoNewLine(computeExportConfig.fmt()) } public fun getCartesian(): Sequence { @@ -101,6 +110,7 @@ public data class ScenariosSpec( id, name, outputFolder, + computeExportConfig = computeExportConfig, topologyList[(i / topologyDiv) % topologyList.size], workloadList[(i / workloadDiv) % workloadList.size], allocationPolicyList[(i / allocationDiv) % allocationPolicyList.size], @@ -113,4 +123,8 @@ public data class ScenariosSpec( } } } + + private companion object { + val LOG by logger() + } }