Skip to content

Commit

Permalink
Small tidy-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
Graham Crockford committed Mar 18, 2024
1 parent 51d523f commit d983747
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,23 @@ static DefaultPersistor forDialect(Dialect dialect) {
List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant now)
throws Exception;

/**
* Selects the list of topics with work awaiting processing.
*
* @param tx The current {@link Transaction}.
* @return The topics.
* @throws Exception Any exception.
*/
List<String> selectActiveTopics(final Transaction tx) throws Exception;

/**
* Fetches and locks the next available piece of work on the specified topic.
*
* @param tx The current {@link Transaction}.
* @param topic The topic.
* @return The next available piece of work on the selected topic.
* @throws Exception ANy exception.
*/
Optional<TransactionOutboxEntry> nextInTopic(Transaction tx, String topic) throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@
import com.gruelbox.transactionoutbox.spi.Utils;
import java.lang.reflect.InvocationTargetException;
import java.time.*;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.slf4j.event.Level;

@Slf4j
class TransactionOutboxImpl implements TransactionOutbox, Validatable {

private static final int DEFAULT_FLUSH_BATCH_SIZE = 4096;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
final class TransactionOutboxImpl implements TransactionOutbox, Validatable {

private final TransactionManager transactionManager;
private final Persistor persistor;
Expand All @@ -41,39 +46,6 @@ class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();

private TransactionOutboxImpl(
TransactionManager transactionManager,
Instantiator instantiator,
Submitter submitter,
Duration attemptFrequency,
int blockAfterAttempts,
int flushBatchSize,
Supplier<Clock> clockProvider,
TransactionOutboxListener listener,
Persistor persistor,
Level logLevelTemporaryFailure,
Boolean serializeMdc,
Duration retentionThreshold,
Boolean initializeImmediately) {
this.transactionManager = transactionManager;
this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection);
this.persistor = persistor;
this.submitter = Utils.firstNonNull(submitter, Submitter::withDefaultExecutor);
this.attemptFrequency = Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES));
this.blockAfterAttempts = blockAfterAttempts < 1 ? 5 : blockAfterAttempts;
this.flushBatchSize = flushBatchSize < 1 ? DEFAULT_FLUSH_BATCH_SIZE : flushBatchSize;
this.clockProvider = clockProvider == null ? Clock::systemDefaultZone : clockProvider;
this.listener = Utils.firstNonNull(listener, () -> new TransactionOutboxListener() {});
this.logLevelTemporaryFailure = Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN);
this.validator = new Validator(this.clockProvider);
this.serializeMdc = serializeMdc == null || serializeMdc;
this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold;
this.validator.validate(this);
if (initializeImmediately == null || initializeImmediately) {
initialize();
}
}

@Override
public void validate(Validator validator) {
validator.notNull("transactionManager", transactionManager);
Expand Down Expand Up @@ -424,10 +396,7 @@ private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
entry.setAttempts(entry.getAttempts() + 1);
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts);
entry.setBlocked(blocked);
entry.setLastAttemptTime(Instant.now(clockProvider.get()));
entry.setNextAttemptTime(after(attemptFrequency));
validator.validate(entry);
transactionManager.inTransactionThrows(transaction -> persistor.update(transaction, entry));
transactionManager.inTransactionThrows(tx -> pushBack(tx, entry));
listener.failure(entry, cause);
if (blocked) {
log.error(
Expand Down Expand Up @@ -462,46 +431,43 @@ static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder {
}

public TransactionOutboxImpl build() {
return new TransactionOutboxImpl(
transactionManager,
instantiator,
submitter,
attemptFrequency,
blockAfterAttempts,
flushBatchSize,
clockProvider,
listener,
persistor,
logLevelTemporaryFailure,
serializeMdc,
retentionThreshold,
initializeImmediately);
Validator validator = new Validator(this.clockProvider);
TransactionOutboxImpl impl =
new TransactionOutboxImpl(
transactionManager,
persistor,
Utils.firstNonNull(instantiator, Instantiator::usingReflection),
Utils.firstNonNull(submitter, Submitter::withDefaultExecutor),
Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)),
Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN),
blockAfterAttempts < 1 ? 5 : blockAfterAttempts,
flushBatchSize < 1 ? 4096 : flushBatchSize,
clockProvider == null ? Clock::systemDefaultZone : clockProvider,
Utils.firstNonNull(listener, () -> TransactionOutboxListener.EMPTY),
serializeMdc == null || serializeMdc,
validator,
retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold);
validator.validate(impl);
if (initializeImmediately == null || initializeImmediately) {
impl.initialize();
}
return impl;
}
}

@Accessors(fluent = true, chain = true)
@Setter
private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder {

private String uniqueRequestId;
private String topic;

@Override
public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) {
this.uniqueRequestId = uniqueRequestId;
return this;
}

@Override
public ParameterizedScheduleBuilder ordered(String topic) {
this.topic = topic;
return this;
}
private String ordered;

@Override
public <T> T schedule(Class<T> clazz) {
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, topic);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
/** A listener for events fired by {@link TransactionOutbox}. */
public interface TransactionOutboxListener {

TransactionOutboxListener EMPTY = new TransactionOutboxListener() {};

/**
* Fired when a transaction outbox task is scheduled.
*
Expand Down

0 comments on commit d983747

Please sign in to comment.