Skip to content

Commit

Permalink
Fix timer metrics in DefaultOutput (#29)
Browse files Browse the repository at this point in the history
* Move fakes to their own folder, refactor EventContextConsumerTest

* Use SlidingWindowReservoir in timer metrics

* Rework connectLatency timer to only record connection attempts that did not throw an error in server

* Refactor RelpConnection fakes in unit tests

* Refactor unit tests

* Added a comment about how the Timer.Context is handled in DefaultOutput
  • Loading branch information
51-code authored Aug 30, 2024
1 parent de16d61 commit 670352b
Show file tree
Hide file tree
Showing 11 changed files with 489 additions and 81 deletions.
37 changes: 26 additions & 11 deletions src/main/java/com/teragrep/aer_01/DefaultOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
*/
package com.teragrep.aer_01;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.*;
import com.teragrep.aer_01.config.RelpConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
Expand Down Expand Up @@ -79,15 +77,25 @@ final class DefaultOutput implements Output {
private final Timer connectLatency;


DefaultOutput(
String name,
RelpConfig relpConfig,
MetricRegistry metricRegistry) {
DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry) {
this(name, relpConfig, metricRegistry, new RelpConnection());
}

DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry, RelpConnection relpConnection) {
this(name, relpConfig, metricRegistry, relpConnection, new SlidingWindowReservoir(10000), new SlidingWindowReservoir(10000));
}

DefaultOutput(String name,
RelpConfig relpConfig,
MetricRegistry metricRegistry,
RelpConnection relpConnection,
Reservoir sendReservoir,
Reservoir connectReservoir) {
this.relpAddress = relpConfig.destinationAddress;
this.relpPort = relpConfig.destinationPort;
this.reconnectInterval = relpConfig.reconnectInterval;

this.relpConnection = new RelpConnection();
this.relpConnection = relpConnection;
this.relpConnection.setConnectionTimeout(relpConfig.connectionTimeout);
this.relpConnection.setReadTimeout(relpConfig.readTimeout);
this.relpConnection.setWriteTimeout(relpConfig.writeTimeout);
Expand All @@ -97,17 +105,24 @@ final class DefaultOutput implements Output {
this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"));
this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"));
this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"), () -> new Timer(sendReservoir));
this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"), () -> new Timer(connectReservoir));

connect();
}

private void connect() {
boolean connected = false;
while (!connected) {
try (final Timer.Context context = connectLatency.time()) {
final Timer.Context context = connectLatency.time(); // reset the time (new context)
try {
connected = this.relpConnection.connect(relpAddress, relpPort);
/*
Not closing the context in case of an exception thrown in .connect() will leave the timer.context
for garbage collector to remove. This will happen even if the context is closed because of how
the Timer is implemented.
*/
context.close(); // manually close here, so the timer is only updated if no exceptions were thrown
connects.inc();
} catch (IOException | TimeoutException e) {
LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e);
Expand Down
153 changes: 153 additions & 0 deletions src/test/java/com/teragrep/aer_01/DefaultOutputTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Teragrep Azure Eventhub Reader
* Copyright (C) 2023 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/

package com.teragrep.aer_01;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
import com.teragrep.aer_01.config.RelpConfig;
import com.teragrep.aer_01.config.source.PropertySource;
import com.teragrep.aer_01.fakes.ConnectionlessRelpConnectionFake;
import com.teragrep.aer_01.fakes.RelpConnectionFake;
import com.teragrep.aer_01.fakes.ThrowingRelpConnectionFake;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
import com.teragrep.rlp_01.RelpConnection;
import org.junit.jupiter.api.*;

import java.nio.charset.StandardCharsets;

import static com.codahale.metrics.MetricRegistry.name;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class DefaultOutputTest {

@Test
public void testSendLatencyMetricIsCapped() { // Should only keep information on the last 10.000 messages
SyslogMessage syslogMessage = new SyslogMessage()
.withSeverity(Severity.INFORMATIONAL)
.withFacility(Facility.LOCAL0)
.withMsgId("123")
.withMsg("test");

final int measurementLimit = 10000;

// set up DefaultOutput
MetricRegistry metricRegistry = new MetricRegistry();
SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit);
SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit);
try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()),
metricRegistry, new RelpConnectionFake(), sendReservoir, connectReservoir)) {

for (int i = 0; i < measurementLimit + 100; i++) { // send more messages than the limit is
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
}

Assertions.assertEquals(measurementLimit, sendReservoir.size()); // should have measurementLimit amount of records saved
Assertions.assertEquals(1, connectReservoir.size()); // only connected once
}

@Test
public void testConnectionLatencyMetricIsCapped() { // Should take information on how long it took to successfully connect
System.setProperty("relp.connection.retry.interval", "1");

SyslogMessage syslogMessage = new SyslogMessage()
.withSeverity(Severity.INFORMATIONAL)
.withFacility(Facility.LOCAL0)
.withMsgId("123")
.withMsg("test");

final int measurementLimit = 100;
final int reconnections = measurementLimit + 10;

// set up DefaultOutput
MetricRegistry metricRegistry = new MetricRegistry();
SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit);
SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit);
RelpConnection relpConnection = new ConnectionlessRelpConnectionFake(reconnections); // use a fake that forces reconnects
try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()),
metricRegistry, relpConnection, sendReservoir, connectReservoir)) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}

Assertions.assertEquals(1, sendReservoir.size()); // only sent 1 message
Assertions.assertEquals(measurementLimit, connectReservoir.size()); // should have measurementLimit amount of records saved

System.clearProperty("relp.connection.retry.interval");
}

@Test
public void testConnectionLatencyMetricWithException() { // should not update value if an exception was thrown from server
System.setProperty("relp.connection.retry.interval", "1");

SyslogMessage syslogMessage = new SyslogMessage()
.withSeverity(Severity.INFORMATIONAL)
.withFacility(Facility.LOCAL0)
.withMsgId("123")
.withMsg("test");

final int reconnections = 10;

// set up DefaultOutput
MetricRegistry metricRegistry = new MetricRegistry();
RelpConnection relpConnection = new ThrowingRelpConnectionFake(reconnections); // use a fake that throws exceptions when connecting
try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()),
metricRegistry, relpConnection)) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}

Timer sendTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "sendLatency"));
Timer connectionTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "connectLatency"));

Assertions.assertEquals(1, sendTimer.getCount()); // only sent 1 message
Assertions.assertEquals(1, connectionTimer.getCount()); // only 1 connection attempt without throwing recorded

System.clearProperty("relp.connection.retry.interval");
}
}
134 changes: 69 additions & 65 deletions src/test/java/com/teragrep/aer_01/EventContextConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,96 +52,100 @@
import com.teragrep.aer_01.config.MetricsConfig;
import com.teragrep.aer_01.config.source.PropertySource;
import com.teragrep.aer_01.config.source.Sourceable;
import com.teragrep.aer_01.fakes.CheckpointlessEventContextFactory;
import com.teragrep.aer_01.fakes.EventContextFactory;
import com.teragrep.aer_01.fakes.OutputFake;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.time.Instant;

import static com.codahale.metrics.MetricRegistry.name;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class EventContextConsumerTest {

private final Sourceable configSource = new PropertySource();
private final int prometheusPort = new MetricsConfig(configSource).prometheusPort;

@Test
public void testLatencyMetric() throws Exception {
final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
final Sourceable configSource = new PropertySource();
final int prometheusPort = new MetricsConfig(configSource).prometheusPort;
final MetricRegistry metricRegistry = new MetricRegistry();

try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) {
EventContext eventContext;
final double records = 10;
for (int i = 0; i < records; i++) {
eventContext = eventContextFactory.create();
eventContextConsumer.accept(eventContext);
}
public void testLatencyMetric() {
EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
MetricRegistry metricRegistry = new MetricRegistry();
EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort);

long latency = Instant.now().getEpochSecond();
final double records = 10;
for (int i = 0; i < records; i++) {
EventContext eventContext = eventContextFactory.create();
eventContextConsumer.accept(eventContext);
}

// 5 records for each partition
Gauge<Long> gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "1"));
Gauge<Long> gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "2"));
Assertions.assertDoesNotThrow(eventContextConsumer::close);

// hard to test the exact correct latency
Assertions.assertTrue(gauge1.getValue() >= latency);
Assertions.assertTrue(gauge2.getValue() >= latency);
}
long latency = Instant.now().getEpochSecond();

// 5 records for each partition
Gauge<Long> gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "1"));
Gauge<Long> gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "2"));

// hard to test the exact correct latency
Assertions.assertTrue(gauge1.getValue() >= latency);
Assertions.assertTrue(gauge2.getValue() >= latency);
}

@Test
public void testDepthBytesMetric() throws Exception {
final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
final Sourceable configSource = new PropertySource();
final int prometheusPort = new MetricsConfig(configSource).prometheusPort;
final MetricRegistry metricRegistry = new MetricRegistry();

try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) {
// FIXME: code duplication when initializing without null
EventContext eventContext = eventContextFactory.create();
eventContextConsumer.accept(eventContext);
public void testDepthBytesMetric() {
EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
MetricRegistry metricRegistry = new MetricRegistry();

long depth1 = 0L;
final double records = 10;
for (int i = 1; i < records; i++) { // records - 1 loops
eventContext = eventContextFactory.create();
eventContextConsumer.accept(eventContext);
long depth1 = 0L;
final double records = 10;
EventContext eventContext = eventContextFactory.create();

if (i == 4) {
depth1 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
}
}
EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort);
eventContextConsumer.accept(eventContext);

long depth2 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
Gauge<Long> gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "1"));
Gauge<Long> gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "2"));
for (int i = 1; i < records; i++) { // records - 1 loops
if (i == 5) { // 5 records per partition
depth1 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
}

Assertions.assertEquals(depth1, 99L); // offsets are defined in the factory
Assertions.assertEquals(depth2, 99L);
Assertions.assertEquals(depth1, gauge1.getValue());
Assertions.assertEquals(depth2, gauge2.getValue());
eventContext = eventContextFactory.create();
eventContextConsumer.accept(eventContext);
}

Assertions.assertDoesNotThrow(eventContextConsumer::close);

long depth2 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
Gauge<Long> gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "1"));
Gauge<Long> gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "2"));

Assertions.assertEquals(depth1, 99L); // offsets are defined in the factory
Assertions.assertEquals(depth2, 99L);
Assertions.assertEquals(depth1, gauge1.getValue());
Assertions.assertEquals(depth2, gauge2.getValue());
}

@Test
public void testEstimatedDataDepthMetric() throws Exception {
final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
final Sourceable configSource = new PropertySource();
final int prometheusPort = new MetricsConfig(configSource).prometheusPort;
final MetricRegistry metricRegistry = new MetricRegistry();

try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) {
final double records = 10;
long length = 0L;
for (int i = 0; i < records; i++) {
EventContext eventContext = eventContextFactory.create();
length = length + eventContext.getEventData().getBody().length;
eventContextConsumer.accept(eventContext);
}
public void testEstimatedDataDepthMetric() {
EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory();
MetricRegistry metricRegistry = new MetricRegistry();
EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort);

final double records = 10;
long length = 0L;
for (int i = 0; i < records; i++) {
EventContext eventContext = eventContextFactory.create();
length = length + eventContext.getEventData().getBody().length;
eventContextConsumer.accept(eventContext);
}

Gauge<Long> gauge = metricRegistry.gauge(MetricRegistry.name(EventContextConsumer.class, "estimated-data-depth"));
Double estimatedDepth = (length / records) / records;
Assertions.assertDoesNotThrow(eventContextConsumer::close);

Assertions.assertEquals(estimatedDepth, gauge.getValue());
}
Gauge<Long> gauge = metricRegistry.gauge(MetricRegistry.name(EventContextConsumer.class, "estimated-data-depth"));
Double estimatedDepth = (length / records) / records;

Assertions.assertEquals(estimatedDepth, gauge.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* a licensee so wish it.
*/

package com.teragrep.aer_01;
package com.teragrep.aer_01.fakes;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
Expand Down
Loading

0 comments on commit 670352b

Please sign in to comment.