Skip to content

Commit

Permalink
[JSON-API] Websockets fix for matchedQueries (#11361)
Browse files Browse the repository at this point in the history
* Changes to ensure matchedQueries are returned correctly when queries contain a mix of offsets and no offsets.

CHANGELOG_BEGIN
[JSON-API] fixes a bug related to the matchedQueries value returned for websocket multiqueries,
this only happens for patterns where the multiqueries contain a mixture of queries with and without
offsets.
CHANGELOG_END

* changes based on code review comments
  • Loading branch information
akshayshirahatti-da authored Oct 26, 2021
1 parent 4a34b68 commit e474b2d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,81 @@ abstract class AbstractWebsocketServiceIntegrationTest
).toJson
}

"matchedQueries should be correct for multiqueries with per-query offsets" in withHttpService {
(uri, _, _, _) =>
import spray.json._

val (party, headers) = getUniquePartyAndAuthHeaders("Alice")
val initialCreate = initialIouCreate(uri, party, headers)

//initial query without offset
val query =
"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}}
]"""

@nowarn("msg=pattern var evtsWrapper .* is never used")
def resp(
iouCid: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[domain.Offset]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume
.interpret(
for {
evtsWrapper @ ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = {
(ctid: String) shouldBe (iouCid.unwrap: String)
inside(evtsWrapper) { case JsObject(obj) =>
inside(obj get "events") {
case Some(
JsArray(
Vector(
Created(IouAmount(amt), MatchedQueries(NumList(ix), _))
)
)
) =>
//matchedQuery should be 0 for the initial query supplied
Set((amt, ix)) should ===(Set((BigDecimal("999.99"), Vector(BigDecimal(0)))))
}
}
}
ContractDelta(Vector(), _, Some(offset)) <- readOne

_ = kill.shutdown()
_ <- drain

} yield offset
)
}

for {
creation <- initialCreate
_ = creation._1 shouldBe a[StatusCodes.Success]
iouCid = getContractId(getResult(creation._2))
jwt = jwtForParties(List(party.unwrap), List(), testId)
(kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastSeen <- source via parseResp runWith resp(iouCid, kill)

//construct a new multiquery with one of them having an offset while the other doesn't
multiquery = s"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}, "offset": "${lastSeen.unwrap}"},
{"templateIds": ["Iou:Iou"]}
]"""

clientMsg <- singleClientQueryStream(jwt, uri, multiquery)
.take(1)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
} yield inside(clientMsg) { case Vector(result) =>
//we should expect to have matchedQueries [1] to indicate a match for the new template query only.
result should include(s"""$iouCid""")
result should include(""""matchedQueries":[1]""")
}
}

"query should receive deltas as contracts are archived/created" in withHttpService {
(uri, _, _, _) =>
import spray.json._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ object WebSocketService {
import util.Collections._

val indexedOffsets: Vector[Option[domain.Offset]] =
request.queries.map(_.offset).toVector
request.queriesWithPos.map { case (q, _) => q.offset }.toVector

def matchesOffset(queryIndex: Int, maybeEventOffset: Option[domain.Offset]): Boolean = {
import domain.Offset.`Offset ordering`
Expand All @@ -283,31 +283,31 @@ object WebSocketService {
}

def fn(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(a: domain.ActiveContract[LfV], o: Option[domain.Offset]): Option[Positive] = {
q.get(a.templateId).flatMap { preds =>
preds.collect(Function unlift { case ((_, p), ix) =>
preds.collect(Function unlift { case ((_, p), (ix, pos)) =>
val matchesPredicate = p(a.payload)
(matchesPredicate && matchesOffset(ix, o)).option(ix)
(matchesPredicate && matchesOffset(ix, o)).option(pos)
})
}
}

def dbQueriesPlan(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(implicit
sjd: dbbackend.SupportedJdbcDriver.TC
): (Seq[(domain.TemplateId.RequiredPkg, doobie.Fragment)], Map[Int, Int]) = {
val annotated = q.toSeq.flatMap { case (tpid, nel) =>
nel.toVector.map { case ((vp, _), pos) => (tpid, vp.toSqlWhereClause, pos) }
nel.toVector.map { case ((vp, _), (_, pos)) => (tpid, vp.toSqlWhereClause, pos) }
}
val posMap = annotated.iterator.zipWithIndex.map { case ((_, _, pos), ix) =>
(ix, pos)
}.toMap
(annotated map { case (tpid, sql, _) => (tpid, sql) }, posMap)
}

val query = (gacr: domain.SearchForeverQuery, ix: Int) =>
val query = (gacr: domain.SearchForeverQuery, pos: Int, ix: Int) =>
for {
res <-
gacr.templateIds.toList
Expand All @@ -322,10 +322,11 @@ object WebSocketService {
)
(resolved, unresolved) = res
q = prepareFilters(resolved, gacr.query, lookupType): CompiledQueries
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, ix))))
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, (ix, pos)))))
for {
res <-
request.queries.zipWithIndex
request.queriesWithPos.zipWithIndex //index is used to ensure matchesOffset works properly
.map { case ((q, pos), ix) => (q, pos, ix) }
.foldMapM(query.tupled)
(resolved, unresolved, q) = res
} yield StreamPredicate(
Expand Down Expand Up @@ -363,8 +364,8 @@ object WebSocketService {
request: SearchForeverRequest,
): Option[SearchForeverRequest] = {
import scalaz.std.list
val withoutOffset = request.queries.toList.filter(_.offset.isEmpty)
list.toNel(withoutOffset).map(SearchForeverRequest(_))
val withoutOffset = request.queriesWithPos.toList.filter { case (q, _) => q.offset.isEmpty }
list.toNel(withoutOffset).map(SearchForeverRequest)
}

override def adjustRequest(
Expand All @@ -373,9 +374,9 @@ object WebSocketService {
): SearchForeverRequest =
prefix.fold(request)(prefix =>
request.copy(
queries = request.queries.map(query =>
query.copy(offset = query.offset.orElse(Some(prefix.offset)))
)
queriesWithPos = request.queriesWithPos.map {
_ leftMap (q => q.copy(offset = q.offset.orElse(Some(prefix.offset))))
}
)
)

Expand All @@ -388,8 +389,8 @@ object WebSocketService {
prefix: Option[domain.StartingOffset],
request: SearchForeverRequest,
): Option[domain.StartingOffset] =
request.queries
.map(_.offset)
request.queriesWithPos
.map { case (q, _) => q.offset }
.minimumBy1(identity)
.map(domain.StartingOffset(_))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object domain extends com.daml.fetchcontracts.domain.Aliases {
)

final case class SearchForeverRequest(
queries: NonEmptyList[SearchForeverQuery]
queriesWithPos: NonEmptyList[(SearchForeverQuery, Int)]
)

final case class PartyDetails(identifier: Party, displayName: Option[String], isLocal: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ object JsonProtocol extends DefaultJsonProtocol with ExtraFormats {

implicit val SearchForeverRequestFormat: RootJsonReader[domain.SearchForeverRequest] = {
case multi @ JsArray(_) =>
domain.SearchForeverRequest(multi.convertTo[NonEmptyList[domain.SearchForeverQuery]])
val queriesWithPos = multi.convertTo[NonEmptyList[domain.SearchForeverQuery]].zipWithIndex
domain.SearchForeverRequest(queriesWithPos)
case single =>
domain.SearchForeverRequest(NonEmptyList(single.convertTo[domain.SearchForeverQuery]))
domain.SearchForeverRequest(NonEmptyList((single.convertTo[domain.SearchForeverQuery], 0)))
}

implicit val CommandMetaFormat: RootJsonFormat[domain.CommandMeta] = jsonFormat1(
Expand Down

0 comments on commit e474b2d

Please sign in to comment.