From e59179e7c9a41a74a043ce2a3f5192c78c8e7171 Mon Sep 17 00:00:00 2001 From: Luis Pablo Galeas Bardales Date: Mon, 9 Dec 2024 05:08:27 +0000 Subject: [PATCH] Finalize transactional postgres and mongo --- .../commandhandler/CommandController.java | 46 +++- .../common/commandhandler/CommandHandler.java | 4 +- .../common/eventstore/EventRepository.java | 16 -- .../ambar/common/eventstore/EventStore.java | 117 --------- .../eventstore/EventStoreInitializer.java | 137 ---------- .../PostgresTransactionalEventStore.java | 237 ++++++++++++++++++ .../ambar/common/projection/MongoConfig.java | 49 ---- .../common/projection/MongoInitializer.java | 35 --- .../projection/MongoNonTransactionalApi.java | 13 - .../projection/MongoTransactionalAPI.java | 44 ---- .../MongoTransactionalProjectionOperator.java | 66 +++++ .../common/projection/ProjectedEvent.java | 14 -- .../projection/ProjectionController.java | 33 ++- .../ambar/common/queryhandler/Query.java | 4 + .../common/queryhandler/QueryController.java | 48 ++++ .../common/queryhandler/QueryHandler.java | 10 + .../common/reaction/ReactionController.java | 43 +++- .../common/reaction/ReactionHandler.java | 4 +- .../ambar/common/serializedevent/README.md | 4 +- .../ambar/common/sessionauth/Session.java | 17 -- .../common/sessionauth/SessionRepository.java | 24 +- .../common/sessionauth/SessionService.java | 2 + .../cloud/ambar/common/util/MongoConfig.java | 90 +++++++ .../ambar/common/util/MongoInitializer.java | 16 ++ .../common/util/MongoInitializerApi.java | 27 ++ .../ambar/common/util/PostgresConfig.java | 100 ++++++++ .../common/util/PostgresInitializer.java | 16 ++ .../common/util/PostgresInitializerApi.java | 120 +++++++++ .../EnrollmentCommandController.java | 19 +- .../RequestEnrollmentCommand.java | 2 +- .../RequestEnrollmentCommandHandler.java | 23 +- .../RequestEnrollmentHttpRequest.java | 2 +- .../controller/EnrollmentQueryController.java | 37 --- .../DuplicateEnrollmentException.java | 8 - .../exception/InactiveProductException.java | 8 - .../exception/InvalidIncomeException.java | 8 - .../exception/InvalidProductException.java | 9 - .../InvalidSessionTokenException.java | 8 - .../EnrollmentProjectionController.java | 12 +- .../enrollmentlist/EnrollmentRepository.java | 10 +- .../enrollmentlist/ProductNameRepository.java | 8 +- .../ProductActiveStatusRepository.java | 10 +- .../EnrollmentQueryController.java | 35 +++ .../queryhandler/GetUserEnrollmentsQuery.java | 5 +- .../GetUserEnrollmentsQueryHandler.java | 33 ++- .../EnrollmentReactionController.java | 13 +- .../ReviewEnrollmentReactionHandler.java | 19 +- .../src/main/resources/application.properties | 28 ++- 48 files changed, 1005 insertions(+), 628 deletions(-) delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventRepository.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStore.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStoreInitializer.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/PostgresTransactionalEventStore.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoConfig.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoInitializer.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoNonTransactionalApi.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalAPI.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalProjectionOperator.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectedEvent.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/Query.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryController.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryHandler.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/Session.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoConfig.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializer.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializerApi.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresConfig.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializer.java create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializerApi.java rename application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/{controller => commandhandler}/EnrollmentCommandController.java (67%) rename application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/{controller => commandhandler}/RequestEnrollmentHttpRequest.java (81%) delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentQueryController.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/DuplicateEnrollmentException.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InactiveProductException.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidIncomeException.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidProductException.java delete mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidSessionTokenException.java rename application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/{controller => projection}/EnrollmentProjectionController.java (79%) create mode 100644 application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/EnrollmentQueryController.java rename application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/{controller => reaction}/EnrollmentReactionController.java (70%) diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandController.java index b775231d..9d7eeee8 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandController.java @@ -1,6 +1,7 @@ package cloud.ambar.common.commandhandler; -import cloud.ambar.common.eventstore.EventStore; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -10,14 +11,26 @@ @RequiredArgsConstructor public class CommandController { - private final EventStore eventStore; + private final PostgresTransactionalEventStore postgresTransactionalEventStore; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; private static final Logger log = LogManager.getLogger(CommandController.class); - public void processCommand(final Command command, final CommandHandler commandHandler) { + public void processCommand( + final Command command, + final CommandHandler commandHandler + ) { + log.info("Command controller received command: " + command); + try { - eventStore.beginTransaction(); + // We start a PG transaction because command handlers need to append to the event store transactionally. + // I.e., they need to read aggregates and append to them in an ACID fashion. + // We start a Mongo transaction because if a command handler needs to read from a projection, + // it also needs to do so transactionally to not receive dirty reads. + postgresTransactionalEventStore.beginTransaction(); + mongoTransactionalProjectionOperator.startTransaction(); commandHandler.handleCommand(command); - eventStore.commitTransaction(); + postgresTransactionalEventStore.commitTransaction(); + mongoTransactionalProjectionOperator.commitTransaction(); } catch (Exception e) { log.error("Failed to process reaction command."); log.error(e); @@ -28,10 +41,27 @@ public void processCommand(final Command command, final CommandHandler commandHa .collect(Collectors.joining("\n")); log.error(stackTraceString); - if (eventStore.isTransactionActive()) { - eventStore.abortTransaction(); - eventStore.closeSession(); + try { + if (postgresTransactionalEventStore.isTransactionActive()) { + postgresTransactionalEventStore.abortTransaction(); + } + } catch (Exception postgresException) { + log.error("Failed to abort postgres transaction."); + log.error(postgresException); + log.error(postgresException.getMessage()); + } + + try { + if (mongoTransactionalProjectionOperator.isTransactionActive()) { + mongoTransactionalProjectionOperator.abortTransaction(); + } + } catch (Exception mongoException) { + log.error("Failed to abort mongo transaction."); + log.error(mongoException); + log.error(mongoException.getMessage()); } + + throw new RuntimeException("Failed to process command with exception: " + e); } } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandHandler.java index 2cd07346..77beab8a 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandHandler.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandHandler.java @@ -1,10 +1,10 @@ package cloud.ambar.common.commandhandler; -import cloud.ambar.common.eventstore.EventStore; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor abstract public class CommandHandler { - final protected EventStore eventStore; + final protected PostgresTransactionalEventStore postgresTransactionalEventStore; protected abstract void handleCommand(Command command); } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventRepository.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventRepository.java deleted file mode 100644 index 1223a5a3..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventRepository.java +++ /dev/null @@ -1,16 +0,0 @@ -package cloud.ambar.common.eventstore; - -import cloud.ambar.common.serializedevent.SerializedEvent; -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.stereotype.Service; -import org.springframework.web.context.annotation.RequestScope; - -import java.util.List; -import java.util.Optional; - -@Service -@RequestScope -public interface EventRepository extends JpaRepository { - List findAllByAggregateId(String aggregateId); - Optional findByEventId(String eventId); -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStore.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStore.java deleted file mode 100644 index d6db7e8b..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStore.java +++ /dev/null @@ -1,117 +0,0 @@ -package cloud.ambar.common.eventstore; - -import cloud.ambar.common.aggregate.Aggregate; -import cloud.ambar.common.event.CreationEvent; -import cloud.ambar.common.event.Event; -import cloud.ambar.common.event.TransformationEvent; -import cloud.ambar.common.serializedevent.Deserializer; -import cloud.ambar.common.serializedevent.SerializedEvent; -import cloud.ambar.common.serializedevent.Serializer; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; -import org.springframework.web.context.annotation.RequestScope; - -import java.util.List; - -@Service -@RequestScope -@RequiredArgsConstructor -public class EventStore { - private final EventRepository eventRepository; - private final Deserializer deserializer; - private final Serializer serializer; - private boolean transactionActive = false; - - public AggregateAndEventIdsInLastEvent findAggregate(String aggregateId) { - if (!transactionActive) { - throw new RuntimeException("Transaction must be active to read aggregate from event store!"); - } - final List serializedEvents = eventRepository.findAllByAggregateId(aggregateId); - final List events = serializedEvents.stream() - .map(deserializer::deserialize) - .toList(); - - if (events.isEmpty()) { - throw new RuntimeException("No events found for aggregateId: " + aggregateId); - } - - final Event creationEvent = events.getFirst(); - final List transformationEvents = events.subList(1, events.size()); - - Aggregate aggregate; - if (creationEvent instanceof CreationEvent) { - aggregate = ((CreationEvent) creationEvent).createAggregate(); - } else { - throw new RuntimeException("First event is not a creation event"); - } - - String eventIdOfLastEvent = creationEvent.getEventId(); - String correlationIdOfLastEvent = creationEvent.getCorrelationId(); - - for (Event transformationEvent : transformationEvents) { - if (transformationEvent instanceof TransformationEvent) { - aggregate = ((TransformationEvent) transformationEvent).transformAggregate(aggregate); - eventIdOfLastEvent = transformationEvent.getEventId(); - correlationIdOfLastEvent = transformationEvent.getCorrelationId(); - - } else { - throw new RuntimeException("Event is not a transformation event"); - } - } - - - return new AggregateAndEventIdsInLastEvent( - aggregate, - eventIdOfLastEvent, - correlationIdOfLastEvent - ); - } - - public void saveEvent(Event event) { - if (!transactionActive) { - throw new RuntimeException("Transaction must be active to write into event store!"); - } - eventRepository.save(serializer.serialize(event)); - } - - public boolean doesEventAlreadyExist(String eventId) { - if (!transactionActive) { - throw new RuntimeException("Transaction must be active to read event from event store!"); - } - return eventRepository.findByEventId(eventId).isPresent(); - } - - public void beginTransaction() { - if (transactionActive) { - throw new RuntimeException("Transaction already active."); - } - // todo transaction begin - transactionActive = true; - } - - public void commitTransaction() { - if (!transactionActive) { - throw new RuntimeException("No transaction to commit."); - } - - // todo transaction commit - transactionActive = false; - } - - public void abortTransaction() { - if (!transactionActive) { - throw new RuntimeException("No transaction to abort"); - } - - // todo abort transaction - transactionActive = false; - } - - public void closeSession() { - // todo close session - } - - public boolean isTransactionActive() { - return transactionActive; - } -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStoreInitializer.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStoreInitializer.java deleted file mode 100644 index 8c35ab13..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/EventStoreInitializer.java +++ /dev/null @@ -1,137 +0,0 @@ -package cloud.ambar.common.eventstore; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.ApplicationRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; - -/** - * By making this a component, we tell Spring to initialize this class and make it available to the Application Context - * by doing this, along with ApplicationRunner bean, we can have this code run on startup of the application and ensure - * that our event store is ready for us. - */ -@Component -public class EventStoreInitializer { - private static final Logger log = LogManager.getLogger(EventStoreInitializer.class); - - private final JdbcTemplate jdbcTemplate; - - @Value("${EVENT_STORE_CREATE_TABLE_WITH_NAME}") - private String eventStoreTableName; - - @Value("${EVENT_STORE_CREATE_REPLICATION_USER_WITH_USERNAME}") - private String eventStoreCreateReplicationUserWithUsername; - - @Value("${EVENT_STORE_CREATE_REPLICATION_USER_WITH_PASSWORD}") - private String eventStoreCreateReplicationUserWithPassword; - - @Value("${EVENT_STORE_DATABASE_NAME}") - private String eventStoreDatabaseName; - - @Value("${EVENT_STORE_CREATE_REPLICATION_PUBLICATION}") - private String eventStoreCreateReplicationPublication; - - public EventStoreInitializer(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - } - - @Bean - ApplicationRunner initEventStore() { - return args -> { - // Create table - log.info("Creating table " + eventStoreTableName); - executeStatementIgnoreErrors( - String.format(""" - CREATE TABLE IF NOT EXISTS %s ( - id BIGSERIAL NOT NULL, - event_id TEXT NOT NULL UNIQUE, - aggregate_id TEXT NOT NULL, - aggregate_version BIGINT NOT NULL, - causation_id TEXT NOT NULL, - correlation_id TEXT NOT NULL, - recorded_on TEXT NOT NULL, - event_name TEXT NOT NULL, - json_payload TEXT NOT NULL, - json_metadata TEXT NOT NULL, - PRIMARY KEY (id));""", - eventStoreTableName) - ); - - // Create user - log.info("Creating replication user"); - executeStatementIgnoreErrors(String.format( - "CREATE USER %s REPLICATION LOGIN PASSWORD '%s';", - eventStoreCreateReplicationUserWithUsername, - eventStoreCreateReplicationUserWithPassword - )); - - // Grant permissions to user - log.info("Granting permissions to replication user"); - executeStatementIgnoreErrors(String.format( - "GRANT CONNECT ON DATABASE \"%s\" TO %s;", - eventStoreDatabaseName, - eventStoreCreateReplicationUserWithUsername - )); - - log.info("Granting select to replication user"); - executeStatementIgnoreErrors(String.format( - "GRANT SELECT ON TABLE %s TO %s;", - eventStoreTableName, - eventStoreCreateReplicationUserWithUsername - )); - - // Create publication - log.info("Creating publication for table"); - executeStatementIgnoreErrors(String.format( - "CREATE PUBLICATION %s FOR TABLE %s;", - eventStoreCreateReplicationPublication, - eventStoreTableName - )); - - // Create indices - log.info("Creating aggregate id, aggregate version index"); - executeStatementIgnoreErrors(String.format( - "CREATE UNIQUE INDEX event_store_idx_event_aggregate_id_version ON %s(aggregate_id, aggregate_version);", - eventStoreTableName - )); - log.info("Creating id index"); - executeStatementIgnoreErrors(String.format( - "CREATE UNIQUE INDEX event_store_idx_event_id ON %s(event_id);", - eventStoreTableName - )); - log.info("Creating causation index"); - executeStatementIgnoreErrors(String.format( - "CREATE INDEX event_store_idx_event_causation_id ON %s(causation_id);", - eventStoreTableName - )); - log.info("Creating correlation index"); - executeStatementIgnoreErrors(String.format( - "CREATE INDEX event_store_idx_event_correlation_id ON %s(correlation_id);", - eventStoreTableName - )); - log.info("Creating recording index"); - executeStatementIgnoreErrors(String.format( - "CREATE INDEX event_store_idx_occurred_on ON %s(recorded_on);", - eventStoreTableName - )); - log.info("Creating event name index"); - executeStatementIgnoreErrors(String.format( - "CREATE INDEX event_store_idx_event_name ON %s(event_name);", - eventStoreTableName - )); - }; - } - - private void executeStatementIgnoreErrors(final String sqlStatement) { - try { - log.info("Executing SQL: " + sqlStatement); - jdbcTemplate.execute(sqlStatement); - } catch (Exception e) { - log.warn("Caught exception when executing SQL statement."); - log.warn(e); - } - } -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/PostgresTransactionalEventStore.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/PostgresTransactionalEventStore.java new file mode 100644 index 00000000..108ec11b --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/PostgresTransactionalEventStore.java @@ -0,0 +1,237 @@ +package cloud.ambar.common.eventstore; + +import cloud.ambar.common.aggregate.Aggregate; +import cloud.ambar.common.event.CreationEvent; +import cloud.ambar.common.event.Event; +import cloud.ambar.common.event.TransformationEvent; +import cloud.ambar.common.queryhandler.QueryController; +import cloud.ambar.common.serializedevent.Deserializer; +import cloud.ambar.common.serializedevent.SerializedEvent; +import cloud.ambar.common.serializedevent.Serializer; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@RequiredArgsConstructor +public class PostgresTransactionalEventStore implements AutoCloseable { + private static final Logger log = LogManager.getLogger(PostgresTransactionalEventStore.class); + + private final Connection connection; + + private final Serializer serializer; + + private final Deserializer deserializer; + + private final String eventStoreTable; + + private boolean isTransactionActive = false; + + public void beginTransaction() { + if (isTransactionActive) { + throw new RuntimeException("Transaction already active!"); + } + try { + connection.setAutoCommit(false); + try (Statement stmt = connection.createStatement()) { + stmt.execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + } + isTransactionActive = true; + } catch (SQLException e) { + throw new RuntimeException("Failed to start transaction", e); + } + } + + public AggregateAndEventIdsInLastEvent findAggregate(String aggregateId) { + final List serializedEvents = findAllSerializedEventsByAggregateId(aggregateId); + final List events = serializedEvents.stream() + .map(deserializer::deserialize) + .toList(); + + if (events.isEmpty()) { + throw new RuntimeException("No events found for aggregateId: " + aggregateId); + } + + final Event creationEvent = events.getFirst(); + final List transformationEvents = events.subList(1, events.size()); + + Aggregate aggregate; + if (creationEvent instanceof CreationEvent) { + aggregate = ((CreationEvent) creationEvent).createAggregate(); + } else { + throw new RuntimeException("First event is not a creation event"); + } + + String eventIdOfLastEvent = creationEvent.getEventId(); + String correlationIdOfLastEvent = creationEvent.getCorrelationId(); + + for (Event transformationEvent : transformationEvents) { + if (transformationEvent instanceof TransformationEvent) { + aggregate = ((TransformationEvent) transformationEvent).transformAggregate(aggregate); + eventIdOfLastEvent = transformationEvent.getEventId(); + correlationIdOfLastEvent = transformationEvent.getCorrelationId(); + + } else { + throw new RuntimeException("Event is not a transformation event"); + } + } + + + return new AggregateAndEventIdsInLastEvent( + aggregate, + eventIdOfLastEvent, + correlationIdOfLastEvent + ); + } + + public void saveEvent(Event event) { + saveSerializedEvent(serializer.serialize(event)); + } + + public boolean doesEventAlreadyExist(String eventId) { + return findSerializedEventByEventId(eventId).isPresent(); + } + + public void commitTransaction() { + if (!isTransactionActive) { + throw new RuntimeException("Transaction must be active to commit!"); + } + try { + connection.commit(); + isTransactionActive = false; + } catch (SQLException e) { + throw new RuntimeException("Failed to commit transaction", e); + } + } + + public void abortTransaction() { + if (!isTransactionActive) { + throw new RuntimeException("Transaction must be active to abort!"); + } + try { + connection.rollback(); + isTransactionActive = false; + } catch (SQLException e) { + throw new RuntimeException("Failed to abort transaction", e); + } + } + + public boolean isTransactionActive() { + return isTransactionActive; + } + + // IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions + // when the transaction event store gets garbage collected. + // I.e., it will return the event store's connection back to the connection pool. + // Note: There is need to close the connection, because that would mess with the library's connection pool. + // The transactional event store is meant to be used in @RequestScope, so the connection will be cleaned up + // by the library when the transactional event store and its connection are garbage collected. + public void close() { + if (isTransactionActive) { + abortTransaction(); + } + } + + private List findAllSerializedEventsByAggregateId(String aggregateId) { + if (!isTransactionActive) { + throw new RuntimeException("Transaction must be active to perform operations!"); + } + + List events = new ArrayList<>(); + String sql = String.format(""" + SELECT id, event_id, aggregate_id, causation_id, correlation_id, + aggregate_version, json_payload, json_metadata, recorded_on, event_name + FROM %s + WHERE aggregate_id = ? + ORDER BY aggregate_version ASC + """, eventStoreTable); + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, aggregateId); + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + events.add(mapResultSetToSerializedEvent(rs)); + } + return events; + } catch (SQLException e) { + throw new RuntimeException("Failed to fetch events for aggregate: " + aggregateId, e); + } + } + + private void saveSerializedEvent(SerializedEvent event) { + if (!isTransactionActive) { + throw new RuntimeException("Transaction must be active to perform operations!"); + } + + String sql = String.format(""" + INSERT INTO %s ( + event_id, aggregate_id, causation_id, correlation_id, + aggregate_version, json_payload, json_metadata, recorded_on, event_name + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, eventStoreTable); + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, event.getEventId()); + stmt.setString(2, event.getAggregateId()); + stmt.setString(3, event.getCausationId()); + stmt.setString(4, event.getCorrelationId()); + stmt.setInt(5, event.getAggregateVersion()); + stmt.setString(6, event.getJsonPayload()); + stmt.setString(7, event.getJsonMetadata()); + stmt.setString(8, event.getRecordedOn()); + stmt.setString(9, event.getEventName()); + + stmt.executeUpdate(); + } catch (SQLException e) { + log.error(e.getClass()); + log.error(e.getMessage()); + log.error(e); + throw new RuntimeException("Failed to save event: " + event.getEventId(), e); + } + } + + private Optional findSerializedEventByEventId(String eventId) { + if (!isTransactionActive) { + throw new RuntimeException("Transaction must be active to perform operations!"); + } + + String sql = String.format(""" + SELECT id, event_id, aggregate_id, causation_id, correlation_id, + aggregate_version, json_payload, json_metadata, recorded_on, event_name + FROM %s + WHERE event_id = ? + """, eventStoreTable); + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, eventId); + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) { + return Optional.of(mapResultSetToSerializedEvent(rs)); + } + return Optional.empty(); + } catch (SQLException e) { + throw new RuntimeException("Failed to fetch event: " + eventId, e); + } + } + + private SerializedEvent mapResultSetToSerializedEvent(ResultSet rs) throws SQLException { + return SerializedEvent.builder() + .id(rs.getInt("id")) + .eventId(rs.getString("event_id")) + .aggregateId(rs.getString("aggregate_id")) + .causationId(rs.getString("causation_id")) + .correlationId(rs.getString("correlation_id")) + .aggregateVersion(rs.getInt("aggregate_version")) + .jsonPayload(rs.getString("json_payload")) + .jsonMetadata(rs.getString("json_metadata")) + .recordedOn(rs.getString("recorded_on")) + .eventName(rs.getString("event_name")) + .build(); + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoConfig.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoConfig.java deleted file mode 100644 index bc7b222c..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoConfig.java +++ /dev/null @@ -1,49 +0,0 @@ -package cloud.ambar.common.projection; - -import com.mongodb.client.ClientSession; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper; -import org.springframework.data.mongodb.core.convert.MappingMongoConverter; -import org.springframework.web.context.annotation.RequestScope; - - -@Configuration -public class MongoConfig { - @Value("${app.mongodb.transactional-api.uri}") - private String mongodbUri; - - @Value("${app.mongodb.transactional-api.database}") - private String mongoDatabaseName; - - @Bean - @RequestScope - public MongoTransactionalAPI mongoTransactionalAPI() { - MongoClient mongoClient = MongoClients.create(mongodbUri); - ClientSession session = mongoClient.startSession(); - MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName).withSession(session); - - // Disable _class field in mongo documents - MappingMongoConverter converter = (MappingMongoConverter) mongoTemplate.getConverter(); - converter.setTypeMapper(new DefaultMongoTypeMapper(null)); - - return new MongoTransactionalAPI(mongoTemplate, session); - } - - @Bean - public MongoNonTransactionalApi mongoNonTransactionalApi() { - MongoClient mongoClient = MongoClients.create(mongodbUri); - MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName); - - // Disable _class field in mongo documents - MappingMongoConverter converter = (MappingMongoConverter) mongoTemplate.getConverter(); - converter.setTypeMapper(new DefaultMongoTypeMapper(null)); - - return new MongoNonTransactionalApi(mongoTemplate); - } -} - diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoInitializer.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoInitializer.java deleted file mode 100644 index b0b0ead9..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoInitializer.java +++ /dev/null @@ -1,35 +0,0 @@ -package cloud.ambar.common.projection; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.springframework.boot.ApplicationRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.data.mongodb.core.index.Index; -import org.springframework.stereotype.Component; - -@Component -public class MongoInitializer { - private static final Logger log = LogManager.getLogger(MongoInitializer.class); - - private final MongoNonTransactionalApi mongoNonTransactionalApi; - - public MongoInitializer(MongoNonTransactionalApi mongoNonTransactionalApi) { - this.mongoNonTransactionalApi = mongoNonTransactionalApi; - } - - @Bean - ApplicationRunner initMongo() { - return args -> { - log.info("Creating collections"); - mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_Enrollment"); - mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_ProductName"); - mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_ProductActiveStatus"); - log.info("Created collections"); - - log.info("Creating indexes"); - mongoNonTransactionalApi.operate().indexOps("CreditCard_Enrollment_Enrollment") - .ensureIndex(new Index().on("userId", org.springframework.data.domain.Sort.Direction.ASC)); - log.info("Created indexes"); - }; - } -} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoNonTransactionalApi.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoNonTransactionalApi.java deleted file mode 100644 index 8eeb13db..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoNonTransactionalApi.java +++ /dev/null @@ -1,13 +0,0 @@ -package cloud.ambar.common.projection; - -import lombok.RequiredArgsConstructor; -import org.springframework.data.mongodb.core.MongoTemplate; - -@RequiredArgsConstructor -public class MongoNonTransactionalApi { - private final MongoTemplate mongoTemplate; - - public MongoTemplate operate() { - return mongoTemplate; - } -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalAPI.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalAPI.java deleted file mode 100644 index 4ed3f71c..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalAPI.java +++ /dev/null @@ -1,44 +0,0 @@ -package cloud.ambar.common.projection; - -import com.mongodb.ReadConcern; -import com.mongodb.ReadPreference; -import com.mongodb.TransactionOptions; -import com.mongodb.WriteConcern; -import com.mongodb.client.ClientSession; -import lombok.RequiredArgsConstructor; -import org.springframework.data.mongodb.core.MongoTemplate; - -@RequiredArgsConstructor -public class MongoTransactionalAPI { - private final MongoTemplate mongoTemplate; - private final ClientSession session; - - public void startTransaction() { - session.startTransaction( - TransactionOptions.builder() - .writeConcern(WriteConcern.MAJORITY) - .readConcern(ReadConcern.SNAPSHOT) - .readPreference(ReadPreference.primary()) - .build()); - } - - public MongoTemplate operate() { - return mongoTemplate; - } - - public void commitTransaction() { - session.commitTransaction(); - } - - public boolean isTransactionActive() { - return session.hasActiveTransaction(); - } - - public void abortTransaction() { - session.abortTransaction(); - } - - public void closeSession() { - session.close(); - } -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalProjectionOperator.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalProjectionOperator.java new file mode 100644 index 00000000..b9593731 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalProjectionOperator.java @@ -0,0 +1,66 @@ +package cloud.ambar.common.projection; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; +import com.mongodb.client.ClientSession; +import lombok.RequiredArgsConstructor; +import org.springframework.data.mongodb.core.MongoTemplate; + +@RequiredArgsConstructor +public class MongoTransactionalProjectionOperator implements AutoCloseable { + private final MongoTemplate mongoTemplate; + + private final ClientSession session; + + public void startTransaction() { + if (session.hasActiveTransaction()) { + throw new RuntimeException("Transaction to MongoDB already active!"); + } + session.startTransaction( + TransactionOptions.builder() + .writeConcern(WriteConcern.MAJORITY) + .readConcern(ReadConcern.SNAPSHOT) + .readPreference(ReadPreference.primary()) + .build()); + } + + // This method is used to read and write to MongoDB within a transaction. + public MongoTemplate operate() { + if (!session.hasActiveTransaction()) { + throw new RuntimeException("Transaction must be active to read or write to MongoDB!"); + } + return mongoTemplate; + } + + public void commitTransaction() { + if (!session.hasActiveTransaction()) { + throw new RuntimeException("Transaction must be active to commit transaction to MongoDB!"); + } + session.commitTransaction(); + } + + public boolean isTransactionActive() { + return session.hasActiveTransaction(); + } + + public void abortTransaction() { + if (!session.hasActiveTransaction()) { + throw new RuntimeException("Transaction must be active to abort transaction for MongoDB!"); + } + session.abortTransaction(); + } + + // IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions + // when the transactional projection operator gets garbage collected. + // I.e., it will return the projection operator's session back to the connection pool. + // Note: There is no need to close the session, because that would mess with the library's session pool. + // The transactional projection operator is meant to be used in @RequestScope, so the session will be cleaned up + // by the library when the transactional projection operator and its session are garbage collected. + public void close() { + if (session.hasActiveTransaction()) { + session.abortTransaction(); + } + } +} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectedEvent.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectedEvent.java deleted file mode 100644 index 5d6283ac..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectedEvent.java +++ /dev/null @@ -1,14 +0,0 @@ -package cloud.ambar.common.projection; - -import lombok.Builder; -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; - -@Builder -@Setter -@Getter -public class ProjectedEvent { - @NonNull private String eventId; - @NonNull private String projectionName; -} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectionController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectionController.java index 73949e47..ba82ef0b 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectionController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectionController.java @@ -4,7 +4,7 @@ import cloud.ambar.common.ambar.AmbarResponseFactory; import cloud.ambar.common.event.Event; import cloud.ambar.common.serializedevent.Deserializer; -import lombok.RequiredArgsConstructor; +import lombok.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.data.mongodb.core.query.Criteria; @@ -15,21 +15,24 @@ @RequiredArgsConstructor public abstract class ProjectionController { + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; private final Deserializer deserializer; - private final MongoTransactionalAPI mongoTransactionalAPI; private static final Logger log = LogManager.getLogger(ProjectionController.class); - protected String processHttpRequest( + protected String processProjectionHttpRequest( final AmbarHttpRequest ambarHttpRequest, final ProjectionHandler projectionHandler, final String projectionName) { - log.info("Projection received http request: {}", ambarHttpRequest); + log.info("Reaction controller received http request: " + ambarHttpRequest); try { Event event = deserializer.deserialize(ambarHttpRequest.getSerializedEvent()); - mongoTransactionalAPI.startTransaction(); - boolean isAlreadyProjected = mongoTransactionalAPI.operate().count( + // We start a Mongo transaction because if a projection handler needs to update a projection, + // it should do so idempotently by checking if the event has already been projected, + // and it should do so with a transaction to not receive dirty reads. + mongoTransactionalProjectionOperator.startTransaction(); + boolean isAlreadyProjected = mongoTransactionalProjectionOperator.operate().count( Query.query( Criteria.where("eventId").is(event.getEventId()) .and("projectionName").is(projectionName) @@ -41,7 +44,7 @@ protected String processHttpRequest( return AmbarResponseFactory.successResponse(); } - mongoTransactionalAPI.operate().save( + mongoTransactionalProjectionOperator.operate().save( ProjectedEvent.builder() .eventId(event.getEventId()) .projectionName(projectionName) @@ -51,7 +54,7 @@ protected String processHttpRequest( projectionHandler.project(event); - mongoTransactionalAPI.commitTransaction(); + mongoTransactionalProjectionOperator.commitTransaction(); return AmbarResponseFactory.successResponse(); } catch (Exception e) { if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) { @@ -67,11 +70,19 @@ protected String processHttpRequest( .map(StackTraceElement::toString) .collect(Collectors.joining("\n")); log.error(stackTraceString); - if (mongoTransactionalAPI.isTransactionActive()) { - mongoTransactionalAPI.abortTransaction(); - mongoTransactionalAPI.closeSession(); + if (mongoTransactionalProjectionOperator.isTransactionActive()) { + mongoTransactionalProjectionOperator.abortTransaction(); } return AmbarResponseFactory.retryResponse(e); } } + + @Builder + @Setter + @Getter + public static class ProjectedEvent { + @NonNull + private String eventId; + @NonNull private String projectionName; + } } \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/Query.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/Query.java new file mode 100644 index 00000000..c2281f98 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/Query.java @@ -0,0 +1,4 @@ +package cloud.ambar.common.queryhandler; + +abstract public class Query { +} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryController.java new file mode 100644 index 00000000..940469c7 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryController.java @@ -0,0 +1,48 @@ +package cloud.ambar.common.queryhandler; + +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Arrays; +import java.util.stream.Collectors; + +@RequiredArgsConstructor +public class QueryController { + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; + private static final Logger log = LogManager.getLogger(QueryController.class); + + public Object processQuery(final Query query, final QueryHandler queryHandler) { + // We start a Mongo transaction because if a query handler needs to read from a projection, + // it also needs to do so transactionally to not receive dirty reads. + try { + mongoTransactionalProjectionOperator.startTransaction(); + Object object = queryHandler.handleQuery(query); + mongoTransactionalProjectionOperator.commitTransaction(); + + return object; + } catch (Exception e) { + log.error("Failed to process reaction command."); + log.error(e); + log.error(e.getMessage()); + + String stackTraceString = Arrays.stream(e.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + log.error(stackTraceString); + + try { + if (mongoTransactionalProjectionOperator.isTransactionActive()) { + mongoTransactionalProjectionOperator.abortTransaction(); + } + } catch (Exception mongoException) { + log.error("Failed to abort mongo transaction."); + log.error(mongoException); + log.error(mongoException.getMessage()); + } + + throw new RuntimeException("Failed to process query with exception: " + e); + } + } +} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryHandler.java new file mode 100644 index 00000000..353362ea --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryHandler.java @@ -0,0 +1,10 @@ +package cloud.ambar.common.queryhandler; + +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +abstract public class QueryHandler { + final protected MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; + protected abstract Object handleQuery(Query query); +} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionController.java index 8a314247..f36bf337 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionController.java @@ -2,8 +2,9 @@ import cloud.ambar.common.ambar.AmbarHttpRequest; import cloud.ambar.common.ambar.AmbarResponseFactory; -import cloud.ambar.common.eventstore.EventStore; import cloud.ambar.common.serializedevent.Deserializer; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,17 +14,24 @@ @RequiredArgsConstructor public abstract class ReactionController { - private final EventStore eventStore; + private final PostgresTransactionalEventStore postgresTransactionalEventStore; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; private final Deserializer deserializer; private static final Logger log = LogManager.getLogger(ReactionController.class); - public String processHttpRequest(final AmbarHttpRequest ambarHttpRequest, final ReactionHandler reactionHandler) { + public String processReactionHttpRequest(final AmbarHttpRequest ambarHttpRequest, final ReactionHandler reactionHandler) { try { - log.info("Reaction received http request: " + ambarHttpRequest); + log.info("Reaction controller received http request: " + ambarHttpRequest); - eventStore.beginTransaction(); + // We start a PG transaction because reaction handlers need to append to the event store transactionally. + // I.e., they need to read aggregates and append to them in an ACID fashion. + // We start a Mongo transaction because if a reaction handler needs to read from a projection, + // it also needs to do so transactionally to not receive dirty reads. + postgresTransactionalEventStore.beginTransaction(); + mongoTransactionalProjectionOperator.startTransaction(); reactionHandler.react(deserializer.deserialize(ambarHttpRequest.getSerializedEvent())); - eventStore.commitTransaction(); + postgresTransactionalEventStore.commitTransaction(); + mongoTransactionalProjectionOperator.commitTransaction(); return AmbarResponseFactory.successResponse(); } catch (Exception e) { @@ -40,10 +48,27 @@ public String processHttpRequest(final AmbarHttpRequest ambarHttpRequest, final .map(StackTraceElement::toString) .collect(Collectors.joining("\n")); log.error(stackTraceString); - if (eventStore.isTransactionActive()) { - eventStore.abortTransaction(); - eventStore.closeSession(); + + try { + if (postgresTransactionalEventStore.isTransactionActive()) { + postgresTransactionalEventStore.abortTransaction(); + } + } catch (Exception postgresException) { + log.error("Failed to abort postgres transaction."); + log.error(postgresException); + log.error(postgresException.getMessage()); + } + + try { + if (mongoTransactionalProjectionOperator.isTransactionActive()) { + mongoTransactionalProjectionOperator.abortTransaction(); + } + } catch (Exception mongoException) { + log.error("Failed to abort mongo transaction."); + log.error(mongoException); + log.error(mongoException.getMessage()); } + return AmbarResponseFactory.retryResponse(e); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionHandler.java index cfb34f1a..f0afc768 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionHandler.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionHandler.java @@ -1,12 +1,12 @@ package cloud.ambar.common.reaction; import cloud.ambar.common.event.Event; -import cloud.ambar.common.eventstore.EventStore; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public abstract class ReactionHandler { - final protected EventStore eventStore; + final protected PostgresTransactionalEventStore postgresTransactionalEventStore; protected abstract void react(final Event event) throws JsonProcessingException; } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/serializedevent/README.md b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/serializedevent/README.md index 9e44a2cf..73a4f84a 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/serializedevent/README.md +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/serializedevent/README.md @@ -3,4 +3,6 @@ This module contains the classes that are used to serialize and deserialize the events that are sent to the database. A Serialized Event is a representation in which additional properties, not encoded in the abstract Event are converted into fields that go into the payload or metadata fields. Additionally, the Serialized Event -contains an `event_name` which can be used to figure out which class the SerializedEvent should be deserialized into. \ No newline at end of file +contains an `event_name` which can be used to figure out which class the SerializedEvent should be deserialized into. + +Serialized Events are used when communicating with the database (Postgres) or event bus (Ambar). \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/Session.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/Session.java deleted file mode 100644 index bff29224..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/Session.java +++ /dev/null @@ -1,17 +0,0 @@ -package cloud.ambar.common.sessionauth; - -import jakarta.persistence.Id; -import lombok.*; - -import java.time.Instant; - -@Builder -@AllArgsConstructor -@Getter -public class Session { - @Id @NonNull private String id; - @NonNull private String userId; - @NonNull private String sessionToken; - @NonNull private Boolean signedOut; - @NonNull private Instant tokenLastRefreshedAt; -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionRepository.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionRepository.java index cc3efabf..aa3fb181 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionRepository.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionRepository.java @@ -1,24 +1,27 @@ package cloud.ambar.common.sessionauth; -import cloud.ambar.common.projection.MongoNonTransactionalApi; -import lombok.RequiredArgsConstructor; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import jakarta.persistence.Id; +import lombok.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; +import org.springframework.web.context.annotation.RequestScope; import java.time.Instant; import java.util.Optional; @Service +@RequestScope @RequiredArgsConstructor public class SessionRepository { private static final Logger log = LogManager.getLogger(SessionRepository.class); private final SessionConfig sessionConfig; - private final MongoNonTransactionalApi mongoNonTransactionalApi; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; public Optional authenticatedUserIdFromSessionToken(String sessionToken) { - Session session = mongoNonTransactionalApi.operate().findOne( + Session session = mongoTransactionalProjectionOperator.operate().findOne( Query.query( Criteria.where("sessionToken").is(sessionToken) ), @@ -43,4 +46,17 @@ public Optional authenticatedUserIdFromSessionToken(String sessionToken) return Optional.of(session.getUserId()); } + + @Builder + @AllArgsConstructor + @Getter + public static class Session { + @Id + @NonNull + private String id; + @NonNull private String userId; + @NonNull private String sessionToken; + @NonNull private Boolean signedOut; + @NonNull private Instant tokenLastRefreshedAt; + } } \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionService.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionService.java index e8f4620d..6aa828de 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionService.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/sessionauth/SessionService.java @@ -4,8 +4,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.web.context.annotation.RequestScope; @Service +@RequestScope @RequiredArgsConstructor public class SessionService { private static final Logger log = LogManager.getLogger(SessionService.class); diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoConfig.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoConfig.java new file mode 100644 index 00000000..c5495c9a --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoConfig.java @@ -0,0 +1,90 @@ +package cloud.ambar.common.util; + +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.ClientSession; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.web.context.annotation.RequestScope; + +import java.util.concurrent.TimeUnit; + +@Configuration +public class MongoConfig { + @Value("${app.mongodb.uri}") + private String mongodbUri; + + @Value("${app.mongodb.database}") + private String mongoDatabaseName; + + @Bean("MongoClientForTransactionalSupport") + public MongoClient mongoClientForTransactionalSupport() { + ConnectionString connectionString = new ConnectionString(mongodbUri); + MongoClientSettings settings = MongoClientSettings.builder() + .applyConnectionString(connectionString) + .applyToConnectionPoolSettings(builder -> + builder.maxSize(20) + .minSize(5) + .maxWaitTime(2000, TimeUnit.MILLISECONDS) + .maxConnectionLifeTime(30, TimeUnit.MINUTES) + .maxConnectionIdleTime(10, TimeUnit.MINUTES) + ) + .build(); + + return MongoClients.create(settings); + } + + @Bean("MongoClientForNonTransactionalOperations") + public MongoClient mongoClientForNonTransactionalOperations() { + ConnectionString connectionString = new ConnectionString(mongodbUri); + MongoClientSettings settings = MongoClientSettings.builder() + .applyConnectionString(connectionString) + .applyToConnectionPoolSettings(builder -> + builder.maxSize(20) + .minSize(5) + .maxWaitTime(2000, TimeUnit.MILLISECONDS) + .maxConnectionLifeTime(30, TimeUnit.MINUTES) + .maxConnectionIdleTime(10, TimeUnit.MINUTES) + ) + .build(); + + return MongoClients.create(settings); + } + + @Bean + @RequestScope + public MongoTransactionalProjectionOperator mongoTransactionalAPI( + @Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient + ) { + ClientSession session = mongoClient.startSession(); + MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName).withSession(session); + + // Disable _class field in mongo documents + MappingMongoConverter converter = (MappingMongoConverter) mongoTemplate.getConverter(); + converter.setTypeMapper(new DefaultMongoTypeMapper(null)); + + return new MongoTransactionalProjectionOperator(mongoTemplate, session); + } + + @Bean + public MongoInitializerApi mongoInitalizerApi( + @Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient + ) { + MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName); + + // Disable _class field in mongo documents + MappingMongoConverter converter = (MappingMongoConverter) mongoTemplate.getConverter(); + converter.setTypeMapper(new DefaultMongoTypeMapper(null)); + + return new MongoInitializerApi(mongoTemplate); + } +} + diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializer.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializer.java new file mode 100644 index 00000000..0d5de087 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializer.java @@ -0,0 +1,16 @@ +package cloud.ambar.common.util; + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class MongoInitializer { + private final MongoInitializerApi mongoInitializerApi; + + @Bean + public void initMongo() { + mongoInitializerApi.initialize(); + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializerApi.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializerApi.java new file mode 100644 index 00000000..939b15e6 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoInitializerApi.java @@ -0,0 +1,27 @@ +package cloud.ambar.common.util; + +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.index.Index; + +@RequiredArgsConstructor +public class MongoInitializerApi { + private static final Logger log = LogManager.getLogger(MongoInitializerApi.class); + + private final MongoTemplate mongoTemplate; + + public void initialize() { + log.info("Creating collections"); + mongoTemplate.createCollection("CreditCard_Enrollment_Enrollment"); + mongoTemplate.createCollection("CreditCard_Enrollment_ProductName"); + mongoTemplate.createCollection("CreditCard_Enrollment_ProductActiveStatus"); + log.info("Created collections"); + + log.info("Creating indexes"); + mongoTemplate.indexOps("CreditCard_Enrollment_Enrollment") + .ensureIndex(new Index().on("userId", org.springframework.data.domain.Sort.Direction.ASC)); + log.info("Created indexes"); + } +} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresConfig.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresConfig.java new file mode 100644 index 00000000..1079e0f9 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresConfig.java @@ -0,0 +1,100 @@ +package cloud.ambar.common.util; + +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; +import cloud.ambar.common.serializedevent.Deserializer; +import cloud.ambar.common.serializedevent.Serializer; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.context.annotation.RequestScope; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +@Configuration +public class PostgresConfig { + @Value("${app.postgresql.uri}") + private String postgresUri; + + @Value("${app.postgresql.database}") + private String postgresDatabase; + + @Value("${app.postgresql.table}") + private String postgresTable; + + @Value("${app.postgresql.eventStoreCreateReplicationUserWithUsername}") + private String postgresReplicationUsername; + + @Value("${app.postgresql.eventStoreCreateReplicationUserWithPassword}") + private String postgresReplicationPassword; + + @Value("${app.postgresql.eventStoreCreateReplicationPublication}") + private String postgresReplicationPublicationName; + + @Bean + @Qualifier("DataSourceForTransactionalSupport") + public DataSource dataSourceForTransactionalSupport() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(postgresUri); + config.setAutoCommit(false); // Important for transaction control + + config.setMaximumPoolSize(10); + config.setMinimumIdle(5); + config.setIdleTimeout(300000); // 5 minutes + config.setConnectionTimeout(20000); // 20 seconds + + return new HikariDataSource(config); + } + + @Bean + @Qualifier("DataSourceForNonTransactionalOperations") + public DataSource dataSourceNonTransactionalOperations() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(postgresUri); + + config.setMaximumPoolSize(10); + config.setMinimumIdle(5); + config.setIdleTimeout(300000); + config.setConnectionTimeout(20000); + + return new HikariDataSource(config); + } + + @Bean + @RequestScope + public PostgresTransactionalEventStore postgresTransactionalAPI( + @Qualifier("DataSourceForTransactionalSupport") DataSource dataSource, + Serializer serializer, + Deserializer deserializer + ) { + try { + Connection connection = dataSource.getConnection(); + return new PostgresTransactionalEventStore(connection, serializer, deserializer, postgresTable); + } catch (SQLException e) { + throw new RuntimeException("Failed to get connection from data source for PG transactional API", e); + } + } + + @Bean + public PostgresInitializerApi postgresInitializerApi( + @Qualifier("DataSourceForNonTransactionalOperations") DataSource dataSource + ) { + try { + Connection connection = dataSource.getConnection(); + return new PostgresInitializerApi( + connection, + postgresDatabase, + postgresTable, + postgresReplicationUsername, + postgresReplicationPassword, + postgresReplicationPublicationName + ); + } catch (SQLException e) { + throw new RuntimeException("Failed to get connection for PG non transactional API", e); + } + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializer.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializer.java new file mode 100644 index 00000000..90f65d85 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializer.java @@ -0,0 +1,16 @@ +package cloud.ambar.common.util; + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class PostgresInitializer { + private final PostgresInitializerApi postgresInitializerApi; + + @Bean + public void initPostgres() { + postgresInitializerApi.initialize(); + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializerApi.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializerApi.java new file mode 100644 index 00000000..3094de10 --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresInitializerApi.java @@ -0,0 +1,120 @@ +package cloud.ambar.common.util; + +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.context.annotation.Bean; + +import java.sql.*; + +@RequiredArgsConstructor +public class PostgresInitializerApi { + private static final Logger log = LogManager.getLogger(PostgresInitializerApi.class); + + private final Connection connection; + + private final String eventStoreDatabaseName; + + private final String eventStoreTable; + + private final String eventStoreCreateReplicationUserWithUsername; + + private final String eventStoreCreateReplicationUserWithPassword; + + private final String eventStoreCreateReplicationPublication; + + @Bean + public void initialize() { + // Create table + log.info("Creating table " + eventStoreTable); + executeStatementIgnoreErrors( + String.format(""" + CREATE TABLE IF NOT EXISTS %s ( + id BIGSERIAL NOT NULL, + event_id TEXT NOT NULL UNIQUE, + aggregate_id TEXT NOT NULL, + aggregate_version BIGINT NOT NULL, + causation_id TEXT NOT NULL, + correlation_id TEXT NOT NULL, + recorded_on TEXT NOT NULL, + event_name TEXT NOT NULL, + json_payload TEXT NOT NULL, + json_metadata TEXT NOT NULL, + PRIMARY KEY (id));""", + eventStoreTable) + ); + + // Create replication user + log.info("Creating replication user"); + executeStatementIgnoreErrors(String.format( + "CREATE USER %s REPLICATION LOGIN PASSWORD '%s';", + eventStoreCreateReplicationUserWithUsername, + eventStoreCreateReplicationUserWithPassword + )); + + // Grant permissions to user + log.info("Granting permissions to replication user"); + executeStatementIgnoreErrors(String.format( + "GRANT CONNECT ON DATABASE \"%s\" TO %s;", + eventStoreDatabaseName, + eventStoreCreateReplicationUserWithUsername + )); + + log.info("Granting select to replication user"); + executeStatementIgnoreErrors(String.format( + "GRANT SELECT ON TABLE %s TO %s;", + eventStoreTable, + eventStoreCreateReplicationUserWithUsername + )); + + // Create publication + log.info("Creating publication for table"); + executeStatementIgnoreErrors(String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + eventStoreCreateReplicationPublication, + eventStoreTable + )); + + // Create indexes + log.info("Creating aggregate id, aggregate version index"); + executeStatementIgnoreErrors(String.format( + "CREATE UNIQUE INDEX event_store_idx_event_aggregate_id_version ON %s(aggregate_id, aggregate_version);", + eventStoreTable + )); + log.info("Creating id index"); + executeStatementIgnoreErrors(String.format( + "CREATE UNIQUE INDEX event_store_idx_event_id ON %s(event_id);", + eventStoreTable + )); + log.info("Creating causation index"); + executeStatementIgnoreErrors(String.format( + "CREATE INDEX event_store_idx_event_causation_id ON %s(causation_id);", + eventStoreTable + )); + log.info("Creating correlation index"); + executeStatementIgnoreErrors(String.format( + "CREATE INDEX event_store_idx_event_correlation_id ON %s(correlation_id);", + eventStoreTable + )); + log.info("Creating recording index"); + executeStatementIgnoreErrors(String.format( + "CREATE INDEX event_store_idx_occurred_on ON %s(recorded_on);", + eventStoreTable + )); + log.info("Creating event name index"); + executeStatementIgnoreErrors(String.format( + "CREATE INDEX event_store_idx_event_name ON %s(event_name);", + eventStoreTable + )); + } + + private void executeStatementIgnoreErrors(final String sqlStatement) { + try { + log.info("Executing SQL: " + sqlStatement); + connection.createStatement().execute(sqlStatement); + } catch (Exception e) { + log.warn("Caught exception when executing SQL statement."); + log.warn(e); + } + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentCommandController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/EnrollmentCommandController.java similarity index 67% rename from application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentCommandController.java rename to application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/EnrollmentCommandController.java index 980fd603..3cdcbbc3 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentCommandController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/EnrollmentCommandController.java @@ -1,10 +1,8 @@ -package cloud.ambar.creditcard.enrollment.controller; +package cloud.ambar.creditcard.enrollment.commandhandler; import cloud.ambar.common.commandhandler.CommandController; -import cloud.ambar.common.eventstore.EventStore; -import cloud.ambar.common.sessionauth.SessionService; -import cloud.ambar.creditcard.enrollment.commandhandler.RequestEnrollmentCommandHandler; -import cloud.ambar.creditcard.enrollment.commandhandler.RequestEnrollmentCommand; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import jakarta.validation.Valid; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; @@ -14,17 +12,14 @@ @RequestScope @RequestMapping("/api/v1/credit_card/enrollment") public class EnrollmentCommandController extends CommandController { - private final SessionService sessionService; - private final RequestEnrollmentCommandHandler requestEnrollmentCommandHandler; public EnrollmentCommandController( - EventStore eventStore, - SessionService sessionService, + PostgresTransactionalEventStore postgresTransactionalEventStore, + MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator, RequestEnrollmentCommandHandler requestEnrollmentCommandHandler ) { - super(eventStore); - this.sessionService = sessionService; + super(postgresTransactionalEventStore, mongoTransactionalProjectionOperator); this.requestEnrollmentCommandHandler = requestEnrollmentCommandHandler; } @@ -35,7 +30,7 @@ public void requestEnrollment( @Valid @RequestBody RequestEnrollmentHttpRequest request) { final RequestEnrollmentCommand command = RequestEnrollmentCommand .builder() - .userId(sessionService.authenticatedUserIdFromSessionToken(sessionToken)) + .sessionToken(sessionToken) .productId(request.getProductId()) .annualIncomeInCents(request.getAnnualIncomeInCents()) .build(); diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommand.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommand.java index 3cb26363..73db8418 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommand.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommand.java @@ -8,7 +8,7 @@ @Builder @Getter public class RequestEnrollmentCommand extends Command { - @NonNull private String userId; + @NonNull private String sessionToken; @NonNull private String productId; @NonNull private Integer annualIncomeInCents; } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommandHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommandHandler.java index c7f1be75..8a0c5776 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommandHandler.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentCommandHandler.java @@ -2,9 +2,9 @@ import cloud.ambar.common.commandhandler.Command; import cloud.ambar.common.commandhandler.CommandHandler; -import cloud.ambar.common.eventstore.EventStore; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; +import cloud.ambar.common.sessionauth.SessionService; import cloud.ambar.creditcard.enrollment.event.EnrollmentRequested; -import cloud.ambar.creditcard.enrollment.exception.InactiveProductException; import cloud.ambar.creditcard.enrollment.projection.isproductactive.IsProductActive; import org.springframework.stereotype.Service; import org.springframework.web.context.annotation.RequestScope; @@ -16,10 +16,17 @@ @Service @RequestScope public class RequestEnrollmentCommandHandler extends CommandHandler { + private final SessionService sessionService; + private final IsProductActive isProductActive; - public RequestEnrollmentCommandHandler(EventStore eventStore, IsProductActive isProductActive) { - super(eventStore); + public RequestEnrollmentCommandHandler( + PostgresTransactionalEventStore postgresTransactionalEventStore, + SessionService sessionService, + IsProductActive isProductActive + ) { + super(postgresTransactionalEventStore); + this.sessionService = sessionService; this.isProductActive = isProductActive; } @@ -32,8 +39,10 @@ protected void handleCommand(Command command) { } private void handleRequestEnrollment(final RequestEnrollmentCommand command) { + String userId = sessionService.authenticatedUserIdFromSessionToken(command.getSessionToken()); + if (!isProductActive.isProductActive(command.getProductId())) { - throw new InactiveProductException(); + throw new RuntimeException("Product is inactive and not eligible for enrollment request."); } final String eventId = generateRandomId(); @@ -45,11 +54,11 @@ private void handleRequestEnrollment(final RequestEnrollmentCommand command) { .correlationId(eventId) .causationId(eventId) .recordedOn(Instant.now()) - .userId(command.getUserId()) + .userId(userId) .productId(command.getProductId()) .annualIncomeInCents(command.getAnnualIncomeInCents()) .build(); - eventStore.saveEvent(enrollmentRequested); + postgresTransactionalEventStore.saveEvent(enrollmentRequested); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/RequestEnrollmentHttpRequest.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentHttpRequest.java similarity index 81% rename from application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/RequestEnrollmentHttpRequest.java rename to application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentHttpRequest.java index 78e2779f..6630f743 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/RequestEnrollmentHttpRequest.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/commandhandler/RequestEnrollmentHttpRequest.java @@ -1,4 +1,4 @@ -package cloud.ambar.creditcard.enrollment.controller; +package cloud.ambar.creditcard.enrollment.commandhandler; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Positive; diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentQueryController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentQueryController.java deleted file mode 100644 index 9929284b..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentQueryController.java +++ /dev/null @@ -1,37 +0,0 @@ -package cloud.ambar.creditcard.enrollment.controller; - -import cloud.ambar.common.sessionauth.SessionService; -import cloud.ambar.creditcard.enrollment.projection.enrollmentlist.EnrollmentListItem; -import cloud.ambar.creditcard.enrollment.queryhandler.GetUserEnrollmentsQueryHandler; -import cloud.ambar.creditcard.enrollment.queryhandler.GetUserEnrollmentsQuery; -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestHeader; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.context.annotation.RequestScope; - -import java.util.List; - -@RestController -@RequestScope -@RequiredArgsConstructor -@RequestMapping("/api/v1/credit_card/enrollment") -public class EnrollmentQueryController { - private final SessionService sessionService; - - private final GetUserEnrollmentsQueryHandler getUserEnrollmentsQueryHandler; - - @PostMapping(value = "/list-enrollments") - public List listEnrollments( - @RequestHeader("X-With-Session-Token") String sessionToken - ) { - final String userId = sessionService.authenticatedUserIdFromSessionToken(sessionToken); - - GetUserEnrollmentsQuery query = GetUserEnrollmentsQuery.builder() - .userId(userId) - .build(); - - return getUserEnrollmentsQueryHandler.handle(query); - } -} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/DuplicateEnrollmentException.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/DuplicateEnrollmentException.java deleted file mode 100644 index b3097a5f..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/DuplicateEnrollmentException.java +++ /dev/null @@ -1,8 +0,0 @@ -package cloud.ambar.creditcard.enrollment.exception; - -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ResponseStatus; - -@ResponseStatus(value = HttpStatus.CONFLICT, reason = "Enrollment for product by user already present.") -public class DuplicateEnrollmentException extends RuntimeException { -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InactiveProductException.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InactiveProductException.java deleted file mode 100644 index a84fd12f..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InactiveProductException.java +++ /dev/null @@ -1,8 +0,0 @@ -package cloud.ambar.creditcard.enrollment.exception; - -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ResponseStatus; - -@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Product is inactive and not eligible for the request.") -public class InactiveProductException extends RuntimeException { -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidIncomeException.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidIncomeException.java deleted file mode 100644 index e9e54ddf..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidIncomeException.java +++ /dev/null @@ -1,8 +0,0 @@ -package cloud.ambar.creditcard.enrollment.exception; - -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ResponseStatus; - -@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Zero or negative value passed") -public class InvalidIncomeException extends RuntimeException { -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidProductException.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidProductException.java deleted file mode 100644 index c61cbd5d..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidProductException.java +++ /dev/null @@ -1,9 +0,0 @@ -package cloud.ambar.creditcard.enrollment.exception; - -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ResponseStatus; - -@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Invalid Product") - -public class InvalidProductException extends RuntimeException { -} diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidSessionTokenException.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidSessionTokenException.java deleted file mode 100644 index ce261abe..00000000 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/exception/InvalidSessionTokenException.java +++ /dev/null @@ -1,8 +0,0 @@ -package cloud.ambar.creditcard.enrollment.exception; - -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ResponseStatus; - -@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "InvalidSessionToken") -public class InvalidSessionTokenException extends RuntimeException { -} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentProjectionController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/EnrollmentProjectionController.java similarity index 79% rename from application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentProjectionController.java rename to application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/EnrollmentProjectionController.java index 2df46d6e..98f51004 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentProjectionController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/EnrollmentProjectionController.java @@ -1,7 +1,7 @@ -package cloud.ambar.creditcard.enrollment.controller; +package cloud.ambar.creditcard.enrollment.projection; import cloud.ambar.common.ambar.AmbarHttpRequest; -import cloud.ambar.common.projection.MongoTransactionalAPI; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; import cloud.ambar.common.projection.ProjectionController; import cloud.ambar.common.serializedevent.Deserializer; import cloud.ambar.creditcard.enrollment.projection.enrollmentlist.EnrollmentListProjectionHandler; @@ -24,10 +24,10 @@ public class EnrollmentProjectionController extends ProjectionController { public EnrollmentProjectionController( Deserializer deserializer, - MongoTransactionalAPI mongoTransactionalAPI, + MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator, IsProductActiveProjectionHandler isProductActiveProjectionHandler, EnrollmentListProjectionHandler enrollmentListProjectionHandler) { - super(deserializer, mongoTransactionalAPI); + super(mongoTransactionalProjectionOperator, deserializer); this.isProductActiveProjectionHandler = isProductActiveProjectionHandler; this.enrollmentListProjectionHandler = enrollmentListProjectionHandler; } @@ -38,7 +38,7 @@ public EnrollmentProjectionController( public String projectIsCardProductActive( @Valid @RequestBody AmbarHttpRequest request ) { - return processHttpRequest(request, isProductActiveProjectionHandler, "CreditCard_Enrollment_IsProductActive"); + return processProjectionHttpRequest(request, isProductActiveProjectionHandler, "CreditCard_Enrollment_IsProductActive"); } @PostMapping(value = "/enrollment_list", consumes = MediaType.APPLICATION_JSON_VALUE, @@ -46,6 +46,6 @@ public String projectIsCardProductActive( public String projectEnrollmentList( @Valid @RequestBody AmbarHttpRequest request ) { - return processHttpRequest(request, enrollmentListProjectionHandler, "CreditCard_Enrollment_EnrollmentList"); + return processProjectionHttpRequest(request, enrollmentListProjectionHandler, "CreditCard_Enrollment_EnrollmentList"); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/EnrollmentRepository.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/EnrollmentRepository.java index 302c491d..8c9148af 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/EnrollmentRepository.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/EnrollmentRepository.java @@ -1,6 +1,6 @@ package cloud.ambar.creditcard.enrollment.projection.enrollmentlist; -import cloud.ambar.common.projection.MongoTransactionalAPI; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; import lombok.RequiredArgsConstructor; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; @@ -14,14 +14,14 @@ @Service @RequiredArgsConstructor public class EnrollmentRepository { - private final MongoTransactionalAPI mongoTransactionalAPI; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; public void save(final Enrollment enrollment) { - mongoTransactionalAPI.operate().save(enrollment, "CreditCard_Enrollment_Enrollment"); + mongoTransactionalProjectionOperator.operate().save(enrollment, "CreditCard_Enrollment_Enrollment"); } public Optional findOneById(final String id) { - return Optional.ofNullable(mongoTransactionalAPI.operate().findOne( + return Optional.ofNullable(mongoTransactionalProjectionOperator.operate().findOne( Query.query( Criteria.where("id").is(id) ), @@ -31,7 +31,7 @@ public Optional findOneById(final String id) { } public List findAllByUserId(final String userId) { - return mongoTransactionalAPI.operate().find( + return mongoTransactionalProjectionOperator.operate().find( Query.query( Criteria.where("userId").is(userId) ), diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/ProductNameRepository.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/ProductNameRepository.java index e0aec37f..4f4dfe9f 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/ProductNameRepository.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/enrollmentlist/ProductNameRepository.java @@ -1,6 +1,6 @@ package cloud.ambar.creditcard.enrollment.projection.enrollmentlist; -import cloud.ambar.common.projection.MongoTransactionalAPI; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; import lombok.RequiredArgsConstructor; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; @@ -13,14 +13,14 @@ @Service @RequiredArgsConstructor public class ProductNameRepository { - private final MongoTransactionalAPI mongoTransactionalAPI; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; public void save(final ProductName productName) { - mongoTransactionalAPI.operate().save(productName, "CreditCard_Enrollment_ProductName"); + mongoTransactionalProjectionOperator.operate().save(productName, "CreditCard_Enrollment_ProductName"); } public Optional findOneById(final String id) { - return Optional.ofNullable(mongoTransactionalAPI.operate().findOne( + return Optional.ofNullable(mongoTransactionalProjectionOperator.operate().findOne( Query.query( Criteria.where("id").is(id) ), diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/isproductactive/ProductActiveStatusRepository.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/isproductactive/ProductActiveStatusRepository.java index db5cb64a..568d1184 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/isproductactive/ProductActiveStatusRepository.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/projection/isproductactive/ProductActiveStatusRepository.java @@ -1,6 +1,6 @@ package cloud.ambar.creditcard.enrollment.projection.isproductactive; -import cloud.ambar.common.projection.MongoTransactionalAPI; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; import lombok.RequiredArgsConstructor; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; @@ -13,10 +13,10 @@ @Service @RequiredArgsConstructor public class ProductActiveStatusRepository { - private final MongoTransactionalAPI mongoTransactionalAPI; + private final MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator; public boolean isThereAnActiveProductWithId(final String productId) { - Optional productActiveStatus = Optional.ofNullable(mongoTransactionalAPI.operate().findOne( + Optional productActiveStatus = Optional.ofNullable(mongoTransactionalProjectionOperator.operate().findOne( Query.query( Criteria.where("id").is(productId) .and("active").is(true) @@ -29,7 +29,7 @@ public boolean isThereAnActiveProductWithId(final String productId) { } public Optional findOneById(final String productId) { - return Optional.ofNullable(mongoTransactionalAPI.operate().findOne( + return Optional.ofNullable(mongoTransactionalProjectionOperator.operate().findOne( Query.query(Criteria.where("id").is(productId)), ProductActiveStatus.class, "CreditCard_Enrollment_ProductActiveStatus" @@ -37,6 +37,6 @@ public Optional findOneById(final String productId) { } public void save(final ProductActiveStatus productActiveStatus) { - mongoTransactionalAPI.operate().save(productActiveStatus, "CreditCard_Enrollment_ProductActiveStatus"); + mongoTransactionalProjectionOperator.operate().save(productActiveStatus, "CreditCard_Enrollment_ProductActiveStatus"); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/EnrollmentQueryController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/EnrollmentQueryController.java new file mode 100644 index 00000000..4af3471b --- /dev/null +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/EnrollmentQueryController.java @@ -0,0 +1,35 @@ +package cloud.ambar.creditcard.enrollment.queryhandler; + +import cloud.ambar.common.queryhandler.QueryController; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.annotation.RequestScope; + +@RestController +@RequestScope +@RequestMapping("/api/v1/credit_card/enrollment") +public class EnrollmentQueryController extends QueryController { + private final GetUserEnrollmentsQueryHandler getUserEnrollmentsQueryHandler; + + public EnrollmentQueryController( + MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator, + GetUserEnrollmentsQueryHandler getUserEnrollmentsQueryHandler + ) { + super(mongoTransactionalProjectionOperator); + this.getUserEnrollmentsQueryHandler = getUserEnrollmentsQueryHandler; + } + + @PostMapping(value = "/list-enrollments") + public Object listEnrollments( + @RequestHeader("X-With-Session-Token") String sessionToken + ) { + GetUserEnrollmentsQuery query = GetUserEnrollmentsQuery.builder() + .sessionToken(sessionToken) + .build(); + + return processQuery(query, getUserEnrollmentsQueryHandler); + } +} \ No newline at end of file diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQuery.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQuery.java index f4fbb13b..1742ded2 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQuery.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQuery.java @@ -1,11 +1,12 @@ package cloud.ambar.creditcard.enrollment.queryhandler; +import cloud.ambar.common.queryhandler.Query; import lombok.Builder; import lombok.Getter; import lombok.NonNull; @Builder @Getter -public class GetUserEnrollmentsQuery { - @NonNull private String userId; +public class GetUserEnrollmentsQuery extends Query { + @NonNull private String sessionToken; } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQueryHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQueryHandler.java index 9da1659b..bea8cd55 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQueryHandler.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/queryhandler/GetUserEnrollmentsQueryHandler.java @@ -1,8 +1,11 @@ package cloud.ambar.creditcard.enrollment.queryhandler; +import cloud.ambar.common.queryhandler.Query; +import cloud.ambar.common.queryhandler.QueryHandler; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import cloud.ambar.common.sessionauth.SessionService; import cloud.ambar.creditcard.enrollment.projection.enrollmentlist.EnrollmentListItem; import cloud.ambar.creditcard.enrollment.projection.enrollmentlist.GetEnrollmentList; -import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.web.context.annotation.RequestScope; @@ -10,11 +13,31 @@ @Service @RequestScope -@RequiredArgsConstructor -public class GetUserEnrollmentsQueryHandler { +public class GetUserEnrollmentsQueryHandler extends QueryHandler { + private final SessionService sessionService; private final GetEnrollmentList getEnrollmentList; - public List handle(final GetUserEnrollmentsQuery query) { - return getEnrollmentList.getEnrollmentList(query.getUserId()); + public GetUserEnrollmentsQueryHandler( + MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator, + SessionService sessionService, + GetEnrollmentList getEnrollmentList + ) { + super(mongoTransactionalProjectionOperator); + this.sessionService = sessionService; + this.getEnrollmentList = getEnrollmentList; + } + + @Override + protected Object handleQuery(Query query) { + if (query instanceof GetUserEnrollmentsQuery) { + return handleGetUserEnrollments((GetUserEnrollmentsQuery) query); + } else { + throw new IllegalArgumentException("Unsupported query type: " + query.getClass().getName()); + } + } + + public List handleGetUserEnrollments(final GetUserEnrollmentsQuery query) { + String userId = sessionService.authenticatedUserIdFromSessionToken(query.getSessionToken()); + return getEnrollmentList.getEnrollmentList(userId); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentReactionController.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/EnrollmentReactionController.java similarity index 70% rename from application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentReactionController.java rename to application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/EnrollmentReactionController.java index dd32c80d..91b79f9e 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/controller/EnrollmentReactionController.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/EnrollmentReactionController.java @@ -1,10 +1,10 @@ -package cloud.ambar.creditcard.enrollment.controller; +package cloud.ambar.creditcard.enrollment.reaction; import cloud.ambar.common.ambar.AmbarHttpRequest; -import cloud.ambar.common.eventstore.EventStore; import cloud.ambar.common.reaction.ReactionController; import cloud.ambar.common.serializedevent.Deserializer; -import cloud.ambar.creditcard.enrollment.reaction.ReviewEnrollmentReactionHandler; +import cloud.ambar.common.projection.MongoTransactionalProjectionOperator; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import jakarta.validation.Valid; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; @@ -20,10 +20,11 @@ public class EnrollmentReactionController extends ReactionController { private final ReviewEnrollmentReactionHandler reviewEnrollmentReactionHandler; public EnrollmentReactionController( - EventStore eventStore, + PostgresTransactionalEventStore postgresTransactionalEventStore, + MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator, Deserializer deserializer, ReviewEnrollmentReactionHandler reviewEnrollmentReactionHandler) { - super(eventStore, deserializer); + super(postgresTransactionalEventStore, mongoTransactionalProjectionOperator, deserializer); this.reviewEnrollmentReactionHandler = reviewEnrollmentReactionHandler; } @@ -33,6 +34,6 @@ public EnrollmentReactionController( public String reactWithReviewEnrollment( @Valid @RequestBody AmbarHttpRequest request ) { - return processHttpRequest(request, reviewEnrollmentReactionHandler); + return processReactionHttpRequest(request, reviewEnrollmentReactionHandler); } } diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/ReviewEnrollmentReactionHandler.java b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/ReviewEnrollmentReactionHandler.java index e69e8271..2aa793b7 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/ReviewEnrollmentReactionHandler.java +++ b/application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/creditcard/enrollment/reaction/ReviewEnrollmentReactionHandler.java @@ -3,8 +3,8 @@ import cloud.ambar.common.aggregate.Aggregate; import cloud.ambar.common.event.Event; import cloud.ambar.common.eventstore.AggregateAndEventIdsInLastEvent; -import cloud.ambar.common.eventstore.EventStore; import cloud.ambar.common.reaction.ReactionHandler; +import cloud.ambar.common.eventstore.PostgresTransactionalEventStore; import cloud.ambar.creditcard.enrollment.aggregate.Enrollment; import cloud.ambar.creditcard.enrollment.aggregate.EnrollmentStatus; import cloud.ambar.creditcard.enrollment.event.EnrollmentAccepted; @@ -23,14 +23,17 @@ public class ReviewEnrollmentReactionHandler extends ReactionHandler { private final GetEnrollmentList getEnrollmentList; - public ReviewEnrollmentReactionHandler(EventStore eventStore, GetEnrollmentList getEnrollmentList) { - super(eventStore); + public ReviewEnrollmentReactionHandler( + PostgresTransactionalEventStore postgresTransactionalEventStore, + GetEnrollmentList getEnrollmentList + ) { + super(postgresTransactionalEventStore); this.getEnrollmentList = getEnrollmentList; } public void react(final Event event) { if (event instanceof EnrollmentRequested) { - final AggregateAndEventIdsInLastEvent aggregateAndEventIdsInLastEvent = eventStore.findAggregate(event.getAggregateId()); + final AggregateAndEventIdsInLastEvent aggregateAndEventIdsInLastEvent = postgresTransactionalEventStore.findAggregate(event.getAggregateId()); final Aggregate aggregate = aggregateAndEventIdsInLastEvent.getAggregate(); final String causationId = aggregateAndEventIdsInLastEvent.getEventIdOfLastEvent(); final String correlationId = aggregateAndEventIdsInLastEvent.getCorrelationIdOfLastEvent(); @@ -44,12 +47,12 @@ public void react(final Event event) { } final String reactionEventId = generateDeterministicId("ReviewedEnrollment" + event.getEventId()); - if (eventStore.doesEventAlreadyExist(reactionEventId)) { + if (postgresTransactionalEventStore.doesEventAlreadyExist(reactionEventId)) { return; } if (getEnrollmentList.isThereAnyAcceptedEnrollmentForUserAndProduct(enrollment.getUserId(), enrollment.getProductId())) { - eventStore.saveEvent(EnrollmentDeclined.builder() + postgresTransactionalEventStore.saveEvent(EnrollmentDeclined.builder() .eventId(reactionEventId) .aggregateId(enrollment.getAggregateId()) .aggregateVersion(enrollment.getAggregateVersion() + 1) @@ -64,7 +67,7 @@ public void react(final Event event) { if (enrollment.getAnnualIncomeInCents() < 1500000) { - eventStore.saveEvent(EnrollmentDeclined.builder() + postgresTransactionalEventStore.saveEvent(EnrollmentDeclined.builder() .eventId(reactionEventId) .aggregateId(enrollment.getAggregateId()) .aggregateVersion(enrollment.getAggregateVersion() + 1) @@ -75,7 +78,7 @@ public void react(final Event event) { .reasonDescription("Insufficient annual income.") .build()); } else { - eventStore.saveEvent(EnrollmentAccepted.builder() + postgresTransactionalEventStore.saveEvent(EnrollmentAccepted.builder() .eventId(reactionEventId) .aggregateId(enrollment.getAggregateId()) .aggregateVersion(enrollment.getAggregateVersion() + 1) diff --git a/application/backend-credit-card-enrollment/backend-java/src/main/resources/application.properties b/application/backend-credit-card-enrollment/backend-java/src/main/resources/application.properties index cf819e1a..2b351e7b 100644 --- a/application/backend-credit-card-enrollment/backend-java/src/main/resources/application.properties +++ b/application/backend-credit-card-enrollment/backend-java/src/main/resources/application.properties @@ -1,16 +1,26 @@ spring.application.name=CreditCard -# Configs related to our serializedEvent store, in this case in postgres -spring.datasource.url=jdbc:postgresql://${EVENT_STORE_HOST}:${EVENT_STORE_PORT}/${EVENT_STORE_DATABASE_NAME} -spring.datasource.username=${EVENT_STORE_USER} -spring.datasource.password=${EVENT_STORE_PASSWORD} -spring.datasource.driver-class-name=org.postgresql.Driver + +# Configurations related to the event store +app.postgresql.uri=jdbc:postgresql://${EVENT_STORE_HOST}:${EVENT_STORE_PORT}/${EVENT_STORE_DATABASE_NAME}?user=${EVENT_STORE_USER}&password=${EVENT_STORE_PASSWORD} +app.postgresql.database=${EVENT_STORE_DATABASE_NAME} +app.postgresql.table=${EVENT_STORE_CREATE_TABLE_WITH_NAME} +app.postgresql.eventStoreCreateReplicationUserWithUsername=${EVENT_STORE_CREATE_REPLICATION_USER_WITH_USERNAME} +app.postgresql.eventStoreCreateReplicationUserWithPassword=${EVENT_STORE_CREATE_REPLICATION_USER_WITH_PASSWORD} +app.postgresql.eventStoreCreateReplicationPublication=${EVENT_STORE_CREATE_REPLICATION_PUBLICATION} + +# Prevent updating the postgresql schema, such that we can create/update the event store schema explicitly spring.jpa.hibernate.ddl-auto=none + # Configurations related to projection data -app.mongodb.transactional-api.uri=mongodb://${MONGODB_PROJECTION_DATABASE_USERNAME}:${MONGODB_PROJECTION_DATABASE_PASSWORD}@${MONGODB_PROJECTION_HOST}:${MONGODB_PROJECTION_PORT},${MONGODB_PROJECTION_HOST}:${MONGODB_PROJECTION_PORT}/${MONGODB_PROJECTION_DATABASE_NAME}?serverSelectionTimeoutMS=10000&connectTimeoutMS=10000&authSource=admin -app.mongodb.transactional-api.database=${MONGODB_PROJECTION_DATABASE_NAME} -# Prevent Spring Boot from trying to auto wire mongo clients -spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration +app.mongodb.uri=mongodb://${MONGODB_PROJECTION_DATABASE_USERNAME}:${MONGODB_PROJECTION_DATABASE_PASSWORD}@${MONGODB_PROJECTION_HOST}:${MONGODB_PROJECTION_PORT},${MONGODB_PROJECTION_HOST}:${MONGODB_PROJECTION_PORT}/${MONGODB_PROJECTION_DATABASE_NAME}?serverSelectionTimeoutMS=10000&connectTimeoutMS=10000&authSource=admin +app.mongodb.database=${MONGODB_PROJECTION_DATABASE_NAME} + +# Prevent Spring Boot from trying to auto wire mongo clients and postgres clients +spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration,org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration,org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration,org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration,org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration,org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration +spring.data.jpa.repositories.enabled=false + # Configurations related to session tokens app.session.session_tokens_expire_after_seconds=${SESSION_TOKENS_EXPIRE_AFTER_SECONDS} + # Logging logging.level.org.springframework.web=INFO \ No newline at end of file