diff --git a/src/main/java/com/teragrep/aer_01/EventContextConsumer.java b/src/main/java/com/teragrep/aer_01/EventContextConsumer.java index bbe40c1..e942231 100644 --- a/src/main/java/com/teragrep/aer_01/EventContextConsumer.java +++ b/src/main/java/com/teragrep/aer_01/EventContextConsumer.java @@ -59,6 +59,7 @@ import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Map; import java.util.UUID; import java.util.function.Consumer; @@ -110,16 +111,28 @@ public void accept(EventContext eventContext) { .addSDParam("uuid", eventUuid) .addSDParam("unixtime", Instant.now().toString()) .addSDParam("id_source", "source"); + + SDElement sdPartition = new SDElement("aer_01_partition@48577") + .addSDParam("fully_qualified_namespace", eventContext.getPartitionContext().getFullyQualifiedNamespace()) + .addSDParam("eventhub_name", eventContext.getPartitionContext().getEventHubName()) + .addSDParam("partition_id", eventContext.getPartitionContext().getPartitionId()) + .addSDParam("consumer_group", eventContext.getPartitionContext().getConsumerGroup()); + + Long offset = eventContext.getEventData().getOffset(); + Instant enqueuedTime = eventContext.getEventData().getEnqueuedTime(); + String partitionKey = eventContext.getEventData().getPartitionKey(); + String correlationId = eventContext.getEventData().getCorrelationId(); + Map properties = eventContext.getEventData().getProperties(); + SDElement sdEvent = new SDElement("aer_01_event@48577") + .addSDParam("offset", offset == null ? "" : String.valueOf(offset)) + .addSDParam("enqueued_time", enqueuedTime == null ? "" : enqueuedTime.toString()) + .addSDParam("partition_key", partitionKey == null ? "" : partitionKey) + .addSDParam("correlation_id", correlationId == null ? "" : correlationId); + properties.forEach((key, value) -> sdEvent.addSDParam("property_" + key, value.toString())); /* // TODO add this too as SDElement SDElement sdCorId = new SDElement("id@123").addSDParam("corId", eventContext.getEventData().getCorrelationId()); - // TODO add azure stuff - eventContext.getPartitionContext().getFullyQualifiedNamespace(); - eventContext.getPartitionContext().getEventHubName(); - eventContext.getPartitionContext().getPartitionId(); - eventContext.getPartitionContext().getConsumerGroup(); - // TODO metrics about these vs last retrieved, these are tracked per partition!: eventContext.getLastEnqueuedEventProperties().getEnqueuedTime(); eventContext.getLastEnqueuedEventProperties().getSequenceNumber(); @@ -139,6 +152,8 @@ public void accept(EventContext eventContext) { .withHostname(syslogConfig.hostname) .withAppName(syslogConfig.appName) .withSDElement(sdId) + .withSDElement(sdPartition) + .withSDElement(sdEvent) //.withSDElement(sdCorId) .withMsgId(eventContext.getEventData().getSequenceNumber().toString()) .withMsg(eventContext.getEventData().getBodyAsString());