From 9fb279f3e432e76b8662b650d1cb95895d8ba1e6 Mon Sep 17 00:00:00 2001 From: Denis Shevchenko Date: Wed, 20 Oct 2021 16:33:57 +0400 Subject: [PATCH] CAD-3511: datapoint-forward library. --- cabal.project | 4 + datapoint-forward/CHANGELOG.md | 3 + datapoint-forward/CODEOWNERS | 3 + datapoint-forward/LICENSE | 202 ++++++++++++++++++ datapoint-forward/NOTICE | 14 ++ datapoint-forward/README.md | 9 + datapoint-forward/datapoint-forward.cabal | 91 ++++++++ .../src/DataPoint/Forward/Acceptor.hs | 21 ++ .../src/DataPoint/Forward/Configuration.hs | 36 ++++ .../src/DataPoint/Forward/Forwarder.hs | 22 ++ .../src/DataPoint/Forward/Network/Acceptor.hs | 154 +++++++++++++ .../DataPoint/Forward/Network/Forwarder.hs | 104 +++++++++ .../DataPoint/Forward/Protocol/Acceptor.hs | 55 +++++ .../src/DataPoint/Forward/Protocol/Codec.hs | 83 +++++++ .../DataPoint/Forward/Protocol/Forwarder.hs | 49 +++++ .../src/DataPoint/Forward/Protocol/Type.hs | 122 +++++++++++ .../src/DataPoint/Forward/Utils.hs | 131 ++++++++++++ datapoint-forward/test/Main.hs | 12 ++ .../Test/DataPoint/Forward/Demo/Configs.hs | 29 +++ .../test/Test/DataPoint/Forward/Demo/Tests.hs | 103 +++++++++ .../Test/DataPoint/Forward/Protocol/Codec.hs | 35 +++ .../Test/DataPoint/Forward/Protocol/Tests.hs | 27 +++ .../test/Test/DataPoint/Forward/Types.hs | 23 ++ 23 files changed, 1332 insertions(+) create mode 100644 datapoint-forward/CHANGELOG.md create mode 100644 datapoint-forward/CODEOWNERS create mode 100644 datapoint-forward/LICENSE create mode 100644 datapoint-forward/NOTICE create mode 100644 datapoint-forward/README.md create mode 100644 datapoint-forward/datapoint-forward.cabal create mode 100644 datapoint-forward/src/DataPoint/Forward/Acceptor.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Configuration.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Forwarder.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs create mode 100644 datapoint-forward/src/DataPoint/Forward/Utils.hs create mode 100644 datapoint-forward/test/Main.hs create mode 100644 datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs create mode 100644 datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs create mode 100644 datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs create mode 100644 datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs create mode 100644 datapoint-forward/test/Test/DataPoint/Forward/Types.hs diff --git a/cabal.project b/cabal.project index 37ffbc9825d..4354dd0f093 100644 --- a/cabal.project +++ b/cabal.project @@ -13,6 +13,7 @@ packages: bench/locli bench/tx-generator plutus-example/plutus-example + datapoint-forward trace-dispatcher trace-forward trace-resources @@ -79,6 +80,9 @@ package cardano-testnet package plutus-examples tests: True +package datapoint-forward + tests: True + package trace-resources tests: True diff --git a/datapoint-forward/CHANGELOG.md b/datapoint-forward/CHANGELOG.md new file mode 100644 index 00000000000..27316fb6d97 --- /dev/null +++ b/datapoint-forward/CHANGELOG.md @@ -0,0 +1,3 @@ +# ChangeLog + +# 0.1.0 diff --git a/datapoint-forward/CODEOWNERS b/datapoint-forward/CODEOWNERS new file mode 100644 index 00000000000..6e6b1a89e87 --- /dev/null +++ b/datapoint-forward/CODEOWNERS @@ -0,0 +1,3 @@ +# General reviewers per PR +# Denis Serge Jürgen +* @denisshevchenko @deepfire @jutaro diff --git a/datapoint-forward/LICENSE b/datapoint-forward/LICENSE new file mode 100644 index 00000000000..f471221ad3a --- /dev/null +++ b/datapoint-forward/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2021 Input Output (Hong Kong) Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/datapoint-forward/NOTICE b/datapoint-forward/NOTICE new file mode 100644 index 00000000000..fb77bb84e9d --- /dev/null +++ b/datapoint-forward/NOTICE @@ -0,0 +1,14 @@ +Copyright 2021 Input Output (Hong Kong) Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/datapoint-forward/README.md b/datapoint-forward/README.md new file mode 100644 index 00000000000..30e4b93741e --- /dev/null +++ b/datapoint-forward/README.md @@ -0,0 +1,9 @@ +# datapoint-forward + +`datapoint-forward` is a library allowing to forward different structured data from one process to another one. It is built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols). + +The `trace-dispatcher` library is using `datapoint-forward` to forward different structured data from the node to exernal acceptors (for example, `cardano-tracer`). The example of such a data is node's basic info. Please note that each forwarded type should provide `ToJSON` and `FromJSON` instances, because it will be "packed" to `DataPoint` and then forwarded to the acceptor by request. + +## Developers + +Benchmarking team is responsible for this library. The primary developer is [@denisshevchenko](https://github.com/denisshevchenko). diff --git a/datapoint-forward/datapoint-forward.cabal b/datapoint-forward/datapoint-forward.cabal new file mode 100644 index 00000000000..5a7a434eafd --- /dev/null +++ b/datapoint-forward/datapoint-forward.cabal @@ -0,0 +1,91 @@ +cabal-version: 2.4 +name: datapoint-forward +version: 0.1.0 +synopsis: See README for more info +description: See README for more info +license: Apache-2.0 +license-file: LICENSE +copyright: 2021 Input Output (Hong Kong) Ltd. +author: IOHK +maintainer: operations@iohk.io +build-type: Simple +extra-doc-files: README.md + CHANGELOG.md + +common base { build-depends: base >= 4.14 && < 4.15 } + +common project-config + default-language: Haskell2010 + + ghc-options: -Wall + -Wcompat + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wno-unticked-promoted-constructors + -Wno-orphans + -Wpartial-fields + -Wredundant-constraints + -Wunused-packages + +library + import: base, project-config + hs-source-dirs: src + + exposed-modules: DataPoint.Forward.Acceptor + DataPoint.Forward.Configuration + DataPoint.Forward.Forwarder + DataPoint.Forward.Utils + + DataPoint.Forward.Network.Acceptor + DataPoint.Forward.Network.Forwarder + + DataPoint.Forward.Protocol.Acceptor + DataPoint.Forward.Protocol.Codec + DataPoint.Forward.Protocol.Forwarder + DataPoint.Forward.Protocol.Type + + build-depends: aeson + , async + , bytestring + , cborg + , contra-tracer + , extra + , io-classes + , ouroboros-network-framework + , serialise + , stm + , text + , typed-protocols + , typed-protocols-cborg + , unordered-containers + +test-suite test + import: base, project-config + type: exitcode-stdio-1.0 + main-is: Main.hs + hs-source-dirs: test + + other-modules: Test.DataPoint.Forward.Protocol.Codec + Test.DataPoint.Forward.Protocol.Tests + Test.DataPoint.Forward.Demo.Configs + Test.DataPoint.Forward.Demo.Tests + Test.DataPoint.Forward.Types + + build-depends: aeson + , async + , contra-tracer + , directory + , extra + , filepath + , ouroboros-network-framework + , datapoint-forward + , QuickCheck + , serialise + , stm + , tasty + , tasty-quickcheck + , typed-protocols + , text + + ghc-options: -rtsopts + -threaded diff --git a/datapoint-forward/src/DataPoint/Forward/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Acceptor.hs new file mode 100644 index 00000000000..d19f039ea6f --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Acceptor.hs @@ -0,0 +1,21 @@ +{-# LANGUAGE NamedFieldPuns #-} + +-- | This top-level module will be used by the acceptor application. +-- Acceptor application asks 'DataPoint's from the forwarder application. +module DataPoint.Forward.Acceptor + ( runDataPointAcceptor + ) where + +import Ouroboros.Network.IOManager (IOManager) + +import DataPoint.Forward.Network.Acceptor (listenToForwarder) +import DataPoint.Forward.Configuration (AcceptorConfiguration (..)) +import DataPoint.Forward.Utils (DataPointAsker, runActionInLoop) + +runDataPointAcceptor + :: IOManager -- ^ 'IOManager' from the external application. + -> AcceptorConfiguration -- ^ Acceptor configuration. + -> DataPointAsker -- ^ The structure we use to ask for 'DataPoint's explicitly. + -> IO () +runDataPointAcceptor iomgr config@AcceptorConfiguration{forwarderEndpoint} dpAsker = + runActionInLoop (listenToForwarder iomgr config dpAsker) forwarderEndpoint 1 diff --git a/datapoint-forward/src/DataPoint/Forward/Configuration.hs b/datapoint-forward/src/DataPoint/Forward/Configuration.hs new file mode 100644 index 00000000000..df82501b95d --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Configuration.hs @@ -0,0 +1,36 @@ +module DataPoint.Forward.Configuration + ( AcceptorConfiguration (..) + , ForwarderConfiguration (..) + , HowToConnect (..) + ) where + +import Control.Tracer (Tracer) +import GHC.Conc (TVar) +import Ouroboros.Network.Driver (TraceSendRecv) + +import DataPoint.Forward.Protocol.Type + +-- | Specifies how to connect to the peer. +-- Currently, only local socket/pipe is used. +newtype HowToConnect = LocalPipe FilePath + deriving Show + +-- | Acceptor configuration, parameterized by trace item's type. +data AcceptorConfiguration = AcceptorConfiguration + { -- | The tracer that will be used by the acceptor in its network layer. + acceptorTracer :: !(Tracer IO (TraceSendRecv DataPointForward)) + -- | The endpoint that will be used to listen to the forwarder. + , forwarderEndpoint :: !HowToConnect + -- | 'TVar' that can be used as a brake: if an external thread sets + -- it to 'True', the acceptor will send 'MsgDone' message to the + -- forwarder and their session will be closed. + , shouldWeStop :: !(TVar Bool) + } + +-- | Forwarder configuration, parameterized by trace item's type. +data ForwarderConfiguration = ForwarderConfiguration + { -- | The tracer that will be used by the forwarder in its network layer. + forwarderTracer :: !(Tracer IO (TraceSendRecv DataPointForward)) + -- | The endpoint that will be used to connect to the acceptor. + , acceptorEndpoint :: !HowToConnect + } diff --git a/datapoint-forward/src/DataPoint/Forward/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Forwarder.hs new file mode 100644 index 00000000000..173103a343b --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Forwarder.hs @@ -0,0 +1,22 @@ +{-# LANGUAGE NamedFieldPuns #-} + +-- This top-level module will be used by the forwarder application. +-- Forwarder application collects 'DataPoint's and sends them to +-- the acceptor application. +module DataPoint.Forward.Forwarder + ( runDataPointForwarder + ) where + +import Ouroboros.Network.IOManager (IOManager) + +import DataPoint.Forward.Configuration (ForwarderConfiguration (..)) +import DataPoint.Forward.Network.Forwarder (connectToAcceptor) +import DataPoint.Forward.Utils + +runDataPointForwarder + :: IOManager -- ^ 'IOManager' from the external application. + -> ForwarderConfiguration -- ^ Forwarder configuration. + -> DataPointStore -- ^ DataPoint store. + -> IO () +runDataPointForwarder iomgr config@ForwarderConfiguration{acceptorEndpoint} dpStore = + runActionInLoop (connectToAcceptor iomgr config dpStore) acceptorEndpoint 1 diff --git a/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs new file mode 100644 index 00000000000..ed0f6cbea60 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Network/Acceptor.hs @@ -0,0 +1,154 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Network.Acceptor + ( listenToForwarder + -- | Export this function for Mux purpose. + , acceptDataPoints + , acceptDataPointsInit + ) where + +import Codec.CBOR.Term (Term) +import qualified Codec.Serialise as CBOR +import Control.Concurrent.Async (race_, wait) +import Control.Monad.Extra (ifM) +import Control.Monad.STM (atomically, check) +import Control.Concurrent.STM.TVar (modifyTVar', readTVar, readTVarIO) +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), + MiniProtocolNum (..), MuxMode (..), + OuroborosApplication (..), MuxPeer (..), + RunMiniProtocol (..), + miniProtocolLimits, miniProtocolNum, miniProtocolRun) +import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.ErrorPolicy (nullErrorPolicies) +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) +import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..), + SomeResponderApplication (..), + cleanNetworkMutableState, newNetworkMutableState, + nullNetworkServerTracers, withServerNode) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), + UnversionedProtocolData (..), + unversionedHandshakeCodec, + unversionedProtocolDataCodec) +import Ouroboros.Network.Protocol.Handshake.Type (Handshake) +import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, + simpleSingletonVersions) + +import qualified DataPoint.Forward.Protocol.Acceptor as Acceptor +import qualified DataPoint.Forward.Protocol.Codec as Acceptor +import DataPoint.Forward.Protocol.Type (DataPointName) +import DataPoint.Forward.Configuration (AcceptorConfiguration (..), HowToConnect (..)) +import DataPoint.Forward.Utils (DataPointAsker (..)) + +listenToForwarder + :: IOManager + -> AcceptorConfiguration + -> DataPointAsker + -> IO () +listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} dpAsker = do + let (LocalPipe localPipe) = forwarderEndpoint + snocket = localSnocket iomgr + address = localAddressFromPath localPipe + doListenToForwarder snocket address noTimeLimitsHandshake app + where + app = + -- TODO: There's _shouldStopSTM and 'shouldWeStop' in + -- 'AcceptorConfiguration'. Currently 'ouroboros-network' does not exposes + -- the write end of `_shouldStopSTM`, if it did we could use it instead of + -- 'shouldWeStop'. + OuroborosApplication $ \_connectionId _shouldStopSTM -> + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 1 + , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } + , miniProtocolRun = acceptDataPoints config dpAsker + } + ] + +doListenToForwarder + :: Ord addr + => Snocket IO fd addr + -> addr + -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) + -> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void () + -> IO () +doListenToForwarder snocket address timeLimits app = do + networkState <- newNetworkMutableState + race_ (cleanNetworkMutableState networkState) + $ withServerNode + snocket + nullNetworkServerTracers + networkState + (AcceptedConnectionsLimit maxBound maxBound 0) + address + unversionedHandshakeCodec + timeLimits + (cborTermVersionDataCodec unversionedProtocolDataCodec) + acceptableVersion + (simpleSingletonVersions + UnversionedProtocol + UnversionedProtocolData + (SomeResponderApplication app)) + nullErrorPolicies + $ \_ serverAsync -> wait serverAsync -- Block until async exception. + +acceptDataPoints + :: AcceptorConfiguration + -> DataPointAsker + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +acceptDataPoints config dpAsker = + ResponderProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (acceptorTracer config) + (Acceptor.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.dataPointAcceptorPeer $ acceptorActions config dpAsker []) + +acceptDataPointsInit + :: AcceptorConfiguration + -> DataPointAsker + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +acceptDataPointsInit config dpAsker = + InitiatorProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (acceptorTracer config) + (Acceptor.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.dataPointAcceptorPeer $ acceptorActions config dpAsker []) + +acceptorActions + :: AcceptorConfiguration + -> DataPointAsker + -> [DataPointName] + -> Acceptor.DataPointAcceptor IO () +acceptorActions config@AcceptorConfiguration{shouldWeStop} + dpAsker@DataPointAsker{askDataPoints, dataPointsNames, dataPointsAreHere, dataPointsReply} + dpNames = + Acceptor.SendMsgDataPointsRequest dpNames $ \replyWithDataPoints -> do + -- Reply with 'DataPoint's is here, update the asker. + atomically $ do + modifyTVar' dataPointsReply $ const replyWithDataPoints + -- To notify external context that answer was received. + modifyTVar' dataPointsAreHere $ const True + -- To prevent new automatic request. + modifyTVar' askDataPoints $ const False + ifM (readTVarIO shouldWeStop) + (return $ Acceptor.SendMsgDone $ return ()) + $ do + -- Block here until external context ask for 'DataPoint's again. + atomically $ readTVar askDataPoints >>= check + -- Ok, external context asked for 'DataPoint's again. + dpNames' <- atomically $ do + modifyTVar' dataPointsAreHere $ const False + readTVar dataPointsNames + return $ acceptorActions config dpAsker dpNames' diff --git a/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs new file mode 100644 index 00000000000..b66058bae6c --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Network/Forwarder.hs @@ -0,0 +1,104 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Network.Forwarder + ( connectToAcceptor + -- | Export this function for Mux purpose. + , forwardDataPoints + , forwardDataPointsResp + ) where + +import Codec.CBOR.Term (Term) +import qualified Codec.Serialise as CBOR +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.IOManager (IOManager) +import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), + MiniProtocolNum (..), MuxMode (..), + OuroborosApplication (..), MuxPeer (..), + RunMiniProtocol (..), + miniProtocolLimits, miniProtocolNum, miniProtocolRun) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), + UnversionedProtocolData (..), + unversionedHandshakeCodec, + unversionedProtocolDataCodec) +import Ouroboros.Network.Protocol.Handshake.Type (Handshake) +import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) +import Ouroboros.Network.Socket (connectToNode, nullNetworkConnectTracers) + +import DataPoint.Forward.Configuration (ForwarderConfiguration (..), HowToConnect (..)) +import DataPoint.Forward.Utils +import qualified DataPoint.Forward.Protocol.Forwarder as Forwarder +import qualified DataPoint.Forward.Protocol.Codec as Forwarder + +connectToAcceptor + :: IOManager + -> ForwarderConfiguration + -> DataPointStore + -> IO () +connectToAcceptor iomgr config@ForwarderConfiguration{acceptorEndpoint} dpStore = + doConnectToAcceptor (localSnocket iomgr) (localAddressFromPath localPipe) noTimeLimitsHandshake app + where + LocalPipe localPipe = acceptorEndpoint + app = + OuroborosApplication $ \_connectionId _shouldStopSTM -> + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 1 + , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } + , miniProtocolRun = forwardDataPoints config dpStore + } + ] + +doConnectToAcceptor + :: Snocket IO fd addr + -> addr + -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) + -> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void + -> IO () +doConnectToAcceptor snocket address timeLimits app = + connectToNode + snocket + unversionedHandshakeCodec + timeLimits + (cborTermVersionDataCodec unversionedProtocolDataCodec) + nullNetworkConnectTracers + acceptableVersion + (simpleSingletonVersions + UnversionedProtocol + UnversionedProtocolData + app) + Nothing + address + +forwardDataPoints + :: ForwarderConfiguration + -> DataPointStore + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +forwardDataPoints config dpStore = + InitiatorProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.dataPointForwarderPeer $ readFromStore dpStore) + +forwardDataPointsResp + :: ForwarderConfiguration + -> DataPointStore + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +forwardDataPointsResp config dpStore = + ResponderProtocolOnly $ + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.dataPointForwarderPeer $ readFromStore dpStore) diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs new file mode 100644 index 00000000000..768adba0166 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Acceptor.hs @@ -0,0 +1,55 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} + +-- | A view of the trace forwarding/accepting protocol +-- from the point of view of the client. +-- +-- For execution, a conversion into the typed protocol is provided. +-- +module DataPoint.Forward.Protocol.Acceptor + ( DataPointAcceptor(..) + , dataPointAcceptorPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import DataPoint.Forward.Protocol.Type + +data DataPointAcceptor m a where + SendMsgDataPointsRequest + :: [DataPointName] + -> (DataPointValues -> m (DataPointAcceptor m a)) + -> DataPointAcceptor m a + + SendMsgDone + :: m a + -> DataPointAcceptor m a + +-- | Interpret a particular action sequence into the client side of the protocol. +-- +dataPointAcceptorPeer + :: Monad m + => DataPointAcceptor m a + -> Peer DataPointForward 'AsClient 'StIdle m a +dataPointAcceptorPeer = \case + SendMsgDataPointsRequest request next -> + -- Send our message (request for new 'DataPoint's from the forwarder). + Yield (ClientAgency TokIdle) (MsgDataPointsRequest request) $ + -- We're now into the 'StBusy' state, and now we'll wait for a reply + -- from the forwarder. It is assuming that the forwarder will reply + -- immediately (even there are no 'DataPoint's). + Await (ServerAgency TokBusy) $ \(MsgDataPointsReply reply) -> + Effect $ + dataPointAcceptorPeer <$> next reply + + SendMsgDone getResult -> + -- We do an actual transition using 'yield', to go from the 'StIdle' to + -- 'StDone' state. Once in the 'StDone' state we can actually stop using + -- 'done', with a return value. + Effect $ + Yield (ClientAgency TokIdle) MsgDone . Done TokDone + <$> getResult diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs new file mode 100644 index 00000000000..a033bea116a --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Codec.hs @@ -0,0 +1,83 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module DataPoint.Forward.Protocol.Codec + ( codecDataPointForward + ) where + +import qualified Codec.CBOR.Decoding as CBOR +import qualified Codec.CBOR.Encoding as CBOR +import Codec.CBOR.Read (DeserialiseFailure) +import Control.Monad.Class.MonadST (MonadST) +import qualified Data.ByteString.Lazy as LBS +import Text.Printf (printf) + +import Network.TypedProtocol.Codec (Codec, PeerHasAgency (..), + PeerRole (..), SomeMessage (..)) +import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS) + +import DataPoint.Forward.Protocol.Type + +codecDataPointForward + :: forall m. + MonadST m + => ([DataPointName] -> CBOR.Encoding) -- ^ Encoder for 'Request'. + -> (forall s . CBOR.Decoder s [DataPointName]) -- ^ Decoder for 'Request'. + -> (DataPointValues -> CBOR.Encoding) -- ^ Encoder for reply with list of 'DataPoint's values. + -> (forall s . CBOR.Decoder s DataPointValues) -- ^ Decoder for reply with list of 'DataPoint's values. + -> Codec DataPointForward + DeserialiseFailure m LBS.ByteString +codecDataPointForward encodeRequest decodeRequest + encodeReplyList decodeReplyList = + mkCodecCborLazyBS encode decode + where + -- Encode messages. + encode + :: forall (pr :: PeerRole) + (st :: DataPointForward) + (st' :: DataPointForward). + PeerHasAgency pr st + -> Message DataPointForward st st' + -> CBOR.Encoding + + encode (ClientAgency TokIdle) (MsgDataPointsRequest request) = + CBOR.encodeListLen 3 + <> CBOR.encodeWord 1 + <> encodeRequest request + + encode (ClientAgency TokIdle) MsgDone = + CBOR.encodeListLen 1 + <> CBOR.encodeWord 2 + + encode (ServerAgency TokBusy) (MsgDataPointsReply reply) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 4 + <> encodeReplyList reply + + -- Decode messages + decode + :: forall (pr :: PeerRole) + (st :: DataPointForward) s. + PeerHasAgency pr st + -> CBOR.Decoder s (SomeMessage st) + decode stok = do + len <- CBOR.decodeListLen + key <- CBOR.decodeWord + case (key, len, stok) of + (1, 3, ClientAgency TokIdle) -> + SomeMessage . MsgDataPointsRequest <$> decodeRequest + + (2, 1, ClientAgency TokIdle) -> + return $ SomeMessage MsgDone + + (4, 2, ServerAgency TokBusy) -> + SomeMessage . MsgDataPointsReply <$> decodeReplyList + + -- Failures per protocol state + (_, _, ClientAgency TokIdle) -> + fail (printf "codecDataPointForward (%s) unexpected key (%d, %d)" (show stok) key len) + (_, _, ServerAgency TokBusy) -> + fail (printf "codecDataPointForward (%s) unexpected key (%d, %d)" (show stok) key len) diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs new file mode 100644 index 00000000000..3f2962ad7ca --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Forwarder.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Protocol.Forwarder + ( DataPointForwarder (..) + , dataPointForwarderPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import DataPoint.Forward.Protocol.Type + +data DataPointForwarder m a = DataPointForwarder + { -- | The acceptor sent us a request for new 'DataPoint's. + recvMsgDataPointsRequest + :: [DataPointName] + -> m (DataPointValues, DataPointForwarder m a) + + -- | The acceptor terminated. Here we have a pure return value, but we + -- could have done another action in 'm' if we wanted to. + , recvMsgDone :: m a + } + +-- | Interpret a particular action sequence into the server side of the protocol. +-- +dataPointForwarderPeer + :: Monad m + => DataPointForwarder m a + -> Peer DataPointForward 'AsServer 'StIdle m a +dataPointForwarderPeer DataPointForwarder{recvMsgDataPointsRequest, recvMsgDone} = + -- In the 'StIdle' state the forwarder is awaiting a request message + -- from the acceptor. + Await (ClientAgency TokIdle) $ \case + -- The acceptor sent us a request for new 'DataPoint's, so now we're + -- in the 'StBusy' state which means it's the forwarder's turn to send + -- a reply. + MsgDataPointsRequest request -> Effect $ do + (reply, next) <- recvMsgDataPointsRequest request + return $ Yield (ServerAgency TokBusy) + (MsgDataPointsReply reply) + (dataPointForwarderPeer next) + + -- The acceptor sent the done transition, so we're in the 'StDone' state + -- so all we can do is stop using 'done', with a return value. + MsgDone -> Effect $ Done TokDone <$> recvMsgDone diff --git a/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs b/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs new file mode 100644 index 00000000000..8646c8d62c4 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Protocol/Type.hs @@ -0,0 +1,122 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE EmptyCase #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE TypeFamilies #-} + +-- | The type of the 'DataPoint' forwarding/accepting protocol. +-- +-- Since we are using a typed protocol framework this is in some sense /the/ +-- definition of the protocol: what is allowed and what is not allowed. + +module DataPoint.Forward.Protocol.Type + ( DataPointName + , DataPointValue + , DataPointValues + , DataPointForward (..) + , Message (..) + , ClientHasAgency (..) + , ServerHasAgency (..) + , NobodyHasAgency (..) + ) where + +import qualified Data.ByteString.Lazy as LBS +import Data.Text (Text) +import Network.TypedProtocol.Core (Protocol (..)) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +-- | A kind to identify our protocol, and the types of the states in the state +-- transition diagram of the protocol. +-- +-- IMPORTANT NOTE: the following terminology is used: +-- +-- 1. From the protocol's point of view, two peers talk to each other: +-- the forwarder and the acceptor. +-- 2. The forwarder is an application that collects 'DataPoint's and sends +-- them to the acceptor by request (with 'MsgDataPointsReply'). +-- 3. The acceptor is an application that receives 'DataPoint's from the +-- forwarder. +-- 4. You can think of the acceptor as a client, and the forwarder as a server. +-- After the connection is established, the acceptor asks for 'DataPoint's, +-- the forwarder replies to it. + +type DataPointName = Text +type DataPointValue = LBS.ByteString +type DataPointValues = [(DataPointName, Maybe DataPointValue)] + +data DataPointForward where + + -- | Both acceptor and forwarder are in idle state. The acceptor can send a + -- request for a list of 'DataPoint's ('MsgDataPointsRequest'); + -- the forwarder is waiting for a request, it will replay with 'MsgDataPointsReply'. + StIdle :: DataPointForward + + -- | The acceptor has sent a next request for 'DataPoint's. The acceptor is + -- now waiting for a reply, and the forwarder is busy getting ready to send a + -- reply with new list of 'DataPoint's. + StBusy :: DataPointForward + + -- | Both the acceptor and forwarder are in the terminal state. They're done. + StDone :: DataPointForward + +instance ShowProxy DataPointForward where + showProxy _ = "DataPointForward" + +instance Protocol DataPointForward where + + -- | The messages in the trace forwarding/accepting protocol. + -- + data Message DataPointForward from to where + -- | Request the list of 'DataPoint's from the forwarder. + -- State: Idle -> Busy. + MsgDataPointsRequest + :: [DataPointName] + -> Message DataPointForward 'StIdle 'StBusy + + -- | Reply with a list of 'DataPoint's for the acceptor. + -- State: Busy -> Idle. + MsgDataPointsReply + :: DataPointValues + -> Message DataPointForward 'StBusy 'StIdle + + -- | Terminating message. State: Idle -> Done. + MsgDone + :: Message DataPointForward 'StIdle 'StDone + + -- | This is an explanation of our states, in terms of which party has agency + -- in each state. + -- + -- 1. When both peers are in Idle state, the acceptor can send a message + -- to the forwarder (request for new 'DataPoint's), + -- 2. When both peers are in Busy state, the forwarder is expected to send + -- a reply to the acceptor (list of new 'DataPoint's). + -- + -- So we assume that, from __interaction__ point of view: + -- 1. ClientHasAgency (from 'Network.TypedProtocol.Core') corresponds to acceptor's agency. + -- 3. ServerHasAgency (from 'Network.TypedProtocol.Core') corresponds to forwarder's agency. + -- + data ClientHasAgency st where + TokIdle :: ClientHasAgency 'StIdle + + data ServerHasAgency st where + TokBusy :: ServerHasAgency 'StBusy + + data NobodyHasAgency st where + TokDone :: NobodyHasAgency 'StDone + + -- | Impossible cases. + exclusionLemma_ClientAndServerHaveAgency TokIdle tok = case tok of {} + exclusionLemma_NobodyAndClientHaveAgency TokDone tok = case tok of {} + exclusionLemma_NobodyAndServerHaveAgency TokDone tok = case tok of {} + +instance Show (Message DataPointForward from to) where + show MsgDataPointsRequest{} = "MsgDataPointsRequest" + show MsgDataPointsReply{} = "MsgDataPointsReply" + show MsgDone{} = "MsgDone" + +instance Show (ClientHasAgency (st :: DataPointForward)) where + show TokIdle = "TokIdle" + +instance Show (ServerHasAgency (st :: DataPointForward)) where + show TokBusy{} = "TokBusy" diff --git a/datapoint-forward/src/DataPoint/Forward/Utils.hs b/datapoint-forward/src/DataPoint/Forward/Utils.hs new file mode 100644 index 00000000000..b82db8c97b3 --- /dev/null +++ b/datapoint-forward/src/DataPoint/Forward/Utils.hs @@ -0,0 +1,131 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} + +module DataPoint.Forward.Utils + ( DataPoint (..) + , DataPointStore + , DataPointAsker (..) + , initDataPointStore + , initDataPointAsker + , writeToStore + , readFromStore + , askForDataPoints + , runActionInLoop + ) where + +import Control.Concurrent.STM (atomically, check) +import Control.Concurrent.STM.TVar +import Control.Exception (SomeAsyncException (..), fromException, tryJust) +import Control.Tracer (showTracing, stdoutTracer, traceWith) +import Data.Aeson +import qualified Data.HashMap.Strict as HM +import System.Time.Extra (sleep) + +import DataPoint.Forward.Configuration +import DataPoint.Forward.Protocol.Forwarder +import DataPoint.Forward.Protocol.Type + +-- | Type wrapper for some value of type 'v'. The only reason we need this +-- wrapper is an ability to store different values in the same 'DataPointStore'. +data DataPoint where + DataPoint :: ToJSON v => v -> DataPoint + +type DataPointStore = TVar (HM.HashMap DataPointName DataPoint) + +initDataPointStore :: IO DataPointStore +initDataPointStore = newTVarIO HM.empty + +-- | Write 'DataPoint' to the store. +writeToStore + :: DataPointStore + -> DataPointName + -> DataPoint + -> IO () +writeToStore dpStore dpName dp = atomically $ + modifyTVar' dpStore $ \store -> + if dpName `HM.member` store + then HM.adjust (const dp) dpName store + else HM.insert dpName dp store + +-- | Read 'DataPoint's from the store. Please note that we don't care what's +-- inside of 'DataPoint', we just know it can be encoded to JSON. +readFromStore + :: DataPointStore + -> DataPointForwarder IO () +readFromStore dpStore = + DataPointForwarder + { recvMsgDataPointsRequest = \dpNames -> do + store <- readTVarIO dpStore + let replyList = map (lookupDataPoint store) dpNames + return (replyList, readFromStore dpStore) + , recvMsgDone = return () + } + where + lookupDataPoint store dpName = + ( dpName + , (\(DataPoint v) -> Just $ encode v) =<< HM.lookup dpName store + ) + +-- | Since 'DataPointForward' protocol does not assume the stream of requests/replies, +-- we use the 'TVar's to provide to acceptor's side an ability to ask 'DataPoint's +-- explicitly. +data DataPointAsker = DataPointAsker + { -- | The "ask flag": we use it to notify that we want 'DataPoint's. + askDataPoints :: !(TVar Bool) + -- | The names of 'DataPoint's we need. + , dataPointsNames :: !(TVar [DataPointName]) + -- | The "ready flag": we use it to notify that 'DataPoint's was received. + , dataPointsAreHere :: !(TVar Bool) + -- | The list of received 'DataPoint's' values. + , dataPointsReply :: !(TVar DataPointValues) + } + +initDataPointAsker :: IO DataPointAsker +initDataPointAsker = DataPointAsker + <$> newTVarIO False + <*> newTVarIO [] + <*> newTVarIO False + <*> newTVarIO [] + +askForDataPoints + :: DataPointAsker + -> [DataPointName] + -> IO DataPointValues +askForDataPoints _ [] = return [] +askForDataPoints DataPointAsker{askDataPoints, dataPointsNames, dataPointsAreHere, dataPointsReply} dpNames = do + atomically $ do + modifyTVar' dataPointsNames $ const dpNames -- Fill the names of 'DataPoint's we need. + modifyTVar' askDataPoints $ const True -- Ask them! The flag for acceptor's part of the protocol. + modifyTVar' dataPointsAreHere $ const False -- There is no answer yet. + -- Check the "ready flag" until it's True. + atomically $ readTVar dataPointsAreHere >>= check + -- Return the list of 'DataPoint's' values. + readTVarIO dataPointsReply + +-- | Run monadic action in a loop. If there's an exception, it will re-run +-- the action again, after pause that grows. +runActionInLoop + :: IO () + -> HowToConnect + -> Word + -> IO () +runActionInLoop action endpoint prevDelay = + tryJust excludeAsyncExceptions action >>= \case + Left e -> do + logTrace $ "datapoint-forward, connection with " <> show endpoint <> " failed: " <> show e + sleep $ fromIntegral currentDelay + runActionInLoop action endpoint currentDelay + Right _ -> return () + where + excludeAsyncExceptions e = + case fromException e of + Just SomeAsyncException {} -> Nothing + _ -> Just e + + logTrace = traceWith $ showTracing stdoutTracer + + currentDelay = + if prevDelay < 60 + then prevDelay * 2 + else 60 -- After we reached 60+ secs delay, repeat an attempt every minute. diff --git a/datapoint-forward/test/Main.hs b/datapoint-forward/test/Main.hs new file mode 100644 index 00000000000..550ae62826a --- /dev/null +++ b/datapoint-forward/test/Main.hs @@ -0,0 +1,12 @@ +module Main (main) where + +import Test.Tasty + +import qualified Test.DataPoint.Forward.Protocol.Tests as Protocol +import qualified Test.DataPoint.Forward.Demo.Tests as Demo + +main :: IO () +main = defaultMain $ testGroup "datapoint-forward" + [ Protocol.tests + , Demo.tests + ] diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs new file mode 100644 index 00000000000..74485ad33dd --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Configs.hs @@ -0,0 +1,29 @@ +module Test.DataPoint.Forward.Demo.Configs + ( mkAcceptorConfig + , mkForwarderConfig + ) where + +import Control.Tracer (nullTracer) +import GHC.Conc (TVar) + +import DataPoint.Forward.Configuration + +mkAcceptorConfig + :: HowToConnect + -> TVar Bool + -> AcceptorConfiguration +mkAcceptorConfig ep weAreDone = + AcceptorConfiguration + { acceptorTracer = nullTracer + , forwarderEndpoint = ep + , shouldWeStop = weAreDone + } + +mkForwarderConfig + :: HowToConnect + -> ForwarderConfiguration +mkForwarderConfig ep = + ForwarderConfiguration + { forwarderTracer = nullTracer + , acceptorEndpoint = ep + } diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs new file mode 100644 index 00000000000..22a13275f42 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Demo/Tests.hs @@ -0,0 +1,103 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.DataPoint.Forward.Demo.Tests + ( tests + ) where + +import Control.Concurrent.Async (withAsync) +import Control.Concurrent.STM.TVar (modifyTVar') +import qualified Data.Aeson as A +import GHC.Conc +import System.Directory (getTemporaryDirectory) +#if defined(mingw32_HOST_OS) +import System.FilePath ((), dropDrive) +import qualified Data.Text as T +#else +import System.FilePath (()) +#endif +import Test.Tasty +import Test.Tasty.QuickCheck +import System.Time.Extra (sleep) + +import Ouroboros.Network.IOManager (withIOManager) + +import DataPoint.Forward.Acceptor +import DataPoint.Forward.Configuration +import DataPoint.Forward.Forwarder +import DataPoint.Forward.Utils + +import Test.DataPoint.Forward.Demo.Configs +import Test.DataPoint.Forward.Protocol.Codec () +import Test.DataPoint.Forward.Types + +tests :: TestTree +tests = localOption (QuickCheckTests 1) $ testGroup "DataPoint.Forward.Demo" + [ testProperty "Pass DataPoints" prop_Demo + ] + +ni :: TestNodeInfo +ni = TestNodeInfo + { niName = "core-1" + , niVersion = "1.30.1" + , niCommit = "abcdefg" + , niProtocol = "Shelley" + } + +bs :: BlockchainStatus +bs = BlockchainStatus + { bsEpoch = 124 + , bsSlot = 6785 + } + +prop_Demo :: Property +prop_Demo = ioProperty . withIOManager $ \iomgr -> do + ep <- LocalPipe <$> mkLocalPipePath + weAreDone <- newTVarIO False + dpStore <- initDataPointStore + dpAsker <- initDataPointAsker + + dpValues <- withAsync (runDataPointAcceptor iomgr (mkAcceptorConfig ep weAreDone) dpAsker) $ \_ -> do + sleep 0.5 + withAsync (runDataPointForwarder iomgr (mkForwarderConfig ep) dpStore) $ \_ -> do + sleep 0.5 + writeToStore dpStore "nodeInfo" $ DataPoint ni + writeToStore dpStore "bchainStatus" $ DataPoint bs + sleep 0.1 + values <- askForDataPoints dpAsker ["nodeInfo", "bchainStatus", "wrongName"] + sleep 1.0 + atomically $ modifyTVar' weAreDone $ const True + sleep 0.5 + return values + + p1 <- case lookup "nodeInfo" dpValues of + Just (Just v) -> case A.decode v of + Nothing -> false "nodeInfo value is invalid!" + Just (ni' :: TestNodeInfo) -> return $ ni === ni' + _ -> false "No expected nodeInfo value!" + + p2 <- case lookup "bchainStatus" dpValues of + Just (Just v) -> case A.decode v of + Nothing -> false "bchainStatus value is invalid!" + Just (bs' :: BlockchainStatus) -> return $ bs === bs' + _ -> false "No expected bchainStatus value!" + + p3 <- case lookup "wrongName" dpValues of + Just Nothing -> return $ property True + _ -> false "No expected wrongName value!" + + return $ p1 .&&. p2 .&&. p3 + +mkLocalPipePath :: IO FilePath +mkLocalPipePath = do + tmpDir <- getTemporaryDirectory +#if defined(mingw32_HOST_OS) + return $ "\\\\.\\pipe\\" <> (T.unpack . T.replace "\\" "-" . T.pack) (dropDrive tmpDir) + <> "_" "trace-forward-test" +#else + return $ tmpDir "trace-forward-test.sock" +#endif + +false :: String -> IO Property +false msg = return . counterexample msg $ property False diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs new file mode 100644 index 00000000000..14812a3edc6 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Codec.hs @@ -0,0 +1,35 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} + +module Test.DataPoint.Forward.Protocol.Codec () where + +import qualified Data.Aeson as A +import Test.QuickCheck + +import Network.TypedProtocol.Core +import Network.TypedProtocol.Codec + +import DataPoint.Forward.Protocol.Type + +import Test.DataPoint.Forward.Types + +instance Arbitrary (AnyMessageAndAgency DataPointForward) where + arbitrary = oneof + [ pure $ AnyMessageAndAgency (ClientAgency TokIdle) (MsgDataPointsRequest ["NodeInfo"]) + , pure $ AnyMessageAndAgency (ServerAgency TokBusy) (MsgDataPointsReply [("NodeInfo", Just ni)]) + , pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone + ] + where + ni = A.encode $ TestNodeInfo + { niName = "core-1" + , niVersion = "1.30.1" + , niCommit = "abcdefg" + , niProtocol = "Shelley" + } + +instance Eq (AnyMessage DataPointForward) where + AnyMessage (MsgDataPointsRequest r1) == AnyMessage (MsgDataPointsRequest r2) = r1 == r2 + AnyMessage (MsgDataPointsReply r1) == AnyMessage (MsgDataPointsReply r2) = r1 == r2 + AnyMessage MsgDone == AnyMessage MsgDone = True + _ == _ = False diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs new file mode 100644 index 00000000000..e0f0e28a6c3 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Protocol/Tests.hs @@ -0,0 +1,27 @@ +module Test.DataPoint.Forward.Protocol.Tests + ( tests + ) where + +import qualified Codec.Serialise as CBOR +import Control.Monad.ST (runST) +import Test.Tasty +import Test.Tasty.QuickCheck + +import Network.TypedProtocol.Codec + +import DataPoint.Forward.Protocol.Codec +import DataPoint.Forward.Protocol.Type + +import Test.DataPoint.Forward.Protocol.Codec () + +tests :: TestTree +tests = testGroup "DataPoint.Forward.Protocol" + [ testProperty "codec" prop_codec_DataPointForward + ] + +prop_codec_DataPointForward :: AnyMessageAndAgency DataPointForward -> Bool +prop_codec_DataPointForward msg = + runST $ prop_codecM + (codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg diff --git a/datapoint-forward/test/Test/DataPoint/Forward/Types.hs b/datapoint-forward/test/Test/DataPoint/Forward/Types.hs new file mode 100644 index 00000000000..ed51c17d1f9 --- /dev/null +++ b/datapoint-forward/test/Test/DataPoint/Forward/Types.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} + +module Test.DataPoint.Forward.Types + ( TestNodeInfo (..) + , BlockchainStatus (..) + ) where + +import qualified Data.Aeson as A +import Data.Text (Text) +import GHC.Generics + +data TestNodeInfo = TestNodeInfo + { niName :: !Text + , niVersion :: !Text + , niCommit :: !Text + , niProtocol :: !Text + } deriving (Eq, Generic, A.ToJSON, A.FromJSON, Show) + +data BlockchainStatus = BlockchainStatus + { bsEpoch :: !Int + , bsSlot :: !Int + } deriving (Eq, Generic, A.ToJSON, A.FromJSON, Show)