Skip to content

Commit

Permalink
operational tooling part 1
Browse files Browse the repository at this point in the history
this commit adds the beginning of some manual operational tooling for
working with topics outside of the api.  included are:
 - delete_topic
 - deactivate_topic
 - running_topics

the idea here is to make it a bit easier to do some common operations
to rescue ailing clusters, albeit in a sometimes destructive way.
more to come later.
  • Loading branch information
evanmcc committed Jan 19, 2018
1 parent fb3726b commit 0e01ae2
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 111 deletions.
4 changes: 3 additions & 1 deletion config/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
[time, color, " [",severity,"] ",
pid, " ",
"mod=", module,
" fun=", function, " ", message, "\e[0m\r\n"]}]}]}]},
" fun=", function, " ", message, "\e[0m\r\n"]}]}]},
{lager_truncation_size, 20000}
]},

"config/shared"
].
1 change: 1 addition & 0 deletions include/vg.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-define(FETCH2_REQUEST, 1001).
-define(ENSURE_REQUEST, 1002).
-define(REPLICATE_REQUEST, 1003).
-define(DELETE_TOPIC_REQUEST, 1004).

-define(UNKNOWN_ERROR, -1).
-define(NO_ERROR, 0).
Expand Down
50 changes: 37 additions & 13 deletions src/vg.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
-module(vg).

-export([create_topic/1,
ensure_topic/1,
%% client interface
-export([ensure_topic/1,
write/3, write/4,
fetch/1, fetch/2, fetch/4,
fetch/5]).

%% ops interface.
-export([
create_topic/1,
delete_topic/1,
describe_topic/1,
deactivate_topic/1,
running_topics/0
]).

-include("vg.hrl").

-type record() :: #{id => integer(),
Expand Down Expand Up @@ -110,9 +119,11 @@ fetch(Topic, Partition, Offset, Count) ->
%% A fetch of offset -1 returns Limit number of the records up to the
%% high watermark
fetch(Topic, Partition, -1, MaxBytes, Limit) ->
Offset = vg_topics:lookup_hwm(Topic, Partition),
fetch(Topic, Partition, erlang:max(0, Offset - Limit + 1), MaxBytes, Limit);
HWM = vg_topics:lookup_hwm(Topic, Partition),
fetch(Topic, Partition, erlang:max(0, HWM - Limit + 1), MaxBytes, Limit);
fetch(Topic, Partition, Offset, MaxBytes, Limit) ->
%% check high water mark first as it'll thrown for not found
HWM = vg_topics:lookup_hwm(Topic, Partition),
{SegmentId, Position} = vg_log_segments:find_segment_offset(Topic, Partition, Offset),
Fetch =
case Limit of
Expand Down Expand Up @@ -148,12 +159,25 @@ fetch(Topic, Partition, Offset, MaxBytes, Limit) ->
_ -> min(SendBytes, MaxBytes)
end,
ErrorCode = 0,
case vg_topics:lookup_hwm(Topic, Partition) of
{error, not_found} ->
{error, not_found};
HighWaterMark ->
Response = vg_protocol:encode_fetch_topic_response(Partition, ErrorCode, HighWaterMark, Bytes),

lager:debug("sending hwm=~p bytes=~p", [HighWaterMark, Bytes]),
{erlang:iolist_size(Response)+Bytes, Response, {File, Position, Bytes}}
end.
Response = vg_protocol:encode_fetch_topic_response(Partition, ErrorCode, HWM, Bytes),

lager:debug("sending hwm=~p bytes=~p", [HWM, Bytes]),
{erlang:iolist_size(Response)+Bytes, Response, {File, Position, Bytes}}.

%% these are here mostly for ergonomics. right now they just forward
%% the work to the cluster manager, but we might need to change that
%% later and this allows us to keep a easy to type interface that
%% doesn't have to change.
delete_topic(Topic) ->
vg_cluster_mgr:delete_topic(Topic).

describe_topic(Topic) ->
vg_cluster_mgr:describe_topic(Topic).

deactivate_topic(Topic) ->
vg_cluster_mgr:deactivate_topic(Topic).

%% this is shaping up to be quite expensive and could block lazy
%% starts of deactivated topics. use in production with caution.
running_topics() ->
vg_cluster_mgr:running_topics().
46 changes: 28 additions & 18 deletions src/vg_active_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

-export([start_link/3,
write/3,
write/4]).
write/4,
halt/2]).

-export([init/1,
handle_call/3,
Expand All @@ -21,27 +22,28 @@
index_max_bytes :: integer(),
index_interval_bytes :: integer()}).

-record(state, {topic_dir :: file:filename(),
next_id :: integer(),
next_brick :: atom(),
byte_count :: integer(),
pos :: integer(),
index_pos :: integer(),
log_fd :: file:fd(),
segment_id :: integer(),
index_fd :: file:fd(),
topic :: binary(),
partition :: integer(),
config :: #config{}
-record(state, {topic_dir :: file:filename(),
next_id :: integer(),
next_brick :: atom(),
byte_count :: integer(),
pos :: integer(),
index_pos :: integer(),
log_fd :: file:fd(),
segment_id :: integer(),
index_fd :: file:fd(),
topic :: binary(),
partition :: integer(),
config :: #config{},
halted = false :: boolean()
}).

%% need this until an Erlang release with `hibernate_after` spec added to gen option type
-dialyzer({nowarn_function, start_link/3}).

-define(SERVER(Topic, Partition), {via, gproc, {n, l, {Topic, Partition}}}).
-define(ACTIVE_SEG(Topic, Partition), {via, gproc, {n, l, {active, Topic, Partition}}}).

start_link(Topic, Partition, NextBrick) ->
case gen_server:start_link(?SERVER(Topic, Partition), ?MODULE, [Topic, Partition, NextBrick],
case gen_server:start_link(?ACTIVE_SEG(Topic, Partition), ?MODULE, [Topic, Partition, NextBrick],
[{hibernate_after, timer:minutes(5)}]) of % hibernate after 5 minutes with no messages
{ok, Pid} ->
{ok, Pid};
Expand All @@ -60,10 +62,8 @@ write(Topic, Partition, RecordSet) ->
write(Topic, Partition, head, RecordSet).

write(Topic, Partition, ExpectedId, RecordSet) ->
%% there clearly needs to be a lot more logic here. it's also not
%% clear that this is the right place for this
try
case gen_server:call(?SERVER(Topic, Partition), {write, ExpectedId, RecordSet}) of
case gen_server:call(?ACTIVE_SEG(Topic, Partition), {write, ExpectedId, RecordSet}) of
retry ->
write(Topic, Partition, ExpectedId, RecordSet);
R -> R
Expand All @@ -81,6 +81,11 @@ create_retry(Topic, Partition, ExpectedId, RecordSet)->
{ok, _} = vg_cluster_mgr:ensure_topic(Topic),
write(Topic, Partition, ExpectedId, RecordSet).

halt(Topic, Partition) ->
gen_server:call(?ACTIVE_SEG(Topic, Partition), halt).

%%%%%%%%%%%%

init([Topic, Partition, NextNode]) ->
lager:info("at=init topic=~p next_server=~p", [Topic, NextNode]),
Config = setup_config(),
Expand Down Expand Up @@ -115,6 +120,11 @@ init([Topic, Partition, NextNode]) ->
config = Config
}}.

%% coverall to keep any new writes from coming in while we delete the topic
handle_call(_Msg, _From, State = #state{halted = true}) ->
{reply, halted, State};
handle_call(halt, _From, State) ->
{reply, ok, State#state{halted = true}};
handle_call({write, ExpectedID0, RecordSet}, _From, State = #state{next_id = ID,
topic = Topic,
next_brick = NextBrick}) ->
Expand Down
21 changes: 21 additions & 0 deletions src/vg_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
ensure_topic/1,
topics/0, topics/2,
fetch/1, fetch/2, fetch/3,

%% internal-only stuff
replicate/5,
delete_topic/2,
%% end internal

produce/2, produce/3,
init/0,
setup/2,
Expand Down Expand Up @@ -172,6 +177,13 @@ replicate(Pool, Topic, ExpectedId, Records, Timeout) ->
{error, Reason}
end.

delete_topic(Pool, Topic) ->
lager:debug("delete_topic pool=~p topic=~p", [Pool, Topic]),
case scall(fun() -> shackle:call(Pool, {delete_topic, Topic}, timer:seconds(60)) end) of
{ok, ok} -> ok;
{error, Reason} -> {error, Reason}
end.

-spec produce(Topic, RecordSet)
-> {ok, integer()} | {error, term()}
when Topic :: vg:topic(),
Expand Down Expand Up @@ -294,6 +306,15 @@ handle_request({replicate, Topic, Partition, ExpectedId, Data}, #state {
{ok, RequestId, [<<(iolist_size(EncodedRequest)):32/signed-integer>>, EncodedRequest],
State#state{corids = maps:put(RequestId, ?REPLICATE_REQUEST, CorIds),
request_counter = RequestCounter + 1}};
handle_request({delete_topic, Topic}, #state{request_counter = RequestCounter,
corids = CorIds} = State) ->
RequestId = request_id(RequestCounter),
Request = vg_protocol:encode_delete_topic(Topic),
EncodedRequest = vg_protocol:encode_request(?DELETE_TOPIC_REQUEST, RequestId, ?CLIENT_ID, Request),

{ok, RequestId, [<<(iolist_size(EncodedRequest)):32/signed-integer>>, EncodedRequest],
State#state{corids = maps:put(RequestId, ?DELETE_TOPIC_REQUEST, CorIds),
request_counter = RequestCounter + 1}};
handle_request({produce, TopicRecords}, #state {
request_counter = RequestCounter,
corids = CorIds
Expand Down
79 changes: 77 additions & 2 deletions src/vg_cluster_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
%% API
-export([start_link/3,
get_map/0,
create_topic/1,
ensure_topic/1]).

-export([
create_topic/1,
delete_topic/1,
describe_topic/1,
deactivate_topic/1,
running_topics/0
]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
Expand Down Expand Up @@ -60,6 +67,24 @@ ensure_topic(Topic) ->
HeadNode = vg_chain_state:head(),
gen_server:call({?SERVER, HeadNode}, {ensure_topic, Topic}).

delete_topic(Topic) ->
HeadNode = vg_chain_state:head(),
gen_server:call({?SERVER, HeadNode}, {delete_topic, Topic}, infinity).

describe_topic(Topic) ->
HeadNode = vg_chain_state:head(),
gen_server:call({?SERVER, HeadNode}, {describe_topic, Topic}).

deactivate_topic(Topic) ->
HeadNode = vg_chain_state:head(),
gen_server:call({?SERVER, HeadNode}, {deactivate_topic, Topic}).

running_topics() ->
HeadNode = vg_chain_state:head(),
gen_server:call({?SERVER, HeadNode}, running_topics).

%%%%%%%%%%%%%%%%%%%%%%%%

init([ChainName, ChainNodes, DataDir]) ->
Chain = create_chain(ChainName, ChainNodes),
State = load_state([Chain], DataDir),
Expand Down Expand Up @@ -107,7 +132,47 @@ handle_call({ensure_topic, Topic}, _From, State=#state{topics=Topics,
Chain ->
start_on_all_nodes(Topic, Chain, Chains),
{reply, {ok, Chain}, State}
end.
end;
handle_call({delete_topic, Topic}, _From, State=#state{topics=Topics,
chains=Chains}) ->
%% have topic mgr delete the topic segments and directory
%% deactivate the topic so that it can be recreated if desired
{Reply, Topics1} =
case maps:get(Topic, Topics, not_found) of
not_found -> {{error, not_found}, Topics};
Chain ->
Rep =
try
vg_topic_mgr:delete_topic(Topic, 0) % eventually iterate partitions?
catch _:{noproc, _} ->
start_on_all_nodes(Topic, Chain, Chains),
vg_topic_mgr:delete_topic(Topic, 0)
end,
stop_on_all_nodes(Topic, Chain, Chains),
{Rep, maps:remove(Topic, Topics)}
end,
{reply, Reply, State#state{topics=Topics1}};
%% handle_call({describe_topic, Topic}, _From, State=#state{topics=Topics,
%% chains=Chains}) ->
%% %% get the hwm
%% %% get number of segments
%% %% size on disk
%% %% check if it's running?
%% {reply, ok, State};
handle_call({deactivate_topic, Topic}, _From, State=#state{topics=Topics,
chains=Chains}) ->
Ret =
case maps:get(Topic, Topics, not_found) of
not_found -> {error, not_found};
Chain -> stop_on_all_nodes(Topic, Chain, Chains)
end,
{reply, Ret, State};
handle_call(running_topics, _From, State=#state{chains=_Chains}) ->
%% TODO: need to do this for all chains?
Ret = vg_topics_sup:list_topics(node()),
{reply, Ret, State};
handle_call(_, _, State) ->
{noreply, State}.

handle_cast(_Msg, State) ->
{noreply, State}.
Expand All @@ -134,6 +199,16 @@ start_on_all_nodes(Topic, Chain, Chains) ->
{error, Reason} -> exit({error, Reason})
end || Node <- Nodes].

stop_on_all_nodes(Topic, Chain, Chains) ->
#chain{nodes=Nodes} = maps:get(Chain, Chains),
%% usort here to remove useless oks
lists:usort(
[case vg_topics_sup:stop_child(Node, Topic, [0]) of
[ok] -> ok;
%% annotate and pass on the error for user analysis
Other -> {Node, Topic, Other}
end || Node <- Nodes]).

%% TODO: the topic space stuff MUST be fixed before multiple chains are supported
create_chain(Name, []) ->
#chain{name = Name,
Expand Down
11 changes: 11 additions & 0 deletions src/vg_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,17 @@ handle_request(?REPLICATE_REQUEST, _Role, Data, CorrelationId, Socket) ->
ReplicateResponse = vg_protocol:encode_replicate_response(Response),
Size = erlang:iolist_size(ReplicateResponse) + 4,
gen_tcp:send(Socket, [<<Size:32/signed-integer, CorrelationId:32/signed-integer>>, ReplicateResponse]);
handle_request(?DELETE_TOPIC_REQUEST, _Role, Data, CorrelationId, Socket) ->
{Topic, <<>>} = vg_protocol:decode_string(Data),
lager:info("received request to delete topic ~p", [Topic]),
Resp = case vg_topic_mgr:delete_topic(Topic, 0) of
ok -> <<"OK">>;
{error, Reason} ->
io_lib:format("~p", [Reason])
end,
Response = vg_protocol:encode_string(Resp),
Size = erlang:iolist_size(Response) + 4,
gen_tcp:send(Socket, [<<Size:32/signed-integer, CorrelationId:32/signed-integer>>, Response]);
handle_request(?PRODUCE_REQUEST, Role, Data, CorrelationId, Socket) when Role =:= head orelse Role =:= solo ->
{_Acks, _Timeout, TopicData} = vg_protocol:decode_produce_request(Data),
Results = [{Topic, [begin
Expand Down
16 changes: 16 additions & 0 deletions src/vg_log_segments.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
-export([init_table/0,
load_existing/2,
load_all/2,
delete_segments/2,
cleanup_segments_table/2,
insert/3,
local/2,
find_log_segment/3,
Expand Down Expand Up @@ -50,6 +52,20 @@ load_segments(Topic, Partition, LogSegments) ->
insert(Topic, Partition, SegmentId)
end || LogSegment <- LogSegments].

delete_segments(Topic, Partition) ->
TopicDir = vg_utils:topic_dir(Topic, Partition),
AllFiles = filelib:wildcard(filename:join(TopicDir, "*")),
ok = lists:foreach(fun file:delete/1, AllFiles),
ok = file:del_dir(TopicDir).

cleanup_segments_table(Topic, Partition) ->
NumDeleted = ets:select_delete(?SEGMENTS_TABLE,
[{?LOG_SEGMENT_MATCH_PATTERN(Topic, Partition),
[],
?LOG_SEGMENT_RETURN}]),
lager:info("deleted ~p segments from the table", [NumDeleted]),
prometheus_gauge:dec(log_segments, [NumDeleted]).

insert(Topic, Partition, SegmentId) ->
prometheus_gauge:inc(log_segments, [Topic]),
ets:insert(?SEGMENTS_TABLE, {Topic, Partition, SegmentId}).
Expand Down
Loading

0 comments on commit 0e01ae2

Please sign in to comment.