Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting load distribution across a cluster #258

Closed
tsg21 opened this issue Mar 11, 2022 · 7 comments
Closed

Supporting load distribution across a cluster #258

tsg21 opened this issue Mar 11, 2022 · 7 comments
Labels
question Further information is requested

Comments

@tsg21
Copy link
Contributor

tsg21 commented Mar 11, 2022

The README states:

that said, if you want to distribute work across a cluster at point of submission, this is also supported

I do, in fact, have that exact requirement but I don't see how to achieve that with the TransactionOutbox API as it stands. How is it supported?

I think you might argue that this functionality is outside the scope of this library, but transaction-outbox is very close to supporting this. All I think it needs is a way to schedule a work item for asynchronous processing by the background thread(s), and not immediate execution in a post-commit hook. This could be done by adding an extra flag to ParameterizedScheduleBuilder.

Is this a feature you would consider adding? I might be able to spin up a PR if so.

@badgerwithagun
Copy link
Member

badgerwithagun commented Mar 12, 2022

The issue is that there are too many different messaging protocols, load balancers and service discovery mechanisms. I wouldn't want to introduce more. All the support is there to wire into yours, though.

I'll outline how ours works, as an example.

First, you need #260, so LGTM.

Our load balancer (Nomad) gives each registered service its own DNS name, e.g. invoicing.service.consul. Therefore all we have to do is to route processing through the load balancer to distribute it around instances of the app.

We define a REST endpoint like this:

POST /outbox/process/:entryId

Which takes a TransactionOutboxEntry as its body, handled like this:

var objectMapper = myNormalObjectMapper.copy();
objectMapper.setDefaultTyping(TransactionOutboxJacksonModule.typeResolver());
objectMapper.registerModule(new TransactionOutboxJacksonModule());
TransactionOutboxEntry entry = objectMapper.readValue(request.bodyAsBytes(), TransactionOutboxEntry.class);
Submitter submitter = ExecutorSubmitter.builder().executor(localExecutor).logLevelWorkQueueSaturation(Level.INFO).build();
submitter.submit(entry, outbox.get()::processNow);

Where localExecutor is an ExecutorService operating on an ArrayBlockingQueue of limited size, so the endpoint never blocks,.

Then create your own Submitter implementation which pushes requests to that endpoint, again via an ExecutorService with a limited queue:

@Slf4j
@Builder
@AllArgsConstructor
class TransactionOutboxRemotingSubmitter implements Submitter {


  private final TransactionOutboxRemotingResource remotingResource;
  private final ExecutorService localExecutor;
  private final String url;

  @Override
  public void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> leIgnore) {
    try {
      localExecutor.execute(() -> processRemotely(entry));
      log.info("Queued {} to be sent for remote processing", entry.description());
    } catch (RejectedExecutionException e) {
      log.info("Will queue {} for processing when local executor is available", entry.description());
    } catch (Exception e) {
      log.warn("Failed to queue {} for execution at {}. It will be re-attempted later.", entry.description(), url, e);
    }
  }

  private void processRemotely(TransactionOutboxEntry entry) {
    try {
      // Sends to /outbox/process/:entryId using the same objectMapper as on the receiving side
      remotingResource.process(entry.getId(), entry);
      log.info("Submitted {} for remote processing at {}", entry.description(), url);
    } catch (Exception e) {
      log.warn(
        "Failed to submit {} for remote processing at {}. It will be re-attempted later.",
        entry.description(),
        url,
        e
      );
    }
  }
}

Works solidly in production (in fact, once we put this live we ended up DOSing the DB with the volume before we tuned everything else 😊)

@badgerwithagun
Copy link
Member

@tsg21 - I've updated the README on my PR to provide more information on this: https://github.com/gruelbox/transaction-outbox/blob/fix-236/README.md#clustering

@badgerwithagun badgerwithagun added the question Further information is requested label Mar 14, 2022
@badgerwithagun
Copy link
Member

This has now been merged so you should be good to give it a whirl 👍🏻

@tsg21
Copy link
Contributor Author

tsg21 commented Mar 16, 2022

I will do that...

@tsg21
Copy link
Contributor Author

tsg21 commented Sep 27, 2023

We now use this in production backed by AWS SQS it and it works extremely reliably, with queues of over 50k items. I would in principle like to contribute the code to the project but it mashes together Spring and AWS SQS support. I think for it to fit into the modular structure of the project I would need to decouple the code from Spring, which would require a re-write.

Is there any interest in this?

@badgerwithagun
Copy link
Member

Very interested indeed @tsg21 .

@badgerwithagun
Copy link
Member

Something you're likely to have time for, @tsg21 ?

@gruelbox gruelbox locked and limited conversation to collaborators Dec 23, 2023
@badgerwithagun badgerwithagun converted this issue into discussion #543 Dec 23, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants