Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Store outbound messages #84

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions initdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ db.createUser({

const database = 'mediator';
const collectionDidAccount = 'user.account';
const collectionMessages = 'messages';
const collectionMessages = 'messages'; // TODO rename to messages.inbox
const collectionMessagesSend = 'outbox'; // TODO rename to messages.outbox

// The current database to use.
use(database);

// Create collections.
db.createCollection(collectionDidAccount);
db.createCollection(collectionMessages);
db.createCollection(collectionMessagesSend);

//create index
db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true });
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }});
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } });
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ object ActionUtils {
def packResponse(
originalMessage: Option[PlaintextMessage],
action: Action
): ZIO[Operations & Agent & Resolver & MessageDispatcher, MediatorError, Option[EncryptedMessage]] =
): ZIO[
Operations & Agent & Resolver & MessageDispatcher & OutboxMessageRepo,
MediatorError,
Option[EncryptedMessage]
] =
action match {
case _: NoReply.type => ZIO.succeed(None)
case action: AnyReply =>
Expand All @@ -31,6 +35,7 @@ object ActionUtils {
case Some(value) => authEncrypt(reply)
case None => anonEncrypt(reply)
}.mapError(fail => MediatorDidError(fail))
outboxRepo <- ZIO.service[OutboxMessageRepo]
// TODO forward message
maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve
case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none
Expand All @@ -41,6 +46,7 @@ object ActionUtils {
val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for {
messageDispatcher <- ZIO.service[MessageDispatcher]
resolver <- ZIO.service[Resolver]

doc <- resolver
.didDocument(to)
.mapError(fail => MediatorDidError(fail))
Expand All @@ -56,8 +62,9 @@ object ActionUtils {
jobToRun <- mURL match
case None => ZIO.logWarning(s"No url to send message")
case Some(url) => {
ZIO.log(s"Send to url: $url") *>
messageDispatcher
for {
_ <- ZIO.log(s"Send to url: $url")
response <- messageDispatcher
.send(
msg,
url,
Expand All @@ -69,19 +76,58 @@ object ActionUtils {
// case _ => None
)
.catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") }

_ <- outboxRepo
.insert(
SentMessageItem(
msg = msg,
plaintext = reply,
recipient = Set(to),
distination = Some(url),
sendMethod = MessageSendMethod.HTTPS_POST,
result = response match
case str: String => Some(str)
case _: Unit => None
,
)
) // Maybe fork
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
} yield ()
}

} yield (jobToRun)
action match
case Reply(_) => job
case Reply(_) =>
job
.when( // this is +- the opposite condition as below
originalMessage
.map { oMsg => oMsg.return_route.isEmpty || oMsg.return_route.contains(ReturnRoute.none) }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaileshp0110 this is the fix for the double reply I mention.

.getOrElse(true) // If originalMessage is None
)
case SyncReplyOnly(_) => ZIO.unit
case AsyncReplyOnly(_) => job
) *> ZIO
.succeed(msg)
.tap(msg =>
outboxRepo
.insert(
SentMessageItem(
msg = msg,
plaintext = reply,
recipient = reply.to.getOrElse(Set.empty),
distination = None,
sendMethod = MessageSendMethod.INLINE_REPLY,
result = None,
)
)
.catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") }
)
.when(
originalMessage
.map { oMsg =>
oMsg.return_route.contains(ReturnRoute.all) && // Should replies use the same transport channel?
{ // Should replies use the same transport channel?
oMsg.return_route.contains(ReturnRoute.all) || oMsg.return_route.contains(ReturnRoute.thread)
} &&
oMsg.from.map(_.asTO).exists(send2DIDs.contains) // Is the reply back to the original sender?
}
.getOrElse(false) // If originalMessage is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] {
}

object ProtocolExecuter {
type Services = Resolver & Agent & Operations & MessageDispatcher
type Services = Resolver & Agent & Operations & MessageDispatcher & OutboxMessageRepo
type Erros = MediatorError | StorageError
}
case class ProtocolExecuterCollection[-R <: Agent, +E](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ case class MediatorAgent(
// val resolverLayer: ULayer[DynamicResolver] =
// DynamicResolver.resolverLayer(didSocketManager)

type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo
val protocolHandlerLayer
: URLayer[UserAccountRepo & MessageItemRepo, ProtocolExecuter[Services, MediatorError | StorageError]] =
type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo &
OutboxMessageRepo
val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo & OutboxMessageRepo, ProtocolExecuter[
Services,
MediatorError | StorageError
]] =
ZLayer.succeed(
ProtocolExecuterCollection[Services, MediatorError | StorageError](
BasicMessageExecuter,
Expand Down Expand Up @@ -88,7 +91,7 @@ case class MediatorAgent(
data: String,
mSocketID: Option[SocketID],
): ZIO[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
MediatorError | StorageError,
Option[EncryptedMessage]
] =
Expand All @@ -109,7 +112,7 @@ case class MediatorAgent(
msg: EncryptedMessage,
mSocketID: Option[SocketID]
): ZIO[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
MediatorError | StorageError,
Option[EncryptedMessage]
] =
Expand Down Expand Up @@ -203,7 +206,7 @@ case class MediatorAgent(
def createSocketApp(
annotationMap: Seq[LogAnnotation]
): ZIO[
MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo,
MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
Nothing,
zio.http.Response
] = {
Expand Down Expand Up @@ -373,7 +376,7 @@ object MediatorAgent {
} yield ret
}
}: Http[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo,
Throwable,
Request,
Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object MediatorStandalone extends ZIOAppDefault {
Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat)

val app: HttpApp[ // type HttpApp[-R, +Err] = Http[R, Err, Request, Response]
Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo,
Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo &
OutboxMessageRepo,
Throwable
] = MediatorAgent.didCommApp
++ Http
Expand Down Expand Up @@ -117,7 +118,7 @@ object MediatorStandalone extends ZIOAppDefault {
.provideSomeLayer(
AsyncDriverResource.layer
>>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString)
>>> MessageItemRepo.layer.and(UserAccountRepo.layer)
>>> MessageItemRepo.layer.and(UserAccountRepo.layer).and(OutboxMessageRepo.layer)
)
.provideSomeLayer(Operations.layerDefault)
.provideSomeLayer(client >>> MessageDispatcherJVM.layer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package io.iohk.atala.mediator.db
import fmgp.crypto.*
import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.extension.*
import fmgp.util.*
import reactivemongo.api.bson.*
import zio.json.*
import zio.json.ast.Json
import reactivemongo.api.bson.*

import scala.util.*

Expand Down Expand Up @@ -230,7 +232,7 @@ given BSONDocumentReader[AuthProtectedHeader] =
Macros.reader[AuthProtectedHeader]

given BSONDocumentWriter[ProtectedHeader] =
Macros.writer[ProtectedHeader]
Macros.writer[ProtectedHeader] // TODO FIX The encoder for ProtectedHeader MUST not have the field "className"
given BSONDocumentReader[ProtectedHeader] =
Macros.reader[ProtectedHeader]

Expand All @@ -248,3 +250,157 @@ given BSONDocumentReader[EncryptedMessage] with {
override def readDocument(doc: BSONDocument): Try[EncryptedMessage] =
aux.readDocument(doc)
}

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1

given BSONWriter[MsgID] with {
import MsgID.*
def writeTry(obj: MsgID): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[MsgID] with {
def readTry(bson: BSONValue): Try[MsgID] = bson.asTry[String].map(v => MsgID(v))
}

given BSONWriter[PIURI] with {
import PIURI.*
def writeTry(obj: PIURI): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[PIURI] with {
def readTry(bson: BSONValue): Try[PIURI] = bson.asTry[String].map(v => PIURI(v))
}

given BSONWriter[TO] with {
import TO.*
def writeTry(obj: TO): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[TO] with {
def readTry(bson: BSONValue): Try[TO] = bson.asTry[String].map(v => TO(v))
}

given BSONWriter[FROM] with {
import FROM.*
def writeTry(obj: FROM): Try[BSONValue] = Try(BSONString(obj.value))
}
given BSONReader[FROM] with {
def readTry(bson: BSONValue): Try[FROM] = bson.asTry[String].map(v => FROM(v))
}

//JSON_RFC7159
def sequenceTrys[T](trySequence: Seq[_ <: Try[_ <: T]]): Try[Seq[T]] = {
trySequence.foldLeft(Try(Seq.empty[T])) { (acc, tryElement) =>
acc.flatMap(accSeq => tryElement.map(success => accSeq :+ success))
}
}

def toBSON(j: Json): Try[BSONValue] = j match
case Json.Obj(fields) => sequenceTrys(fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
case Json.Arr(elements) => sequenceTrys(elements.map(toBSON(_))).map(BSONArray(_))
case Json.Bool(value) => Try(BSONBoolean(value))
case Json.Str(value) => Try(BSONString(value))
case Json.Num(value) => BSONDecimal.fromBigDecimal(value)
case zio.json.ast.Json.Null => Try(BSONNull)

def toJson(b: BSONValue): Try[Json] = b match
case doc: BSONDocument =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
case array: BSONArray => sequenceTrys(array.values.map(toJson(_))).map(Json.Arr(_: _*))
case e: BSONDouble => e.toDouble.map(Json.Num(_))
case e: BSONInteger => e.toDouble.map(Json.Num(_))
case e: BSONLong => e.toDouble.map(Json.Num(_))
case e: BSONDecimal => e.toDouble.map(Json.Num(_))
case e: BSONString => Try(Json.Str(e.value))
case e: BSONBoolean => Try(Json.Bool(e.value))
case BSONUndefined => Try(Json.Null)
case BSONNull => Try(Json.Null)
// case _: BSONBinary =>
// case _: BSONDateTime =>
// case _: BSONRegex =>
// case _: BSONJavaScript =>
// case _: BSONSymbol =>
// case _: BSONJavaScriptWS =>
// case BSONMinKey =>
// case BSONMaxKey =>
case _ => ??? // FIXME

given BSONWriter[JSON_RFC7159] with {

def writeTry(obj: JSON_RFC7159): Try[BSONDocument] =
sequenceTrys(obj.fields.map(e => toBSON(e._2).map(e2 => (e._1, e2)))).map(BSONDocument(_))
}
given BSONReader[JSON_RFC7159] with {
def readTry(bson: BSONValue): Try[JSON_RFC7159] =
bson.asTry[BSONDocument].flatMap { doc =>
sequenceTrys(doc.toMap.toSeq.map(e => toJson(e._2).map(e2 => (e._1, e2)))).map(Json.Obj(_: _*))
}
}

given BSONWriter[Json] with {
def writeTry(obj: Json): Try[BSONValue] = toBSON(obj)
}
given BSONReader[Json] with {
def readTry(bson: BSONValue): Try[Json] = toJson(bson)
}

given BSONDocumentWriter[AttachmentDataJWS] = Macros.writer[AttachmentDataJWS]
given BSONDocumentReader[AttachmentDataJWS] = Macros.reader[AttachmentDataJWS]
given BSONDocumentWriter[AttachmentDataLinks] = Macros.writer[AttachmentDataLinks]
given BSONDocumentReader[AttachmentDataLinks] = Macros.reader[AttachmentDataLinks]
given BSONDocumentWriter[AttachmentDataBase64] = Macros.writer[AttachmentDataBase64]
given BSONDocumentReader[AttachmentDataBase64] = Macros.reader[AttachmentDataBase64]
given BSONDocumentWriter[AttachmentDataJson] = Macros.writer[AttachmentDataJson]
given BSONDocumentReader[AttachmentDataJson] = Macros.reader[AttachmentDataJson]
given BSONDocumentWriter[AttachmentDataAny] = Macros.writer[AttachmentDataAny]
given BSONDocumentReader[AttachmentDataAny] = Macros.reader[AttachmentDataAny]
given BSONDocumentWriter[AttachmentData] = Macros.writer[AttachmentData]
given BSONDocumentReader[AttachmentData] = Macros.reader[AttachmentData]
given BSONDocumentWriter[Attachment] = Macros.writer[Attachment]
given BSONDocumentReader[Attachment] = Macros.reader[Attachment]

given BSONWriter[ReturnRoute] with {
def writeTry(obj: ReturnRoute): Try[BSONValue] = Try(BSONString(obj.toString()))
}
given BSONReader[ReturnRoute] with {
def readTry(bson: BSONValue): Try[ReturnRoute] = bson.asTry[String].map(v => ReturnRoute.valueOf(v))
}

given BSONDocumentWriter[L10nInline] = Macros.writer[L10nInline]
given BSONDocumentReader[L10nInline] = Macros.reader[L10nInline]

given BSONDocumentWriter[L10n] = Macros.writer[L10n]
given BSONDocumentReader[L10n] = Macros.reader[L10n]

given BSONWriter[SenderOrder] with {
import SenderOrder.*
def writeTry(obj: SenderOrder): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SenderOrder] with {
def readTry(bson: BSONValue): Try[SenderOrder] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SenderOrder(_)))
}

given BSONWriter[SentCount] with {
import SentCount.*
def writeTry(obj: SentCount): Try[BSONInteger] = Try(BSONInteger(obj.value))
}
given BSONReader[SentCount] with {
def readTry(bson: BSONValue): Try[SentCount] = bson.asTry[BSONInteger].flatMap(_.asInt.map(SentCount(_)))
}

given BSONDocumentWriter[ReceivedOrdersElement] = Macros.writer[ReceivedOrdersElement]
given BSONDocumentReader[ReceivedOrdersElement] = Macros.reader[ReceivedOrdersElement]

// fmgp.did.comm.extension.AdvancedSequencingpackage.SenderOrder

given BSONDocumentWriter[PlaintextMessage] with {
val aux = Macros.writer[PlaintextMessageClass]
override def writeTry(obj: PlaintextMessage): Try[BSONDocument] =
obj match {
case msg: PlaintextMessageClass => aux.writeTry(msg) // Success(msg): Try[reactivemongo.api.bson.BSONDocument]
case _ => Failure(RuntimeException("Only support PlaintextMessageClass"))
}

}
given BSONDocumentReader[PlaintextMessage] with {
val aux = Macros.reader[PlaintextMessageClass]
override def readDocument(doc: BSONDocument): Try[PlaintextMessage] =
aux.readDocument(doc)
}
Loading
Loading