Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ClientCall in ClientReaderHandler and ClientRWHandler #87

Merged
merged 1 commit into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/src/Network/GRPC/LowLevel/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ compileNormalRequestResults x =
-- clientReader (client side of server streaming mode)

-- | First parameter is initial server metadata.
type ClientReaderHandler = MetadataMap -> StreamRecv ByteString -> IO ()
type ClientReaderHandler = ClientCall -> MetadataMap -> StreamRecv ByteString -> IO ()
type ClientReaderResult = (MetadataMap, C.StatusCode, StatusDetails)

clientReader :: Client
Expand All @@ -269,13 +269,13 @@ clientReader :: Client
clientReader cl@Client{ clientCQ = cq } rm tm body initMeta f =
withClientCall cl rm tm go
where
go (unsafeCC -> c) = runExceptT $ do
go cc@(unsafeCC -> c) = runExceptT $ do
void $ runOps' c cq [ OpSendInitialMetadata initMeta
, OpSendMessage body
, OpSendCloseFromClient
]
srvMD <- recvInitialMetadata c cq
liftIO $ f srvMD (streamRecvPrim c cq)
liftIO $ f cc srvMD (streamRecvPrim c cq)
recvStatusOnClient c cq

--------------------------------------------------------------------------------
Expand Down Expand Up @@ -326,7 +326,8 @@ pattern CWRFinal mmsg initMD trailMD st ds
-- clientRW (client side of bidirectional streaming mode)

type ClientRWHandler
= IO (Either GRPCIOError MetadataMap)
= ClientCall
-> IO (Either GRPCIOError MetadataMap)
-> StreamRecv ByteString
-> StreamSend ByteString
-> WritesDone
Expand All @@ -352,7 +353,7 @@ clientRW' :: Client
-> MetadataMap
-> ClientRWHandler
-> IO (Either GRPCIOError ClientRWResult)
clientRW' (clientCQ -> cq) (unsafeCC -> c) initMeta f = runExceptT $ do
clientRW' (clientCQ -> cq) cc@(unsafeCC -> c) initMeta f = runExceptT $ do
sendInitialMetadata c cq initMeta

-- 'mdmv' is used to synchronize between callers of 'getMD' and 'recv'
Expand Down Expand Up @@ -412,7 +413,7 @@ clientRW' (clientCQ -> cq) (unsafeCC -> c) initMeta f = runExceptT $ do
-- programmer.
writesDone = writesDonePrim c cq

liftIO (f getMD recv send writesDone)
liftIO (f cc getMD recv send writesDone)
recvStatusOnClient c cq -- Finish()

--------------------------------------------------------------------------------
Expand Down
8 changes: 4 additions & 4 deletions core/tests/LowLevelTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ testServerStreaming =

client c = do
rm <- clientRegisterMethodServerStreaming c "/feed"
eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do
eea <- clientReader c rm 10 clientPay clientInitMD $ \_cc initMD recv -> do
checkMD "Server initial metadata mismatch" serverInitMD initMD
forM_ pays $ \p -> recv `is` Right (Just p)
recv `is` Right Nothing
Expand Down Expand Up @@ -436,7 +436,7 @@ testServerStreamingUnregistered =

client c = do
rm <- clientRegisterMethodServerStreaming c "/feed"
eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do
eea <- clientReader c rm 10 clientPay clientInitMD $ \_cc initMD recv -> do
checkMD "Server initial metadata mismatch" serverInitMD initMD
forM_ pays $ \p -> recv `is` Right (Just p)
recv `is` Right Nothing
Expand Down Expand Up @@ -517,7 +517,7 @@ testBiDiStreaming =

client c = do
rm <- clientRegisterMethodBiDiStreaming c "/bidi"
eea <- clientRW c rm 10 clientInitMD $ \getMD recv send writesDone -> do
eea <- clientRW c rm 10 clientInitMD $ \_cc getMD recv send writesDone -> do
either clientFail (checkMD "Server rsp metadata mismatch" serverInitMD) =<< getMD
send "cw0" `is` Right ()
recv `is` Right (Just "sw0")
Expand Down Expand Up @@ -553,7 +553,7 @@ testBiDiStreamingUnregistered =

client c = do
rm <- clientRegisterMethodBiDiStreaming c "/bidi"
eea <- clientRW c rm 10 clientInitMD $ \getMD recv send writesDone -> do
eea <- clientRW c rm 10 clientInitMD $ \_cc getMD recv send writesDone -> do
either clientFail (checkMD "Server rsp metadata mismatch" serverInitMD) =<< getMD
send "cw0" `is` Right ()
recv `is` Right (Just "sw0")
Expand Down
4 changes: 2 additions & 2 deletions examples/hellos/hellos-client/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ doHelloSS c n = do
let pay = SSRqt "server streaming mode" (fromIntegral n)
enc = BL.toStrict . toLazyByteString $ pay
err desc e = fail $ "doHelloSS: " ++ desc ++ " error: " ++ show e
eea <- clientReader c rm n enc mempty $ \_md recv -> do
eea <- clientReader c rm n enc mempty $ \_cc _md recv -> do
n' <- flip fix (0::Int) $ \go i -> recv >>= \case
Left e -> err "recv" e
Right Nothing -> return i
Expand Down Expand Up @@ -84,7 +84,7 @@ doHelloBi c n = do
let pay = BiRqtRpy "bidi payload"
enc = BL.toStrict . toLazyByteString $ pay
err desc e = fail $ "doHelloBi: " ++ desc ++ " error: " ++ show e
eea <- clientRW c rm n mempty $ \_getMD recv send writesDone -> do
eea <- clientRW c rm n mempty $ \_cc _getMD recv send writesDone -> do
-- perform n writes on a worker thread
thd <- async $ do
replicateM_ n $ send enc >>= \case
Expand Down
10 changes: 5 additions & 5 deletions src/Network/GRPC/HighLevel/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ data ClientRequest (streamType :: GRPCMethodType) request response where
-- | The final field will be invoked once, and it should repeatedly
-- invoke its final argument (of type @(StreamRecv response)@)
-- in order to obtain the streaming response incrementally.
ClientReaderRequest :: request -> TimeoutSeconds -> MetadataMap -> (MetadataMap -> StreamRecv response -> IO ()) -> ClientRequest 'ServerStreaming request response
ClientBiDiRequest :: TimeoutSeconds -> MetadataMap -> (MetadataMap -> StreamRecv response -> StreamSend request -> WritesDone -> IO ()) -> ClientRequest 'BiDiStreaming request response
ClientReaderRequest :: request -> TimeoutSeconds -> MetadataMap -> (LL.ClientCall -> MetadataMap -> StreamRecv response -> IO ()) -> ClientRequest 'ServerStreaming request response
ClientBiDiRequest :: TimeoutSeconds -> MetadataMap -> (LL.ClientCall -> MetadataMap -> StreamRecv response -> StreamSend request -> WritesDone -> IO ()) -> ClientRequest 'BiDiStreaming request response

data ClientResult (streamType :: GRPCMethodType) response where
ClientNormalResponse :: response -> MetadataMap -> MetadataMap -> StatusCode -> StatusDetails -> ClientResult 'Normal response
Expand Down Expand Up @@ -125,13 +125,13 @@ clientRequest client (RegisteredMethod method) (ClientWriterRequest timeout meta
Right parsedRsp ->
ClientWriterResponse parsedRsp initMD_ trailMD_ rspCode_ details_
clientRequest client (RegisteredMethod method) (ClientReaderRequest req timeout meta handler) =
mkResponse <$> LL.clientReader client method timeout (BL.toStrict (toLazyByteString req)) meta (\m recv -> handler m (convertRecv recv))
mkResponse <$> LL.clientReader client method timeout (BL.toStrict (toLazyByteString req)) meta (\cc m recv -> handler cc m (convertRecv recv))
where
mkResponse (Left ioError_) = ClientErrorResponse (ClientIOError ioError_)
mkResponse (Right (meta_, rspCode_, details_)) =
ClientReaderResponse meta_ rspCode_ details_
clientRequest client (RegisteredMethod method) (ClientBiDiRequest timeout meta handler) =
mkResponse <$> LL.clientRW client method timeout meta (\_m recv send writesDone -> handler meta (convertRecv recv) (convertSend send) writesDone)
mkResponse <$> LL.clientRW client method timeout meta (\cc _m recv send writesDone -> handler cc meta (convertRecv recv) (convertSend send) writesDone)
where
mkResponse (Left ioError_) = ClientErrorResponse (ClientIOError ioError_)
mkResponse (Right (meta_, rspCode_, details_)) =
Expand Down Expand Up @@ -164,7 +164,7 @@ simplifyServerStreaming :: TimeoutSeconds
-- ^ Endpoint implementation (typically generated by grpc-haskell)
-> request
-- ^ Request payload
-> (MetadataMap -> StreamRecv response -> IO ())
-> (LL.ClientCall -> MetadataMap -> StreamRecv response -> IO ())
-- ^ Stream handler; note that the 'StreamRecv'
-- action must be called repeatedly in order to
-- consume the stream
Expand Down
4 changes: 2 additions & 2 deletions tests/TestClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ testServerStreamingCall client = testCase "Server-streaming call" $
checkResults nums recv
res <- simpleServiceServerStreamingCall client $
ClientReaderRequest (SimpleServiceRequest "Test" (fromList nums)) 10 mempty
(\_ -> checkResults nums)
(\_ _ -> checkResults nums)
case res of
ClientErrorResponse err -> assertFailure ("ClientErrorResponse: " <> show err)
ClientReaderResponse _ sts _ ->
Expand All @@ -114,7 +114,7 @@ testBiDiStreamingCall client = testCase "Bidi-streaming call" $
iterations <- randomRIO (50, 500)

res <- simpleServiceBiDiStreamingCall client $
ClientBiDiRequest 10 mempty (\_ -> handleRequests iterations)
ClientBiDiRequest 10 mempty (\_ _ -> handleRequests iterations)
case res of
ClientErrorResponse err -> assertFailure ("ClientErrorResponse: " <> show err)
ClientBiDiResponse _ sts _ ->
Expand Down