Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for operation attempt #1472

Merged
merged 29 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a22b355
Start refactoring CollectionSchemaObject
amorton Sep 13, 2024
04031fb
WIP refactoring to add OperationAttempt
amorton Sep 19, 2024
5fc7124
WIP - OperationAttempt with read path
amorton Sep 22, 2024
10a9581
WIP - added SchemaOperationAttempt
amorton Sep 23, 2024
d126132
Fix failing unit test shredderFailure()
amorton Sep 23, 2024
f207951
fmt:fmt
amorton Sep 23, 2024
b83a16f
WIP GeneralOperation and InsertAttemptPage
amorton Sep 25, 2024
81c3e09
Integration test fixes
amorton Sep 25, 2024
13b1008
unit test fixes
amorton Sep 25, 2024
a17d2f3
adding comments and code tidy
amorton Sep 26, 2024
f643396
trying to get the response for insertId's correct
amorton Sep 26, 2024
bcc3a9c
trying to get insertedId's correct again
amorton Sep 26, 2024
5cedeba
Test fixes
amorton Sep 26, 2024
5812835
fmt
amorton Sep 26, 2024
93a1369
comments and cleanup
amorton Sep 26, 2024
cf249c3
Merge branch 'main' into ajm/start-collection-refactor
amorton Sep 26, 2024
b347d40
Merge branch 'ajm/start-collection-refactor' into ajm/#1424-refactor-…
amorton Sep 26, 2024
71e20fb
Add CommandResultBuilder
amorton Sep 26, 2024
5cb12bd
Include errorID in response
amorton Sep 26, 2024
7ad4be0
remove log that should not be there
amorton Sep 26, 2024
260282b
move RequestContext into CommandQueryExecutor
amorton Sep 26, 2024
482b44a
comments and tidy
amorton Sep 26, 2024
b4e777a
fix: SchemaAttempt was calling wrong execute
amorton Sep 26, 2024
5b8bea7
compile fix
amorton Sep 26, 2024
57da595
Merge branch 'main' into ajm/#1424-refactor-TableInsertAttempt
amorton Sep 27, 2024
a5a2d66
Unit tests for OperationAttempt
amorton Sep 30, 2024
4b32475
spelling
amorton Oct 1, 2024
346afa9
Merge branch 'main' into ajm/unit-tests-for-OperationAttempt
amorton Oct 1, 2024
848ca55
Merge branch 'main' into ajm/unit-tests-for-OperationAttempt
amorton Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected InsertAttempt(
}

@Override
protected Uni<AsyncResultSet> execute(CommandQueryExecutor queryExecutor) {
protected Uni<AsyncResultSet> executeStatement(CommandQueryExecutor queryExecutor) {
// bind and execute
var statement = buildInsertStatement();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,23 +121,15 @@ public Uni<SubT> execute(
getClass().getSimpleName(),
positionAndAttemptId());
}
// First things first: did we already fail? If so we do not execute, we can just return self.
// Note we do not return a Failure UNI, we just return self, because the state of the attempt is
// tracked in object
if (status() == OperationStatus.ERROR) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"execute() - state was {}, will not execute {}", status(), positionAndAttemptId());
}

if (!swapStatus("start execute()", OperationStatus.READY, OperationStatus.IN_PROGRESS)) {
return Uni.createFrom().item(downcast());
}

swapStatus("start execute()", OperationStatus.READY, OperationStatus.IN_PROGRESS);

return Uni.createFrom()
.item(() -> execute(queryExecutor)) // Wraps any error from execute() into a Uni
.item(() -> executeIfInProgress(queryExecutor)) // Wraps any error from execute() into a Uni
.flatMap(uni -> uni) // Unwrap Uni<Uni<AsyncResultSet>> to Uni<AsyncResultSet>
.onFailure(throwable -> decideRetry(throwable))
.onFailure(this::decideRetry)
.retry()
.withBackOff(retryPolicy.delay(), retryPolicy.delay())
.atMost(retryPolicy.maxRetries())
Expand All @@ -154,16 +146,39 @@ public Uni<SubT> execute(
});
}

protected Uni<AsyncResultSet> executeIfInProgress(CommandQueryExecutor queryExecutor) {
// First things first: did we already fail? If so we do not execute, we can just return self.
// In practice this should not happen, because the execute() ensures the state is READY before
// starting it is
// here incase we hae missed something in the retry
if (status() == OperationStatus.ERROR) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"executeIfInProgress() - state was {}, will not execute {}",
status(),
positionAndAttemptId());
}
return Uni.createFrom().item(null);
}

// same as above for this checkProgress , we should only be IN_PROGRESS is anything else
// checkProgress will
// set a failure and return false.
return checkStatus("executeIfInProgress", OperationStatus.IN_PROGRESS)
? executeStatement(queryExecutor)
: Uni.createFrom().item(null);
}

/**
* Sublasses must implement this method to build the query and execute it, they should do anything
* with Uni for retry etc, that is handled in the base class {@link #execute(CommandQueryExecutor,
* DriverExceptionHandler)}.
* Sublasses must implement this method to build the query and execute it, they should not do
* anything with Uni for retry etc, that is handled in the base class {@link
* #execute(CommandQueryExecutor, DriverExceptionHandler)}.
*
* @param queryExecutor the {@link CommandQueryExecutor} , subclasses should call the appropriate
* execute method
* @return A {@link Uni} of the {@link AsyncResultSet} for processing the query.
*/
protected abstract Uni<AsyncResultSet> execute(CommandQueryExecutor queryExecutor);
protected abstract Uni<AsyncResultSet> executeStatement(CommandQueryExecutor queryExecutor);

/**
* Decides if the operation should be retried, using the {@link #retryPolicy} and the {@link
Expand All @@ -175,6 +190,17 @@ public Uni<SubT> execute(
*/
protected boolean decideRetry(Throwable throwable) {

// we should only be called when IN_PROGRESS, anything else is an invalid state e.g. if we were
// in ERROR it means
// we tracked an error, and then kept retrying.
// because this function can only return boolean, we throw
if (!checkStatus("decideRetry", OperationStatus.IN_PROGRESS)) {
throw new IllegalStateException(
String.format(
"decideRetry() called when not IN_PROGRESS status=%s, %s",
status(), positionAndAttemptId()));
}

var shouldRetry = retryPolicy.shouldRetry(throwable);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
Expand All @@ -183,6 +209,7 @@ protected boolean decideRetry(Throwable throwable) {
positionAndAttemptId(),
throwable.toString());
}

return shouldRetry;
}

Expand All @@ -205,19 +232,43 @@ protected SubT onCompletion(
AsyncResultSet resultSet,
Throwable throwable) {

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"onCompletion() - resultSetIsNull={}, throwable={}, {}",
resultSet == null,
Objects.toString(throwable, "NULL"),
positionAndAttemptId());
}

// sanity check, if we do not have a result set then we should have an exception
if (resultSet == null && throwable == null) {
throw new IllegalStateException(
String.format(
"onCompletion() - resultSet and throwable are both null, %s",
positionAndAttemptId()));
}

var handledException =
throwable instanceof RuntimeException
? exceptionHandler.maybeHandle(schemaObject, (RuntimeException) throwable)
: throwable;

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("onCompletion() - throwable={}, {}", throwable, positionAndAttemptId());
if (handledException != throwable) {
LOGGER.debug(
"onCompletion() - handledException={}, {}", handledException, positionAndAttemptId());
}
}
return throwable == null ? onSuccess(resultSet) : maybeAddFailure(handledException);

return switch (status()) {
case IN_PROGRESS ->
handledException == null ? onSuccess(resultSet) : maybeAddFailure(handledException);
case ERROR -> downcast();
default ->
throw new IllegalStateException(
String.format(
"onCompletion() unsupported status=%s, %s", status(), positionAndAttemptId()));
};
}

/**
Expand Down Expand Up @@ -269,18 +320,21 @@ protected SubT setStatus(OperationStatus newStatus) {
*
* @param context short descriptive text about what is being checked, used in the exception
* @param expectedStatus The status that is expected
* @return this object, cast to {@link SubT} for chaining methods.
* @return True if the attempt is in the expected state, otherwise a {@link IllegalStateException}
* is added to the attempt, ERROR state set, and false is returned.
*/
protected SubT checkStatus(String context, OperationStatus expectedStatus) {
protected boolean checkStatus(String context, OperationStatus expectedStatus) {

if (status().equals(expectedStatus)) {
return downcast();
return true;
}
;
throw new IllegalStateException(
String.format(
"OperationAttempt: checkStatus() failed for %s, expected %s but actual %s for %s",
context, expectedStatus, status(), positionAndAttemptId()));

maybeAddFailure(
new IllegalStateException(
String.format(
"OperationAttempt: checkStatus() failed for %s, expected %s but actual %s for %s",
context, expectedStatus, status(), positionAndAttemptId())));
return false;
}

/**
Expand All @@ -305,13 +359,18 @@ public SubT checkTerminal(String context) {
}

/**
* Swap the status of the attempt, if the current status is the expected status then set the new
* status, otherwise throw an {@link IllegalStateException}.
* Swap the status of the attempt
*
* @return True is the status was swapped, false otherwise
*/
protected SubT swapStatus(
protected boolean swapStatus(
String context, OperationStatus expectedStatus, OperationStatus newStatus) {
checkStatus(context, expectedStatus);
return setStatus(newStatus);

if (checkStatus(context, expectedStatus)) {
setStatus(newStatus);
return true;
}
return false;
}

/**
Expand All @@ -325,7 +384,8 @@ protected SubT swapStatus(
* @return this object, cast to {@link SubT} for chaining methods.
*/
public SubT setSkippedIfReady() {
return swapStatus("setSkippedIfReady()", OperationStatus.READY, OperationStatus.SKIPPED);
swapStatus("setSkippedIfReady()", OperationStatus.READY, OperationStatus.SKIPPED);
return downcast();
}

/**
Expand Down Expand Up @@ -354,7 +414,8 @@ public Optional<Throwable> failure() {
* <p>OK to add a failure to the attempt before calling execute, we do this for shredding errors,
* because the attempt will not execute if there was an error.
*
* @param failure An error that happened when trying to process the attempt, ok to pass <code>null
* @param throwable An error that happened when trying to process the attempt, ok to pass <code>
* null
* </code> it will be ignored. If a non-null failure has already been added this call will be
* ignored.
* @return This object, cast to {@link SubT} for chaining methods.
Expand Down Expand Up @@ -449,9 +510,9 @@ public int compareTo(SubT other) {
* <p>To implement a custom retry policy, subclass this class and override {@link
* #shouldRetry(Throwable)}.
*/
protected static class RetryPolicy {
public static class RetryPolicy {

static final RetryPolicy NO_RETRY = new RetryPolicy();
public static final RetryPolicy NO_RETRY = new RetryPolicy();

private final int maxRetries;
private final Duration delay;
Expand All @@ -466,7 +527,7 @@ private RetryPolicy() {
* @param maxRetries the number of retries after the initial attempt, must be >= 1
* @param delay the delay between retries, must not be <code>null</code>
*/
RetryPolicy(int maxRetries, Duration delay) {
public RetryPolicy(int maxRetries, Duration delay) {
// This is a requirement of UniRetryAtMost that is it >= 1, however UniRetry.atMost() says it
// must be >= 0
if (maxRetries < 1) {
Expand All @@ -476,15 +537,15 @@ private RetryPolicy() {
this.delay = Objects.requireNonNull(delay, "delay cannot be null");
}

int maxRetries() {
public int maxRetries() {
return maxRetries;
}

Duration delay() {
public Duration delay() {
return delay;
}

boolean shouldRetry(Throwable throwable) {
public boolean shouldRetry(Throwable throwable) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public CqlPagingState resultPagingState() {
}

@Override
protected Uni<AsyncResultSet> execute(CommandQueryExecutor queryExecutor) {
protected Uni<AsyncResultSet> executeStatement(CommandQueryExecutor queryExecutor) {

var statement = buildReadStatement();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ protected SchemaAttempt(int position, SchemaT schemaObject, RetryPolicy retryPol
}

@Override
protected Uni<AsyncResultSet> execute(CommandQueryExecutor queryExecutor) {
protected Uni<AsyncResultSet> executeStatement(CommandQueryExecutor queryExecutor) {
var statement = buildStatement();

if (LOGGER.isDebugEnabled()) {
Expand All @@ -45,7 +45,7 @@ public SchemaRetryPolicy(int maxRetries, Duration delay) {
}

@Override
boolean shouldRetry(Throwable throwable) {
public boolean shouldRetry(Throwable throwable) {
// AARON - this is copied from QueryExecutor.executeSchemaChange()
return throwable instanceof DriverTimeoutException
|| throwable instanceof InvalidQueryException
Expand Down
Loading