Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for processing tasks in order within groups #476

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,21 @@ class DefaultMigrationManager {
"Update length of invocation column on outbox for MySQL dialects only.",
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT",
Map.of(
Dialect.POSTGRESQL_9, "", Dialect.H2, "", Dialect.ORACLE, "SELECT * FROM dual")));
Dialect.POSTGRESQL_9, "", Dialect.H2, "", Dialect.ORACLE, "SELECT * FROM dual")),
new Migration(
9,
"Add createTime column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMESTAMP(6) is millisecond accuracy based on the system clock of the system writing the record. If you have two boxes writing records at roughly the same time, the ordering will be easily screwed up by clock skew.

To my mind, the only way to guarantee order is to have a group-specific incrementing counter with write-locking semantics, e.g:

CREATE TABLE PARTITION_SEQ (
  groupId VARCHAR(250) NOT NULL PRIMARY KEY,
  nextSequence BIGINT NOT NULL
)

and

SELECT nextSequence FROM PARTITION_SEQ WHERE groupId = ? FOR UPDATE

then write your record then

UPDATE PARTITION_SEQ SET nextSequence = ?

This all has a big negative: there is big contention on write. But I don't think it's avoidable.

Copy link
Author

@markushc markushc Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a single global sequence (CREATE SEQUENCE ordered_sequence + INSERT INTO ... nextval('ordered_sequence')) to replace the createTime column do the same job with minimum complexity? If so, we could could try that instead.

I'm actually not sure why using a sequence wasn't the approach taken in the original pull request (#252). With an 8-byte BIGINT wraparound seems unlikely to become an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badgerwithagun I guess having a sequence per group ID could result in a lot of sequences if the following use case is supported:

Suppose you have on the order of a million entities for which you want to publish CRUD events. The events per entity need to be ordered, but the order of events for different entities do not matter. You would then use the entity ID as group ID resulting in on the order of a million sequences?

Having just one sequence as @markushc suggests would be simpler, but I don't know if it would be a critical performance limitation. What if an outbox entry is assigned a sequence number if and only if its group ID is non-null? On the other hand, I don't know if there is a use case for combining schedule calls with and without group ID even though the current API would allow for that.

Copy link
Author

@markushc markushc Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have two boxes writing records at roughly the same time, the ordering will be easily screwed up by clock skew.

Updated the branch to use the database timestamp, with microsecond precision where available, for initializing the task creation time to eliminate the possibility of clock skew between nodes (see dialect.getCurrentTimestamp() in DefaultPersistor.java). Staying with the approach of ordering based on creation time avoids the complexity related to group-specific locks. Thoughts?

Map.of(
Dialect.POSTGRESQL_9,
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)",
Dialect.ORACLE,
"ALTER TABLE TXNO_OUTBOX ADD createTime TIMESTAMP(6)")),
new Migration(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this with a single migration:

ALTER TABLE TXNO_OUTBOX
  ADD COLUMN createTime TIMESTAMP(6),
  ADD COLUMN groupId VARCHAR(250)

10,
"Add groupId column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN groupId VARCHAR(250)",
markushc marked this conversation as resolved.
Show resolved Hide resolved
Map.of(Dialect.ORACLE, "ALTER TABLE TXNO_OUTBOX ADD groupId VARCHAR2(250)")));

static void migrate(TransactionManager transactionManager, Dialect dialect) {
transactionManager.inTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
public class DefaultPersistor implements Persistor, Validatable {

private static final String ALL_FIELDS =
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity elsewhere, can we use the order

id, uniqueRequestId, invocation,
createTime, groupId,
lastAttemptTime, nextAttemptTime,
attempts, blocked, processed, version

+ "createTime, groupId";

/**
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
Expand Down Expand Up @@ -98,7 +99,11 @@ public void migrate(TransactionManager transactionManager) {
public void save(Transaction tx, TransactionOutboxEntry entry)
throws SQLException, AlreadyScheduledException {
var insertSql =
"INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
"INSERT INTO "
+ tableName
+ " ("
+ ALL_FIELDS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
var writer = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), writer);
if (entry.getUniqueRequestId() == null) {
Expand Down Expand Up @@ -139,6 +144,9 @@ private void setupInsert(
stmt.setBoolean(7, entry.isBlocked());
stmt.setBoolean(8, entry.isProcessed());
stmt.setInt(9, entry.getVersion());
stmt.setTimestamp(
10, entry.getCreateTime() == null ? null : Timestamp.from(entry.getCreateTime()));
stmt.setString(11, entry.getGroupId());
}

@Override
Expand Down Expand Up @@ -248,6 +256,13 @@ public boolean unblock(Transaction tx, String entryId) throws Exception {
@Override
public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant now)
throws Exception {
List<TransactionOutboxEntry> results = selectBatchUnordered(tx, batchSize, now);
Copy link
Member

@badgerwithagun badgerwithagun Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here I think. When flush() runs, it doesn't run the items it loads; it submits them. That means that even if it submits the work to run in the right order, those threads might not run in the submitted order. FIrstly because they might "race", but more likely because one might fail, causing it to go back on the queue and get run later.

We need to run one entry at a time, synchronously, and if that fails, keep retrying it forever.

That means:

  • The block threshold has to be ignored for ordered processing. We can't ever safely skip a record.
  • That means we probably need a means of manually blocking a record if necessary, something the API lacks atm
  • We have to handle ordered processing completely separately from unordered processing.
  • No calling Submitter in flush() when dealing with ordered entrues. Simply call processNow()

As for the most efficient way to fetch the data given that we need to only process one-at-a-time for each groupId, there are few different approaches:

  • EITHER Don't fetch ordered records in batches. Fetch them one at a time for each group. You then have to block until they all complete before you can process the next batch, but you _can_process all of these in parallel safely (using a fork/join behaviour - easy with CompletableFuture)
SELECT * FROM TXNO_OUTBOX a
WHERE groupId <> NULL 
AND sequence =
 (SELECT MIN(sequence) FROM TXNO_OUTBOX b
  WHERE a.groupId = b.groupId
  AND processed = false
  AND blocked = false)
  • OR you could first fetch a list of all the distinct group ids and spin up independent parallel processors for each which simply fetch one record at a time.
  • More ideas welcome of course!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. So separating flush for ordered processing and invoking processNow seems to be the way to go here. For failed tasks, it seems that as long as tasks are processed in order within the group and we keep retrying the first task in the group until it succeeds we should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to merge #484 (maybe not necessarily as-is currently, but the idea seems solid) as I believe that would make the code easier to work with, especially when introducing additional SQL as in this PR.

It might also be nice to merge #492 in case sequences are needed directly as the "create sequence" functionality is improved in later versions.

Copy link
Author

@markushc markushc Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the branch to process ordered tasks synchronously, while unordered tasks are still submitted asynchronously as before (see TransactionOutboxImpl.java line 153). Also added a test to ensure that a blocked ordered task will prevent other tasks in the same ordered group from progressing. Any remaining concerns here?

results.addAll(selectBatchOrdered(tx, batchSize - results.size(), now));
return results;
}

public List<TransactionOutboxEntry> selectBatchUnordered(
Transaction tx, int batchSize, Instant now) throws Exception {
String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : "";
//noinspection resource
try (PreparedStatement stmt =
Expand All @@ -262,6 +277,7 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
+ dialect.booleanValue(false)
+ " AND processed = "
+ dialect.booleanValue(false)
+ " AND groupid IS NULL "
+ dialect.getLimitCriteria()
+ forUpdate)) {
stmt.setTimestamp(1, Timestamp.from(now));
Expand All @@ -270,6 +286,56 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
}
}

public List<TransactionOutboxEntry> selectBatchOrdered(
final Transaction tx, final int batchSize, final Instant now) throws Exception {
String forUpdate = dialect.isSupportsSkipLock() ? " FOR UPDATE SKIP LOCKED" : "";
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
dialect.isSupportsWindowFunctions()
? "WITH t AS"
+ " ("
+ " SELECT RANK() OVER (PARTITION BY groupid ORDER BY createtime) AS rn, "
+ ALL_FIELDS
+ " FROM "
+ tableName
+ " WHERE processed = "
+ dialect.booleanValue(false)
+ " )"
+ " SELECT "
+ ALL_FIELDS
+ " FROM t "
+ " WHERE rn = 1 AND nextAttemptTime < ? AND blocked = "
+ dialect.booleanValue(false)
+ " AND groupid IS NOT NULL "
+ dialect.getLimitCriteria()
+ (dialect.isSupportsWindowFunctionsForUpdate() ? forUpdate : "")
: "SELECT "
+ ALL_FIELDS
+ " FROM"
+ " ("
+ " SELECT groupid AS group_id, MIN(createtime) AS min_create_time"
+ " FROM "
+ tableName
+ " WHERE processed = "
+ dialect.booleanValue(false)
+ " GROUP BY group_id"
+ " ) AS t"
+ " JOIN "
+ tableName
+ " t1"
+ " ON t1.groupid = t.group_id AND t1.createtime = t.min_create_time"
+ " WHERE nextAttemptTime < ? AND blocked = "
+ dialect.booleanValue(false)
+ " AND groupid IS NOT NULL "
+ dialect.getLimitCriteria()
+ forUpdate)) {
stmt.setTimestamp(1, Timestamp.from(now));
stmt.setInt(2, batchSize);
return gatherResults(batchSize, stmt);
}
}

@Override
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
throws Exception {
Expand Down Expand Up @@ -311,6 +377,11 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio
.blocked(rs.getBoolean("blocked"))
.processed(rs.getBoolean("processed"))
.version(rs.getInt("version"))
.createTime(
rs.getTimestamp("createTime") == null
? null
: rs.getTimestamp("createTime").toInstant())
.groupId(rs.getString("groupId"))
.build();
log.debug("Found {}", entry);
return entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,34 @@
@Getter
@Beta
public enum Dialect {
MY_SQL_5(false, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
MY_SQL_8(true, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
MY_SQL_5(
false,
false,
false,
Constants.DEFAULT_DELETE_EXPIRED_STMT,
Constants.DEFAULT_LIMIT_CRITERIA), //
MY_SQL_8(
true,
true,
true,
Constants.DEFAULT_DELETE_EXPIRED_STMT,
Constants.DEFAULT_LIMIT_CRITERIA), //
POSTGRESQL_9(
true,
true,
true,
"DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)",
Constants.DEFAULT_LIMIT_CRITERIA), //
H2(false, Constants.DEFAULT_DELETE_EXPIRED_STMT, Constants.DEFAULT_LIMIT_CRITERIA), //
H2(
false,
true,
true,
Constants.DEFAULT_DELETE_EXPIRED_STMT,
Constants.DEFAULT_LIMIT_CRITERIA), //
ORACLE(
true,
true,
false,
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?",
Constants.ORACLE_LIMIT_CRITERIA);

Expand All @@ -33,6 +52,19 @@ public enum Dialect {
@SuppressWarnings("JavaDoc")
private final boolean supportsSkipLock;

/**
* @return True if dialect supports use of SQL window functions.
*/
@SuppressWarnings("JavaDoc")
private final boolean supportsWindowFunctions;

/**
* @return True if dialect supports use of ({@code FOR UPDATE}) in queries using SQL window
* functions.
*/
@SuppressWarnings("JavaDoc")
private final boolean supportsWindowFunctionsForUpdate;

/**
* @return Format string for the SQL required to delete expired retained records.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,18 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId);

/**
* Specifies a group id for the request. This defaults to {@code null}, but if non-null, enables
* requests to be executed in the order created within the group of requests using the same
* group id. If non-null, the request will not be sumbitted immediately. Instead, ordered
* requests are processed only with {@link TransactionOutbox#flush()}.
*
* @param groupId The group id identifying requests that must be processed in order. May be
* {@code null}, in which case no ordering is enforced.
* @return Builder.
*/
ParameterizedScheduleBuilder groupId(String groupId);

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public class TransactionOutboxEntry implements Validatable {
@Setter
private int version;

@Getter private Instant createTime;

@Getter private String groupId;

@EqualsAndHashCode.Exclude @ToString.Exclude private volatile boolean initialized;
@EqualsAndHashCode.Exclude @ToString.Exclude private String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void initialize() {

@Override
public <T> T schedule(Class<T> clazz) {
return schedule(clazz, null);
return schedule(clazz, null, null);
}

@Override
Expand Down Expand Up @@ -220,7 +220,7 @@ public boolean unblock(String entryId, Object transactionContext) {
}
}

private <T> T schedule(Class<T> clazz, String uniqueRequestId) {
private <T> T schedule(Class<T> clazz, String uniqueRequestId, String groupId) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -236,16 +236,19 @@ private <T> T schedule(Class<T> clazz, String uniqueRequestId) {
extracted.getMethodName(),
extracted.getParameters(),
extracted.getArgs(),
uniqueRequestId);
uniqueRequestId,
groupId);
validator.validate(entry);
persistor.save(extracted.getTransaction(), entry);
extracted
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
submitNow(entry);
});
if (groupId == null) {
extracted
markushc marked this conversation as resolved.
Show resolved Hide resolved
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
submitNow(entry);
});
}
log.debug(
"Scheduled {} for running after transaction commit", entry.description());
return null;
Expand Down Expand Up @@ -311,7 +314,12 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction)
}

private TransactionOutboxEntry newEntry(
Class<?> clazz, String methodName, Class<?>[] params, Object[] args, String uniqueRequestId) {
Class<?> clazz,
String methodName,
Class<?>[] params,
Object[] args,
String uniqueRequestId,
String groupId) {
return TransactionOutboxEntry.builder()
.id(UUID.randomUUID().toString())
.invocation(
Expand All @@ -324,6 +332,8 @@ private TransactionOutboxEntry newEntry(
.lastAttemptTime(null)
.nextAttemptTime(after(attemptFrequency))
.uniqueRequestId(uniqueRequestId)
.createTime(clockProvider.get().instant())
.groupId(groupId)
.build();
}

Expand Down Expand Up @@ -409,18 +419,29 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB

private String uniqueRequestId;

private String groupId;

@Override
public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) {
this.uniqueRequestId = uniqueRequestId;
return this;
}

@Override
public ParameterizedScheduleBuilder groupId(final String groupId) {
this.groupId = groupId;
return this;
}

@Override
public <T> T schedule(Class<T> clazz) {
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId);
if (groupId != null && groupId.length() > 250) {
throw new IllegalArgumentException("groupId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, groupId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -243,6 +244,55 @@ public void success(TransactionOutboxEntry entry) {
MatcherAssert.assertThat(ids, containsInAnyOrder("1", "2", "4", "6"));
}

@Test
void orderedRequests() {

TransactionManager transactionManager = simpleTxnManager();

List<String> ids = new ArrayList<>();
AtomicReference<Clock> clockProvider = new AtomicReference<>(Clock.systemDefaultZone());

TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.listener(
new TransactionOutboxListener() {
@Override
public void success(TransactionOutboxEntry entry) {
ids.add((String) entry.getInvocation().getArgs()[0]);
}
})
.submitter(Submitter.withExecutor(Runnable::run))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.clockProvider(clockProvider::get)
.build();

clearOutbox();

String groupId = "groupId1";
transactionManager.inTransaction(
() -> outbox.with().groupId(groupId).schedule(ClassProcessor.class).process("2"));
clockProvider.set(
Clock.fixed(clockProvider.get().instant().minusSeconds(1), clockProvider.get().getZone()));
transactionManager.inTransaction(
() -> outbox.with().groupId(groupId).schedule(ClassProcessor.class).process("1"));
clockProvider.set(
Clock.fixed(clockProvider.get().instant().plusSeconds(2), clockProvider.get().getZone()));
transactionManager.inTransaction(
() -> outbox.with().groupId(groupId).schedule(ClassProcessor.class).process("3"));

// Ensure no tasks were processed immediately, as a group id was provided
MatcherAssert.assertThat(ids, equalTo(new ArrayList<>(List.of())));

// Run the clock over the threshold
clockProvider.set(
Clock.fixed(clockProvider.get().instant().plusSeconds(120), clockProvider.get().getZone()));
while (outbox.flush()) {}

// Ensure tasks were processed in order, as all shared the same group id
MatcherAssert.assertThat(ids, equalTo(new ArrayList<>(List.of("1", "2", "3"))));
}

/**
* Uses a simple data source transaction manager and attempts to fire a concrete class via
* reflection.
Expand Down