Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for processing tasks in order within groups #476

Closed
wants to merge 10 commits into from

Conversation

markushc
Copy link

@markushc markushc commented Aug 23, 2023

Add support for processing tasks in order within groups: allow optionally setting a group ID on tasks, which is used to ensure ordering of tasks sharing the same group ID. Tasks with a group id are not submitted immediately, but will instead be processed separately during the flush operation to ensure ordering.

Useful for example to generate change events for entities. The group ID is similar in concept to AWS SNS/SQS message group IDs and Kafka partitions. Existing functionality and interfaces should continue working the same as before.

Builds on the ideas and implementation of the related draft pull request #252. Provides functionality requested in issue #198.

amseager and others added 4 commits February 17, 2022 17:59
Also add indicator for supporting window functions with FOR UPDATE to
account for Oracle.

Make flush process both unordered and ordered requests, to make
straightforward usage simpler.
@mkjensen
Copy link
Contributor

This PR looks very interesting. We would like to use the library to generate change events for entities so ordering is required. We would actually like a strict ordering (order in which the original transactions were written, as mentioned in #198), but this seems to be a step in the right direction. The grouping functionality allowing you to e.g. use entity ID as group ID is nice.

new Migration(
9,
"Add createTime column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6) NULL AFTER invocation",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMESTAMP(6) is millisecond accuracy based on the system clock of the system writing the record. If you have two boxes writing records at roughly the same time, the ordering will be easily screwed up by clock skew.

To my mind, the only way to guarantee order is to have a group-specific incrementing counter with write-locking semantics, e.g:

CREATE TABLE PARTITION_SEQ (
  groupId VARCHAR(250) NOT NULL PRIMARY KEY,
  nextSequence BIGINT NOT NULL
)

and

SELECT nextSequence FROM PARTITION_SEQ WHERE groupId = ? FOR UPDATE

then write your record then

UPDATE PARTITION_SEQ SET nextSequence = ?

This all has a big negative: there is big contention on write. But I don't think it's avoidable.

Copy link
Author

@markushc markushc Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a single global sequence (CREATE SEQUENCE ordered_sequence + INSERT INTO ... nextval('ordered_sequence')) to replace the createTime column do the same job with minimum complexity? If so, we could could try that instead.

I'm actually not sure why using a sequence wasn't the approach taken in the original pull request (#252). With an 8-byte BIGINT wraparound seems unlikely to become an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badgerwithagun I guess having a sequence per group ID could result in a lot of sequences if the following use case is supported:

Suppose you have on the order of a million entities for which you want to publish CRUD events. The events per entity need to be ordered, but the order of events for different entities do not matter. You would then use the entity ID as group ID resulting in on the order of a million sequences?

Having just one sequence as @markushc suggests would be simpler, but I don't know if it would be a critical performance limitation. What if an outbox entry is assigned a sequence number if and only if its group ID is non-null? On the other hand, I don't know if there is a use case for combining schedule calls with and without group ID even though the current API would allow for that.

Copy link
Author

@markushc markushc Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have two boxes writing records at roughly the same time, the ordering will be easily screwed up by clock skew.

Updated the branch to use the database timestamp, with microsecond precision where available, for initializing the task creation time to eliminate the possibility of clock skew between nodes (see dialect.getCurrentTimestamp() in DefaultPersistor.java). Staying with the approach of ordering based on creation time avoids the complexity related to group-specific locks. Thoughts?

@@ -248,6 +256,13 @@ public boolean unblock(Transaction tx, String entryId) throws Exception {
@Override
public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant now)
throws Exception {
List<TransactionOutboxEntry> results = selectBatchUnordered(tx, batchSize, now);
Copy link
Member

@badgerwithagun badgerwithagun Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here I think. When flush() runs, it doesn't run the items it loads; it submits them. That means that even if it submits the work to run in the right order, those threads might not run in the submitted order. FIrstly because they might "race", but more likely because one might fail, causing it to go back on the queue and get run later.

We need to run one entry at a time, synchronously, and if that fails, keep retrying it forever.

That means:

  • The block threshold has to be ignored for ordered processing. We can't ever safely skip a record.
  • That means we probably need a means of manually blocking a record if necessary, something the API lacks atm
  • We have to handle ordered processing completely separately from unordered processing.
  • No calling Submitter in flush() when dealing with ordered entrues. Simply call processNow()

As for the most efficient way to fetch the data given that we need to only process one-at-a-time for each groupId, there are few different approaches:

  • EITHER Don't fetch ordered records in batches. Fetch them one at a time for each group. You then have to block until they all complete before you can process the next batch, but you _can_process all of these in parallel safely (using a fork/join behaviour - easy with CompletableFuture)
SELECT * FROM TXNO_OUTBOX a
WHERE groupId <> NULL 
AND sequence =
 (SELECT MIN(sequence) FROM TXNO_OUTBOX b
  WHERE a.groupId = b.groupId
  AND processed = false
  AND blocked = false)
  • OR you could first fetch a list of all the distinct group ids and spin up independent parallel processors for each which simply fetch one record at a time.
  • More ideas welcome of course!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. So separating flush for ordered processing and invoking processNow seems to be the way to go here. For failed tasks, it seems that as long as tasks are processed in order within the group and we keep retrying the first task in the group until it succeeds we should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to merge #484 (maybe not necessarily as-is currently, but the idea seems solid) as I believe that would make the code easier to work with, especially when introducing additional SQL as in this PR.

It might also be nice to merge #492 in case sequences are needed directly as the "create sequence" functionality is improved in later versions.

Copy link
Author

@markushc markushc Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the branch to process ordered tasks synchronously, while unordered tasks are still submitted asynchronously as before (see TransactionOutboxImpl.java line 153). Also added a test to ensure that a blocked ordered task will prevent other tasks in the same ordered group from progressing. Any remaining concerns here?

@mkjensen
Copy link
Contributor

Hi @markushc,

Do you plan to implement the suggestions/requirements from @badgerwithagun?

If not, I would be interested in helping out as I have a setup that requires all of this (strict ordering/a sequence instead of timestamp, separate queue for grouped entries/retry forever).

@markushc
Copy link
Author

Do you plan to implement the suggestions/requirements from @badgerwithagun?

If not, I would be interested in helping out as I have a setup that requires all of this (strict ordering/a sequence instead of timestamp, separate queue for grouped entries/retry forever).

I'm still interested in getting this finished and merged, but not sure when I can get around to it. If you could help getting this finished, that would be welcome. Using sequences seems good to me.

@mkjensen
Copy link
Contributor

mkjensen commented Sep 28, 2023

Do you plan to implement the suggestions/requirements from @badgerwithagun?
If not, I would be interested in helping out as I have a setup that requires all of this (strict ordering/a sequence instead of timestamp, separate queue for grouped entries/retry forever).

I'm still interested in getting this finished and merged, but not sure when I can get around to it. If you could help getting this finished, that would be welcome. Using sequences seems good to me.

I have started experimenting with this. Should I add my suggestions to a new branch off of your branch (note that GitHub currently blocks me from forking your fork because I already have my own fork with pending PRs) or what do you think?

Edit: Ha, I was using a sequence, but it seems like MySQL does not support sequences as the only one of the supported DBMSes. So another setup would be needed there (or another approach in general) so maybe I'm not as close to having something as I initially thought.

mkjensen added a commit to mkjensen/transaction-outbox that referenced this pull request Oct 2, 2023
@mkjensen
Copy link
Contributor

mkjensen commented Oct 2, 2023

I pushed my experiment which is based on this PR to mkjensen@5349ff9

I copied the Dialect and window SQL.

There are some TODOs where I would like input.

I would be happy to contribute to this branch/PR if you want it, but I'm a bit unsure about how as stated in #476 (comment)

@badgerwithagun
Copy link
Member

@mkjensen

A few things I spotted on reading:

  • The next sequence number should be for an outbox id and group combined (composite key) otherwise you'll be forced to make the groups unique between calls, which seems unnecessarily limiting.
  • When inserting a new sequence number if the read fails, you might get a race condition causing two parallel INSERTs, one of which will fail. You need to detect a PK violation and retry (once only).

I've not got very far through the code though

@markushc
Copy link
Author

markushc commented Oct 6, 2023

I have started experimenting with this. Should I add my suggestions to a new branch off of your branch (note that GitHub currently blocks me from forking your fork because I already have my own fork with pending PRs) or what do you think?

@mkjensen You have been added as a collaborator on my fork now, so you should be able to push your changes onto my branch. That way the work can continue in this pull request, preserving the discussion context. Feel free to push changes at will.

@mkjensen
Copy link
Contributor

mkjensen commented Oct 8, 2023

I have started experimenting with this. Should I add my suggestions to a new branch off of your branch (note that GitHub currently blocks me from forking your fork because I already have my own fork with pending PRs) or what do you think?

@mkjensen You have been added as a collaborator on my fork now, so you should be able to push your changes onto my branch. That way the work can continue in this pull request, preserving the discussion context. Feel free to push changes at will.

Thanks. In the meantime we have implemented the outbox pattern using Spring Integration instead, so I'm not sure I will get the chance to do more here for now. Feel free to use what I did if it makes sense.

Ensures that outbox entries that are created concurrently use the same
clock (that of the database instance), instead of the clocks of the
nodes creating the entries which could potentially be out of sync.
Ensures that earlier tasks within a group have been processed before
attempting to process the next tasks within a groupd as part of the
next flush.
Ensure that if an ordered task is blocked, any remaining tasks are
processed only after the blocked task is completed.
@markushc
Copy link
Author

markushc commented Oct 22, 2023

Updated the branch in an attempt to address all outstanding comments (choosing to go for a slightly different approach than @mkjensen by keep using the creation time for ordering, except for borrowing the synchronous processing of tasks from his branch). Could this be a first version of ordered tasks support, that could be improved on later? @badgerwithagun Let me know what, if anything, still should be addressed.

@MaksimMyshkin
Copy link

Hi @badgerwithagun . How soon do you plan to look at new changes? Also considering to use this library but missing message ordering feature is critical for our project 😢.

@badgerwithagun
Copy link
Member

Hi @MaksimMyshkin - looking at it now!

Using the database timestamp is definitely an improvement, but probably (for our usage) not reliable enough. I think we can probably view that as an improvement later on.

"ALTER TABLE TXNO_OUTBOX ADD COLUMN createTime TIMESTAMP(6)",
Dialect.ORACLE,
"ALTER TABLE TXNO_OUTBOX ADD createTime TIMESTAMP(6)")),
new Migration(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this with a single migration:

ALTER TABLE TXNO_OUTBOX
  ADD COLUMN createTime TIMESTAMP(6),
  ADD COLUMN groupId VARCHAR(250)

@@ -36,7 +36,8 @@
public class DefaultPersistor implements Persistor, Validatable {

private static final String ALL_FIELDS =
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
"id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity elsewhere, can we use the order

id, uniqueRequestId, invocation,
createTime, groupId,
lastAttemptTime, nextAttemptTime,
attempts, blocked, processed, version

List<CompletableFuture<Void>> orderedFutures =
batch.stream()
.filter(entry -> entry.getGroupId() != null)
.map(entry -> CompletableFuture.runAsync(() -> this.processNow(entry)))
Copy link
Member

@badgerwithagun badgerwithagun Nov 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several issues I can see here:

  1. We want to make sure that ordered messages never get blocked automatically, because that would cause the next message in the group to be processed out of order. Blocking should only ever happen manually if the application can be confident it is safe to do so. So we need to tweak that behaviour but also make sure there is a well-defined mechanism for blocking an arbitrary entry, e.g. outbox.block(String id) and outbox.unblock(String id). Your code currently automatically blocks messages.
  2. CompletableFuture.runAsync uses the common fork/join pool, which might not be how an application is managing its threads, or might cause the library to block core processing threads which the application isn't expecting. We have the Submitter interface for this.
  3. I know for a fact that several people have implemented the Submitter interface in ways that don't use local threads and therefore there would be no reliable way to join on them (see https://github.com/gruelbox/transaction-outbox#clustering)
  4. Error handling scenarios are quite different between unordered and ordered. For unordered, we don't really care about errors from flush(). If any jobs fail and get blocked, we get errors logged and notifications we can act on, but we basically don't care until then. With ordered flushing, if any work fails, we need to alert immediately; this is a serious situation which is blocking processing.
  5. Finally, because it's necessary for the caller to call flush() rmuch more often, we are wasting quite a lot of processing time and hammering the database fetching unordered items simply in order to clear queues of ordered items.

All in all I think the structure here needs a bit of a rethink; I'll post a reply in a moment with some ideas.

@badgerwithagun
Copy link
Member

badgerwithagun commented Nov 12, 2023

OK, my suggestion here is to:

  • Don't automatically mark ordered items as blocked when they go past the block threshold. Just push back the nextAttemptTime, fire the blocked event and log an error every time the item is attempted from then on. Make a lot of noise!
  • Add block and unblock methods to allow clients to selectively block records if they are 100% confident that a record can be skipped within a group.
  • We should have two methods:
    -- boolean flush(), which does exactly what it does now, needing to be called repeatedly until false, and skips all items with a groupId.
    -- boolean flushOrdered(Executor) which handles items with a groupId, does the repeated fetching and avoids repeatedly attempting failing items within that call. They can be tried again next time.
public boolean flushOrdered(Executor exexutor) {
    var failingGroups = new HashSet<String>();
    var lastSetOfResults = Collection.emptySet();
    int count = 0;
    do {
        var nextSetOfResults = fetchNextItemInAllGroups();
        failingGroups.addAll(Sets.intersection(lastSetOfResults, nextSetOfResults).stream().map(TransactionOutboxEntry::getGroupId).toList());
        var work = nextSetOfResults
            .filter(it -> !failingGroups.contains(it.getGroupId()))
            .peek(it -> count = count + 1)
            .map(it -> CompletableFuture.runASync(() -> outbox.processNow(it), executor)
            .toList();
        lastSetOfResults = nextSetOfResults;
        if (work.isEmpty()) {
            break;
        }
        CompletableFuture.allOf(work).join();
    } while (count < batchSize);
    return count > 0;
}

@badgerwithagun
Copy link
Member

As this is needed at my place of work, I'm picking up the work on this feature.

Closing this PR and will open another.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants