Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalTxMonitor: Add support for GetMeasures #4918

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ library

build-depends: base >=4.14 && <4.21,
bytestring >=0.10 && <0.13,
containers,
cborg >=0.2.1 && <0.3,
deepseq,
quiet,
Expand All @@ -107,6 +108,7 @@ library
ouroboros-network-api
^>=0.9.0,
serialise,
text,
typed-protocols ^>=0.1.1,
typed-protocols-cborg
^>=0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec
, codecLocalTxMonitorId
) where

import Control.Monad
import Control.Monad.Class.MonadST

import Network.TypedProtocol.Codec.CBOR

import Data.ByteString.Lazy (ByteString)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
Expand All @@ -44,7 +47,7 @@ codecLocalTxMonitor encodeTxId decodeTxId
mkCodecCborLazyBS encode decode
where
encode ::
forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). ()
forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). ()
=> PeerHasAgency pr st
-> Message ptcl st st'
-> CBOR.Encoding
Expand All @@ -65,6 +68,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid
MsgGetSizes ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 9
MsgGetMeasures ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 11 -- why increments of 2?

encode (ServerAgency TokAcquiring) = \case
MsgAcquired slot ->
Expand All @@ -89,6 +94,14 @@ codecLocalTxMonitor encodeTxId decodeTxId
<> CBOR.encodeWord32 (sizeInBytes sz)
<> CBOR.encodeWord32 (numberOfTxs sz)

encode (ServerAgency (TokBusy TokGetMeasures)) = \case
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 2
<> CBOR.encodeWord 12
<> CBOR.encodeListLen 2
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)

decode ::
forall s (pr :: PeerRole) (st :: ptcl). ()
=> PeerHasAgency pr st
Expand All @@ -113,6 +126,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
return (SomeMessage (MsgHasTx txid))
(ClientAgency TokAcquired, 1, 9) ->
return (SomeMessage MsgGetSizes)
(ClientAgency TokAcquired, 1, 11) ->
return (SomeMessage MsgGetMeasures)

(ServerAgency TokAcquiring, 2, 2) -> do
slot <- decodeSlot
Expand All @@ -136,6 +151,13 @@ codecLocalTxMonitor encodeTxId decodeTxId
let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs }
return (SomeMessage (MsgReplyGetSizes sizes))

(ServerAgency (TokBusy TokGetMeasures), 2, 12) -> do
_len <- CBOR.decodeListLen
txCount <- CBOR.decodeWord32
measuresMap <- decodeMeasureMap
let measures = MempoolMeasures { txCount, measuresMap }
pure (SomeMessage (MsgReplyGetMeasures measures))

(ClientAgency TokIdle, _, _) ->
fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len)
(ClientAgency TokAcquired, _, _) ->
Expand All @@ -145,6 +167,40 @@ codecLocalTxMonitor encodeTxId decodeTxId
(ServerAgency (TokBusy _), _, _) ->
fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len)

encodeMeasureMap :: Map MeasureName (SizeAndCapacity Integer) -> CBOR.Encoding
encodeMeasureMap m =
CBOR.encodeMapLen (fromIntegral (Map.size m)) <>
Map.foldMapWithKey f m
where
f mn sc =
encodeMeasureName mn <> encodeSizeAndCapacity sc

decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Integer))
decodeMeasureMap = do
len <- CBOR.decodeMapLen
mapContents <- replicateM len $
(,) <$> decodeMeasureName <*> decodeSizeAndCapacity
pure $ Map.fromList mapContents

encodeMeasureName :: MeasureName -> CBOR.Encoding
encodeMeasureName (MeasureName t) = CBOR.encodeString t

decodeMeasureName :: CBOR.Decoder s MeasureName
decodeMeasureName = MeasureName <$> CBOR.decodeString

encodeSizeAndCapacity :: SizeAndCapacity Integer -> CBOR.Encoding
encodeSizeAndCapacity sc =
CBOR.encodeListLen 2 <>
CBOR.encodeInteger (size sc) <>
CBOR.encodeInteger (capacity sc)

decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Integer)
decodeSizeAndCapacity = do
_len <- CBOR.decodeListLen
size <- CBOR.decodeInteger
capacity <- CBOR.decodeInteger
pure SizeAndCapacity { size, capacity }

-- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do
-- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ data ServerStAcquired txid tx slot m a = ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a)
, recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a)
, recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a)
, recvMsgGetMeasures :: m (ServerStBusy GetMeasures txid tx slot m a)
, recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a)
, recvMsgRelease :: m (ServerStIdle txid tx slot m a)
}
Expand All @@ -94,6 +95,11 @@ data ServerStBusy (kind :: StBusyKind) txid tx slot m a where
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetSizes txid tx slot m a

SendMsgReplyGetMeasures
:: MempoolMeasures
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetMeasures txid tx slot m a

-- | Interpret a 'LocalTxMonitorServer' action sequence as a 'Peer' on the
-- client-side of the 'LocalTxMonitor' protocol.
--
Expand Down Expand Up @@ -133,6 +139,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
{ recvMsgNextTx
, recvMsgHasTx
, recvMsgGetSizes
, recvMsgGetMeasures
, recvMsgAwaitAcquire
, recvMsgRelease
} -> Await (ClientAgency TokAcquired) $ \case
Expand All @@ -142,6 +149,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
Effect $ handleHasTx <$> recvMsgHasTx txid
MsgGetSizes ->
Effect $ handleGetSizes <$> recvMsgGetSizes
MsgGetMeasures ->
Effect $ handleGetMeasures <$> recvMsgGetMeasures
MsgAwaitAcquire ->
Effect $ handleStAcquiring <$> recvMsgAwaitAcquire
MsgRelease ->
Expand Down Expand Up @@ -170,3 +179,11 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
SendMsgReplyGetSizes sizes serverStAcquired ->
Yield (ServerAgency (TokBusy TokGetSizes)) (MsgReplyGetSizes sizes) $
handleStAcquired serverStAcquired

handleGetMeasures ::
ServerStBusy GetMeasures txid tx slot m a
-> Peer (LocalTxMonitor txid tx slot) AsServer (StBusy GetMeasures) m a
handleGetMeasures = \case
SendMsgReplyGetMeasures measures serverStAcquired ->
Yield (ServerAgency (TokBusy TokGetMeasures)) (MsgReplyGetMeasures measures) $
handleStAcquired serverStAcquired
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
-- @
module Ouroboros.Network.Protocol.LocalTxMonitor.Type where


import Data.Map.Strict (Map)
import Data.Text (Text)
import Data.Word
import GHC.Generics (Generic)

Expand Down Expand Up @@ -109,6 +110,9 @@ data StBusyKind where
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetSizes :: StBusyKind
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetMeasures :: StBusyKind

-- | Describes the MemPool sizes and capacity for a given snapshot.
data MempoolSizeAndCapacity = MempoolSizeAndCapacity
Expand All @@ -121,6 +125,22 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity
-- ^ The number of transactions in the mempool
} deriving (Generic, Eq, Show, NFData)

data SizeAndCapacity a = SizeAndCapacity
{ size :: !a
, capacity :: !a
} deriving (Generic, Eq, Show, NFData)

instance Functor SizeAndCapacity where
fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c)

newtype MeasureName = MeasureName Text
deriving (Generic, Eq, Ord, Show, NFData)

data MempoolMeasures = MempoolMeasures
{ txCount :: !Word32
, measuresMap :: !(Map MeasureName (SizeAndCapacity Integer))
} deriving (Generic, Eq, Show, NFData)

instance Protocol (LocalTxMonitor txid tx slot) where

-- | The messages in the transaction monitoring protocol.
Expand Down Expand Up @@ -201,6 +221,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where
:: MempoolSizeAndCapacity
-> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired

-- | The client asks the server about the mempool current size and max
-- capacity
--
MsgGetMeasures
:: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures)

MsgReplyGetMeasures
:: MempoolMeasures
-> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired

-- | Release the acquired snapshot, in order to loop back to the idle state.
--
MsgRelease
Expand Down Expand Up @@ -249,27 +279,31 @@ instance ( NFData txid
, NFData tx
, NFData slot
) => NFData (Message (LocalTxMonitor txid tx slot) from to) where
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgGetMeasures = ()
rnf (MsgReplyGetMeasures msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()

data TokBusyKind (k :: StBusyKind) where
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokGetMeasures :: TokBusyKind GetMeasures

instance NFData (TokBusyKind k) where
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokGetMeasures = ()

deriving instance (Show txid, Show tx, Show slot)
=> Show (Message (LocalTxMonitor txid tx slot) from to)
Expand All @@ -281,7 +315,8 @@ instance Show (ClientHasAgency (st :: LocalTxMonitor txid tx slot)) where

instance Show (ServerHasAgency (st :: LocalTxMonitor txid tx slot)) where
show = \case
TokAcquiring -> "TokAcquiring"
TokBusy TokNextTx -> "TokBusy TokNextTx"
TokBusy TokHasTx -> "TokBusy TokHasTx"
TokBusy TokGetSizes -> "TokBusy TokGetSizes"
TokAcquiring -> "TokAcquiring"
TokBusy TokNextTx -> "TokBusy TokNextTx"
TokBusy TokHasTx -> "TokBusy TokHasTx"
TokBusy TokGetSizes -> "TokBusy TokGetSizes"
TokBusy TokGetMeasures -> "TokBusy TokGetMeasures"
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,10 @@ localTxMonitorServer txId (slot, allTxs) =
, numberOfTxs = fromIntegral (length allTxs)
}
in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs)
, recvMsgGetMeasures =
let measures = MempoolMeasures
{ txCount = fromIntegral (length allTxs)
, measuresMap = mempty
}
in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Ouroboros.Network.Protocol.LocalTxMonitor.Examples
import Ouroboros.Network.Protocol.LocalTxMonitor.Server
import Ouroboros.Network.Protocol.LocalTxMonitor.Type

import Data.Text qualified as Text
import Test.ChainGenerators ()
import Test.Ouroboros.Network.Testing.Utils (prop_codec_cborM,
prop_codec_valid_cbor_encoding, splits2, splits3)
Expand Down Expand Up @@ -209,11 +210,40 @@ instance (Arbitrary txid, Arbitrary tx, Arbitrary slot)
, AnyMessageAndAgency (ClientAgency TokAcquired) . MsgHasTx <$> arbitrary
, AnyMessageAndAgency (ServerAgency (TokBusy TokHasTx)) . MsgReplyHasTx <$> arbitrary
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgGetSizes
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgGetMeasures
, AnyMessageAndAgency (ServerAgency (TokBusy TokGetSizes)) . MsgReplyGetSizes <$> arbitrary
, AnyMessageAndAgency (ServerAgency (TokBusy TokGetMeasures)) . MsgReplyGetMeasures <$> arbitrary
, pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgRelease
, pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone
]

instance Arbitrary MempoolMeasures where
arbitrary =
MempoolMeasures
<$> arbitrary
<*> arbitrary

instance Arbitrary MeasureName where
arbitrary = MeasureName <$> frequency
[ (9, genKnownMeasureName)
, (1, genUnknownMeasureName)
]
where
genKnownMeasureName =
Text.pack <$> elements
[ "transaction_bytes"
, "reference_scripts"
, "ex_units_memory"
, "ex_units_steps"
]
genUnknownMeasureName = Text.pack <$> arbitrary

instance Arbitrary a => Arbitrary (SizeAndCapacity a) where
arbitrary =
SizeAndCapacity
<$> arbitrary
<*> arbitrary

instance Arbitrary MempoolSizeAndCapacity where
arbitrary =
MempoolSizeAndCapacity
Expand All @@ -224,15 +254,17 @@ instance Arbitrary MempoolSizeAndCapacity where
instance (Eq txid, Eq tx, Eq slot)
=> Eq (AnyMessage (LocalTxMonitor txid tx slot))
where
AnyMessage MsgAcquire == AnyMessage MsgAcquire = True
AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b
AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True
AnyMessage MsgNextTx == AnyMessage MsgNextTx = True
AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b
AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b
AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b
AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True
AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b
AnyMessage MsgRelease == AnyMessage MsgRelease = True
AnyMessage MsgDone == AnyMessage MsgDone = True
AnyMessage _ == AnyMessage _ = False
AnyMessage MsgAcquire == AnyMessage MsgAcquire = True
AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b
AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True
AnyMessage MsgNextTx == AnyMessage MsgNextTx = True
AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b
AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b
AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b
AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True
AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b
AnyMessage MsgGetMeasures == AnyMessage MsgGetMeasures = True
AnyMessage (MsgReplyGetMeasures a) == AnyMessage (MsgReplyGetMeasures b) = a == b
AnyMessage MsgRelease == AnyMessage MsgRelease = True
AnyMessage MsgDone == AnyMessage MsgDone = True
AnyMessage _ == AnyMessage _ = False
Loading