Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1235] add /transaction/watch endpoint & background tx status process #1304

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ changes.
## [Unreleased]

- Added 'sentryenv' field in backend config file [Issue 1401](https://github.com/IntersectMBO/govtool/issues/1401)
- Add websocket transaction status endpoint /transaction/watch/<TX-HASH> [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235)
- Add wallet connector package [Issue 898](https://github.com/IntersectMBO/govtool/issues/898)
- Change DRep without metadata name from "Sole Voter" to "Direct Voter" [Issue 880](https://github.com/IntersectMBO/govtool/issues/880)
- Inicialize Usersnap into App [Issue 546](https://github.com/IntersectMBO/govtool/issues/546)
Expand All @@ -25,6 +26,7 @@ changes.

### Added

- added `websocketlifetimeseconds` config field for webscoket connections, and watched transactions. [Issue 1235](https://github.com/IntersectMBO/govtool/issues/1235)
- added separate async process that fetches new voting_anchors, validates their metadata using metadata-validation service, and then stores it in Redis database [Issue 1234](https://github.com/IntersectMBO/govtool/issues/1234)
- added `bio` `dRepName` `email` `references` `metadataValid` and `metadataStatus` fields to `drep/list`
- added `metadatavalidationmaxconcurrentrequests` field to the backend config
Expand Down
17 changes: 16 additions & 1 deletion govtool/backend/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

module Main where

import GHC.Conc (newTVarIO)
import Control.Concurrent (forkIO)
import Control.Concurrent.QSem (newQSem)
import Control.Exception (Exception,
Expand Down Expand Up @@ -77,6 +78,7 @@ import VVA.Types (AppEnv (..),
CacheEnv (..))
import Network.HTTP.Client hiding (Proxy, Request)
import Network.HTTP.Client.TLS
import VVA.Transaction (processTransactionStatuses, timeoutStaleWebsocketConnections)

proxyAPI :: Proxy (VVAApi :<|> SwaggerAPI)
proxyAPI = Proxy
Expand Down Expand Up @@ -137,14 +139,27 @@ startApp vvaConfig = do
connectionPool <- createPool (connectPostgreSQL (encodeUtf8 (dbSyncConnectionString $ getter vvaConfig))) close 1 1 60
vvaTlsManager <- newManager tlsManagerSettings
qsem <- newQSem (metadataValidationMaxConcurrentRequests vvaConfig)
let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem}
websocketConnectionsTVar <- newTVarIO mempty
let appEnv = AppEnv {vvaConfig=vvaConfig, vvaCache=cacheEnv, vvaConnectionPool=connectionPool, vvaTlsManager, vvaMetadataQSem=qsem, vvaWebSocketConnections=websocketConnectionsTVar}

_ <- forkIO $ do
result <- runReaderT (runExceptT startFetchProcess) appEnv
case result of
Left e -> throw e
Right _ -> return ()

_ <- forkIO $ do
result <- runReaderT (runExceptT $ processTransactionStatuses websocketConnectionsTVar) appEnv
case result of
Left e -> throw e
Right _ -> return ()

_ <- forkIO $ do
result <- runReaderT (runExceptT $ timeoutStaleWebsocketConnections websocketConnectionsTVar) appEnv
case result of
Left e -> throw e
Right _ -> return ()

server' <- mkVVAServer appEnv
runSettings settings server'

Expand Down
3 changes: 2 additions & 1 deletion govtool/backend/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
"host" : "localhost",
"port" : 8094,
"password": null
}
},
"websocketlifetimeseconds": 60
}
22 changes: 22 additions & 0 deletions govtool/backend/src/VVA/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE MultiParamTypeClasses #-}

module VVA.API where

import Control.Monad.Loops (iterateUntil)
import qualified Network.WebSockets.Connection as WS
import Servant.API.WebSocket (WebSocket)
import Control.Concurrent.QSem (waitQSem, signalQSem)
import Control.Concurrent.Async (mapConcurrently)
import Control.Exception (throw, throwIO)
Expand Down Expand Up @@ -44,6 +48,7 @@ import VVA.Types (App, AppEnv (..),
AppError (CriticalError, ValidationError, InternalError),
CacheEnv (..))
import qualified VVA.Metadata as Metadata
import Servant.OpenApi (HasOpenApi, toOpenApi)

type VVAApi =
"drep" :> "list"
Expand Down Expand Up @@ -78,6 +83,8 @@ type VVAApi =
:<|> "network" :> "metrics" :> Get '[JSON] GetNetworkMetricsResponse
:<|> "proposal" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse
:<|> "drep" :> "metadata" :> "validate" :> ReqBody '[JSON] MetadataValidationParams :> Post '[JSON] MetadataValidationResponse
:<|> "transaction" :> "watch" :> Capture "transactionId" HexText :> WebSocket

server :: App m => ServerT VVAApi m
server = drepList
:<|> getVotingPower
Expand All @@ -93,6 +100,21 @@ server = drepList
:<|> getNetworkMetrics
:<|> getProposalMetadataValidationResponse
:<|> getDRepMetadataValidationResponse
:<|> transactionWatch

instance HasOpenApi WebSocket where
toOpenApi _ = mempty


transactionWatch :: App m => HexText -> WS.Connection -> m ()
transactionWatch (unHexText -> transactionId) c = do
tvar <- asks vvaWebSocketConnections
uuid <- Transaction.watchTransaction tvar transactionId c
liftIO $ iterateUntil (== ("ACK" :: Text)) $ Text.stripEnd <$> WS.receiveData c
Transaction.removeWebsocketConnection tvar uuid "Tx confirmed. Closing connection."
return ()




mapDRepType :: Types.DRepType -> DRepType
Expand Down
17 changes: 15 additions & 2 deletions govtool/backend/src/VVA/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module VVA.Config
, vvaConfigToText
, getMetadataValidationHost
, getMetadataValidationPort
, getWebsocketLifetimeSeconds
) where

import Conferer
Expand Down Expand Up @@ -101,6 +102,8 @@ data VVAConfigInternal
, vVAConfigInternalMetadataValidationMaxConcurrentRequests :: Int
-- | Redis config
, vVAConfigInternalRedisConfig :: RedisInternalConfig
-- | WebSocket lifetime in seconds
, vVAConfigInternalWebsocketLifetimeSeconds :: Int
}
deriving (FromConfig, Generic, Show)

Expand All @@ -116,7 +119,8 @@ instance DefaultConfig VVAConfigInternal where
vVAConfigInternalMetadataValidationHost = "localhost",
vVAConfigInternalMetadataValidationPort = 3001,
vVAConfigInternalMetadataValidationMaxConcurrentRequests = 10,
vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing
vVAConfigInternalRedisConfig = RedisInternalConfig "localhost" 6379 Nothing,
vVAConfigInternalWebsocketLifetimeSeconds = 60 * 1
}

data RedisConfig
Expand Down Expand Up @@ -150,6 +154,8 @@ data VVAConfig
, metadataValidationMaxConcurrentRequests :: Int
-- | Redis config
, redisConfig :: RedisConfig
-- | WebSocket lifetime in seconds
, websocketLifetimeSeconds :: Int
}
deriving (Generic, Show, ToJSON)

Expand Down Expand Up @@ -198,7 +204,8 @@ convertConfig VVAConfigInternal {..} =
{ redisHost = redisInternalConfigHost $ vVAConfigInternalRedisConfig,
redisPort = redisInternalConfigPort $ vVAConfigInternalRedisConfig,
redisPassword = redisInternalConfigPassword $ vVAConfigInternalRedisConfig
}
},
websocketLifetimeSeconds = vVAConfigInternalWebsocketLifetimeSeconds
}

-- | Load configuration from a file specified on the command line. Load from
Expand Down Expand Up @@ -265,3 +272,9 @@ getMetadataValidationPort ::
(Has VVAConfig r, MonadReader r m) =>
m Int
getMetadataValidationPort = asks (metadataValidationPort . getter)

-- | Access websocket lifetime in seconds
getWebsocketLifetimeSeconds ::
(Has VVAConfig r, MonadReader r m) =>
m Int
getWebsocketLifetimeSeconds = asks (websocketLifetimeSeconds . getter)
2 changes: 0 additions & 2 deletions govtool/backend/src/VVA/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ startFetchProcess ::
startFetchProcess = go 0
where
go latestKnownId = do
liftIO $ putStrLn "Fetching metadata..."

anchors <- getNewVotingAnchors latestKnownId
if null anchors
then do
Expand Down
144 changes: 141 additions & 3 deletions govtool/backend/src/VVA/Transaction.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BlockArguments #-}

module VVA.Transaction where

import Control.Exception (throw)
import Data.Time
import qualified Data.Map as Map
import Data.Map (Map)
import qualified Network.WebSockets.Connection as WS
import GHC.Conc (TVar, threadDelay, readTVar, readTVarIO, writeTVar, atomically)
import Data.Maybe
import Data.UUID.V4 (nextRandom)
import Network.WebSockets.Connection (Connection)
import qualified Database.Redis as Redis
import Control.Exception (throw, try, SomeException)
import Control.Monad.Except (MonadError, throwError)
import Control.Monad.Reader

Expand All @@ -20,8 +31,8 @@ import qualified Database.PostgreSQL.Simple as SQL

import VVA.Config
import VVA.Pool (ConnectionPool, withPool)
import VVA.Types (AppError (..),
TransactionStatus (..))
import VVA.Types (AppError (..), AppEnv (..), WebsocketTvar,
TransactionStatus (..), vvaWebSocketConnections)

sqlFrom :: ByteString -> SQL.Query
sqlFrom bs = fromString $ unpack $ Text.decodeUtf8 bs
Expand All @@ -39,3 +50,130 @@ getTransactionStatus transactionId = withPool $ \conn -> do
[SQL.Only True] -> return TransactionConfirmed
[SQL.Only False] -> return TransactionUnconfirmed
x -> throwError $ CriticalError ("Expected exactly one result from get-transaction-status.sql but got " <> pack (show (length x)) <> " of them. This should never happen")

timeoutStaleWebsocketConnections ::
(Has AppEnv r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> m ()
timeoutStaleWebsocketConnections tvar = do
currentTime <- liftIO getCurrentTime
connections <- liftIO $ readTVarIO tvar
websocketLifetimeSeconds <- getWebsocketLifetimeSeconds
let staleConnections = Map.filter (\(_, time) -> diffUTCTime currentTime time > fromIntegral websocketLifetimeSeconds) connections
forM_ (Map.keys staleConnections) $ \uuid -> do
removeWebsocketConnection tvar uuid "Websocket timed out."

liftIO $ threadDelay (30 * 1000000)
timeoutStaleWebsocketConnections tvar

processTransactionStatuses ::
(Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> m ()
processTransactionStatuses tvar = do
txs <- getWatchedTransactions
forM_ txs $ \(txHash, uuid) -> do
status <- getTransactionStatus txHash
case status of
TransactionConfirmed -> do
connection <- getWebsocketConnection tvar uuid
case connection of
Just (conn, _) -> do
liftIO $ try @SomeException $ WS.sendTextData conn ("{\"status\": \"confirmed\"}" :: Text)
return ()
Nothing -> return ()
TransactionUnconfirmed -> return ()

liftIO $ threadDelay (20 * 1000000)
processTransactionStatuses tvar


watchTransaction ::
(Has AppEnv r, Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> Text
-> Connection
-> m Text
watchTransaction tVar txHash connection = do

uuid <- (pack . show) <$> liftIO nextRandom

port <- getRedisPort
host <- getRedisHost
conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1}
websocketLifetimeSeconds <- getWebsocketLifetimeSeconds

liftIO $ Redis.runRedis conn $ do
_ <- Redis.set (Text.encodeUtf8 txHash) (Text.encodeUtf8 uuid)
Redis.expire (Text.encodeUtf8 txHash) $ fromIntegral websocketLifetimeSeconds
return ()

setWebsocketConnection tVar uuid connection
return uuid

getWatchedTransactions ::
(Has ConnectionPool r, Has VVAConfig r, MonadReader r m, MonadIO m, MonadError AppError m)
=> m [(Text, Text)]
getWatchedTransactions = do
port <- getRedisPort
host <- getRedisHost
conn <- liftIO $ Redis.checkedConnect $ Redis.defaultConnectInfo {Redis.connectHost = unpack host, Redis.connectPort = Redis.PortNumber $ fromIntegral port, Redis.connectDatabase = 1}
keysResult <- liftIO $ Redis.runRedis conn $ Redis.keys "*"
case keysResult of
Left err -> do
-- handle error, maybe throw an AppError
throwError $ CriticalError $ "Error fetching keys: " <> (pack . show) err
Right keys -> do
keyValuePairs <- liftIO $ Redis.runRedis conn $ mapM getKeyValue keys
let filteredPairs = catMaybes keyValuePairs
pure filteredPairs


getKeyValue :: ByteString -> Redis.Redis (Maybe (Text, Text))
getKeyValue key = do
valueResult <- Redis.get key
return $ case valueResult of
Left _ -> Nothing
Right Nothing -> Nothing
Right (Just value) -> Just (Text.decodeUtf8 key, Text.decodeUtf8 value)

setWebsocketConnection ::
(MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> Text
-> Connection
-> m ()
setWebsocketConnection tvar txHash connection = do
liftIO do
currentTime <- getCurrentTime
atomically $ do
connections <- readTVar tvar
writeTVar tvar $ Map.insert txHash (connection, currentTime) connections


getWebsocketConnection ::
(MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> Text
-> m (Maybe (Connection, UTCTime))
getWebsocketConnection tvar txHash = do
connections <- liftIO $ readTVarIO tvar
return $ Map.lookup txHash connections

removeWebsocketConnection ::
(MonadReader r m, MonadIO m, MonadError AppError m)
=> WebsocketTvar
-> Text
-> Text
-> m ()
removeWebsocketConnection tvar txHash message = liftIO $ do
mConn <- atomically $ do
connections <- readTVar tvar
writeTVar tvar $ Map.delete txHash connections
return $ Map.lookup txHash connections

case mConn of
Just (conn, _) -> do
liftIO $ try @SomeException $ WS.sendClose conn message
return ()
Nothing -> return ()
11 changes: 11 additions & 0 deletions govtool/backend/src/VVA/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

module VVA.Types where

import GHC.Conc (TVar)
import qualified Network.WebSockets.Connection as WS
import Data.Aeson.TH (deriveJSON)
import VVA.API.Utils (jsonOptions)
import GHC.Generics (Generic)
Expand All @@ -24,6 +26,7 @@ import Data.Has
import Data.Pool (Pool)
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Map (Map)

import Database.PostgreSQL.Simple (Connection)

Expand Down Expand Up @@ -205,13 +208,16 @@ data VotingAnchor

type App m = (MonadReader AppEnv m, MonadIO m, MonadFail m, MonadError AppError m)

type WebsocketTvar = TVar (Map Text (WS.Connection, UTCTime))

data AppEnv
= AppEnv
{ vvaConfig :: VVAConfig
, vvaCache :: CacheEnv
, vvaConnectionPool :: Pool Connection
, vvaTlsManager :: Manager
, vvaMetadataQSem :: QSem
, vvaWebSocketConnections :: WebsocketTvar
}

instance Has VVAConfig AppEnv where
Expand All @@ -233,3 +239,8 @@ instance Has Manager AppEnv where
instance Has QSem AppEnv where
getter AppEnv {vvaMetadataQSem} = vvaMetadataQSem
modifier f a@AppEnv {vvaMetadataQSem} = a {vvaMetadataQSem = f vvaMetadataQSem}

instance Has (TVar (Map Text (WS.Connection, UTCTime))) AppEnv where
getter AppEnv {vvaWebSocketConnections} = vvaWebSocketConnections
modifier f a@AppEnv {vvaWebSocketConnections} = a {vvaWebSocketConnections = f vvaWebSocketConnections}

Loading
Loading