Skip to content

Commit

Permalink
Refactored host, server and service exporters, allowing to select whi…
Browse files Browse the repository at this point in the history
…ch columns are to be included in their raw output by listing them in `scenario.json`.

The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be defined anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm.

Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space).

Check `opendc.compute.telemetry.export.parquet.README.md` fro more info.
  • Loading branch information
T0mexX committed Aug 12, 2024
1 parent 28cce05 commit 80497cd
Show file tree
Hide file tree
Showing 19 changed files with 734 additions and 723 deletions.
3 changes: 3 additions & 0 deletions opendc-compute/opendc-compute-telemetry/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation"
// Build configuration
plugins {
`kotlin-library-conventions`
kotlin("plugin.serialization") version "1.9.22"
}

dependencies {
api(projects.opendcCompute.opendcComputeApi)
api(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.compute.telemetry.export.parquet

import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.encoding.encodeStructure
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonDecoder
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.jsonObject
import org.opendc.common.logger.infoNewLine
import org.opendc.common.logger.logger
import org.opendc.compute.telemetry.table.HostTableReader
import org.opendc.compute.telemetry.table.ServerTableReader
import org.opendc.compute.telemetry.table.ServiceTableReader
import org.opendc.trace.util.parquet.exporter.ColListSerializer
import org.opendc.trace.util.parquet.exporter.ExportColumn
import org.opendc.trace.util.parquet.exporter.Exportable
import org.opendc.trace.util.parquet.exporter.columnSerializer
import org.slf4j.Logger

/**
* Aggregates the necessary settings to personalize the output
* parquet files for compute workloads.
*
* @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
* @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file.
* @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
*/
@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
public data class ComputeExportConfig(
public val hostExportColumns: List<ExportColumn<HostTableReader>>,
public val serverExportColumns: List<ExportColumn<ServerTableReader>>,
public val serviceExportColumns: List<ExportColumn<ServiceTableReader>>,
) {
/**
* @return formatted string representing the export config, to be logged with [Logger.infoNewLine].
*/
public fun fmt(): String =
"""
| === Compute Export Config ===
| Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
| Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')}
| Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
""".trimIndent()

public companion object {
internal val LOG by logger()

/**
* Config that includes all columns defined in [DfltHostExportColumns],
* [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded
* columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader].
*/
public val ALL_COLUMNS: ComputeExportConfig by lazy {
ParquetComputeMonitor.loadDfltFields()
ComputeExportConfig(
hostExportColumns = ExportColumn.getAllLoadedColumns(),
serverExportColumns = ExportColumn.getAllLoadedColumns(),
serviceExportColumns = ExportColumn.getAllLoadedColumns(),
)
}

/**
* A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs.
*
* This serializer makes use of reified column serializers for the 2 properties.
*/
internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> {
override val descriptor: SerialDescriptor =
buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") {
element(
"hostExportColumns",
ListSerializer(columnSerializer<HostTableReader>()).descriptor,
)
element(
"serverExportColumns",
ListSerializer(columnSerializer<ServerTableReader>()).descriptor,
)
element(
"serviceExportColumns",
ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
)
}

override fun deserialize(decoder: Decoder): ComputeExportConfig {
// Basically a recursive call with a JsonDecoder.
val jsonDec =
(decoder as? JsonDecoder) ?: let {
Json.decodeFromString(decoder.decodeString().trim('"'))
}

// Loads the default columns so that they are available for deserialization.
ParquetComputeMonitor.loadDfltFields()
val elem = jsonDec.decodeJsonElement().jsonObject

val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
val serverFields: List<ExportColumn<ServerTableReader>> = elem["serverExportColumns"].toFieldList()
val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()

return ComputeExportConfig(
hostExportColumns = hostFields,
serverExportColumns = serverFields,
serviceExportColumns = serviceFields,
)
}

override fun serialize(
encoder: Encoder,
value: ComputeExportConfig,
) {
encoder.encodeStructure(descriptor) {
encodeSerializableElement(
descriptor,
0,
ColListSerializer(columnSerializer<HostTableReader>()),
value.hostExportColumns,
)
encodeSerializableElement(
descriptor,
1,
ColListSerializer(columnSerializer<ServerTableReader>()),
value.serverExportColumns,
)
encodeSerializableElement(
descriptor,
2,
ColListSerializer(columnSerializer<ServiceTableReader>()),
value.serviceExportColumns,
)
}
}
}
}
}

private val json = Json { ignoreUnknownKeys = true }

private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
this?.let {
json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it)
}?.ifEmpty {
ComputeExportConfig.LOG.warn(
"deserialized list of export columns for exportable ${T::class.simpleName} " +
"produced empty list, falling back to all loaded columns",
)
ExportColumn.getAllLoadedColumns()
} ?: ExportColumn.getAllLoadedColumns()
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.compute.telemetry.export.parquet

import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
import org.apache.parquet.schema.Types
import org.opendc.compute.telemetry.table.HostTableReader
import org.opendc.trace.util.parquet.exporter.ExportColumn

// Object needed to solve ambiguity for field names that are included in more than 1 exportable.
// Additional fields can be added at runtime from anywhere.
public object DfltHostExportColumns {
public val TIMESTAMP: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("timestamp"),
) { it.timestamp.toEpochMilli() }

public val TIMESTAMP_ABS: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("timestamp_absolute"),
) { it.timestampAbsolute.toEpochMilli() }

public val HOST_ID: ExportColumn<HostTableReader> =
ExportColumn(
field =
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("host_id"),
) { Binary.fromString(it.host.id) }

public val HOST_NAME: ExportColumn<HostTableReader> =
ExportColumn(
field =
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("host_name"),
) { Binary.fromString(it.host.name) }

public val CPU_COUNT: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT32).named("cpu_count"),
) { it.host.cpuCount }

public val MEM_CAPACITY: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("mem_capacity"),
) { it.host.memCapacity }

public val GUESTS_TERMINATED: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT32).named("guests_terminated"),
) { it.guestsTerminated }

public val GUESTS_RUNNING: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT32).named("guests_running"),
) { it.guestsRunning }

public val GUESTS_ERROR: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT32).named("guests_error"),
) { it.guestsError }

public val GUESTS_INVALID: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT32).named("guests_invalid"),
) { it.guestsInvalid }

public val CPU_LIMIT: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("cpu_limit"),
) { it.cpuLimit }

public val CPU_USAGE: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("cpu_usage"),
) { it.cpuUsage }

public val CPU_DEMAND: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("cpu_demand"),
) { it.cpuDemand }

public val CPU_UTILIZATION: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("cpu_utilization"),
) { it.cpuUtilization }

public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_active"),
) { it.cpuActiveTime }

public val CPU_TIME_IDLE: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_idle"),
) { it.cpuIdleTime }

public val CPU_TIME_STEAL: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_steal"),
) { it.cpuStealTime }

public val CPU_TIME_LOST: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_lost"),
) { it.cpuLostTime }

public val POWER_DRAW: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("power_draw"),
) { it.powerDraw }

public val ENERGY_USAGE: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("energy_usage"),
) { it.energyUsage }

public val CARBON_INTENSITY: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("carbon_intensity"),
) { it.carbonIntensity }

public val CARBON_EMISSION: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(DOUBLE).named("carbon_emission"),
) { it.carbonEmission }

public val UP_TIME: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("uptime"),
) { it.uptime }

public val DOWN_TIME: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.required(INT64).named("downtime"),
) { it.downtime }

public val BOOT_TIME: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.optional(INT64).named("boot_time"),
) { it.bootTime?.toEpochMilli() }

public val BOOT_TIME_ABS: ExportColumn<HostTableReader> =
ExportColumn(
field = Types.optional(INT64).named("boot_time_absolute"),
) { it.bootTimeAbsolute?.toEpochMilli() }
}
Loading

0 comments on commit 80497cd

Please sign in to comment.