Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored parquet exporters, dynamic selection of output columns #241

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opendc-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,9 @@ dependencies {
implementation(libs.kotlin.logging)
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion")

api(libs.log4j.core)
api(libs.log4j.slf4j)
api(libs.kotlin.logging)

testImplementation(projects.opendcSimulator.opendcSimulatorCore)
}
131 changes: 131 additions & 0 deletions opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.common.logger

import mu.KotlinLogging
import org.slf4j.Logger

/**
* @return a slf4j logger named as the calling class simple name.
* Can also be used in the companion object to limit the instances of loggers.
*
*
* ```kotlin
* class Foo {
* val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo")
*
* companion object {
* val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo")
* val LOG by logger("smth") // Same as: KotlinLogging.logger(name = "smth")
* }
* }
* ```
*/
public fun <T : Any> T.logger(name: String? = null): Lazy<Logger> {
return lazy {
KotlinLogging.logger(
name
?: runCatching { this::class.java.enclosingClass.simpleName }
.getOrNull()
?: "unknown",
)
}
}

/**
* Logs [msg] with WARN level and returns null.
* ```kotlin
* // Replace
* LOG.warn(<msg>)
* return null
* // With
* return LOG.warnAndNull(<msg>)
*/
public fun Logger.warnAndNull(msg: String): Nothing? {
this.warn(msg)
return null
}

/**
* Logs [msg] with ERROR level and returns null.
* ```kotlin
* // Replace
* LOG.error(<msg>)
* return null
* // With
* return LOG.errAndNull(<msg>)
*/
public fun Logger.errAndNull(msg: String): Nothing? {
this.error(msg)
return null
}

/**
* Logs [msg] with *WARN* level and returns [obj].
*
*
* ```kotlin
* // Replace
* if (<key> !in map) {
* LOG.warn("warn-message")
* return <default-value>
* } else map[<key>]
* // With
* map.getOrDefault(<key>, LOG.withWarn(<default-value>, "<warn-message>"))
* ```
*/
public fun <T> Logger.withWarn(
obj: T,
msg: String,
): T {
this.warn(msg)
return obj
}

/**
* Logs [msg] with *ERROR* level and returns [obj].
*/
public fun <T> Logger.withErr(
obj: T,
msg: String,
): T {
this.error(msg)
return obj
}

/**
* Logs [msg] with *INFO* level on a new line.
* ```console
*
* 09:28:08.830 [INFO] ScenariosSpec -
* | === Compute Export Config ===
* | Host Fields (columns) : timestamp,
* ...
* // Instead of
* 09:28:08.830 [INFO] ScenariosSpec - | === Compute Export Config ===
* | Host Fields (columns) : timestamp,
* ```
*/
public fun Logger.infoNewLine(msg: String) {
info("\n" + msg)
}
43 changes: 43 additions & 0 deletions opendc-common/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2020 atlarge-research
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->

<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
</Console>
</Appenders>
<Loggers>
<Logger name="org.opendc" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hadoop" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Root level="warn">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
3 changes: 3 additions & 0 deletions opendc-compute/opendc-compute-telemetry/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation"
// Build configuration
plugins {
`kotlin-library-conventions`
kotlin("plugin.serialization") version "1.9.22"
}

dependencies {
api(projects.opendcCompute.opendcComputeApi)
api(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.opendc.compute.telemetry.export.parquet

import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.encoding.encodeStructure
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonDecoder
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.jsonObject
import org.opendc.common.logger.logger
import org.opendc.compute.telemetry.table.HostTableReader
import org.opendc.compute.telemetry.table.ServerTableReader
import org.opendc.compute.telemetry.table.ServiceTableReader
import org.opendc.trace.util.parquet.exporter.ColListSerializer
import org.opendc.trace.util.parquet.exporter.ExportColumn
import org.opendc.trace.util.parquet.exporter.Exportable
import org.opendc.trace.util.parquet.exporter.columnSerializer

/**
* Aggregates the necessary settings to personalize the output
* parquet files for compute workloads.
*
* @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
* @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file.
* @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
*/
@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
public data class ComputeExportConfig(
public val hostExportColumns: Set<ExportColumn<HostTableReader>>,
public val serverExportColumns: Set<ExportColumn<ServerTableReader>>,
public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>,
) {
public constructor(
hostExportColumns: Collection<ExportColumn<HostTableReader>>,
serverExportColumns: Collection<ExportColumn<ServerTableReader>>,
serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>,
) : this(
hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS,
serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS,
serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS,
)

/**
* @return formatted string representing the export config.
*/
public fun fmt(): String =
"""
| === Compute Export Config ===
| Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
| Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')}
| Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
""".trimIndent()

public companion object {
internal val LOG by logger()

/**
* Force the jvm to load the default [ExportColumn]s relevant to compute export,
* so that they are available for deserialization.
*/
public fun loadDfltColumns() {
DfltHostExportColumns
DfltServerExportColumns
DfltServiceExportColumns
}

/**
* Config that includes all columns defined in [DfltHostExportColumns],
* [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded
* columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader].
*/
public val ALL_COLUMNS: ComputeExportConfig by lazy {
loadDfltColumns()
ComputeExportConfig(
hostExportColumns = ExportColumn.getAllLoadedColumns(),
serverExportColumns = ExportColumn.getAllLoadedColumns(),
serviceExportColumns = ExportColumn.getAllLoadedColumns(),
)
}

/**
* A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs.
*
* This serializer makes use of reified column serializers for the 2 properties.
*/
internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> {
override val descriptor: SerialDescriptor =
buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") {
element(
"hostExportColumns",
ListSerializer(columnSerializer<HostTableReader>()).descriptor,
)
element(
"serverExportColumns",
ListSerializer(columnSerializer<ServerTableReader>()).descriptor,
)
element(
"serviceExportColumns",
ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
)
}

override fun deserialize(decoder: Decoder): ComputeExportConfig {
val jsonDec =
(decoder as? JsonDecoder) ?: let {
// Basically a recursive call with a JsonDecoder.
return json.decodeFromString(decoder.decodeString().trim('"'))
}

// Loads the default columns so that they are available for deserialization.
loadDfltColumns()
val elem = jsonDec.decodeJsonElement().jsonObject

val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
val serverFields: List<ExportColumn<ServerTableReader>> = elem["serverExportColumns"].toFieldList()
val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()

return ComputeExportConfig(
hostExportColumns = hostFields,
serverExportColumns = serverFields,
serviceExportColumns = serviceFields,
)
}

override fun serialize(
encoder: Encoder,
value: ComputeExportConfig,
) {
encoder.encodeStructure(descriptor) {
encodeSerializableElement(
descriptor,
0,
ColListSerializer(columnSerializer<HostTableReader>()),
value.hostExportColumns.toList(),
)
encodeSerializableElement(
descriptor,
1,
ColListSerializer(columnSerializer<ServerTableReader>()),
value.serverExportColumns.toList(),
)
encodeSerializableElement(
descriptor,
2,
ColListSerializer(columnSerializer<ServiceTableReader>()),
value.serviceExportColumns.toList(),
)
}
}
}
}
}

private val json = Json { ignoreUnknownKeys = true }

private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
this?.let {
json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it)
}?.ifEmpty {
ComputeExportConfig.LOG.warn(
"deserialized list of export columns for exportable ${T::class.simpleName} " +
"produced empty list, falling back to all loaded columns",
)
ExportColumn.getAllLoadedColumns<T>()
} ?: ExportColumn.getAllLoadedColumns<T>()
Loading
Loading