From 5edc128f72598d3b45f96f69b1bd0677e17ef654 Mon Sep 17 00:00:00 2001 From: Shailesh Patil <53746241+mineme0110@users.noreply.github.com> Date: Tue, 23 Apr 2024 11:13:25 +0100 Subject: [PATCH] feat(mediator) : Add TTL for message collection (#290) Add TTL for message collection to do house keeping update time stamp field from String to Intant Update initdb.js Add migration script for ttl Signed-off-by: Shailesh --- README.md | 15 ++++++++ .../charts/mediator/templates/mongodb.yaml | 10 ++++++ initdb.js | 11 ++++++ .../mediator/AgentExecutorMediator.scala | 2 +- .../identus/mediator/db/DataModels.scala | 34 ++++++++++++++++--- .../identus/mediator/db/MessageItemRepo.scala | 9 ++--- .../protocols/ForwardMessageExecuter.scala | 3 +- migration_mediator_collection.js | 31 +++++++++++++++++ 8 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 migration_mediator_collection.js diff --git a/README.md b/README.md index 79d3f0fe..fa6646d1 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,21 @@ To set up the mediator storage (MongoDB): - `MONGODB_PASSWORD` - is the password used by the Mediator service to connect to the database. - `MONGODB_DB_NAME` - is the name of the database used by the Mediator. +#### Mediator storage +- The `messages` collection contains two types of messages: `Mediator` and `User`. +1. **Mediator Messages**: + - These messages received by mediator for any interactions with the mediator. + - Examples include messages for setting up mediation, requesting mediation, or picking up messages from the mediator. + - These messages stored in collection can be used for debugging purpose mediator functionality and interactions with the mediator. Hence they can be deleted after a period of time. + - This message type `Mediator` can be setup to have a configurable Time-To-Live (TTL) value, after which they can expire. + - This is how the TTL can be configured for the collection messages [initdb.js](initdb.js) +2. **User Messages**: + - These are the actual messages e.g like the Forward message from the mediator, contain a `User` message inside. This inside message is stored as type `User` to be delivered to user. + - They do not have a TTL, and will persist in the system until the user retrieves them using a pickup protocol and deletes them. + - The mediator is responsible for storing and making these user messages available for delivery to the intended recipients. + + ℹ️ For existing users, please utilize the migration script [migration_mediator_collection.js](migration_mediator_collection.js) to migrate the collection. + ## Run The DIDComm Mediator comprises two elements: a backend service and a database. diff --git a/infrastructure/charts/mediator/templates/mongodb.yaml b/infrastructure/charts/mediator/templates/mongodb.yaml index 4d25cf28..67882388 100644 --- a/infrastructure/charts/mediator/templates/mongodb.yaml +++ b/infrastructure/charts/mediator/templates/mongodb.yaml @@ -62,6 +62,16 @@ data: // Only enforce uniqueness on non-empty arrays db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }}); db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 }); + // 7 day * 24 hours * 60 minutes * 60 seconds + const expireAfterSeconds = 7 * 24 * 60 * 60; + db.getCollection(collectionMessages).createIndex( + { ts: 1 }, + { + name: "message-ttl-index", + partialFilterExpression: { "message_type" : "Mediator" }, + expireAfterSeconds: expireAfterSeconds + } + ); --- apiVersion: v1 kind: Service diff --git a/initdb.js b/initdb.js index 6cd4ca2c..5d9bd69d 100644 --- a/initdb.js +++ b/initdb.js @@ -24,3 +24,14 @@ db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true // Only enforce uniqueness on non-empty arrays db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } }); db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 }); + +// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage +const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds +db.getCollection(collectionMessages).createIndex( + { ts: 1 }, + { + name: "message-ttl-index", + partialFilterExpression: { "message_type" : "Mediator" }, + expireAfterSeconds: expireAfterSeconds + } +) \ No newline at end of file diff --git a/mediator/src/main/scala/org/hyperledger/identus/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/org/hyperledger/identus/mediator/AgentExecutorMediator.scala index aa6fdd66..5f89819d 100644 --- a/mediator/src/main/scala/org/hyperledger/identus/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/org/hyperledger/identus/mediator/AgentExecutorMediator.scala @@ -287,7 +287,7 @@ object AgentExecutorMediator { em.`protected`.obj match case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em) case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em) - }.flatMap(decrypt _) + }.flatMap(decrypt) case sm: SignedMessage => ops.verify(sm).flatMap { case false => ZIO.fail(ValidationFailed) diff --git a/mediator/src/main/scala/org/hyperledger/identus/mediator/db/DataModels.scala b/mediator/src/main/scala/org/hyperledger/identus/mediator/db/DataModels.scala index 4a20b017..6e0a3f74 100644 --- a/mediator/src/main/scala/org/hyperledger/identus/mediator/db/DataModels.scala +++ b/mediator/src/main/scala/org/hyperledger/identus/mediator/db/DataModels.scala @@ -13,26 +13,33 @@ type HASH = String // messages type XRequestID = String // x-request-id +enum MessageType { + case Mediator, User +} + case class MessageItem( _id: HASH, msg: SignedMessage | EncryptedMessage, headers: ProtectedHeader | Seq[SignProtectedHeader], - ts: String, + ts: Instant, + message_type: MessageType, xRequestId: Option[XRequestID] ) object MessageItem { - def apply(msg: SignedMessage | EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem = + def apply(msg: SignedMessage | EncryptedMessage, messageType: MessageType, xRequestId: Option[XRequestID]): MessageItem = + val now = Instant.now() msg match { case sMsg: SignedMessage => new MessageItem( msg.sha256, msg, sMsg.signatures.map(_.`protected`.obj), - Instant.now().toString, + now, + messageType, xRequestId ) case eMsg: EncryptedMessage => - new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, Instant.now().toString, xRequestId) + new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, now, messageType, xRequestId) } given BSONWriter[ProtectedHeader | Seq[SignProtectedHeader]] with { @@ -72,6 +79,25 @@ object MessageItem { } } + given BSONWriter[MessageType] with + def writeTry(value: MessageType): Try[BSONValue] = Try { + value match { + case MessageType.Mediator => BSONString("Mediator") + case MessageType.User => BSONString("User") + } + } + + given BSONReader[MessageType] with + def readTry(bson: BSONValue): Try[MessageType] = Try { + bson match { + case BSONString("Mediator") => MessageType.Mediator + case BSONString("User") => MessageType.User + case _ => throw new RuntimeException("Invalid MessagePurpose value in BSON") + } + } + + + given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem] given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem] } diff --git a/mediator/src/main/scala/org/hyperledger/identus/mediator/db/MessageItemRepo.scala b/mediator/src/main/scala/org/hyperledger/identus/mediator/db/MessageItemRepo.scala index ed118e25..3c159063 100644 --- a/mediator/src/main/scala/org/hyperledger/identus/mediator/db/MessageItemRepo.scala +++ b/mediator/src/main/scala/org/hyperledger/identus/mediator/db/MessageItemRepo.scala @@ -1,7 +1,8 @@ package org.hyperledger.identus.mediator.db import fmgp.did.* -import fmgp.did.comm.{SignedMessage, EncryptedMessage} +import fmgp.did.comm.{EncryptedMessage, SignedMessage} +import org.hyperledger.identus.mediator.db.MessageType.Mediator import org.hyperledger.identus.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable} import reactivemongo.api.bson.* import reactivemongo.api.bson.collection.BSONCollection @@ -28,13 +29,13 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon .map(_.collection(collectionName)) .mapError(ex => StorageCollection(ex)) - def insert(msg: SignedMessage | EncryptedMessage): IO[StorageError, WriteResult] = { + def insert(msg: SignedMessage | EncryptedMessage, messageType: MessageType = Mediator): IO[StorageError, WriteResult] = { for { - _ <- ZIO.logInfo("insert") + _ <- ZIO.logInfo(s"insert $messageType") xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) coll <- collection result <- ZIO - .fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId))) + .fromFuture(implicit ec => coll.insert.one(MessageItem(msg, messageType, xRequestId))) .tapError(err => ZIO.logError(s"insert : ${err.getMessage}")) .mapError { case ex: DatabaseException if (ex.code.contains(DuplicateMessage.code)) => DuplicateMessage(ex) diff --git a/mediator/src/main/scala/org/hyperledger/identus/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/org/hyperledger/identus/mediator/protocols/ForwardMessageExecuter.scala index c821f07b..e8293cc1 100644 --- a/mediator/src/main/scala/org/hyperledger/identus/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/org/hyperledger/identus/mediator/protocols/ForwardMessageExecuter.scala @@ -11,6 +11,7 @@ import org.hyperledger.identus.mediator.db.* import zio.* import zio.json.* import fmgp.did.comm.protocol.pickup3.MessageDelivery +import org.hyperledger.identus.mediator.db.MessageType.User object ForwardMessageExecuter extends ProtocolExecuter[ @@ -36,7 +37,7 @@ object ForwardMessageExecuter msg <- if (numbreOfUpdated > 0) { // Or maybe we can add all the time for { - _ <- repoMessageItem.insert(m.msg) + _ <- repoMessageItem.insert(m.msg, User) _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") // For Live Mode diff --git a/migration_mediator_collection.js b/migration_mediator_collection.js new file mode 100644 index 00000000..56ea7c77 --- /dev/null +++ b/migration_mediator_collection.js @@ -0,0 +1,31 @@ +// migration script +// Please utilize the following script to update your existing collection for Mediator release v0.14.5 and beyond. +const collectionName = 'messages'; +const collectionNameUserAccount = 'user.account'; +let userHashes = []; + +db.getCollection(collectionNameUserAccount).find({}).forEach(function(user) { + user.messagesRef.forEach(function(messageRef) { + userHashes.push(messageRef.hash); + }); +}); + +db.getCollection('messages').find({}).forEach(function(message) { + let newTimestamp = new Date(message.ts); + if(userHashes.includes(message._id)) { + db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'User', ts: newTimestamp } }); + } else { + db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'Mediator', ts: newTimestamp } }); + } +}); + +// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage +const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds +db.getCollection(collectionMessages).createIndex( + { ts: 1 }, + { + name: "message-ttl-index", + partialFilterExpression: { "message_type" : "Mediator" }, + expireAfterSeconds: expireAfterSeconds + } +) \ No newline at end of file