diff --git a/src/main/java/com/teragrep/aer_01/DefaultOutput.java b/src/main/java/com/teragrep/aer_01/DefaultOutput.java index 056102e..1a82414 100644 --- a/src/main/java/com/teragrep/aer_01/DefaultOutput.java +++ b/src/main/java/com/teragrep/aer_01/DefaultOutput.java @@ -45,9 +45,7 @@ */ package com.teragrep.aer_01; -import com.codahale.metrics.Counter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; +import com.codahale.metrics.*; import com.teragrep.aer_01.config.RelpConfig; import com.teragrep.rlp_01.RelpBatch; import com.teragrep.rlp_01.RelpConnection; @@ -79,15 +77,25 @@ final class DefaultOutput implements Output { private final Timer connectLatency; - DefaultOutput( - String name, - RelpConfig relpConfig, - MetricRegistry metricRegistry) { + DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry) { + this(name, relpConfig, metricRegistry, new RelpConnection()); + } + + DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry, RelpConnection relpConnection) { + this(name, relpConfig, metricRegistry, relpConnection, new SlidingWindowReservoir(10000), new SlidingWindowReservoir(10000)); + } + + DefaultOutput(String name, + RelpConfig relpConfig, + MetricRegistry metricRegistry, + RelpConnection relpConnection, + Reservoir sendReservoir, + Reservoir connectReservoir) { this.relpAddress = relpConfig.destinationAddress; this.relpPort = relpConfig.destinationPort; this.reconnectInterval = relpConfig.reconnectInterval; - this.relpConnection = new RelpConnection(); + this.relpConnection = relpConnection; this.relpConnection.setConnectionTimeout(relpConfig.connectionTimeout); this.relpConnection.setReadTimeout(relpConfig.readTimeout); this.relpConnection.setWriteTimeout(relpConfig.writeTimeout); @@ -97,8 +105,8 @@ final class DefaultOutput implements Output { this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends")); this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects")); this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects")); - this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency")); - this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency")); + this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"), () -> new Timer(sendReservoir)); + this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"), () -> new Timer(connectReservoir)); connect(); } @@ -106,8 +114,15 @@ final class DefaultOutput implements Output { private void connect() { boolean connected = false; while (!connected) { - try (final Timer.Context context = connectLatency.time()) { + final Timer.Context context = connectLatency.time(); // reset the time (new context) + try { connected = this.relpConnection.connect(relpAddress, relpPort); + /* + Not closing the context in case of an exception thrown in .connect() will leave the timer.context + for garbage collector to remove. This will happen even if the context is closed because of how + the Timer is implemented. + */ + context.close(); // manually close here, so the timer is only updated if no exceptions were thrown connects.inc(); } catch (IOException | TimeoutException e) { LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e); diff --git a/src/test/java/com/teragrep/aer_01/DefaultOutputTest.java b/src/test/java/com/teragrep/aer_01/DefaultOutputTest.java new file mode 100644 index 0000000..6cf7639 --- /dev/null +++ b/src/test/java/com/teragrep/aer_01/DefaultOutputTest.java @@ -0,0 +1,153 @@ +/* + * Teragrep Azure Eventhub Reader + * Copyright (C) 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +package com.teragrep.aer_01; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; +import com.teragrep.aer_01.config.RelpConfig; +import com.teragrep.aer_01.config.source.PropertySource; +import com.teragrep.aer_01.fakes.ConnectionlessRelpConnectionFake; +import com.teragrep.aer_01.fakes.RelpConnectionFake; +import com.teragrep.aer_01.fakes.ThrowingRelpConnectionFake; +import com.teragrep.rlo_14.Facility; +import com.teragrep.rlo_14.Severity; +import com.teragrep.rlo_14.SyslogMessage; +import com.teragrep.rlp_01.RelpConnection; +import org.junit.jupiter.api.*; + +import java.nio.charset.StandardCharsets; + +import static com.codahale.metrics.MetricRegistry.name; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class DefaultOutputTest { + + @Test + public void testSendLatencyMetricIsCapped() { // Should only keep information on the last 10.000 messages + SyslogMessage syslogMessage = new SyslogMessage() + .withSeverity(Severity.INFORMATIONAL) + .withFacility(Facility.LOCAL0) + .withMsgId("123") + .withMsg("test"); + + final int measurementLimit = 10000; + + // set up DefaultOutput + MetricRegistry metricRegistry = new MetricRegistry(); + SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit); + SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit); + try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), + metricRegistry, new RelpConnectionFake(), sendReservoir, connectReservoir)) { + + for (int i = 0; i < measurementLimit + 100; i++) { // send more messages than the limit is + output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + } + + Assertions.assertEquals(measurementLimit, sendReservoir.size()); // should have measurementLimit amount of records saved + Assertions.assertEquals(1, connectReservoir.size()); // only connected once + } + + @Test + public void testConnectionLatencyMetricIsCapped() { // Should take information on how long it took to successfully connect + System.setProperty("relp.connection.retry.interval", "1"); + + SyslogMessage syslogMessage = new SyslogMessage() + .withSeverity(Severity.INFORMATIONAL) + .withFacility(Facility.LOCAL0) + .withMsgId("123") + .withMsg("test"); + + final int measurementLimit = 100; + final int reconnections = measurementLimit + 10; + + // set up DefaultOutput + MetricRegistry metricRegistry = new MetricRegistry(); + SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit); + SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit); + RelpConnection relpConnection = new ConnectionlessRelpConnectionFake(reconnections); // use a fake that forces reconnects + try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), + metricRegistry, relpConnection, sendReservoir, connectReservoir)) { + output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + Assertions.assertEquals(1, sendReservoir.size()); // only sent 1 message + Assertions.assertEquals(measurementLimit, connectReservoir.size()); // should have measurementLimit amount of records saved + + System.clearProperty("relp.connection.retry.interval"); + } + + @Test + public void testConnectionLatencyMetricWithException() { // should not update value if an exception was thrown from server + System.setProperty("relp.connection.retry.interval", "1"); + + SyslogMessage syslogMessage = new SyslogMessage() + .withSeverity(Severity.INFORMATIONAL) + .withFacility(Facility.LOCAL0) + .withMsgId("123") + .withMsg("test"); + + final int reconnections = 10; + + // set up DefaultOutput + MetricRegistry metricRegistry = new MetricRegistry(); + RelpConnection relpConnection = new ThrowingRelpConnectionFake(reconnections); // use a fake that throws exceptions when connecting + try (DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), + metricRegistry, relpConnection)) { + output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + Timer sendTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "sendLatency")); + Timer connectionTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "connectLatency")); + + Assertions.assertEquals(1, sendTimer.getCount()); // only sent 1 message + Assertions.assertEquals(1, connectionTimer.getCount()); // only 1 connection attempt without throwing recorded + + System.clearProperty("relp.connection.retry.interval"); + } +} diff --git a/src/test/java/com/teragrep/aer_01/EventContextConsumerTest.java b/src/test/java/com/teragrep/aer_01/EventContextConsumerTest.java index 7687578..04fe238 100644 --- a/src/test/java/com/teragrep/aer_01/EventContextConsumerTest.java +++ b/src/test/java/com/teragrep/aer_01/EventContextConsumerTest.java @@ -52,96 +52,100 @@ import com.teragrep.aer_01.config.MetricsConfig; import com.teragrep.aer_01.config.source.PropertySource; import com.teragrep.aer_01.config.source.Sourceable; +import com.teragrep.aer_01.fakes.CheckpointlessEventContextFactory; +import com.teragrep.aer_01.fakes.EventContextFactory; +import com.teragrep.aer_01.fakes.OutputFake; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import java.time.Instant; import static com.codahale.metrics.MetricRegistry.name; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class EventContextConsumerTest { + private final Sourceable configSource = new PropertySource(); + private final int prometheusPort = new MetricsConfig(configSource).prometheusPort; + @Test - public void testLatencyMetric() throws Exception { - final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); - final Sourceable configSource = new PropertySource(); - final int prometheusPort = new MetricsConfig(configSource).prometheusPort; - final MetricRegistry metricRegistry = new MetricRegistry(); - - try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) { - EventContext eventContext; - final double records = 10; - for (int i = 0; i < records; i++) { - eventContext = eventContextFactory.create(); - eventContextConsumer.accept(eventContext); - } + public void testLatencyMetric() { + EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); + MetricRegistry metricRegistry = new MetricRegistry(); + EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort); - long latency = Instant.now().getEpochSecond(); + final double records = 10; + for (int i = 0; i < records; i++) { + EventContext eventContext = eventContextFactory.create(); + eventContextConsumer.accept(eventContext); + } - // 5 records for each partition - Gauge gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "1")); - Gauge gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "2")); + Assertions.assertDoesNotThrow(eventContextConsumer::close); - // hard to test the exact correct latency - Assertions.assertTrue(gauge1.getValue() >= latency); - Assertions.assertTrue(gauge2.getValue() >= latency); - } + long latency = Instant.now().getEpochSecond(); + + // 5 records for each partition + Gauge gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "1")); + Gauge gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", "2")); + + // hard to test the exact correct latency + Assertions.assertTrue(gauge1.getValue() >= latency); + Assertions.assertTrue(gauge2.getValue() >= latency); } @Test - public void testDepthBytesMetric() throws Exception { - final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); - final Sourceable configSource = new PropertySource(); - final int prometheusPort = new MetricsConfig(configSource).prometheusPort; - final MetricRegistry metricRegistry = new MetricRegistry(); - - try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) { - // FIXME: code duplication when initializing without null - EventContext eventContext = eventContextFactory.create(); - eventContextConsumer.accept(eventContext); + public void testDepthBytesMetric() { + EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); + MetricRegistry metricRegistry = new MetricRegistry(); - long depth1 = 0L; - final double records = 10; - for (int i = 1; i < records; i++) { // records - 1 loops - eventContext = eventContextFactory.create(); - eventContextConsumer.accept(eventContext); + long depth1 = 0L; + final double records = 10; + EventContext eventContext = eventContextFactory.create(); - if (i == 4) { - depth1 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset(); - } - } + EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort); + eventContextConsumer.accept(eventContext); - long depth2 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset(); - Gauge gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "1")); - Gauge gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "2")); + for (int i = 1; i < records; i++) { // records - 1 loops + if (i == 5) { // 5 records per partition + depth1 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset(); + } - Assertions.assertEquals(depth1, 99L); // offsets are defined in the factory - Assertions.assertEquals(depth2, 99L); - Assertions.assertEquals(depth1, gauge1.getValue()); - Assertions.assertEquals(depth2, gauge2.getValue()); + eventContext = eventContextFactory.create(); + eventContextConsumer.accept(eventContext); } + + Assertions.assertDoesNotThrow(eventContextConsumer::close); + + long depth2 = eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset(); + Gauge gauge1 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "1")); + Gauge gauge2 = metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", "2")); + + Assertions.assertEquals(depth1, 99L); // offsets are defined in the factory + Assertions.assertEquals(depth2, 99L); + Assertions.assertEquals(depth1, gauge1.getValue()); + Assertions.assertEquals(depth2, gauge2.getValue()); } @Test - public void testEstimatedDataDepthMetric() throws Exception { - final EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); - final Sourceable configSource = new PropertySource(); - final int prometheusPort = new MetricsConfig(configSource).prometheusPort; - final MetricRegistry metricRegistry = new MetricRegistry(); - - try (EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort)) { - final double records = 10; - long length = 0L; - for (int i = 0; i < records; i++) { - EventContext eventContext = eventContextFactory.create(); - length = length + eventContext.getEventData().getBody().length; - eventContextConsumer.accept(eventContext); - } + public void testEstimatedDataDepthMetric() { + EventContextFactory eventContextFactory = new CheckpointlessEventContextFactory(); + MetricRegistry metricRegistry = new MetricRegistry(); + EventContextConsumer eventContextConsumer = new EventContextConsumer(configSource, new OutputFake(), metricRegistry, prometheusPort); + + final double records = 10; + long length = 0L; + for (int i = 0; i < records; i++) { + EventContext eventContext = eventContextFactory.create(); + length = length + eventContext.getEventData().getBody().length; + eventContextConsumer.accept(eventContext); + } - Gauge gauge = metricRegistry.gauge(MetricRegistry.name(EventContextConsumer.class, "estimated-data-depth")); - Double estimatedDepth = (length / records) / records; + Assertions.assertDoesNotThrow(eventContextConsumer::close); - Assertions.assertEquals(estimatedDepth, gauge.getValue()); - } + Gauge gauge = metricRegistry.gauge(MetricRegistry.name(EventContextConsumer.class, "estimated-data-depth")); + Double estimatedDepth = (length / records) / records; + + Assertions.assertEquals(estimatedDepth, gauge.getValue()); } } diff --git a/src/test/java/com/teragrep/aer_01/CheckpointStoreFake.java b/src/test/java/com/teragrep/aer_01/fakes/CheckpointStoreFake.java similarity index 98% rename from src/test/java/com/teragrep/aer_01/CheckpointStoreFake.java rename to src/test/java/com/teragrep/aer_01/fakes/CheckpointStoreFake.java index 1907cce..9ca22b6 100644 --- a/src/test/java/com/teragrep/aer_01/CheckpointStoreFake.java +++ b/src/test/java/com/teragrep/aer_01/fakes/CheckpointStoreFake.java @@ -44,7 +44,7 @@ * a licensee so wish it. */ -package com.teragrep.aer_01; +package com.teragrep.aer_01.fakes; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.models.Checkpoint; diff --git a/src/test/java/com/teragrep/aer_01/CheckpointlessEventContextFactory.java b/src/test/java/com/teragrep/aer_01/fakes/CheckpointlessEventContextFactory.java similarity index 98% rename from src/test/java/com/teragrep/aer_01/CheckpointlessEventContextFactory.java rename to src/test/java/com/teragrep/aer_01/fakes/CheckpointlessEventContextFactory.java index 46353cd..82a0bfe 100644 --- a/src/test/java/com/teragrep/aer_01/CheckpointlessEventContextFactory.java +++ b/src/test/java/com/teragrep/aer_01/fakes/CheckpointlessEventContextFactory.java @@ -44,7 +44,7 @@ * a licensee so wish it. */ -package com.teragrep.aer_01; +package com.teragrep.aer_01.fakes; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventData; diff --git a/src/test/java/com/teragrep/aer_01/fakes/ConnectionlessRelpConnectionFake.java b/src/test/java/com/teragrep/aer_01/fakes/ConnectionlessRelpConnectionFake.java new file mode 100644 index 0000000..a07b138 --- /dev/null +++ b/src/test/java/com/teragrep/aer_01/fakes/ConnectionlessRelpConnectionFake.java @@ -0,0 +1,67 @@ +/* + * Teragrep Azure Eventhub Reader + * Copyright (C) 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +package com.teragrep.aer_01.fakes; + +/** + * Fake that reconnects until the amount of reconnects reach the given limit. + */ +public class ConnectionlessRelpConnectionFake extends RelpConnectionFake { + + private final int limit; + private int timesConnected = 0; + + public ConnectionlessRelpConnectionFake(int limit) { + super(); + this.limit = limit; + } + + @Override + public boolean connect(String hostname, int port) { + timesConnected++; + return timesConnected >= limit; + } +} diff --git a/src/test/java/com/teragrep/aer_01/EventContextFactory.java b/src/test/java/com/teragrep/aer_01/fakes/EventContextFactory.java similarity index 98% rename from src/test/java/com/teragrep/aer_01/EventContextFactory.java rename to src/test/java/com/teragrep/aer_01/fakes/EventContextFactory.java index 0766878..54e9b95 100644 --- a/src/test/java/com/teragrep/aer_01/EventContextFactory.java +++ b/src/test/java/com/teragrep/aer_01/fakes/EventContextFactory.java @@ -44,7 +44,7 @@ * a licensee so wish it. */ -package com.teragrep.aer_01; +package com.teragrep.aer_01.fakes; import com.azure.messaging.eventhubs.models.EventContext; diff --git a/src/test/java/com/teragrep/aer_01/EventDataFake.java b/src/test/java/com/teragrep/aer_01/fakes/EventDataFake.java similarity index 98% rename from src/test/java/com/teragrep/aer_01/EventDataFake.java rename to src/test/java/com/teragrep/aer_01/fakes/EventDataFake.java index 99ef570..12b133b 100644 --- a/src/test/java/com/teragrep/aer_01/EventDataFake.java +++ b/src/test/java/com/teragrep/aer_01/fakes/EventDataFake.java @@ -44,7 +44,7 @@ * a licensee so wish it. */ -package com.teragrep.aer_01; +package com.teragrep.aer_01.fakes; import com.azure.messaging.eventhubs.EventData; diff --git a/src/test/java/com/teragrep/aer_01/OutputFake.java b/src/test/java/com/teragrep/aer_01/fakes/OutputFake.java similarity index 96% rename from src/test/java/com/teragrep/aer_01/OutputFake.java rename to src/test/java/com/teragrep/aer_01/fakes/OutputFake.java index 49a8839..a118e63 100644 --- a/src/test/java/com/teragrep/aer_01/OutputFake.java +++ b/src/test/java/com/teragrep/aer_01/fakes/OutputFake.java @@ -44,7 +44,9 @@ * a licensee so wish it. */ -package com.teragrep.aer_01; +package com.teragrep.aer_01.fakes; + +import com.teragrep.aer_01.Output; public final class OutputFake implements Output { @Override diff --git a/src/test/java/com/teragrep/aer_01/fakes/RelpConnectionFake.java b/src/test/java/com/teragrep/aer_01/fakes/RelpConnectionFake.java new file mode 100644 index 0000000..e83d1f1 --- /dev/null +++ b/src/test/java/com/teragrep/aer_01/fakes/RelpConnectionFake.java @@ -0,0 +1,95 @@ +/* + * Teragrep Azure Eventhub Reader + * Copyright (C) 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +package com.teragrep.aer_01.fakes; + +import com.teragrep.rlp_01.RelpBatch; +import com.teragrep.rlp_01.RelpConnection; + +import java.io.IOException; + +public class RelpConnectionFake extends RelpConnection { + + @Override + public void setReadTimeout(int readTimeout) { + // no-op in fake + } + + @Override + public void setWriteTimeout(int writeTimeout) { + // no-op in fake + } + + @Override + public void setConnectionTimeout(int timeout) { + // no-op in fake + } + + @Override + public boolean connect(String hostname, int port) throws IOException { + return true; + } + + @Override + public void tearDown() { + // no-op in fake + } + + @Override + public boolean disconnect() { + return true; + } + + @Override + public void commit(RelpBatch relpBatch) { + // remove all the requests from relpBatch in the fake + // so that the batch will return true in verifyTransactionAll() + while (relpBatch.getWorkQueueLength() > 0) { + long reqId = relpBatch.popWorkQueue(); + relpBatch.removeRequest(reqId); + } + } +} diff --git a/src/test/java/com/teragrep/aer_01/fakes/ThrowingRelpConnectionFake.java b/src/test/java/com/teragrep/aer_01/fakes/ThrowingRelpConnectionFake.java new file mode 100644 index 0000000..ed56874 --- /dev/null +++ b/src/test/java/com/teragrep/aer_01/fakes/ThrowingRelpConnectionFake.java @@ -0,0 +1,72 @@ +/* + * Teragrep Azure Eventhub Reader + * Copyright (C) 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +package com.teragrep.aer_01.fakes; + +import java.io.IOException; + +/** + * Fake that throws an exception when connecting until the amount of connect attempts reach the given limit. + */ +public class ThrowingRelpConnectionFake extends RelpConnectionFake { + + private final int limit; + private int timesConnected = 0; + + public ThrowingRelpConnectionFake(int limit) { + super(); + this.limit = limit; + } + + @Override + public boolean connect(String hostname, int port) throws IOException, IllegalStateException { + timesConnected++; + if (timesConnected < limit) { + throw new IOException("Fake exception"); + } + return super.connect(hostname, port); + } +}