From ae9ebd74ede31ce5c0758847269e52d68d2b9054 Mon Sep 17 00:00:00 2001 From: jankun4 Date: Sun, 16 Jun 2024 23:26:02 +0200 Subject: [PATCH 1/6] [#1235] add /transaction/watch endpoint & background tx status process --- CHANGELOG.md | 1 + govtool/backend/app/Main.hs | 11 ++- govtool/backend/src/VVA/API.hs | 20 +++++ govtool/backend/src/VVA/Transaction.hs | 115 ++++++++++++++++++++++++- govtool/backend/src/VVA/Types.hs | 12 +++ govtool/backend/vva-be.cabal | 4 + 6 files changed, 160 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index becd8dc0a..f85f4a6d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ changes. ## [Unreleased] - Added 'sentryenv' field in backend config file [Issue 1401](https://github.com/IntersectMBO/govtool/issues/1401) +- Add websocket transaction status endpoint /transaction/watch/ [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235) - Add wallet connector package [Issue 898](https://github.com/IntersectMBO/govtool/issues/898) - Change DRep without metadata name from "Sole Voter" to "Direct Voter" [Issue 880](https://github.com/IntersectMBO/govtool/issues/880) - Inicialize Usersnap into App [Issue 546](https://github.com/IntersectMBO/govtool/issues/546) diff --git a/govtool/backend/app/Main.hs b/govtool/backend/app/Main.hs index cc08afdd3..5e10d2aa6 100644 --- a/govtool/backend/app/Main.hs +++ b/govtool/backend/app/Main.hs @@ -8,6 +8,7 @@ module Main where +import GHC.Conc (newTVarIO) import Control.Concurrent (forkIO) import Control.Concurrent.QSem (newQSem) import Control.Exception (Exception, @@ -77,6 +78,7 @@ import VVA.Types (AppEnv (..), CacheEnv (..)) import Network.HTTP.Client hiding (Proxy, Request) import Network.HTTP.Client.TLS +import VVA.Transaction (processTransactionStatuses) proxyAPI :: Proxy (VVAApi :<|> SwaggerAPI) proxyAPI = Proxy @@ -137,7 +139,8 @@ startApp vvaConfig = do connectionPool <- createPool (connectPostgreSQL (encodeUtf8 (dbSyncConnectionString $ getter vvaConfig))) close 1 1 60 vvaTlsManager <- newManager tlsManagerSettings qsem <- newQSem (metadataValidationMaxConcurrentRequests vvaConfig) - let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem} + websocketConnectionsTVar <- newTVarIO mempty + let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem, vvaWebSocketConnections=websocketConnectionsTVar} _ <- forkIO $ do result <- runReaderT (runExceptT startFetchProcess) appEnv @@ -145,6 +148,12 @@ startApp vvaConfig = do Left e -> throw e Right _ -> return () + _ <- forkIO $ do + result <- runReaderT (runExceptT $ processTransactionStatuses websocketConnectionsTVar) appEnv + case result of + Left e -> throw e + Right _ -> return () + server' <- mkVVAServer appEnv runSettings settings server' diff --git a/govtool/backend/src/VVA/API.hs b/govtool/backend/src/VVA/API.hs index f5e29cc63..e5642a63d 100644 --- a/govtool/backend/src/VVA/API.hs +++ b/govtool/backend/src/VVA/API.hs @@ -5,9 +5,12 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE MultiParamTypeClasses #-} module VVA.API where +import qualified Network.WebSockets.Connection as WS +import Servant.API.WebSocket (WebSocket) import Control.Concurrent.QSem (waitQSem, signalQSem) import Control.Concurrent.Async (mapConcurrently) import Control.Exception (throw, throwIO) @@ -44,6 +47,7 @@ import VVA.Types (App, AppEnv (..), AppError (CriticalError, ValidationError, InternalError), CacheEnv (..)) import qualified VVA.Metadata as Metadata +import Servant.OpenApi (HasOpenApi, toOpenApi) type VVAApi = "drep" :> "list" @@ -78,6 +82,8 @@ type VVAApi = :<|> "network" :> "metrics" :> Get '[JSON] GetNetworkMetricsResponse :<|> "proposal" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse :<|> "drep" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse + :<|> "transaction" :> "watch" :> Capture "transactionId" HexText :> WebSocket + server :: App m => ServerT VVAApi m server = drepList :<|> getVotingPower @@ -93,6 +99,20 @@ server = drepList :<|> getNetworkMetrics :<|> getProposalMetadataValidationResponse :<|> getDRepMetadataValidationResponse + :<|> transactionWatch + +instance HasOpenApi WebSocket where + toOpenApi _ = mempty + + +transactionWatch :: App m => HexText -> WS.Connection -> m () +transactionWatch (unHexText -> transactionId) c = do + tvar <- asks vvaWebSocketConnections + Transaction.watchTransaction tvar transactionId c + liftIO $ forever $ do + msg <- WS.receiveData c + putStrLn $ Text.unpack $ ("Received: " <> msg) + WS.sendTextData c (msg :: Text) mapDRepType :: Types.DRepType -> DRepType diff --git a/govtool/backend/src/VVA/Transaction.hs b/govtool/backend/src/VVA/Transaction.hs index 73ebc594a..25d3fdc08 100644 --- a/govtool/backend/src/VVA/Transaction.hs +++ b/govtool/backend/src/VVA/Transaction.hs @@ -1,9 +1,19 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeApplications #-} module VVA.Transaction where +import qualified Data.Map as Map +import Data.Map (Map) +import qualified Network.WebSockets.Connection as WS +import GHC.Conc (TVar, threadDelay, readTVar, readTVarIO, writeTVar, atomically) +import Data.Maybe +import Data.Either.Extra (mapRight) +import Data.UUID.V4 (nextRandom) +import Network.WebSockets.Connection (Connection) +import qualified Database.Redis as Redis import Control.Exception (throw) import Control.Monad.Except (MonadError, throwError) import Control.Monad.Reader @@ -20,8 +30,8 @@ import qualified Database.PostgreSQL.Simple as SQL import VVA.Config import VVA.Pool (ConnectionPool, withPool) -import VVA.Types (AppError (..), - TransactionStatus (..)) +import VVA.Types (AppError (..), AppEnv (..), WebsocketTvar, + TransactionStatus (..), vvaWebSocketConnections) sqlFrom :: ByteString -> SQL.Query sqlFrom bs = fromString $ unpack $ Text.decodeUtf8 bs @@ -39,3 +49,104 @@ getTransactionStatus transactionId = withPool $ \conn -> do [SQL.Only True] -> return TransactionConfirmed [SQL.Only False] -> return TransactionUnconfirmed x -> throwError $ CriticalError ("Expected exactly one result from get-transaction-status.sql but got " <> pack (show (length x)) <> " of them. This should never happen") + +processTransactionStatuses :: + (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> m () +processTransactionStatuses tvar = do + txs <- getWatchedTransactions + forM_ txs $ \(txHash, uuid) -> do + status <- getTransactionStatus txHash + case status of + TransactionConfirmed -> do + connection <- getWebsocketConnection tvar uuid + case connection of + Just conn -> do + liftIO $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text) + removeWebsocketConnection tvar uuid + Nothing -> return () + TransactionUnconfirmed -> return () + + liftIO $ threadDelay (20 * 1000000) + processTransactionStatuses tvar + + +watchTransaction :: + (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> Connection + -> m () +watchTransaction tVar txHash connection = do + + uuid <- (pack . show) <$> liftIO nextRandom + + port <- getRedisPort + host <- getRedisHost + conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + + liftIO $ Redis.runRedis conn $ do + _ <- Redis.set (Text.encodeUtf8 txHash) (Text.encodeUtf8 uuid) + + return () + setWebsocketConnection tVar uuid connection + + + +getWatchedTransactions :: + (Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => m [(Text, Text)] +getWatchedTransactions = do + port <- getRedisPort + host <- getRedisHost + conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + keysResult <- liftIO $ Redis.runRedis conn $ Redis.keys "*" + case keysResult of + Left err -> do + -- handle error, maybe throw an AppError + throwError $ CriticalError $ "Error fetching keys: " <> (pack . show) err + Right keys -> do + keyValuePairs <- liftIO $ Redis.runRedis conn $ mapM getKeyValue keys + let filteredPairs = catMaybes keyValuePairs + pure filteredPairs + + +getKeyValue :: ByteString -> Redis.Redis (Maybe (Text, Text)) +getKeyValue key = do + valueResult <- Redis.get key + return $ case valueResult of + Left _ -> Nothing + Right Nothing -> Nothing + Right (Just value) -> Just (Text.decodeUtf8 key, Text.decodeUtf8 value) + +setWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> Connection + -> m () +setWebsocketConnection tvar txHash connection = do + liftIO $ atomically $ do + connections <- readTVar tvar + writeTVar tvar $ Map.insert txHash connection connections + + +getWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> m (Maybe Connection) +getWebsocketConnection tvar txHash = do + connections <- liftIO $ readTVarIO tvar + return $ Map.lookup txHash connections + +removeWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> m () +removeWebsocketConnection tvar txHash = do + liftIO $ atomically $ do + connections <- readTVar tvar + writeTVar tvar $ Map.delete txHash connections \ No newline at end of file diff --git a/govtool/backend/src/VVA/Types.hs b/govtool/backend/src/VVA/Types.hs index ef8e97303..275094ce8 100644 --- a/govtool/backend/src/VVA/Types.hs +++ b/govtool/backend/src/VVA/Types.hs @@ -9,6 +9,10 @@ module VVA.Types where + + +import GHC.Conc (TVar) +import qualified Network.WebSockets.Connection as WS import Data.Aeson.TH (deriveJSON) import VVA.API.Utils (jsonOptions) import GHC.Generics (Generic) @@ -24,6 +28,7 @@ import Data.Has import Data.Pool (Pool) import Data.Text (Text) import Data.Time (UTCTime) +import Data.Map (Map) import Database.PostgreSQL.Simple (Connection) @@ -205,6 +210,8 @@ data VotingAnchor type App m = (MonadReader AppEnv m, MonadIO m, MonadFail m, MonadError AppError m) +type WebsocketTvar = TVar (Map Text WS.Connection) + data AppEnv = AppEnv { vvaConfig :: VVAConfig @@ -212,6 +219,7 @@ data AppEnv , vvaConnectionPool :: Pool Connection , vvaTlsManager :: Manager , vvaMetadataQSem :: QSem + , vvaWebSocketConnections :: WebsocketTvar } instance Has VVAConfig AppEnv where @@ -233,3 +241,7 @@ instance Has Manager AppEnv where instance Has QSem AppEnv where getter AppEnv {vvaMetadataQSem} = vvaMetadataQSem modifier f a@AppEnv {vvaMetadataQSem} = a {vvaMetadataQSem = f vvaMetadataQSem} + +instance Has (TVar (Map Text WS.Connection)) AppEnv where + getter AppEnv {vvaWebSocketConnections} = vvaWebSocketConnections + modifier f a@AppEnv {vvaWebSocketConnections} = a {vvaWebSocketConnections = f vvaWebSocketConnections} diff --git a/govtool/backend/vva-be.cabal b/govtool/backend/vva-be.cabal index 0a5ef69d6..5f965ed49 100644 --- a/govtool/backend/vva-be.cabal +++ b/govtool/backend/vva-be.cabal @@ -105,6 +105,10 @@ library , vector , async , hedis + , websockets + , uuid + , servant-websockets + , servant-openapi3 exposed-modules: VVA.Config , VVA.CommandLine From bf8834cdcfff7ce08abc0362e3c6b55a6f92887a Mon Sep 17 00:00:00 2001 From: jankun4 Date: Tue, 11 Jun 2024 22:19:43 +0200 Subject: [PATCH 2/6] [#1234] Add reddis support for storing metadata validation results --- govtool/backend/app/Main.hs | 7 +++++++ govtool/backend/src/VVA/Types.hs | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/govtool/backend/app/Main.hs b/govtool/backend/app/Main.hs index 5e10d2aa6..276347ef7 100644 --- a/govtool/backend/app/Main.hs +++ b/govtool/backend/app/Main.hs @@ -139,8 +139,12 @@ startApp vvaConfig = do connectionPool <- createPool (connectPostgreSQL (encodeUtf8 (dbSyncConnectionString $ getter vvaConfig))) close 1 1 60 vvaTlsManager <- newManager tlsManagerSettings qsem <- newQSem (metadataValidationMaxConcurrentRequests vvaConfig) +<<<<<<< HEAD websocketConnectionsTVar <- newTVarIO mempty let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem, vvaWebSocketConnections=websocketConnectionsTVar} +======= + let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem} +>>>>>>> b45e63d1... [#1234] Add reddis support for storing metadata validation results _ <- forkIO $ do result <- runReaderT (runExceptT startFetchProcess) appEnv @@ -148,12 +152,15 @@ startApp vvaConfig = do Left e -> throw e Right _ -> return () +<<<<<<< HEAD _ <- forkIO $ do result <- runReaderT (runExceptT $ processTransactionStatuses websocketConnectionsTVar) appEnv case result of Left e -> throw e Right _ -> return () +======= +>>>>>>> b45e63d1... [#1234] Add reddis support for storing metadata validation results server' <- mkVVAServer appEnv runSettings settings server' diff --git a/govtool/backend/src/VVA/Types.hs b/govtool/backend/src/VVA/Types.hs index 275094ce8..29b140da4 100644 --- a/govtool/backend/src/VVA/Types.hs +++ b/govtool/backend/src/VVA/Types.hs @@ -9,8 +9,6 @@ module VVA.Types where - - import GHC.Conc (TVar) import qualified Network.WebSockets.Connection as WS import Data.Aeson.TH (deriveJSON) @@ -245,3 +243,4 @@ instance Has QSem AppEnv where instance Has (TVar (Map Text WS.Connection)) AppEnv where getter AppEnv {vvaWebSocketConnections} = vvaWebSocketConnections modifier f a@AppEnv {vvaWebSocketConnections} = a {vvaWebSocketConnections = f vvaWebSocketConnections} + From 58f13b81ec0494528397c9490627bafd8228fb94 Mon Sep 17 00:00:00 2001 From: jankun4 Date: Mon, 17 Jun 2024 11:50:09 +0200 Subject: [PATCH 3/6] [#1235] merge with #1234 branch --- govtool/backend/app/Main.hs | 7 ------- govtool/backend/src/VVA/Transaction.hs | 1 - 2 files changed, 8 deletions(-) diff --git a/govtool/backend/app/Main.hs b/govtool/backend/app/Main.hs index 276347ef7..5e10d2aa6 100644 --- a/govtool/backend/app/Main.hs +++ b/govtool/backend/app/Main.hs @@ -139,12 +139,8 @@ startApp vvaConfig = do connectionPool <- createPool (connectPostgreSQL (encodeUtf8 (dbSyncConnectionString $ getter vvaConfig))) close 1 1 60 vvaTlsManager <- newManager tlsManagerSettings qsem <- newQSem (metadataValidationMaxConcurrentRequests vvaConfig) -<<<<<<< HEAD websocketConnectionsTVar <- newTVarIO mempty let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem, vvaWebSocketConnections=websocketConnectionsTVar} -======= - let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem} ->>>>>>> b45e63d1... [#1234] Add reddis support for storing metadata validation results _ <- forkIO $ do result <- runReaderT (runExceptT startFetchProcess) appEnv @@ -152,15 +148,12 @@ startApp vvaConfig = do Left e -> throw e Right _ -> return () -<<<<<<< HEAD _ <- forkIO $ do result <- runReaderT (runExceptT $ processTransactionStatuses websocketConnectionsTVar) appEnv case result of Left e -> throw e Right _ -> return () -======= ->>>>>>> b45e63d1... [#1234] Add reddis support for storing metadata validation results server' <- mkVVAServer appEnv runSettings settings server' diff --git a/govtool/backend/src/VVA/Transaction.hs b/govtool/backend/src/VVA/Transaction.hs index 25d3fdc08..8305c5e14 100644 --- a/govtool/backend/src/VVA/Transaction.hs +++ b/govtool/backend/src/VVA/Transaction.hs @@ -10,7 +10,6 @@ import Data.Map (Map) import qualified Network.WebSockets.Connection as WS import GHC.Conc (TVar, threadDelay, readTVar, readTVarIO, writeTVar, atomically) import Data.Maybe -import Data.Either.Extra (mapRight) import Data.UUID.V4 (nextRandom) import Network.WebSockets.Connection (Connection) import qualified Database.Redis as Redis From ac356d6c36956a3ade3799faf1d47e66b2123db0 Mon Sep 17 00:00:00 2001 From: jankun4 Date: Sun, 23 Jun 2024 20:27:13 +0200 Subject: [PATCH 4/6] [#1235] add lifetime for websocket connections --- CHANGELOG.md | 1 + govtool/backend/app/Main.hs | 8 ++- govtool/backend/example-config.json | 3 +- govtool/backend/src/VVA/Config.hs | 15 ++++- govtool/backend/src/VVA/Metadata.hs | 2 - govtool/backend/src/VVA/Transaction.hs | 62 +++++++++++++++---- govtool/backend/src/VVA/Types.hs | 4 +- .../config/templates/backend-config.json.tpl | 3 +- 8 files changed, 79 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f85f4a6d1..684551e02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ changes. ### Added +- added `websocketlifetimeseconds` config field for webscoket connections, and watched transactions. [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235) - added separate async process that fetches new voting_anchors, validates their metadata using metadata-validation service, and then stores it in Redis database [Issue 1234](https://github.com/IntersectMBO/govtool/issues/1234) - added `bio` `dRepName` `email` `references` `metadataValid` and `metadataStatus` fields to `drep/list` - added `metadatavalidationmaxconcurrentrequests` field to the backend config diff --git a/govtool/backend/app/Main.hs b/govtool/backend/app/Main.hs index 5e10d2aa6..4a08ec583 100644 --- a/govtool/backend/app/Main.hs +++ b/govtool/backend/app/Main.hs @@ -78,7 +78,7 @@ import VVA.Types (AppEnv (..), CacheEnv (..)) import Network.HTTP.Client hiding (Proxy, Request) import Network.HTTP.Client.TLS -import VVA.Transaction (processTransactionStatuses) +import VVA.Transaction (processTransactionStatuses, timeoutStaleWebsocketConnections) proxyAPI :: Proxy (VVAApi :<|> SwaggerAPI) proxyAPI = Proxy @@ -154,6 +154,12 @@ startApp vvaConfig = do Left e -> throw e Right _ -> return () + _ <- forkIO $ do + result <- runReaderT (runExceptT $ timeoutStaleWebsocketConnections websocketConnectionsTVar) appEnv + case result of + Left e -> throw e + Right _ -> return () + server' <- mkVVAServer appEnv runSettings settings server' diff --git a/govtool/backend/example-config.json b/govtool/backend/example-config.json index 338b8db54..71ec04571 100644 --- a/govtool/backend/example-config.json +++ b/govtool/backend/example-config.json @@ -18,5 +18,6 @@ "host" : "localhost", "port" : 8094, "password": null - } + }, + "websocketlifetimeseconds": 60 } diff --git a/govtool/backend/src/VVA/Config.hs b/govtool/backend/src/VVA/Config.hs index 557ffbcd3..fdb5cf644 100644 --- a/govtool/backend/src/VVA/Config.hs +++ b/govtool/backend/src/VVA/Config.hs @@ -29,6 +29,7 @@ module VVA.Config , vvaConfigToText , getMetadataValidationHost , getMetadataValidationPort + , getWebsocketLifetimeSeconds ) where import Conferer @@ -101,6 +102,8 @@ data VVAConfigInternal , vVAConfigInternalMetadataValidationMaxConcurrentRequests :: Int -- | Redis config , vVAConfigInternalRedisConfig :: RedisInternalConfig + -- | WebSocket lifetime in seconds + , vVAConfigInternalWebsocketLifetimeSeconds :: Int } deriving (FromConfig, Generic, Show) @@ -116,7 +119,8 @@ instance DefaultConfig VVAConfigInternal where vVAConfigInternalMetadataValidationHost = "localhost", vVAConfigInternalMetadataValidationPort = 3001, vVAConfigInternalMetadataValidationMaxConcurrentRequests = 10, - vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing + vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing, + vVAConfigInternalWebsocketLifetimeSeconds = 60 * 1 } data RedisConfig @@ -150,6 +154,8 @@ data VVAConfig , metadataValidationMaxConcurrentRequests :: Int -- | Redis config , redisConfig :: RedisConfig + -- | WebSocket lifetime in seconds + , websocketLifetimeSeconds :: Int } deriving (Generic, Show, ToJSON) @@ -199,6 +205,7 @@ convertConfig VVAConfigInternal {..} = redisPort = redisInternalConfigPort $ vVAConfigInternalRedisConfig, redisPassword = redisInternalConfigPassword $ vVAConfigInternalRedisConfig } + websocketLifetimeSeconds = vVAConfigInternalWebsocketLifetimeSeconds } -- | Load configuration from a file specified on the command line. Load from @@ -265,3 +272,9 @@ getMetadataValidationPort :: (Has VVAConfig r, MonadReader r m) => m Int getMetadataValidationPort = asks (metadataValidationPort . getter) + +-- | Access websocket lifetime in seconds +getWebsocketLifetimeSeconds :: + (Has VVAConfig r, MonadReader r m) => + m Int +getWebsocketLifetimeSeconds = asks (websocketLifetimeSeconds . getter) diff --git a/govtool/backend/src/VVA/Metadata.hs b/govtool/backend/src/VVA/Metadata.hs index d87fe2f37..56bc030a2 100644 --- a/govtool/backend/src/VVA/Metadata.hs +++ b/govtool/backend/src/VVA/Metadata.hs @@ -58,8 +58,6 @@ startFetchProcess :: startFetchProcess = go 0 where go latestKnownId = do - liftIO $ putStrLn "Fetching metadata..." - anchors <- getNewVotingAnchors latestKnownId if null anchors then do diff --git a/govtool/backend/src/VVA/Transaction.hs b/govtool/backend/src/VVA/Transaction.hs index 8305c5e14..4c3db95a2 100644 --- a/govtool/backend/src/VVA/Transaction.hs +++ b/govtool/backend/src/VVA/Transaction.hs @@ -2,9 +2,11 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeApplications #-} +{-# LANGUAGE BlockArguments #-} module VVA.Transaction where +import Data.Time import qualified Data.Map as Map import Data.Map (Map) import qualified Network.WebSockets.Connection as WS @@ -49,6 +51,20 @@ getTransactionStatus transactionId = withPool $ \conn -> do [SQL.Only False] -> return TransactionUnconfirmed x -> throwError $ CriticalError ("Expected exactly one result from get-transaction-status.sql but got " <> pack (show (length x)) <> " of them. This should never happen") +timeoutStaleWebsocketConnections :: + (Has AppEnv r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> m () +timeoutStaleWebsocketConnections tvar = do + currentTime <- liftIO getCurrentTime + connections <- liftIO $ readTVarIO tvar + websocketLifetimeSeconds <- getWebsocketLifetimeSeconds + let staleConnections = Map.filter (\(_, time) -> diffUTCTime currentTime time > fromIntegral websocketLifetimeSeconds) connections + forM_ (Map.keys staleConnections) $ \txHash -> do + removeWebsocketConnection tvar txHash "Websocket timed out." + liftIO $ threadDelay (30 * 1000000) + timeoutStaleWebsocketConnections tvar + processTransactionStatuses :: (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) => WebsocketTvar @@ -61,10 +77,11 @@ processTransactionStatuses tvar = do TransactionConfirmed -> do connection <- getWebsocketConnection tvar uuid case connection of - Just conn -> do + Just (conn, _) -> do liftIO $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text) - removeWebsocketConnection tvar uuid - Nothing -> return () + unWatchTransaciton tvar uuid + removeWebsocketConnection tvar uuid "Tx confirmed. Closing connection." + Nothing -> unWatchTransaciton tvar uuid TransactionUnconfirmed -> return () liftIO $ threadDelay (20 * 1000000) @@ -84,13 +101,28 @@ watchTransaction tVar txHash connection = do port <- getRedisPort host <- getRedisHost conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + websocketLifetimeSeconds <- getWebsocketLifetimeSeconds liftIO $ Redis.runRedis conn $ do _ <- Redis.set (Text.encodeUtf8 txHash) (Text.encodeUtf8 uuid) - + Redis.expire (Text.encodeUtf8 txHash) $ fromIntegral websocketLifetimeSeconds return () + setWebsocketConnection tVar uuid connection +unWatchTransaciton :: + (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> m () +unWatchTransaciton tVar uuid = do + port <- getRedisPort + host <- getRedisHost + conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + + liftIO $ Redis.runRedis conn $ do + _ <- Redis.del [Text.encodeUtf8 uuid] + return () getWatchedTransactions :: @@ -126,16 +158,18 @@ setWebsocketConnection :: -> Connection -> m () setWebsocketConnection tvar txHash connection = do - liftIO $ atomically $ do - connections <- readTVar tvar - writeTVar tvar $ Map.insert txHash connection connections + liftIO do + currentTime <- getCurrentTime + atomically $ do + connections <- readTVar tvar + writeTVar tvar $ Map.insert txHash (connection, currentTime) connections getWebsocketConnection :: (MonadReader r m, MonadIO m, MonadError AppError m) => WebsocketTvar -> Text - -> m (Maybe Connection) + -> m (Maybe (Connection, UTCTime)) getWebsocketConnection tvar txHash = do connections <- liftIO $ readTVarIO tvar return $ Map.lookup txHash connections @@ -144,8 +178,14 @@ removeWebsocketConnection :: (MonadReader r m, MonadIO m, MonadError AppError m) => WebsocketTvar -> Text + -> Text -> m () -removeWebsocketConnection tvar txHash = do - liftIO $ atomically $ do +removeWebsocketConnection tvar txHash message = liftIO $ do + mConn <- atomically $ do connections <- readTVar tvar - writeTVar tvar $ Map.delete txHash connections \ No newline at end of file + writeTVar tvar $ Map.delete txHash connections + return $ Map.lookup txHash connections + + case mConn of + Just (conn, _) -> liftIO $ WS.sendClose conn message + Nothing -> return () \ No newline at end of file diff --git a/govtool/backend/src/VVA/Types.hs b/govtool/backend/src/VVA/Types.hs index 29b140da4..f5d5952b2 100644 --- a/govtool/backend/src/VVA/Types.hs +++ b/govtool/backend/src/VVA/Types.hs @@ -208,7 +208,7 @@ data VotingAnchor type App m = (MonadReader AppEnv m, MonadIO m, MonadFail m, MonadError AppError m) -type WebsocketTvar = TVar (Map Text WS.Connection) +type WebsocketTvar = TVar (Map Text (WS.Connection, UTCTime)) data AppEnv = AppEnv @@ -240,7 +240,7 @@ instance Has QSem AppEnv where getter AppEnv {vvaMetadataQSem} = vvaMetadataQSem modifier f a@AppEnv {vvaMetadataQSem} = a {vvaMetadataQSem = f vvaMetadataQSem} -instance Has (TVar (Map Text WS.Connection)) AppEnv where +instance Has (TVar (Map Text (WS.Connection, UTCTime))) AppEnv where getter AppEnv {vvaWebSocketConnections} = vvaWebSocketConnections modifier f a@AppEnv {vvaWebSocketConnections} = a {vvaWebSocketConnections = f vvaWebSocketConnections} diff --git a/scripts/govtool/config/templates/backend-config.json.tpl b/scripts/govtool/config/templates/backend-config.json.tpl index bda65e26d..3f6cd00b5 100644 --- a/scripts/govtool/config/templates/backend-config.json.tpl +++ b/scripts/govtool/config/templates/backend-config.json.tpl @@ -18,5 +18,6 @@ "host" : "http://redis", "port" : 8094, "password": "" - } + }, + "websocketlifetimeseconds": 60 } From fdfc7d2ed3aa3551f8b6e7d5d45ac4b2c7a843dd Mon Sep 17 00:00:00 2001 From: jankun4 Date: Sun, 23 Jun 2024 21:59:59 +0200 Subject: [PATCH 5/6] [#1235] minor refactor --- govtool/backend/src/VVA/API.hs | 12 +++++---- govtool/backend/src/VVA/Transaction.hs | 36 +++++++++----------------- govtool/backend/vva-be.cabal | 1 + 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/govtool/backend/src/VVA/API.hs b/govtool/backend/src/VVA/API.hs index e5642a63d..b6835d958 100644 --- a/govtool/backend/src/VVA/API.hs +++ b/govtool/backend/src/VVA/API.hs @@ -9,6 +9,7 @@ module VVA.API where +import Control.Monad.Loops (iterateUntil) import qualified Network.WebSockets.Connection as WS import Servant.API.WebSocket (WebSocket) import Control.Concurrent.QSem (waitQSem, signalQSem) @@ -108,11 +109,12 @@ instance HasOpenApi WebSocket where transactionWatch :: App m => HexText -> WS.Connection -> m () transactionWatch (unHexText -> transactionId) c = do tvar <- asks vvaWebSocketConnections - Transaction.watchTransaction tvar transactionId c - liftIO $ forever $ do - msg <- WS.receiveData c - putStrLn $ Text.unpack $ ("Received: " <> msg) - WS.sendTextData c (msg :: Text) + uuid <- Transaction.watchTransaction tvar transactionId c + liftIO $ iterateUntil (== ("ACK" :: Text)) $ Text.stripEnd <$> WS.receiveData c + Transaction.removeWebsocketConnection tvar uuid "Tx confirmed. Closing connection." + return () + + mapDRepType :: Types.DRepType -> DRepType diff --git a/govtool/backend/src/VVA/Transaction.hs b/govtool/backend/src/VVA/Transaction.hs index 4c3db95a2..dd987211c 100644 --- a/govtool/backend/src/VVA/Transaction.hs +++ b/govtool/backend/src/VVA/Transaction.hs @@ -15,7 +15,7 @@ import Data.Maybe import Data.UUID.V4 (nextRandom) import Network.WebSockets.Connection (Connection) import qualified Database.Redis as Redis -import Control.Exception (throw) +import Control.Exception (throw, try, SomeException) import Control.Monad.Except (MonadError, throwError) import Control.Monad.Reader @@ -60,8 +60,9 @@ timeoutStaleWebsocketConnections tvar = do connections <- liftIO $ readTVarIO tvar websocketLifetimeSeconds <- getWebsocketLifetimeSeconds let staleConnections = Map.filter (\(_, time) -> diffUTCTime currentTime time > fromIntegral websocketLifetimeSeconds) connections - forM_ (Map.keys staleConnections) $ \txHash -> do - removeWebsocketConnection tvar txHash "Websocket timed out." + forM_ (Map.keys staleConnections) $ \uuid -> do + removeWebsocketConnection tvar uuid "Websocket timed out." + liftIO $ threadDelay (30 * 1000000) timeoutStaleWebsocketConnections tvar @@ -78,10 +79,9 @@ processTransactionStatuses tvar = do connection <- getWebsocketConnection tvar uuid case connection of Just (conn, _) -> do - liftIO $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text) - unWatchTransaciton tvar uuid - removeWebsocketConnection tvar uuid "Tx confirmed. Closing connection." - Nothing -> unWatchTransaciton tvar uuid + liftIO $ try @SomeException $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text) + return () + Nothing -> return () TransactionUnconfirmed -> return () liftIO $ threadDelay (20 * 1000000) @@ -93,7 +93,7 @@ watchTransaction :: => WebsocketTvar -> Text -> Connection - -> m () + -> m Text watchTransaction tVar txHash connection = do uuid <- (pack . show) <$> liftIO nextRandom @@ -109,21 +109,7 @@ watchTransaction tVar txHash connection = do return () setWebsocketConnection tVar uuid connection - -unWatchTransaciton :: - (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) - => WebsocketTvar - -> Text - -> m () -unWatchTransaciton tVar uuid = do - port <- getRedisPort - host <- getRedisHost - conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} - - liftIO $ Redis.runRedis conn $ do - _ <- Redis.del [Text.encodeUtf8 uuid] - return () - + return uuid getWatchedTransactions :: (Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) @@ -187,5 +173,7 @@ removeWebsocketConnection tvar txHash message = liftIO $ do return $ Map.lookup txHash connections case mConn of - Just (conn, _) -> liftIO $ WS.sendClose conn message + Just (conn, _) -> do + liftIO $ try @SomeException $ WS.sendClose conn message + return () Nothing -> return () \ No newline at end of file diff --git a/govtool/backend/vva-be.cabal b/govtool/backend/vva-be.cabal index 5f965ed49..d131cec33 100644 --- a/govtool/backend/vva-be.cabal +++ b/govtool/backend/vva-be.cabal @@ -109,6 +109,7 @@ library , uuid , servant-websockets , servant-openapi3 + , monad-loops exposed-modules: VVA.Config , VVA.CommandLine From 3d4f1658d56b972f73a07a117fbfc1ef7bf67a71 Mon Sep 17 00:00:00 2001 From: jankun4 Date: Mon, 1 Jul 2024 13:56:55 +0200 Subject: [PATCH 6/6] fix missing comma in config --- govtool/backend/src/VVA/Config.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/govtool/backend/src/VVA/Config.hs b/govtool/backend/src/VVA/Config.hs index fdb5cf644..f68471232 100644 --- a/govtool/backend/src/VVA/Config.hs +++ b/govtool/backend/src/VVA/Config.hs @@ -204,7 +204,7 @@ convertConfig VVAConfigInternal {..} = { redisHost = redisInternalConfigHost $ vVAConfigInternalRedisConfig, redisPort = redisInternalConfigPort $ vVAConfigInternalRedisConfig, redisPassword = redisInternalConfigPassword $ vVAConfigInternalRedisConfig - } + }, websocketLifetimeSeconds = vVAConfigInternalWebsocketLifetimeSeconds }