From fd61f6ba9cd50dfd71942821139f5269c9b44f6c Mon Sep 17 00:00:00 2001 From: Fabio Pinheiro Date: Mon, 21 Aug 2023 17:11:25 +0100 Subject: [PATCH] feat: Store outbound messages (#84) Store all outbound messages Minor bug fix also: - The reply must only be in one way --- initdb.js | 6 +- .../atala/mediator/actions/ActionUtils.scala | 56 +++++- .../mediator/actions/ProtocolExecute.scala | 2 +- .../atala/mediator/app/MediatorAgent.scala | 17 +- .../mediator/app/MediatorStandalone.scala | 5 +- .../atala/mediator/db/BsonImplicits.scala | 160 +++++++++++++++++- .../iohk/atala/mediator/db/DataModels.scala | 71 +++++++- .../atala/mediator/db/OutboxMessageRepo.scala | 39 +++++ 8 files changed, 337 insertions(+), 19 deletions(-) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/db/OutboxMessageRepo.scala diff --git a/initdb.js b/initdb.js index 49145635..6cd4ca2c 100644 --- a/initdb.js +++ b/initdb.js @@ -9,14 +9,18 @@ db.createUser({ const database = 'mediator'; const collectionDidAccount = 'user.account'; const collectionMessages = 'messages'; +const collectionMessagesSend = 'messages.outbound'; // The current database to use. use(database); + // Create collections. db.createCollection(collectionDidAccount); db.createCollection(collectionMessages); +db.createCollection(collectionMessagesSend); + //create index db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true }); // Only enforce uniqueness on non-empty arrays -db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }}); +db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } }); db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 }); diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala index c334e911..6a0bd2b8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala @@ -20,7 +20,11 @@ object ActionUtils { def packResponse( originalMessage: Option[PlaintextMessage], action: Action - ): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] = + ): ZIO[ + Operations & Agent & Resolver & MessageDispatcher & OutboxMessageRepo, + MediatorError, + Option[EncryptedMessage] + ] = action match { case _: NoReply.type => ZIO.succeed(None) case action: AnyReply => @@ -31,6 +35,7 @@ object ActionUtils { case Some(value) => authEncrypt(reply) case None => anonEncrypt(reply) }.mapError(fail => MediatorDidError(fail)) + outboxRepo <- ZIO.service[OutboxMessageRepo] // TODO forward message maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none @@ -41,6 +46,7 @@ object ActionUtils { val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { messageDispatcher <- ZIO.service[MessageDispatcher] resolver <- ZIO.service[Resolver] + doc <- resolver .didDocument(to) .mapError(fail => MediatorDidError(fail)) @@ -56,8 +62,9 @@ object ActionUtils { jobToRun <- mURL match case None => ZIO.logWarning(s"No url to send message") case Some(url) => { - ZIO.log(s"Send to url: $url") *> - messageDispatcher + for { + _ <- ZIO.log(s"Send to url: $url") + response <- messageDispatcher .send( msg, url, @@ -69,19 +76,58 @@ object ActionUtils { // case _ => None ) .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } + + _ <- outboxRepo + .insert( + SentMessageItem( + msg = msg, + plaintext = reply, + recipient = Set(to), + distination = Some(url), + sendMethod = MessageSendMethod.HTTPS_POST, + result = response match + case str: String => Some(str) + case _: Unit => None + , + ) + ) // Maybe fork + .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } + } yield () } } yield (jobToRun) action match - case Reply(_) => job + case Reply(_) => + job + .when( // this is +- the opposite condition as below + originalMessage + .map { oMsg => oMsg.return_route.isEmpty || oMsg.return_route.contains(ReturnRoute.none) } + .getOrElse(true) // If originalMessage is None + ) case SyncReplyOnly(_) => ZIO.unit case AsyncReplyOnly(_) => job ) *> ZIO .succeed(msg) + .tap(msg => + outboxRepo + .insert( + SentMessageItem( + msg = msg, + plaintext = reply, + recipient = reply.to.getOrElse(Set.empty), + distination = None, + sendMethod = MessageSendMethod.INLINE_REPLY, + result = None, + ) + ) + .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } + ) .when( originalMessage .map { oMsg => - oMsg.return_route.contains(ReturnRoute.all) && // Should replies use the same transport channel? + { // Should replies use the same transport channel? + oMsg.return_route.contains(ReturnRoute.all) || oMsg.return_route.contains(ReturnRoute.thread) + } && oMsg.from.map(_.asTO).exists(send2DIDs.contains) // Is the reply back to the original sender? } .getOrElse(false) // If originalMessage is None diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala index bf122039..f6120954 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala @@ -30,7 +30,7 @@ trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] { } object ProtocolExecuter { - type Services = Resolver & Agent & Operations & MessageDispatcher + type Services = Resolver & Agent & Operations & MessageDispatcher & OutboxMessageRepo type Erros = MediatorError | StorageError } case class ProtocolExecuterCollection[-R <: Agent, +E]( diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala index 9c0b8eda..ea29d72d 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala @@ -36,9 +36,12 @@ case class MediatorAgent( // val resolverLayer: ULayer[DynamicResolver] = // DynamicResolver.resolverLayer(didSocketManager) - type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo - val protocolHandlerLayer - : URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services, MediatorError | StorageError]] = + type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo & + OutboxMessageRepo + val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo & OutboxMessageRepo, ProtocolExecuter[ + Services, + MediatorError | StorageError + ]] = ZLayer.succeed( ProtocolExecuterCollection[Services, MediatorError | StorageError]( BasicMessageExecuter, @@ -88,7 +91,7 @@ case class MediatorAgent( data: String, mSocketID: Option[SocketID], ): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, MediatorError | StorageError, Option[EncryptedMessage] ] = @@ -109,7 +112,7 @@ case class MediatorAgent( msg: EncryptedMessage, mSocketID: Option[SocketID] ): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, MediatorError | StorageError, Option[EncryptedMessage] ] = @@ -203,7 +206,7 @@ case class MediatorAgent( def createSocketApp( annotationMap: Seq[LogAnnotation] ): ZIO[ - MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo, + MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, Nothing, zio.http.Response ] = { @@ -373,7 +376,7 @@ object MediatorAgent { } yield ret } }: Http[ - Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo, + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, Throwable, Request, Response diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala index d64a5404..71b75ece 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala @@ -60,7 +60,8 @@ object MediatorStandalone extends ZIOAppDefault { Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat) val app: HttpApp[ // type HttpApp[-R, +Err] = Http[R, Err, Request, Response] - Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo, + Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo & + OutboxMessageRepo, Throwable ] = MediatorAgent.didCommApp ++ Http @@ -117,7 +118,7 @@ object MediatorStandalone extends ZIOAppDefault { .provideSomeLayer( AsyncDriverResource.layer >>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString) - >>> MessageItemRepo.layer.and(UserAccountRepo.layer) + >>> MessageItemRepo.layer.and(UserAccountRepo.layer).and(OutboxMessageRepo.layer) ) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(client >>> MessageDispatcherJVM.layer) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala index 29a2a9e0..afa43fa3 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala @@ -3,9 +3,11 @@ package io.iohk.atala.mediator.db import fmgp.crypto.* import fmgp.did.* import fmgp.did.comm.* +import fmgp.did.comm.extension.* import fmgp.util.* -import reactivemongo.api.bson.* import zio.json.* +import zio.json.ast.Json +import reactivemongo.api.bson.* import scala.util.* @@ -230,7 +232,7 @@ given BSONDocumentReader[AuthProtectedHeader] = Macros.reader[AuthProtectedHeader] given BSONDocumentWriter[ProtectedHeader] = - Macros.writer[ProtectedHeader] + Macros.writer[ProtectedHeader] // TODO FIX The encoder for ProtectedHeader MUST not have the field "className" given BSONDocumentReader[ProtectedHeader] = Macros.reader[ProtectedHeader] @@ -248,3 +250,157 @@ given BSONDocumentReader[EncryptedMessage] with { override def readDocument(doc: BSONDocument): Try[EncryptedMessage] = aux.readDocument(doc) } + +// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1 + +given BSONWriter[MsgID] with { + import MsgID.* + def writeTry(obj: MsgID): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[MsgID] with { + def readTry(bson: BSONValue): Try[MsgID] = bson.asTry[String].map(v => MsgID(v)) +} + +given BSONWriter[PIURI] with { + import PIURI.* + def writeTry(obj: PIURI): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[PIURI] with { + def readTry(bson: BSONValue): Try[PIURI] = bson.asTry[String].map(v => PIURI(v)) +} + +given BSONWriter[TO] with { + import TO.* + def writeTry(obj: TO): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[TO] with { + def readTry(bson: BSONValue): Try[TO] = bson.asTry[String].map(v => TO(v)) +} + +given BSONWriter[FROM] with { + import FROM.* + def writeTry(obj: FROM): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[FROM] with { + def readTry(bson: BSONValue): Try[FROM] = bson.asTry[String].map(v => FROM(v)) +} + +//JSON_RFC7159 +def sequenceTrys[T](trySequence: Seq[_ <: Try[_ <: T]]): Try[Seq[T]] = { + trySequence.foldLeft(Try(Seq.empty[T])) { (acc, tryElement) => + acc.flatMap(accSeq => tryElement.map(success => accSeq :+ success)) + } +} + +def toBSON(j: Json): Try[BSONValue] = j match + case Json.Obj(fields) => sequenceTrys(fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_)) + case Json.Arr(elements) => sequenceTrys(elements.map(toBSON(_))).map(BSONArray(_)) + case Json.Bool(value) => Try(BSONBoolean(value)) + case Json.Str(value) => Try(BSONString(value)) + case Json.Num(value) => BSONDecimal.fromBigDecimal(value) + case zio.json.ast.Json.Null => Try(BSONNull) + +def toJson(b: BSONValue): Try[Json] = b match + case doc: BSONDocument => + sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*)) + case array: BSONArray => sequenceTrys(array.values.map(toJson(_))).map(Json.Arr(_: _*)) + case e: BSONDouble => e.toDouble.map(Json.Num(_)) + case e: BSONInteger => e.toDouble.map(Json.Num(_)) + case e: BSONLong => e.toDouble.map(Json.Num(_)) + case e: BSONDecimal => e.toDouble.map(Json.Num(_)) + case e: BSONString => Try(Json.Str(e.value)) + case e: BSONBoolean => Try(Json.Bool(e.value)) + case BSONUndefined => Try(Json.Null) + case BSONNull => Try(Json.Null) + // case _: BSONBinary => + // case _: BSONDateTime => + // case _: BSONRegex => + // case _: BSONJavaScript => + // case _: BSONSymbol => + // case _: BSONJavaScriptWS => + // case BSONMinKey => + // case BSONMaxKey => + case _ => ??? // FIXME + +given BSONWriter[JSON_RFC7159] with { + + def writeTry(obj: JSON_RFC7159): Try[BSONDocument] = + sequenceTrys(obj.fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_)) +} +given BSONReader[JSON_RFC7159] with { + def readTry(bson: BSONValue): Try[JSON_RFC7159] = + bson.asTry[BSONDocument].flatMap { doc => + sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*)) + } +} + +given BSONWriter[Json] with { + def writeTry(obj: Json): Try[BSONValue] = toBSON(obj) +} +given BSONReader[Json] with { + def readTry(bson: BSONValue): Try[Json] = toJson(bson) +} + +given BSONDocumentWriter[AttachmentDataJWS] = Macros.writer[AttachmentDataJWS] +given BSONDocumentReader[AttachmentDataJWS] = Macros.reader[AttachmentDataJWS] +given BSONDocumentWriter[AttachmentDataLinks] = Macros.writer[AttachmentDataLinks] +given BSONDocumentReader[AttachmentDataLinks] = Macros.reader[AttachmentDataLinks] +given BSONDocumentWriter[AttachmentDataBase64] = Macros.writer[AttachmentDataBase64] +given BSONDocumentReader[AttachmentDataBase64] = Macros.reader[AttachmentDataBase64] +given BSONDocumentWriter[AttachmentDataJson] = Macros.writer[AttachmentDataJson] +given BSONDocumentReader[AttachmentDataJson] = Macros.reader[AttachmentDataJson] +given BSONDocumentWriter[AttachmentDataAny] = Macros.writer[AttachmentDataAny] +given BSONDocumentReader[AttachmentDataAny] = Macros.reader[AttachmentDataAny] +given BSONDocumentWriter[AttachmentData] = Macros.writer[AttachmentData] +given BSONDocumentReader[AttachmentData] = Macros.reader[AttachmentData] +given BSONDocumentWriter[Attachment] = Macros.writer[Attachment] +given BSONDocumentReader[Attachment] = Macros.reader[Attachment] + +given BSONWriter[ReturnRoute] with { + def writeTry(obj: ReturnRoute): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[ReturnRoute] with { + def readTry(bson: BSONValue): Try[ReturnRoute] = bson.asTry[String].map(v => ReturnRoute.valueOf(v)) +} + +given BSONDocumentWriter[L10nInline] = Macros.writer[L10nInline] +given BSONDocumentReader[L10nInline] = Macros.reader[L10nInline] + +given BSONDocumentWriter[L10n] = Macros.writer[L10n] +given BSONDocumentReader[L10n] = Macros.reader[L10n] + +given BSONWriter[SenderOrder] with { + import SenderOrder.* + def writeTry(obj: SenderOrder): Try[BSONInteger] = Try(BSONInteger(obj.value)) +} +given BSONReader[SenderOrder] with { + def readTry(bson: BSONValue): Try[SenderOrder] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SenderOrder(_))) +} + +given BSONWriter[SentCount] with { + import SentCount.* + def writeTry(obj: SentCount): Try[BSONInteger] = Try(BSONInteger(obj.value)) +} +given BSONReader[SentCount] with { + def readTry(bson: BSONValue): Try[SentCount] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SentCount(_))) +} + +given BSONDocumentWriter[ReceivedOrdersElement] = Macros.writer[ReceivedOrdersElement] +given BSONDocumentReader[ReceivedOrdersElement] = Macros.reader[ReceivedOrdersElement] + +// fmgp.did.comm.extension.AdvancedSequencingpackage.SenderOrder + +given BSONDocumentWriter[PlaintextMessage] with { + val aux = Macros.writer[PlaintextMessageClass] + override def writeTry(obj: PlaintextMessage): Try[BSONDocument] = + obj match { + case msg: PlaintextMessageClass => aux.writeTry(msg) // Success(msg): Try[reactivemongo.api.bson.BSONDocument] + case _ => Failure(RuntimeException("Only support PlaintextMessageClass")) + } + +} +given BSONDocumentReader[PlaintextMessage] with { + val aux = Macros.reader[PlaintextMessageClass] + override def readDocument(doc: BSONDocument): Try[PlaintextMessage] = + aux.readDocument(doc) +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala index 993257b0..7656c274 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala @@ -3,8 +3,9 @@ package io.iohk.atala.mediator.db import fmgp.did.* import fmgp.did.comm.* import reactivemongo.api.bson.* - import java.time.Instant +import scala.util.Try + type HASH = String // messages @@ -38,3 +39,71 @@ object DidAccount { given BSONDocumentWriter[DidAccount] = Macros.writer[DidAccount] given BSONDocumentReader[DidAccount] = Macros.reader[DidAccount] } + +// messages outbox +case class SentMessageItem( + _id: BSONObjectID = BSONObjectID.generate(), + encrypt: EncryptedMessage, + hash: HASH, + headers: ProtectedHeader, + plaintext: PlaintextMessage, + transport: Seq[SentMessageItem.TransportInfo], +) + +object SentMessageItem { + + def apply( + msg: EncryptedMessage, + plaintext: PlaintextMessage, + recipient: Set[TO], + distination: Option[String], + sendMethod: MessageSendMethod, + result: Option[String] + ): SentMessageItem = { + new SentMessageItem( + encrypt = msg, + hash = msg.sha1, + headers = msg.`protected`.obj, + plaintext = plaintext, + transport = Seq( + TransportInfo(recipient = recipient, distination = distination, sendMethod = sendMethod, result = result) + ) + ) + } + + given BSONDocumentWriter[SentMessageItem] = { + import SentMessageItem.given_BSONDocumentWriter_TransportInfo + Macros.writer[SentMessageItem] + } + given BSONDocumentReader[SentMessageItem] = { + import SentMessageItem.given_BSONDocumentReader_TransportInfo + Macros.reader[SentMessageItem] + } + + case class TransportInfo( + recipient: Set[TO], + distination: Option[String], + sendMethod: MessageSendMethod, + timestamp: BSONDateTime = BSONDateTime(Instant.now().toEpochMilli()), // Long, + result: Option[String], + ) + object SentMessageItem { + given BSONDocumentWriter[TransportInfo] = Macros.writer[TransportInfo] + given BSONDocumentReader[TransportInfo] = Macros.reader[TransportInfo] + } +} + +enum MessageSendMethod { + case HTTPS_POST extends MessageSendMethod + case INLINE_REPLY extends MessageSendMethod +} +object MessageSendMethod { + given BSONWriter[MessageSendMethod] with { + def writeTry(obj: MessageSendMethod): Try[BSONValue] = + Try(BSONString(obj.toString)) + } + given BSONReader[MessageSendMethod] with { + def readTry(bson: BSONValue): Try[MessageSendMethod] = + bson.asTry[String].flatMap(v => Try(MessageSendMethod.valueOf(v))) + } +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/OutboxMessageRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/OutboxMessageRepo.scala new file mode 100644 index 00000000..8617a55c --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/OutboxMessageRepo.scala @@ -0,0 +1,39 @@ +package io.iohk.atala.mediator.db + +import fmgp.did.* +import io.iohk.atala.mediator.{StorageCollection, StorageError, StorageThrowable} +import reactivemongo.api.bson.* +import reactivemongo.api.bson.collection.BSONCollection +import reactivemongo.api.commands.WriteResult +import reactivemongo.api.{Cursor, CursorProducer} +import zio.* + +import scala.concurrent.ExecutionContext +object OutboxMessageRepo { + def layer: ZLayer[ReactiveMongoApi, Throwable, OutboxMessageRepo] = + ZLayer { + for { + ref <- ZIO.service[ReactiveMongoApi] + } yield OutboxMessageRepo(ref)(using scala.concurrent.ExecutionContext.global) + } +} + +class OutboxMessageRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionContext) { + def collectionName: String = "outbox" + + def collection: IO[StorageCollection, BSONCollection] = reactiveMongoApi.database + .map(_.collection(collectionName)) + .mapError(ex => StorageCollection(ex)) + + def insert(value: SentMessageItem): IO[StorageError, WriteResult] = { + for { + _ <- ZIO.logInfo("insert") + coll <- collection + result <- ZIO + .fromFuture(implicit ec => coll.insert.one(value)) + .tapError(err => ZIO.logError(s"insert : ${err.getMessage}")) + .mapError(ex => StorageThrowable(ex)) + } yield result + } + +}