diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteManyIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteManyIntegrationTest.java index 6df31b5b21..3113107ba3 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteManyIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteManyIntegrationTest.java @@ -19,6 +19,7 @@ import io.stargate.sgv2.jsonapi.testresource.DseTestResource; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.RepeatedTest; @@ -297,37 +298,57 @@ public void concurrentDeletes() throws Exception { """; // start all threads AtomicInteger reportedDeletions = new AtomicInteger(0); + AtomicReferenceArray exceptions = new AtomicReferenceArray<>(threads); for (int i = 0; i < threads; i++) { + int index = i; new Thread( () -> { - Integer deletedCount = - given() - .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) - .contentType(ContentType.JSON) - .body(deleteJson) - .when() - .post( - CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) - .then() - .statusCode(200) - .body( - "status.deletedCount", - anyOf(greaterThanOrEqualTo(0), lessThanOrEqualTo(totalDocuments))) - .body("errors", is(nullValue())) - .extract() - .path("status.deletedCount"); - - // add reported deletes - reportedDeletions.addAndGet(deletedCount); - - // count down - latch.countDown(); + try { + Integer deletedCount = + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(deleteJson) + .when() + .post( + CollectionResource.BASE_PATH, + keyspaceId.asInternal(), + collectionName) + .then() + .statusCode(200) + .body( + "status.deletedCount", + anyOf(greaterThanOrEqualTo(0), lessThanOrEqualTo(totalDocuments))) + .body("errors", is(nullValue())) + .extract() + .path("status.deletedCount"); + + // add reported deletes + reportedDeletions.addAndGet(deletedCount); + } catch (Exception e) { + + // set exception so we can rethrow + exceptions.set(index, e); + } finally { + + // count down + latch.countDown(); + } }) .start(); } latch.await(); + // check if there are any exceptions + // throw first that is seen + for (int i = 0; i < threads; i++) { + Exception exception = exceptions.get(i); + if (null != exception) { + throw exception; + } + } + // assert reported deletes are exactly one assertThat(reportedDeletions.get()).isEqualTo(totalDocuments); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteOneIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteOneIntegrationTest.java index a0b8f23a6c..d1f6e34988 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteOneIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/DeleteOneIntegrationTest.java @@ -16,6 +16,7 @@ import io.stargate.sgv2.jsonapi.testresource.DseTestResource; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; @@ -326,35 +327,55 @@ public void concurrentDeletes() throws Exception { """; // start all threads AtomicInteger reportedDeletions = new AtomicInteger(0); + AtomicReferenceArray exceptions = new AtomicReferenceArray<>(threads); for (int i = 0; i < threads; i++) { + int index = i; new Thread( () -> { - Integer deletedCount = - given() - .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) - .contentType(ContentType.JSON) - .body(deleteJson) - .when() - .post( - CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) - .then() - .statusCode(200) - .body("status.deletedCount", anyOf(is(0), is(1))) - .body("errors", is(nullValue())) - .extract() - .path("status.deletedCount"); - - // add reported deletes - reportedDeletions.addAndGet(deletedCount); - - // count down - latch.countDown(); + try { + Integer deletedCount = + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(deleteJson) + .when() + .post( + CollectionResource.BASE_PATH, + keyspaceId.asInternal(), + collectionName) + .then() + .statusCode(200) + .body("status.deletedCount", anyOf(is(0), is(1))) + .body("errors", is(nullValue())) + .extract() + .path("status.deletedCount"); + + // add reported deletes + reportedDeletions.addAndGet(deletedCount); + } catch (Exception e) { + + // set exception so we can rethrow + exceptions.set(index, e); + } finally { + + // count down + latch.countDown(); + } }) .start(); } latch.await(); + // check if there are any exceptions + // throw first that is seen + for (int i = 0; i < threads; i++) { + Exception exception = exceptions.get(i); + if (null != exception) { + throw exception; + } + } + // assert reported deletes are exactly one assertThat(reportedDeletions.get()).isOne(); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java index a9eef184f6..9061926a14 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java @@ -14,6 +14,7 @@ import io.stargate.sgv2.api.common.config.constants.HttpConstants; import io.stargate.sgv2.jsonapi.testresource.DseTestResource; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.RepeatedTest; @@ -496,29 +497,47 @@ public void concurrentUpdates() throws Exception { } """; // start all threads + AtomicReferenceArray exceptions = new AtomicReferenceArray<>(threads); for (int i = 0; i < threads; i++) { + int index = i; new Thread( () -> { - given() - .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) - .contentType(ContentType.JSON) - .body(updateJson) - .when() - .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) - .then() - .statusCode(200) - .body("status.matchedCount", is(5)) - .body("status.modifiedCount", is(5)) - .body("errors", is(nullValue())); - - // count down - latch.countDown(); + try { + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(updateJson) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("status.matchedCount", is(5)) + .body("status.modifiedCount", is(5)) + .body("errors", is(nullValue())); + } catch (Exception e) { + + // set exception so we can rethrow + exceptions.set(index, e); + } finally { + + // count down + latch.countDown(); + } }) .start(); } latch.await(); + // check if there are any exceptions + // throw first that is seen + for (int i = 0; i < threads; i++) { + Exception exception = exceptions.get(i); + if (null != exception) { + throw exception; + } + } + // assert state after all updates String findJson = """ diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateOneIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateOneIntegrationTest.java index a5d8fdd5a1..2eb3f18343 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateOneIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateOneIntegrationTest.java @@ -13,6 +13,7 @@ import io.stargate.sgv2.api.common.config.constants.HttpConstants; import io.stargate.sgv2.jsonapi.testresource.DseTestResource; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.RepeatedTest; @@ -1744,29 +1745,47 @@ public void concurrentUpdates() throws Exception { } """; // start all threads + AtomicReferenceArray exceptions = new AtomicReferenceArray<>(threads); for (int i = 0; i < threads; i++) { + int index = i; new Thread( () -> { - given() - .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) - .contentType(ContentType.JSON) - .body(updateJson) - .when() - .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) - .then() - .statusCode(200) - .body("status.matchedCount", is(1)) - .body("status.modifiedCount", is(1)) - .body("errors", is(nullValue())); - - // count down - latch.countDown(); + try { + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(updateJson) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("status.matchedCount", is(1)) + .body("status.modifiedCount", is(1)) + .body("errors", is(nullValue())); + } catch (Exception e) { + + // set exception so we can rethrow + exceptions.set(index, e); + } finally { + + // count down + latch.countDown(); + } }) .start(); } latch.await(); + // check if there are any exceptions + // throw first that is seen + for (int i = 0; i < threads; i++) { + Exception exception = exceptions.get(i); + if (null != exception) { + throw exception; + } + } + // assert state after all updates String expectedDoc = """