Skip to content

Commit

Permalink
Extracted DataSync message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mpscholten committed Jul 22, 2022
1 parent f799009 commit aab5745
Showing 1 changed file with 45 additions and 31 deletions.
76 changes: 45 additions & 31 deletions IHP/DataSync/ControllerImpl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import qualified Network.WebSockets as WS
runDataSyncController ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?applicationContext :: ApplicationContext
, ?connection :: WS.Connection
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?state :: IORef DataSyncController
Expand All @@ -51,9 +50,52 @@ runDataSyncController ::
runDataSyncController ensureRLSEnabled installTableChangeTriggers receiveData sendJSON handleCustomMessage = do
setState DataSyncReady { subscriptions = HashMap.empty, transactions = HashMap.empty, asyncs = [] }

let pgListener = ?applicationContext |> get #pgListener
let handleMessage = buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleCustomMessage

let
forever do
message <- Aeson.eitherDecodeStrict' <$> receiveData

case message of
Right decodedMessage -> do
let requestId = get #requestId decodedMessage

Exception.mask \restore -> do
-- Handle the messages in an async way
-- This increases throughput as multiple queries can be fetched
-- in parallel
handlerProcess <- async $ restore do
result <- Exception.try (handleMessage decodedMessage)

case result of
Left (e :: Exception.SomeException) -> do
let errorMessage = case fromException e of
Just (enhancedSqlError :: EnhancedSqlError) -> cs (get #sqlErrorMsg (get #sqlError enhancedSqlError))
Nothing -> cs (displayException e)
Log.error (tshow e)
sendJSON DataSyncError { requestId, errorMessage }
Right result -> pure ()

modifyIORef' ?state (\state -> state |> modify #asyncs (handlerProcess:))
pure ()
Left errorMessage -> sendJSON FailedToDecodeMessageError { errorMessage = cs errorMessage }
{-# INLINE runDataSyncController #-}


buildMessageHandler ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?applicationContext :: ApplicationContext
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?state :: IORef DataSyncController
, PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
, Typeable CurrentUserRecord
, HasNewSessionUrl CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
)
=> _ -> _ -> _ -> _ -> (DataSyncMessage -> IO ())
buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleCustomMessage = handleMessage
where
pgListener = ?applicationContext |> get #pgListener
handleMessage :: DataSyncMessage -> IO ()
handleMessage DataSyncQuery { query, requestId, transactionId } = do
ensureRLSEnabled (get #table query)
Expand Down Expand Up @@ -330,34 +372,6 @@ runDataSyncController ensureRLSEnabled installTableChangeTriggers receiveData se
handleMessage otherwise = handleCustomMessage sendJSON otherwise
forever do
message <- Aeson.eitherDecodeStrict' <$> receiveData
case message of
Right decodedMessage -> do
let requestId = get #requestId decodedMessage
Exception.mask \restore -> do
-- Handle the messages in an async way
-- This increases throughput as multiple queries can be fetched
-- in parallel
handlerProcess <- async $ restore do
result <- Exception.try (handleMessage decodedMessage)
case result of
Left (e :: Exception.SomeException) -> do
let errorMessage = case fromException e of
Just (enhancedSqlError :: EnhancedSqlError) -> cs (get #sqlErrorMsg (get #sqlError enhancedSqlError))
Nothing -> cs (displayException e)
Log.error (tshow e)
sendJSON DataSyncError { requestId, errorMessage }
Right result -> pure ()
modifyIORef' ?state (\state -> state |> modify #asyncs (handlerProcess:))
pure ()
Left errorMessage -> sendJSON FailedToDecodeMessageError { errorMessage = cs errorMessage }
{-# INLINE runDataSyncController #-}
cleanupAllSubscriptions :: _ => (?state :: IORef DataSyncController, ?applicationContext :: ApplicationContext) => IO ()
cleanupAllSubscriptions = do
state <- getState
Expand Down

0 comments on commit aab5745

Please sign in to comment.