Skip to content

Commit

Permalink
feat: store_outbound_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
FabioPinheiro committed Aug 18, 2023
1 parent 02dde1e commit 6ed1029
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 6 deletions.
5 changes: 3 additions & 2 deletions initdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ db.createUser({

const database = 'mediator';
const collectionDidAccount = 'user.account';
const collectionMessages = 'messages';
const collectionMessages = 'messages'; // TODO rename to messages.inbox
const collectionMessagesSend = 'outbox'; // TODO rename to messages.outbox

// The current database to use.
use(database);
Expand All @@ -18,5 +19,5 @@ db.createCollection(collectionMessages);
//create index
db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true });
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }});
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } });
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ object ActionUtils {
jobToRun <- mURL match
case None => ZIO.logWarning(s"No url to send message")
case Some(url) => {
ZIO.log(s"Send to url: $url") *>
messageDispatcher
for {
_ <- ZIO.log(s"Send to url: $url")
response <- messageDispatcher
.send(
msg,
url,
Expand All @@ -69,6 +70,8 @@ object ActionUtils {
// case _ => None
)
.catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") }
// FIXME !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! _ <- storeDispatchedMessage(msg, to, url, reply)
} yield ()
}

} yield (jobToRun)
Expand Down
158 changes: 157 additions & 1 deletion mediator/src/main/scala/io/iohk/atala/mediator/db/BsonImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package io.iohk.atala.mediator.db
import fmgp.crypto.*
import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.extension.*
import fmgp.util.*
import reactivemongo.api.bson.*
import zio.json.*
import zio.json.ast.Json
import reactivemongo.api.bson.*

import scala.util.*

Expand Down Expand Up @@ -248,3 +250,157 @@ given BSONDocumentReader[EncryptedMessage] with {
override def readDocument(doc: BSONDocument): Try[EncryptedMessage] =
aux.readDocument(doc)
}

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1

given BSONWriter[MsgID] with {
import MsgID.*
def writeTry(obj: MsgID): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[MsgID] with {
def readTry(bson: BSONValue): Try[MsgID] = bson.asTry[String].map(v => MsgID(v))
}

given BSONWriter[PIURI] with {
import PIURI.*
def writeTry(obj: PIURI): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[PIURI] with {
def readTry(bson: BSONValue): Try[PIURI] = bson.asTry[String].map(v => PIURI(v))
}

given BSONWriter[TO] with {
import TO.*
def writeTry(obj: TO): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[TO] with {
def readTry(bson: BSONValue): Try[TO] = bson.asTry[String].map(v => TO(v))
}

given BSONWriter[FROM] with {
import FROM.*
def writeTry(obj: FROM): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[FROM] with {
def readTry(bson: BSONValue): Try[FROM] = bson.asTry[String].map(v => FROM(v))
}

//JSON_RFC7159
def sequenceTrys[T](trySequence: Seq[_ <: Try[_ <: T]]): Try[Seq[T]] = {
trySequence.foldLeft(Try(Seq.empty[T])) { (acc, tryElement) =>
acc.flatMap(accSeq => tryElement.map(success => accSeq :+ success))
}
}

def toBSON(j: Json): Try[BSONValue] = j match
case Json.Obj(fields) => sequenceTrys(fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
case Json.Arr(elements) => sequenceTrys(elements.map(toBSON(_))).map(BSONArray(_))
case Json.Bool(value) => Try(BSONBoolean(value))
case Json.Str(value) => Try(BSONString(value))
case Json.Num(value) => BSONDecimal.fromBigDecimal(value)
case zio.json.ast.Json.Null => Try(BSONNull)

def toJson(b: BSONValue): Try[Json] = b match
case doc: BSONDocument =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
case array: BSONArray => sequenceTrys(array.values.map(toJson(_))).map(Json.Arr(_: _*))
case e: BSONDouble => e.toDouble.map(Json.Num(_))
case e: BSONInteger => e.toDouble.map(Json.Num(_))
case e: BSONLong => e.toDouble.map(Json.Num(_))
case e: BSONDecimal => e.toDouble.map(Json.Num(_))
case e: BSONString => Try(Json.Str(e.value))
case e: BSONBoolean => Try(Json.Bool(e.value))
case BSONUndefined => Try(Json.Null)
case BSONNull => Try(Json.Null)
// case _: BSONBinary =>
// case _: BSONDateTime =>
// case _: BSONRegex =>
// case _: BSONJavaScript =>
// case _: BSONSymbol =>
// case _: BSONJavaScriptWS =>
// case BSONMinKey =>
// case BSONMaxKey =>
case _ => ??? // FIXME

given BSONWriter[JSON_RFC7159] with {

def writeTry(obj: JSON_RFC7159): Try[BSONDocument] =
sequenceTrys(obj.fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
}
given BSONReader[JSON_RFC7159] with {
def readTry(bson: BSONValue): Try[JSON_RFC7159] =
bson.asTry[BSONDocument].flatMap { doc =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
}
}

given BSONWriter[Json] with {
def writeTry(obj: Json): Try[BSONValue] = toBSON(obj)
}
given BSONReader[Json] with {
def readTry(bson: BSONValue): Try[Json] = toJson(bson)
}

given BSONDocumentWriter[AttachmentDataJWS] = Macros.writer[AttachmentDataJWS]
given BSONDocumentReader[AttachmentDataJWS] = Macros.reader[AttachmentDataJWS]
given BSONDocumentWriter[AttachmentDataLinks] = Macros.writer[AttachmentDataLinks]
given BSONDocumentReader[AttachmentDataLinks] = Macros.reader[AttachmentDataLinks]
given BSONDocumentWriter[AttachmentDataBase64] = Macros.writer[AttachmentDataBase64]
given BSONDocumentReader[AttachmentDataBase64] = Macros.reader[AttachmentDataBase64]
given BSONDocumentWriter[AttachmentDataJson] = Macros.writer[AttachmentDataJson]
given BSONDocumentReader[AttachmentDataJson] = Macros.reader[AttachmentDataJson]
given BSONDocumentWriter[AttachmentDataAny] = Macros.writer[AttachmentDataAny]
given BSONDocumentReader[AttachmentDataAny] = Macros.reader[AttachmentDataAny]
given BSONDocumentWriter[AttachmentData] = Macros.writer[AttachmentData]
given BSONDocumentReader[AttachmentData] = Macros.reader[AttachmentData]
given BSONDocumentWriter[Attachment] = Macros.writer[Attachment]
given BSONDocumentReader[Attachment] = Macros.reader[Attachment]

given BSONWriter[ReturnRoute] with {
def writeTry(obj: ReturnRoute): Try[BSONValue] = Try(BSONString(obj.toString()))
}
given BSONReader[ReturnRoute] with {
def readTry(bson: BSONValue): Try[ReturnRoute] = bson.asTry[String].map(v => ReturnRoute.valueOf(v))
}

given BSONDocumentWriter[L10nInline] = Macros.writer[L10nInline]
given BSONDocumentReader[L10nInline] = Macros.reader[L10nInline]

given BSONDocumentWriter[L10n] = Macros.writer[L10n]
given BSONDocumentReader[L10n] = Macros.reader[L10n]

given BSONWriter[SenderOrder] with {
import SenderOrder.*
def writeTry(obj: SenderOrder): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SenderOrder] with {
def readTry(bson: BSONValue): Try[SenderOrder] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SenderOrder(_)))
}

given BSONWriter[SentCount] with {
import SentCount.*
def writeTry(obj: SentCount): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SentCount] with {
def readTry(bson: BSONValue): Try[SentCount] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SentCount(_)))
}

given BSONDocumentWriter[ReceivedOrdersElement] = Macros.writer[ReceivedOrdersElement]
given BSONDocumentReader[ReceivedOrdersElement] = Macros.reader[ReceivedOrdersElement]

// fmgp.did.comm.extension.AdvancedSequencingpackage.SenderOrder

given BSONDocumentWriter[PlaintextMessage] with {
val aux = Macros.writer[PlaintextMessageClass]
override def writeTry(obj: PlaintextMessage): Try[BSONDocument] =
obj match {
case msg: PlaintextMessageClass => aux.writeTry(msg) // Success(msg): Try[reactivemongo.api.bson.BSONDocument]
case _ => Failure(RuntimeException("Only support PlaintextMessageClass"))
}

}
given BSONDocumentReader[PlaintextMessage] with {
val aux = Macros.reader[PlaintextMessageClass]
override def readDocument(doc: BSONDocument): Try[PlaintextMessage] =
aux.readDocument(doc)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package io.iohk.atala.mediator.db
import fmgp.did.*
import fmgp.did.comm.*
import reactivemongo.api.bson.*

import java.time.Instant

type HASH = String
// messages

Expand Down Expand Up @@ -38,3 +38,53 @@ object DidAccount {
given BSONDocumentWriter[DidAccount] = Macros.writer[DidAccount]
given BSONDocumentReader[DidAccount] = Macros.reader[DidAccount]
}

// messages outbox
case class SentMessageItem(
_id: BSONObjectID = BSONObjectID.generate(),
encrypt: EncryptedMessage,
hash: HASH,
headers: ProtectedHeader,
plaintext: PlaintextMessage,
transport: Seq[SentMessageItem.TransportInfo],
)

object SentMessageItem {

def apply(
msg: EncryptedMessage,
plaintext: PlaintextMessage,
recipient: TO,
distination: String,
result: String
): SentMessageItem = {
new SentMessageItem(
encrypt = msg,
hash = msg.sha1,
headers = msg.`protected`.obj,
plaintext = plaintext,
transport = Seq(TransportInfo(recipient = recipient, distination = distination, result = Some(result)))
)
}

given BSONDocumentWriter[SentMessageItem] = {
import SentMessageItem.given_BSONDocumentWriter_TransportInfo
Macros.writer[SentMessageItem]
}
given BSONDocumentReader[SentMessageItem] = {
import SentMessageItem.given_BSONDocumentReader_TransportInfo
Macros.reader[SentMessageItem]
}

case class TransportInfo(
recipient: TO,
distination: String,
protocol: String = "HTTPS/POST",
timestamp: BSONDateTime = BSONDateTime(Instant.now().toEpochMilli()), // Long,
result: Option[String],
)
object SentMessageItem {
given BSONDocumentWriter[TransportInfo] = Macros.writer[TransportInfo]
given BSONDocumentReader[TransportInfo] = Macros.reader[TransportInfo]
}
}

0 comments on commit 6ed1029

Please sign in to comment.