diff --git a/CHANGELOG.md b/CHANGELOG.md index becd8dc0a..684551e02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ changes. ## [Unreleased] - Added 'sentryenv' field in backend config file [Issue 1401](https://github.com/IntersectMBO/govtool/issues/1401) +- Add websocket transaction status endpoint /transaction/watch/ [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235) - Add wallet connector package [Issue 898](https://github.com/IntersectMBO/govtool/issues/898) - Change DRep without metadata name from "Sole Voter" to "Direct Voter" [Issue 880](https://github.com/IntersectMBO/govtool/issues/880) - Inicialize Usersnap into App [Issue 546](https://github.com/IntersectMBO/govtool/issues/546) @@ -25,6 +26,7 @@ changes. ### Added +- added `websocketlifetimeseconds` config field for webscoket connections, and watched transactions. [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235) - added separate async process that fetches new voting_anchors, validates their metadata using metadata-validation service, and then stores it in Redis database [Issue 1234](https://github.com/IntersectMBO/govtool/issues/1234) - added `bio` `dRepName` `email` `references` `metadataValid` and `metadataStatus` fields to `drep/list` - added `metadatavalidationmaxconcurrentrequests` field to the backend config diff --git a/govtool/backend/app/Main.hs b/govtool/backend/app/Main.hs index cc08afdd3..4a08ec583 100644 --- a/govtool/backend/app/Main.hs +++ b/govtool/backend/app/Main.hs @@ -8,6 +8,7 @@ module Main where +import GHC.Conc (newTVarIO) import Control.Concurrent (forkIO) import Control.Concurrent.QSem (newQSem) import Control.Exception (Exception, @@ -77,6 +78,7 @@ import VVA.Types (AppEnv (..), CacheEnv (..)) import Network.HTTP.Client hiding (Proxy, Request) import Network.HTTP.Client.TLS +import VVA.Transaction (processTransactionStatuses, timeoutStaleWebsocketConnections) proxyAPI :: Proxy (VVAApi :<|> SwaggerAPI) proxyAPI = Proxy @@ -137,7 +139,8 @@ startApp vvaConfig = do connectionPool <- createPool (connectPostgreSQL (encodeUtf8 (dbSyncConnectionString $ getter vvaConfig))) close 1 1 60 vvaTlsManager <- newManager tlsManagerSettings qsem <- newQSem (metadataValidationMaxConcurrentRequests vvaConfig) - let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem} + websocketConnectionsTVar <- newTVarIO mempty + let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem, vvaWebSocketConnections=websocketConnectionsTVar} _ <- forkIO $ do result <- runReaderT (runExceptT startFetchProcess) appEnv @@ -145,6 +148,18 @@ startApp vvaConfig = do Left e -> throw e Right _ -> return () + _ <- forkIO $ do + result <- runReaderT (runExceptT $ processTransactionStatuses websocketConnectionsTVar) appEnv + case result of + Left e -> throw e + Right _ -> return () + + _ <- forkIO $ do + result <- runReaderT (runExceptT $ timeoutStaleWebsocketConnections websocketConnectionsTVar) appEnv + case result of + Left e -> throw e + Right _ -> return () + server' <- mkVVAServer appEnv runSettings settings server' diff --git a/govtool/backend/example-config.json b/govtool/backend/example-config.json index 338b8db54..71ec04571 100644 --- a/govtool/backend/example-config.json +++ b/govtool/backend/example-config.json @@ -18,5 +18,6 @@ "host" : "localhost", "port" : 8094, "password": null - } + }, + "websocketlifetimeseconds": 60 } diff --git a/govtool/backend/src/VVA/API.hs b/govtool/backend/src/VVA/API.hs index f5e29cc63..b6835d958 100644 --- a/govtool/backend/src/VVA/API.hs +++ b/govtool/backend/src/VVA/API.hs @@ -5,9 +5,13 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE MultiParamTypeClasses #-} module VVA.API where +import Control.Monad.Loops (iterateUntil) +import qualified Network.WebSockets.Connection as WS +import Servant.API.WebSocket (WebSocket) import Control.Concurrent.QSem (waitQSem, signalQSem) import Control.Concurrent.Async (mapConcurrently) import Control.Exception (throw, throwIO) @@ -44,6 +48,7 @@ import VVA.Types (App, AppEnv (..), AppError (CriticalError, ValidationError, InternalError), CacheEnv (..)) import qualified VVA.Metadata as Metadata +import Servant.OpenApi (HasOpenApi, toOpenApi) type VVAApi = "drep" :> "list" @@ -78,6 +83,8 @@ type VVAApi = :<|> "network" :> "metrics" :> Get '[JSON] GetNetworkMetricsResponse :<|> "proposal" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse :<|> "drep" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse + :<|> "transaction" :> "watch" :> Capture "transactionId" HexText :> WebSocket + server :: App m => ServerT VVAApi m server = drepList :<|> getVotingPower @@ -93,6 +100,21 @@ server = drepList :<|> getNetworkMetrics :<|> getProposalMetadataValidationResponse :<|> getDRepMetadataValidationResponse + :<|> transactionWatch + +instance HasOpenApi WebSocket where + toOpenApi _ = mempty + + +transactionWatch :: App m => HexText -> WS.Connection -> m () +transactionWatch (unHexText -> transactionId) c = do + tvar <- asks vvaWebSocketConnections + uuid <- Transaction.watchTransaction tvar transactionId c + liftIO $ iterateUntil (== ("ACK" :: Text)) $ Text.stripEnd <$> WS.receiveData c + Transaction.removeWebsocketConnection tvar uuid "Tx confirmed. Closing connection." + return () + + mapDRepType :: Types.DRepType -> DRepType diff --git a/govtool/backend/src/VVA/Config.hs b/govtool/backend/src/VVA/Config.hs index 557ffbcd3..f68471232 100644 --- a/govtool/backend/src/VVA/Config.hs +++ b/govtool/backend/src/VVA/Config.hs @@ -29,6 +29,7 @@ module VVA.Config , vvaConfigToText , getMetadataValidationHost , getMetadataValidationPort + , getWebsocketLifetimeSeconds ) where import Conferer @@ -101,6 +102,8 @@ data VVAConfigInternal , vVAConfigInternalMetadataValidationMaxConcurrentRequests :: Int -- | Redis config , vVAConfigInternalRedisConfig :: RedisInternalConfig + -- | WebSocket lifetime in seconds + , vVAConfigInternalWebsocketLifetimeSeconds :: Int } deriving (FromConfig, Generic, Show) @@ -116,7 +119,8 @@ instance DefaultConfig VVAConfigInternal where vVAConfigInternalMetadataValidationHost = "localhost", vVAConfigInternalMetadataValidationPort = 3001, vVAConfigInternalMetadataValidationMaxConcurrentRequests = 10, - vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing + vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing, + vVAConfigInternalWebsocketLifetimeSeconds = 60 * 1 } data RedisConfig @@ -150,6 +154,8 @@ data VVAConfig , metadataValidationMaxConcurrentRequests :: Int -- | Redis config , redisConfig :: RedisConfig + -- | WebSocket lifetime in seconds + , websocketLifetimeSeconds :: Int } deriving (Generic, Show, ToJSON) @@ -198,7 +204,8 @@ convertConfig VVAConfigInternal {..} = { redisHost = redisInternalConfigHost $ vVAConfigInternalRedisConfig, redisPort = redisInternalConfigPort $ vVAConfigInternalRedisConfig, redisPassword = redisInternalConfigPassword $ vVAConfigInternalRedisConfig - } + }, + websocketLifetimeSeconds = vVAConfigInternalWebsocketLifetimeSeconds } -- | Load configuration from a file specified on the command line. Load from @@ -265,3 +272,9 @@ getMetadataValidationPort :: (Has VVAConfig r, MonadReader r m) => m Int getMetadataValidationPort = asks (metadataValidationPort . getter) + +-- | Access websocket lifetime in seconds +getWebsocketLifetimeSeconds :: + (Has VVAConfig r, MonadReader r m) => + m Int +getWebsocketLifetimeSeconds = asks (websocketLifetimeSeconds . getter) diff --git a/govtool/backend/src/VVA/Metadata.hs b/govtool/backend/src/VVA/Metadata.hs index d87fe2f37..56bc030a2 100644 --- a/govtool/backend/src/VVA/Metadata.hs +++ b/govtool/backend/src/VVA/Metadata.hs @@ -58,8 +58,6 @@ startFetchProcess :: startFetchProcess = go 0 where go latestKnownId = do - liftIO $ putStrLn "Fetching metadata..." - anchors <- getNewVotingAnchors latestKnownId if null anchors then do diff --git a/govtool/backend/src/VVA/Transaction.hs b/govtool/backend/src/VVA/Transaction.hs index 73ebc594a..dd987211c 100644 --- a/govtool/backend/src/VVA/Transaction.hs +++ b/govtool/backend/src/VVA/Transaction.hs @@ -1,10 +1,21 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE BlockArguments #-} module VVA.Transaction where -import Control.Exception (throw) +import Data.Time +import qualified Data.Map as Map +import Data.Map (Map) +import qualified Network.WebSockets.Connection as WS +import GHC.Conc (TVar, threadDelay, readTVar, readTVarIO, writeTVar, atomically) +import Data.Maybe +import Data.UUID.V4 (nextRandom) +import Network.WebSockets.Connection (Connection) +import qualified Database.Redis as Redis +import Control.Exception (throw, try, SomeException) import Control.Monad.Except (MonadError, throwError) import Control.Monad.Reader @@ -20,8 +31,8 @@ import qualified Database.PostgreSQL.Simple as SQL import VVA.Config import VVA.Pool (ConnectionPool, withPool) -import VVA.Types (AppError (..), - TransactionStatus (..)) +import VVA.Types (AppError (..), AppEnv (..), WebsocketTvar, + TransactionStatus (..), vvaWebSocketConnections) sqlFrom :: ByteString -> SQL.Query sqlFrom bs = fromString $ unpack $ Text.decodeUtf8 bs @@ -39,3 +50,130 @@ getTransactionStatus transactionId = withPool $ \conn -> do [SQL.Only True] -> return TransactionConfirmed [SQL.Only False] -> return TransactionUnconfirmed x -> throwError $ CriticalError ("Expected exactly one result from get-transaction-status.sql but got " <> pack (show (length x)) <> " of them. This should never happen") + +timeoutStaleWebsocketConnections :: + (Has AppEnv r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> m () +timeoutStaleWebsocketConnections tvar = do + currentTime <- liftIO getCurrentTime + connections <- liftIO $ readTVarIO tvar + websocketLifetimeSeconds <- getWebsocketLifetimeSeconds + let staleConnections = Map.filter (\(_, time) -> diffUTCTime currentTime time > fromIntegral websocketLifetimeSeconds) connections + forM_ (Map.keys staleConnections) $ \uuid -> do + removeWebsocketConnection tvar uuid "Websocket timed out." + + liftIO $ threadDelay (30 * 1000000) + timeoutStaleWebsocketConnections tvar + +processTransactionStatuses :: + (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> m () +processTransactionStatuses tvar = do + txs <- getWatchedTransactions + forM_ txs $ \(txHash, uuid) -> do + status <- getTransactionStatus txHash + case status of + TransactionConfirmed -> do + connection <- getWebsocketConnection tvar uuid + case connection of + Just (conn, _) -> do + liftIO $ try @SomeException $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text) + return () + Nothing -> return () + TransactionUnconfirmed -> return () + + liftIO $ threadDelay (20 * 1000000) + processTransactionStatuses tvar + + +watchTransaction :: + (Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> Connection + -> m Text +watchTransaction tVar txHash connection = do + + uuid <- (pack . show) <$> liftIO nextRandom + + port <- getRedisPort + host <- getRedisHost + conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + websocketLifetimeSeconds <- getWebsocketLifetimeSeconds + + liftIO $ Redis.runRedis conn $ do + _ <- Redis.set (Text.encodeUtf8 txHash) (Text.encodeUtf8 uuid) + Redis.expire (Text.encodeUtf8 txHash) $ fromIntegral websocketLifetimeSeconds + return () + + setWebsocketConnection tVar uuid connection + return uuid + +getWatchedTransactions :: + (Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m) + => m [(Text, Text)] +getWatchedTransactions = do + port <- getRedisPort + host <- getRedisHost + conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1} + keysResult <- liftIO $ Redis.runRedis conn $ Redis.keys "*" + case keysResult of + Left err -> do + -- handle error, maybe throw an AppError + throwError $ CriticalError $ "Error fetching keys: " <> (pack . show) err + Right keys -> do + keyValuePairs <- liftIO $ Redis.runRedis conn $ mapM getKeyValue keys + let filteredPairs = catMaybes keyValuePairs + pure filteredPairs + + +getKeyValue :: ByteString -> Redis.Redis (Maybe (Text, Text)) +getKeyValue key = do + valueResult <- Redis.get key + return $ case valueResult of + Left _ -> Nothing + Right Nothing -> Nothing + Right (Just value) -> Just (Text.decodeUtf8 key, Text.decodeUtf8 value) + +setWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> Connection + -> m () +setWebsocketConnection tvar txHash connection = do + liftIO do + currentTime <- getCurrentTime + atomically $ do + connections <- readTVar tvar + writeTVar tvar $ Map.insert txHash (connection, currentTime) connections + + +getWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> m (Maybe (Connection, UTCTime)) +getWebsocketConnection tvar txHash = do + connections <- liftIO $ readTVarIO tvar + return $ Map.lookup txHash connections + +removeWebsocketConnection :: + (MonadReader r m, MonadIO m, MonadError AppError m) + => WebsocketTvar + -> Text + -> Text + -> m () +removeWebsocketConnection tvar txHash message = liftIO $ do + mConn <- atomically $ do + connections <- readTVar tvar + writeTVar tvar $ Map.delete txHash connections + return $ Map.lookup txHash connections + + case mConn of + Just (conn, _) -> do + liftIO $ try @SomeException $ WS.sendClose conn message + return () + Nothing -> return () \ No newline at end of file diff --git a/govtool/backend/src/VVA/Types.hs b/govtool/backend/src/VVA/Types.hs index ef8e97303..f5d5952b2 100644 --- a/govtool/backend/src/VVA/Types.hs +++ b/govtool/backend/src/VVA/Types.hs @@ -9,6 +9,8 @@ module VVA.Types where +import GHC.Conc (TVar) +import qualified Network.WebSockets.Connection as WS import Data.Aeson.TH (deriveJSON) import VVA.API.Utils (jsonOptions) import GHC.Generics (Generic) @@ -24,6 +26,7 @@ import Data.Has import Data.Pool (Pool) import Data.Text (Text) import Data.Time (UTCTime) +import Data.Map (Map) import Database.PostgreSQL.Simple (Connection) @@ -205,6 +208,8 @@ data VotingAnchor type App m = (MonadReader AppEnv m, MonadIO m, MonadFail m, MonadError AppError m) +type WebsocketTvar = TVar (Map Text (WS.Connection, UTCTime)) + data AppEnv = AppEnv { vvaConfig :: VVAConfig @@ -212,6 +217,7 @@ data AppEnv , vvaConnectionPool :: Pool Connection , vvaTlsManager :: Manager , vvaMetadataQSem :: QSem + , vvaWebSocketConnections :: WebsocketTvar } instance Has VVAConfig AppEnv where @@ -233,3 +239,8 @@ instance Has Manager AppEnv where instance Has QSem AppEnv where getter AppEnv {vvaMetadataQSem} = vvaMetadataQSem modifier f a@AppEnv {vvaMetadataQSem} = a {vvaMetadataQSem = f vvaMetadataQSem} + +instance Has (TVar (Map Text (WS.Connection, UTCTime))) AppEnv where + getter AppEnv {vvaWebSocketConnections} = vvaWebSocketConnections + modifier f a@AppEnv {vvaWebSocketConnections} = a {vvaWebSocketConnections = f vvaWebSocketConnections} + diff --git a/govtool/backend/vva-be.cabal b/govtool/backend/vva-be.cabal index 0a5ef69d6..d131cec33 100644 --- a/govtool/backend/vva-be.cabal +++ b/govtool/backend/vva-be.cabal @@ -105,6 +105,11 @@ library , vector , async , hedis + , websockets + , uuid + , servant-websockets + , servant-openapi3 + , monad-loops exposed-modules: VVA.Config , VVA.CommandLine diff --git a/scripts/govtool/config/templates/backend-config.json.tpl b/scripts/govtool/config/templates/backend-config.json.tpl index bda65e26d..3f6cd00b5 100644 --- a/scripts/govtool/config/templates/backend-config.json.tpl +++ b/scripts/govtool/config/templates/backend-config.json.tpl @@ -18,5 +18,6 @@ "host" : "http://redis", "port" : 8094, "password": "" - } + }, + "websocketlifetimeseconds": 60 }