Skip to content

Commit

Permalink
add tail function for debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
evanmcc committed Jan 25, 2018
1 parent 7f2d48a commit d7138d8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
35 changes: 35 additions & 0 deletions src/vg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
describe_topic/1,
deactivate_topic/1,
regenerate_topic_index/1,
tail_topic/1, tail_topic/2,
running_topics/0
]).

Expand Down Expand Up @@ -186,7 +187,41 @@ deactivate_topic(Topic) ->
regenerate_topic_index(Topic) ->
vg_topic_mgr:regenerate_index(Topic, 0).

tail_topic(Topic) ->
tail_topic(Topic, #{}).

-spec tail_topic(binary(), Opts) -> ok when
Opts :: #{records => pos_integer(), % default 10 records
time => pos_integer()}. % default 30 seconds
tail_topic(Topic, Opts) ->
Printer = erlang:spawn_opt(fun() -> tail_printer(Topic, Opts) end,
[{max_heap_size, 1024 * 1024}]),
vg_active_segment:tail(Topic, 0, Printer).

%% this is shaping up to be quite expensive and could block lazy
%% starts of deactivated topics. use in production with caution.
running_topics() ->
vg_cluster_mgr:running_topics().

tail_printer(Topic, Opts) ->
Records = maps:get(records, Opts, 10),
Time = maps:get(time, Opts, timer:seconds(30)),
EndTime = erlang:monotonic_time(milli_seconds) + Time,
F = fun Loop(0, _End) ->
io:format("printed ~p records, terminating~n", [Records]);
Loop(R, End) ->
Left = End - erlang:monotonic_time(milli_seconds),
case Left > 0 of
true ->
receive
{'$print', Term} ->
io:format("~p: ~p~n", [Topic, Term]),
Loop(R - 1, End)
after Left ->
io:format("tail session timed out~n")
end;
false ->
io:format("tail session timed out~n")
end
end,
F(Records, EndTime).
23 changes: 22 additions & 1 deletion src/vg_active_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
write/3,
write/4,
halt/2,
tail/3,
where/2,
stop_indexing/2,
resume_indexing/2]).

Expand Down Expand Up @@ -37,7 +39,8 @@
partition :: integer(),
config :: #config{},
halted = false :: boolean(),
index = true :: boolean()
index = true :: boolean(),
tailer :: pid() | undefined
}).

%% need this until an Erlang release with `hibernate_after` spec added to gen option type
Expand Down Expand Up @@ -87,6 +90,12 @@ create_retry(Topic, Partition, ExpectedId, RecordSet)->
halt(Topic, Partition) ->
gen_server:call(?ACTIVE_SEG(Topic, Partition), halt).

tail(Topic, Partition, Printer) ->
gen_server:call(?ACTIVE_SEG(Topic, Partition), {tail, Printer}).

where(Topic, Partition) ->
gproc:where(?ACTIVE_SEG(Topic, Partition).

stop_indexing(Topic, Partition) ->
gen_server:call(?ACTIVE_SEG(Topic, Partition), stop_indexing).

Expand Down Expand Up @@ -134,6 +143,9 @@ handle_call(_Msg, _From, State = #state{halted = true}) ->
{reply, halted, State};
handle_call(halt, _From, State) ->
{reply, ok, State#state{halted = true}};
handle_call({tail, Printer}, _From, State) ->
monitor(process, Printer),
{reply, ok, State#state{tailer = Printer}};
handle_call(stop_indexing, _From, #state{index_fd = undefined} = State) ->
{reply, ok, State#state{index = false}};
handle_call(stop_indexing, _From, #state{index_fd = FD} = State) ->
Expand All @@ -143,6 +155,7 @@ handle_call(stop_indexing, _From, #state{index_fd = FD} = State) ->
handle_call(resume_indexing, _From, State) ->
{reply, ok, State#state{index = true}};
handle_call({write, ExpectedID0, RecordSet}, _From, State = #state{next_id = ID,
tailer = Tailer,
topic = Topic,
next_brick = NextBrick}) ->
%% TODO: add pipelining of requests
Expand Down Expand Up @@ -191,6 +204,12 @@ handle_call({write, ExpectedID0, RecordSet}, _From, State = #state{next_id = ID,
Go when Go =:= proceed orelse
element(1, Go) =:= ok ->
State1 = write_record_set(RecordSet, State),
case Tailer of
undefined ->
ok;
Pid ->
Pid ! {'$print', {State1#state.next_id - 1, RecordSet}}
end,
{reply, {ok, State1#state.next_id - 1}, State1};
{write_repair, RepairSet} ->
prometheus_counter:inc(write_repairs),
Expand Down Expand Up @@ -219,6 +238,8 @@ handle_cast(_Msg, State) ->
lager:info("bad cast ~p", [_Msg]),
{noreply, State}.

handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) ->
{noreply, State#state{tailer = undefined}};
handle_info(_, State) ->
{noreply, State}.

Expand Down

0 comments on commit d7138d8

Please sign in to comment.