Skip to content

Commit

Permalink
Fix e2e acks test (#3471) (#3485)
Browse files Browse the repository at this point in the history
* Disable flaky e2e acks test

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Disabled another flaky test

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Added debug statements to debug the failing tests

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Modified to assign unique names to pipelines

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Trying with enabling the disabled test

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Fixed failing checkstyle error

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Reduced sleep time in InMemorySource

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Modified to use log4j

Signed-off-by: Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Kondaka <krishkdk@amazon.com>
(cherry picked from commit 400b2a8)

Co-authored-by: kkondaka <41027584+kkondaka@users.noreply.github.com>
  • Loading branch information
opensearch-trigger-bot[bot] and kkondaka authored Oct 11, 2023
1 parent 3379fed commit 0866e80
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.Assert.assertFalse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Instant;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -25,7 +28,7 @@
import static org.hamcrest.Matchers.empty;

class PipelinesWithAcksIT {

private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class);
private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT";
private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml";
private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml";
Expand All @@ -44,13 +47,15 @@ void setUp(String configFile) {
.withPipelinesDirectoryOrFile(configFile)
.build();

LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
dataPrepperTestRunner.start();
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
}

@AfterEach
void tearDown() {
LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now());
dataPrepperTestRunner.stop();
}

Expand All @@ -60,7 +65,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(2000, TimeUnit.MILLISECONDS)
await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -76,7 +81,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(2000, TimeUnit.MILLISECONDS)
await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -91,7 +96,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(5000, TimeUnit.MILLISECONDS)
await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
try {
final List<Record<Event>> records = inMemorySourceAccessor.read(testingKey);
if (records.size() == 0) {
Thread.sleep(1000);
Thread.sleep(50);
continue;
}
AcknowledgementSet ackSet =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipeline1:
pipeline-expiry-test:
delay: 2
source:
in_memory:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipeline1:
pipeline-three-sinks-test:
delay: 2
source:
in_memory:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
simple-pipeline:
simple-pipeline-test:
delay: 10
source:
in_memory:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipeline1:
three-pipelines-route-test-1:
delay: 2
source:
in_memory:
Expand All @@ -9,27 +9,27 @@ pipeline1:
- other_route: '/status >= 300 or /status < 200'
sink:
- pipeline:
name: "pipeline2"
name: "three-pipelines-route-test-2"
routes:
- 2xx_route
- pipeline:
name: "pipeline3"
name: "three-pipelines-route-test-3"
routes:
- other_route

pipeline2:
three-pipelines-route-test-2:
source:
pipeline:
name: "pipeline1"
name: "three-pipelines-route-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

pipeline3:
three-pipelines-route-test-3:
source:
pipeline:
name: "pipeline1"
name: "three-pipelines-route-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipeline1:
three-pipelines-multi-sink-1:
delay: 2
source:
in_memory:
Expand All @@ -9,23 +9,23 @@ pipeline1:
testing_key: PipelinesWithAcksIT
acknowledgments: true
- pipeline:
name: "pipeline2"
name: "three-pipelines-multi-sink-2"

pipeline2:
three-pipelines-multi-sink-2:
source:
pipeline:
name: "pipeline1"
name: "three-pipelines-multi-sink-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
- pipeline:
name: "pipeline3"
name: "three-pipelines-multi-sink-3"

pipeline3:
three-pipelines-multi-sink-3:
source:
pipeline:
name: "pipeline2"
name: "three-pipelines-multi-sink-2"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
pipeline1:
three-pipelines-test-1:
delay: 2
source:
in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
sink:
- pipeline:
name: "pipeline2"
name: "three-pipelines-test-2"

pipeline2:
three-pipelines-test-2:
source:
pipeline:
name: "pipeline1"
name: "three-pipelines-test-1"
sink:
- pipeline:
name: "pipeline3"
name: "three-pipelines-test-3"

pipeline3:
three-pipelines-test-3:
source:
pipeline:
name: "pipeline2"
name: "three-pipelines-test-2"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
pipeline1:
two-parallel-pipelines-test-1:
delay: 2
source:
in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
sink:
- pipeline:
name: "pipeline2"
name: "two-parallel-pipelines-test-2"
- pipeline:
name: "pipeline3"
name: "two-parallel-pipelines-test-3"

pipeline2:
two-parallel-pipelines-test-2:
source:
pipeline:
name: "pipeline1"
name: "two-parallel-pipelines-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true

pipeline3:
two-parallel-pipelines-test-3:
source:
pipeline:
name: "pipeline1"
name: "two-parallel-pipelines-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
pipeline1:
two-pipelines-test-1:
delay: 2
source:
in_memory:
testing_key: PipelinesWithAcksIT
acknowledgments: true
sink:
- pipeline:
name: "pipeline2"
name: "two-pipelines-test-2"

pipeline2:
two-pipelines-test-2:
source:
pipeline:
name: "pipeline1"
name: "two-pipelines-test-1"
sink:
- in_memory:
testing_key: PipelinesWithAcksIT
Expand Down

0 comments on commit 0866e80

Please sign in to comment.