Skip to content

Commit

Permalink
Finalize transactional postgres and mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
galeaspablo committed Dec 9, 2024
1 parent b5a1535 commit e59179e
Show file tree
Hide file tree
Showing 48 changed files with 1,005 additions and 628 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloud.ambar.common.commandhandler;

import cloud.ambar.common.eventstore.EventStore;
import cloud.ambar.common.projection.MongoTransactionalProjectionOperator;
import cloud.ambar.common.eventstore.PostgresTransactionalEventStore;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -10,14 +11,26 @@

@RequiredArgsConstructor
public class CommandController {
private final EventStore eventStore;
private final PostgresTransactionalEventStore postgresTransactionalEventStore;
private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator;
private static final Logger log = LogManager.getLogger(CommandController.class);

public void processCommand(final Command command, final CommandHandler commandHandler) {
public void processCommand(
final Command command,
final CommandHandler commandHandler
) {
log.info("Command controller received command: " + command);

try {
eventStore.beginTransaction();
// We start a PG transaction because command handlers need to append to the event store transactionally.
// I.e., they need to read aggregates and append to them in an ACID fashion.
// We start a Mongo transaction because if a command handler needs to read from a projection,
// it also needs to do so transactionally to not receive dirty reads.
postgresTransactionalEventStore.beginTransaction();
mongoTransactionalProjectionOperator.startTransaction();
commandHandler.handleCommand(command);
eventStore.commitTransaction();
postgresTransactionalEventStore.commitTransaction();
mongoTransactionalProjectionOperator.commitTransaction();
} catch (Exception e) {
log.error("Failed to process reaction command.");
log.error(e);
Expand All @@ -28,10 +41,27 @@ public void processCommand(final Command command, final CommandHandler commandHa
.collect(Collectors.joining("\n"));
log.error(stackTraceString);

if (eventStore.isTransactionActive()) {
eventStore.abortTransaction();
eventStore.closeSession();
try {
if (postgresTransactionalEventStore.isTransactionActive()) {
postgresTransactionalEventStore.abortTransaction();
}
} catch (Exception postgresException) {
log.error("Failed to abort postgres transaction.");
log.error(postgresException);
log.error(postgresException.getMessage());
}

try {
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
mongoTransactionalProjectionOperator.abortTransaction();
}
} catch (Exception mongoException) {
log.error("Failed to abort mongo transaction.");
log.error(mongoException);
log.error(mongoException.getMessage());
}

throw new RuntimeException("Failed to process command with exception: " + e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cloud.ambar.common.commandhandler;

import cloud.ambar.common.eventstore.EventStore;
import cloud.ambar.common.eventstore.PostgresTransactionalEventStore;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
abstract public class CommandHandler {
final protected EventStore eventStore;
final protected PostgresTransactionalEventStore postgresTransactionalEventStore;
protected abstract void handleCommand(Command command);
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit e59179e

Please sign in to comment.