Skip to content

Commit

Permalink
fix: async write callback with wrong result (#30)
Browse files Browse the repository at this point in the history
* fix: response in wrong format for async callback

* test: add test

* fix: finish spec
  • Loading branch information
killme2008 authored Dec 4, 2023
1 parent 2caf6c2 commit a3668a6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/greptimedb_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ write_request(Stream, Request) ->
end.

%% @doc Finish the gRPC stream and wait the result.
-spec finish(Stream :: map()) -> {ok, term()} | {error, term()}.
-spec finish(Stream :: map()) -> {ok, term()} | {error, term(), term()} | timeout | stream_finished.
finish(Stream) ->
finish(Stream, 10_000).

%% @doc Finish the gRPC stream and wait the result with timeout in milliseconds.
-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}.
-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term(), term()} | timeout | stream_finished.
finish(Stream, Timeout) ->
try
ok = grpcbox_client:close_send(Stream),
Expand Down
10 changes: 9 additions & 1 deletion src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,15 @@ shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, Re
%% Write the last request and finish stream
case greptimedb_stream:write_request(Stream, Req) of
ok ->
Result = greptimedb_stream:finish(Stream),
Result = case greptimedb_stream:finish(Stream) of
{ok, Resp} ->
{ok, Resp};
{error, {?GRPC_STATUS_UNAUTHENTICATED, Msg}, Other} ->
{error, {unauth, Msg, Other}};
Err ->
{error, Err}
end,

lists:foreach(fun(ReplyTo) ->
reply(ReplyTo, Result)
end, ReplyToList);
Expand Down
15 changes: 15 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,22 @@ t_auth_error(_) ->
{pool_type, random},
{auth, {basic, #{username => <<"greptime_user">>, password => <<"wrong_pwd">>}}}],
{ok, Client} = greptimedb:start_client(Options),

%% sync write
{error, {unauth, _, _}} = greptimedb:write(Client, Metric, Points),
%% async write
Ref = make_ref(),
TestPid = self(),
ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []},

ok = greptimedb:async_write(Client, Metric, Points, ResultCallback),
receive
{{Ref, reply}, {error, {unauth, _, _}}} ->
ok;
{{Ref, reply}, _Other} ->
?assert(false)
end,

greptimedb:stop_client(Client),
ok.

Expand Down

0 comments on commit a3668a6

Please sign in to comment.