From ed06c38ecb4d126ae9916223311aab26d5cf4b81 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 21 Nov 2023 12:52:05 +0100 Subject: [PATCH] updaded output format of parquet exporter --- .../export/parquet/ParquetHostDataWriter.kt | 9 ++- .../export/parquet/ParquetServerDataWriter.kt | 72 ++++++++++--------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt index 75e792d0a..735101df0 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.compute.export.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType @@ -33,7 +34,6 @@ import org.apache.parquet.schema.Types import org.opendc.experiments.compute.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File -import java.util.UUID /** * A Parquet event writer for [HostTableReader]s. @@ -75,7 +75,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : consumer.endField("timestamp", 0) consumer.startField("host_id", 1) - consumer.addBinary(UUID.fromString(data.host.id).toBinary()) + consumer.addBinary(Binary.fromString(data.host.id)) consumer.endField("host_id", 1) consumer.startField("cpu_count", 2) @@ -169,9 +169,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("timestamp"), Types - .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) - .length(16) - .`as`(LogicalTypeAnnotation.uuidType()) + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) .named("host_id"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt index ed5bb64f3..ddb1d3c6f 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.compute.export.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType @@ -33,7 +34,6 @@ import org.apache.parquet.schema.Types import org.opendc.experiments.compute.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File -import java.util.UUID /** * A Parquet event writer for [ServerTableReader]s. @@ -76,64 +76,68 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : consumer.endField("timestamp", 0) consumer.startField("server_id", 1) - consumer.addBinary(UUID.fromString(data.server.id).toBinary()) + consumer.addBinary(Binary.fromString(data.server.id)) consumer.endField("server_id", 1) + consumer.startField("server_name", 2) + consumer.addBinary(Binary.fromString(data.server.name)) + consumer.endField("server_name", 2) + val hostId = data.host?.id if (hostId != null) { - consumer.startField("host_id", 2) - consumer.addBinary(UUID.fromString(hostId).toBinary()) - consumer.endField("host_id", 2) + consumer.startField("host_id", 3) + consumer.addBinary(Binary.fromString(hostId)) + consumer.endField("host_id", 3) } - consumer.startField("mem_capacity", 3) + consumer.startField("mem_capacity", 4) consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 3) + consumer.endField("mem_capacity", 4) - consumer.startField("cpu_count", 4) + consumer.startField("cpu_count", 5) consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 4) + consumer.endField("cpu_count", 5) - consumer.startField("cpu_limit", 5) + consumer.startField("cpu_limit", 6) consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 5) + consumer.endField("cpu_limit", 6) - consumer.startField("cpu_time_active", 6) + consumer.startField("cpu_time_active", 7) consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 6) + consumer.endField("cpu_time_active", 7) - consumer.startField("cpu_time_idle", 7) + consumer.startField("cpu_time_idle", 8) consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 7) + consumer.endField("cpu_time_idle", 8) - consumer.startField("cpu_time_steal", 8) + consumer.startField("cpu_time_steal", 9) consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 8) + consumer.endField("cpu_time_steal", 9) - consumer.startField("cpu_time_lost", 9) + consumer.startField("cpu_time_lost", 10) consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 9) + consumer.endField("cpu_time_lost", 10) - consumer.startField("uptime", 10) + consumer.startField("uptime", 11) consumer.addLong(data.uptime) - consumer.endField("uptime", 10) + consumer.endField("uptime", 11) - consumer.startField("downtime", 11) + consumer.startField("downtime", 12) consumer.addLong(data.downtime) - consumer.endField("downtime", 11) + consumer.endField("downtime", 12) val provisionTime = data.provisionTime if (provisionTime != null) { - consumer.startField("provision_time", 12) + consumer.startField("provision_time", 13) consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 12) + consumer.endField("provision_time", 13) } val bootTime = data.bootTime if (bootTime != null) { - consumer.startField("boot_time", 13) + consumer.startField("boot_time", 14) consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 13) + consumer.endField("boot_time", 14) } consumer.endMessage() @@ -151,14 +155,16 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("timestamp"), Types - .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) - .length(16) - .`as`(LogicalTypeAnnotation.uuidType()) + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) .named("server_id"), Types - .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) - .length(16) - .`as`(LogicalTypeAnnotation.uuidType()) + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) .named("host_id"), Types .required(PrimitiveType.PrimitiveTypeName.INT64)