diff --git a/IHP/DataSync/ControllerImpl.hs b/IHP/DataSync/ControllerImpl.hs index 568dd345d..5113baf15 100644 --- a/IHP/DataSync/ControllerImpl.hs +++ b/IHP/DataSync/ControllerImpl.hs @@ -39,7 +39,6 @@ import qualified Network.WebSockets as WS runDataSyncController :: ( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)) , ?applicationContext :: ApplicationContext - , ?connection :: WS.Connection , ?context :: ControllerContext , ?modelContext :: ModelContext , ?state :: IORef DataSyncController @@ -51,9 +50,52 @@ runDataSyncController :: runDataSyncController ensureRLSEnabled installTableChangeTriggers receiveData sendJSON handleCustomMessage = do setState DataSyncReady { subscriptions = HashMap.empty, transactions = HashMap.empty, asyncs = [] } - let pgListener = ?applicationContext |> get #pgListener + let handleMessage = buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleCustomMessage - let + forever do + message <- Aeson.eitherDecodeStrict' <$> receiveData + + case message of + Right decodedMessage -> do + let requestId = get #requestId decodedMessage + + Exception.mask \restore -> do + -- Handle the messages in an async way + -- This increases throughput as multiple queries can be fetched + -- in parallel + handlerProcess <- async $ restore do + result <- Exception.try (handleMessage decodedMessage) + + case result of + Left (e :: Exception.SomeException) -> do + let errorMessage = case fromException e of + Just (enhancedSqlError :: EnhancedSqlError) -> cs (get #sqlErrorMsg (get #sqlError enhancedSqlError)) + Nothing -> cs (displayException e) + Log.error (tshow e) + sendJSON DataSyncError { requestId, errorMessage } + Right result -> pure () + + modifyIORef' ?state (\state -> state |> modify #asyncs (handlerProcess:)) + pure () + Left errorMessage -> sendJSON FailedToDecodeMessageError { errorMessage = cs errorMessage } +{-# INLINE runDataSyncController #-} + + +buildMessageHandler :: + ( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord)) + , ?applicationContext :: ApplicationContext + , ?context :: ControllerContext + , ?modelContext :: ModelContext + , ?state :: IORef DataSyncController + , PG.ToField (PrimaryKey (GetTableName CurrentUserRecord)) + , Typeable CurrentUserRecord + , HasNewSessionUrl CurrentUserRecord + , Show (PrimaryKey (GetTableName CurrentUserRecord)) + ) + => _ -> _ -> _ -> _ -> (DataSyncMessage -> IO ()) +buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleCustomMessage = handleMessage + where + pgListener = ?applicationContext |> get #pgListener handleMessage :: DataSyncMessage -> IO () handleMessage DataSyncQuery { query, requestId, transactionId } = do ensureRLSEnabled (get #table query) @@ -330,34 +372,6 @@ runDataSyncController ensureRLSEnabled installTableChangeTriggers receiveData se handleMessage otherwise = handleCustomMessage sendJSON otherwise - forever do - message <- Aeson.eitherDecodeStrict' <$> receiveData - - case message of - Right decodedMessage -> do - let requestId = get #requestId decodedMessage - - Exception.mask \restore -> do - -- Handle the messages in an async way - -- This increases throughput as multiple queries can be fetched - -- in parallel - handlerProcess <- async $ restore do - result <- Exception.try (handleMessage decodedMessage) - - case result of - Left (e :: Exception.SomeException) -> do - let errorMessage = case fromException e of - Just (enhancedSqlError :: EnhancedSqlError) -> cs (get #sqlErrorMsg (get #sqlError enhancedSqlError)) - Nothing -> cs (displayException e) - Log.error (tshow e) - sendJSON DataSyncError { requestId, errorMessage } - Right result -> pure () - - modifyIORef' ?state (\state -> state |> modify #asyncs (handlerProcess:)) - pure () - Left errorMessage -> sendJSON FailedToDecodeMessageError { errorMessage = cs errorMessage } -{-# INLINE runDataSyncController #-} - cleanupAllSubscriptions :: _ => (?state :: IORef DataSyncController, ?applicationContext :: ApplicationContext) => IO () cleanupAllSubscriptions = do state <- getState