From 304596730367ff850bf9be8944ab8ad9c6f742c4 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 08:38:47 -0700 Subject: [PATCH 1/8] Disable flaky e2e acks test Signed-off-by: Kondaka --- .../opensearch/dataprepper/integration/PipelinesWithAcksIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index f0098b4569..c17eb2af63 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Disabled; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -161,6 +162,7 @@ void three_pipelines_multi_sink_multiple_records() { } @Test + @Disabled("Disabling because this test is flaky.") void one_pipeline_three_sinks_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); final int numRecords = 100; From 7f1fb70e99cdca9bc457b3adbd44474c8aef1195 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 10:23:28 -0700 Subject: [PATCH 2/8] Disabled another flaky test Signed-off-by: Kondaka --- .../opensearch/dataprepper/integration/PipelinesWithAcksIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index c17eb2af63..b7183d88a2 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -56,6 +56,7 @@ void tearDown() { } @Test + @Disabled("Disabling because this test is flaky.") void simple_pipeline_with_single_record() { setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST); final int numRecords = 1; From cba386f875ad7d5255cfadd78af2cfd40282209a Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 11:16:17 -0700 Subject: [PATCH 3/8] Added debug statements to debug the failing tests Signed-off-by: Kondaka --- .../dataprepper/integration/PipelinesWithAcksIT.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index b7183d88a2..c30d39b34b 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import java.time.Instant; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; @@ -46,23 +47,24 @@ void setUp(String configFile) { .build(); dataPrepperTestRunner.start(); + System.out.println("Data Prepper Test started at "+Instant.now()); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); } @AfterEach void tearDown() { + System.out.println("Data Prepper Test stopped at "+Instant.now()); dataPrepperTestRunner.stop(); } @Test - @Disabled("Disabling because this test is flaky.") void simple_pipeline_with_single_record() { setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST); final int numRecords = 1; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(2000, TimeUnit.MILLISECONDS) + await().atMost(20000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -78,7 +80,7 @@ void simple_pipeline_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(2000, TimeUnit.MILLISECONDS) + await().atMost(20000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -93,7 +95,7 @@ void two_pipelines_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(5000, TimeUnit.MILLISECONDS) + await().atMost(20000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); From 77eda6052fe3098fe953363b390640504613f36d Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 11:54:45 -0700 Subject: [PATCH 4/8] Modified to assign unique names to pipelines Signed-off-by: Kondaka --- .../one-pipeline-ack-expiry-test.yaml | 2 +- .../acknowledgements/one-pipeline-three-sinks.yaml | 2 +- .../pipeline/acknowledgements/simple-test.yaml | 2 +- .../three-pipeline-route-test.yaml | 14 +++++++------- .../three-pipelines-test-multi-sink.yaml | 14 +++++++------- .../acknowledgements/three-pipelines-test.yaml | 14 +++++++------- .../two-parallel-pipelines-test.yaml | 14 +++++++------- .../acknowledgements/two-pipelines-test.yaml | 8 ++++---- 8 files changed, 35 insertions(+), 35 deletions(-) diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-ack-expiry-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-ack-expiry-test.yaml index 1d329ac7c2..3a43cd7485 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-ack-expiry-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-ack-expiry-test.yaml @@ -1,4 +1,4 @@ -pipeline1: +pipeline-expiry-test: delay: 2 source: in_memory: diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-three-sinks.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-three-sinks.yaml index 6f6fe666e5..eecc2d3b30 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-three-sinks.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/one-pipeline-three-sinks.yaml @@ -1,4 +1,4 @@ -pipeline1: +pipeline-three-sinks-test: delay: 2 source: in_memory: diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/simple-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/simple-test.yaml index 9aba22a13f..5af7065e3d 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/simple-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/simple-test.yaml @@ -1,4 +1,4 @@ -simple-pipeline: +simple-pipeline-test: delay: 10 source: in_memory: diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipeline-route-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipeline-route-test.yaml index be26078ef0..2e7f45228c 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipeline-route-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipeline-route-test.yaml @@ -1,4 +1,4 @@ -pipeline1: +three-pipelines-route-test-1: delay: 2 source: in_memory: @@ -9,27 +9,27 @@ pipeline1: - other_route: '/status >= 300 or /status < 200' sink: - pipeline: - name: "pipeline2" + name: "three-pipelines-route-test-2" routes: - 2xx_route - pipeline: - name: "pipeline3" + name: "three-pipelines-route-test-3" routes: - other_route -pipeline2: +three-pipelines-route-test-2: source: pipeline: - name: "pipeline1" + name: "three-pipelines-route-test-1" sink: - in_memory: testing_key: PipelinesWithAcksIT acknowledgments: true -pipeline3: +three-pipelines-route-test-3: source: pipeline: - name: "pipeline1" + name: "three-pipelines-route-test-1" sink: - in_memory: testing_key: PipelinesWithAcksIT diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test-multi-sink.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test-multi-sink.yaml index 7f804ef3f7..1fa9ce6d4f 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test-multi-sink.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test-multi-sink.yaml @@ -1,4 +1,4 @@ -pipeline1: +three-pipelines-multi-sink-1: delay: 2 source: in_memory: @@ -9,23 +9,23 @@ pipeline1: testing_key: PipelinesWithAcksIT acknowledgments: true - pipeline: - name: "pipeline2" + name: "three-pipelines-multi-sink-2" -pipeline2: +three-pipelines-multi-sink-2: source: pipeline: - name: "pipeline1" + name: "three-pipelines-multi-sink-1" sink: - in_memory: testing_key: PipelinesWithAcksIT acknowledgments: true - pipeline: - name: "pipeline3" + name: "three-pipelines-multi-sink-3" -pipeline3: +three-pipelines-multi-sink-3: source: pipeline: - name: "pipeline2" + name: "three-pipelines-multi-sink-2" sink: - in_memory: testing_key: PipelinesWithAcksIT diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test.yaml index 90dd938c7b..e6211fff15 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/three-pipelines-test.yaml @@ -1,4 +1,4 @@ -pipeline1: +three-pipelines-test-1: delay: 2 source: in_memory: @@ -6,20 +6,20 @@ pipeline1: acknowledgments: true sink: - pipeline: - name: "pipeline2" + name: "three-pipelines-test-2" -pipeline2: +three-pipelines-test-2: source: pipeline: - name: "pipeline1" + name: "three-pipelines-test-1" sink: - pipeline: - name: "pipeline3" + name: "three-pipelines-test-3" -pipeline3: +three-pipelines-test-3: source: pipeline: - name: "pipeline2" + name: "three-pipelines-test-2" sink: - in_memory: testing_key: PipelinesWithAcksIT diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-parallel-pipelines-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-parallel-pipelines-test.yaml index dcae544e62..750f7bbf61 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-parallel-pipelines-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-parallel-pipelines-test.yaml @@ -1,4 +1,4 @@ -pipeline1: +two-parallel-pipelines-test-1: delay: 2 source: in_memory: @@ -6,23 +6,23 @@ pipeline1: acknowledgments: true sink: - pipeline: - name: "pipeline2" + name: "two-parallel-pipelines-test-2" - pipeline: - name: "pipeline3" + name: "two-parallel-pipelines-test-3" -pipeline2: +two-parallel-pipelines-test-2: source: pipeline: - name: "pipeline1" + name: "two-parallel-pipelines-test-1" sink: - in_memory: testing_key: PipelinesWithAcksIT acknowledgments: true -pipeline3: +two-parallel-pipelines-test-3: source: pipeline: - name: "pipeline1" + name: "two-parallel-pipelines-test-1" sink: - in_memory: testing_key: PipelinesWithAcksIT diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-pipelines-test.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-pipelines-test.yaml index 22af39810d..4bbd8b3244 100644 --- a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-pipelines-test.yaml +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/two-pipelines-test.yaml @@ -1,4 +1,4 @@ -pipeline1: +two-pipelines-test-1: delay: 2 source: in_memory: @@ -6,12 +6,12 @@ pipeline1: acknowledgments: true sink: - pipeline: - name: "pipeline2" + name: "two-pipelines-test-2" -pipeline2: +two-pipelines-test-2: source: pipeline: - name: "pipeline1" + name: "two-pipelines-test-1" sink: - in_memory: testing_key: PipelinesWithAcksIT From 93f09134962ee20a86776841996d6965cfff1a94 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 12:23:44 -0700 Subject: [PATCH 5/8] Trying with enabling the disabled test Signed-off-by: Kondaka --- .../dataprepper/integration/PipelinesWithAcksIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index c30d39b34b..69bd058efc 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -46,8 +46,8 @@ void setUp(String configFile) { .withPipelinesDirectoryOrFile(configFile) .build(); - dataPrepperTestRunner.start(); System.out.println("Data Prepper Test started at "+Instant.now()); + dataPrepperTestRunner.start(); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); } @@ -165,7 +165,6 @@ void three_pipelines_multi_sink_multiple_records() { } @Test - @Disabled("Disabling because this test is flaky.") void one_pipeline_three_sinks_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); final int numRecords = 100; From d86f3b3812d97428758d4193a0a76a0a45e9fd91 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 12:40:12 -0700 Subject: [PATCH 6/8] Fixed failing checkstyle error Signed-off-by: Kondaka --- .../opensearch/dataprepper/integration/PipelinesWithAcksIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index 69bd058efc..996c199d6c 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Disabled; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; From ce2f66c725df6316bcd71f6860baf6446ebce252 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 10 Oct 2023 16:09:47 -0700 Subject: [PATCH 7/8] Reduced sleep time in InMemorySource Signed-off-by: Kondaka --- .../opensearch/dataprepper/integration/PipelinesWithAcksIT.java | 2 +- .../java/org/opensearch/dataprepper/plugins/InMemorySource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index 996c199d6c..61b7dcea55 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -45,7 +45,7 @@ void setUp(String configFile) { .withPipelinesDirectoryOrFile(configFile) .build(); - System.out.println("Data Prepper Test started at "+Instant.now()); + System.out.println("Data Prepper Test with config file "+ configFile + " started at "+Instant.now()); dataPrepperTestRunner.start(); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java index 133340cc39..3afd17554c 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java @@ -115,7 +115,7 @@ public void run() { try { final List> records = inMemorySourceAccessor.read(testingKey); if (records.size() == 0) { - Thread.sleep(1000); + Thread.sleep(50); continue; } AcknowledgementSet ackSet = From 45c9483513895af1bee98c19fd332ef30fd2c015 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Wed, 11 Oct 2023 11:35:14 -0700 Subject: [PATCH 8/8] Modified to use log4j Signed-off-by: Kondaka --- .../dataprepper/integration/PipelinesWithAcksIT.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index 61b7dcea55..b8b12cb56b 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -14,6 +14,8 @@ import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.Assert.assertFalse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.TimeUnit; @@ -26,7 +28,7 @@ import static org.hamcrest.Matchers.empty; class PipelinesWithAcksIT { - + private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class); private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT"; private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml"; private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml"; @@ -45,7 +47,7 @@ void setUp(String configFile) { .withPipelinesDirectoryOrFile(configFile) .build(); - System.out.println("Data Prepper Test with config file "+ configFile + " started at "+Instant.now()); + LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now()); dataPrepperTestRunner.start(); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); @@ -53,7 +55,7 @@ void setUp(String configFile) { @AfterEach void tearDown() { - System.out.println("Data Prepper Test stopped at "+Instant.now()); + LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now()); dataPrepperTestRunner.stop(); }