Skip to content

Commit

Permalink
HACK/WIP: add a connection_listener concept
Browse files Browse the repository at this point in the history
  • Loading branch information
Hades32 committed May 16, 2015
1 parent 340d9d6 commit e7cc33a
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions src/emqttc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
port = 1883 :: inet:port_number(),
socket :: inet:socket(),
receiver :: pid(),
connection_listener :: pid() | undefined,
proto_state :: emqttc_protocol:proto_state(),
subscribers = [] :: list(),
pubsub_map = #{} :: map(),
Expand Down Expand Up @@ -127,17 +128,30 @@ start_link() ->
MqttOpts :: [mqttc_opt()],
Client :: pid().
start_link(MqttOpts) when is_list(MqttOpts) ->
gen_fsm:start_link(?MODULE, [undefined, MqttOpts], []).
gen_fsm:start_link(?MODULE, [undefined, MqttOpts, undefined], []).

%%------------------------------------------------------------------------------
%% @doc Start emqttc client with options. And connection_listener
%% @end
%%------------------------------------------------------------------------------
-spec start_link(MqttOpts, ConnListener) -> {ok, Client} | ignore | {error, any()} when
MqttOpts :: [mqttc_opt()],
ConnListener :: pid(),
Client :: pid().
start_link(MqttOpts, ConnListener) when is_list(MqttOpts), is_pid(ConnListener) ->
gen_fsm:start_link(?MODULE, [undefined, MqttOpts, ConnListener], []).

%%------------------------------------------------------------------------------
%% @doc Start emqttc client with name, options.
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Name, MqttOpts) -> {ok, pid()} | ignore | {error, any()} when
-spec start_link(Name, MqttOpts, ConnListener) -> {ok, pid()} | ignore | {error, any()} when
Name :: atom(),
ConnListener :: pid(),
MqttOpts :: [mqttc_opt()].
start_link(Name, MqttOpts) when is_atom(Name), is_list(MqttOpts) ->
gen_fsm:start_link({local, Name}, ?MODULE, [Name, MqttOpts], []).
start_link(Name, MqttOpts, ConnListener) when is_atom(Name), is_list(MqttOpts), is_pid(ConnListener) ->
gen_fsm:start_link({local, Name}, ?MODULE, [Name, MqttOpts, ConnListener], []).


%%------------------------------------------------------------------------------
%% @doc Publish message to broker with QoS0.
Expand Down Expand Up @@ -249,10 +263,10 @@ disconnect(Client) ->
{ok, StateName :: atom(), StateData :: #state{}} |
{ok, StateName :: atom(), StateData :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([undefined, MqttOpts]) ->
init([pid_to_list(self()), MqttOpts]);
init([undefined, MqttOpts, ConnListener]) ->
init([pid_to_list(self()), MqttOpts, ConnListener]);

init([Name, MqttOpts]) ->
init([Name, MqttOpts, ConnListener]) ->

process_flag(trap_exit, true),

Expand All @@ -270,12 +284,13 @@ init([Name, MqttOpts]) ->
IsSSL = get_value(ssl, MqttOpts1, false),

State = init(MqttOpts1, #state{
name = Name,
host = "localhost",
port = if IsSSL -> 8883;
true -> 1883 end,
proto_state = ProtoState,
logger = Logger}),
name = Name,
host = "localhost",
port = if IsSSL -> 8883;
true -> 1883 end,
proto_state = ProtoState,
connection_listener = ConnListener,
logger = Logger}),

{ok, connecting, State, 0}.

Expand Down Expand Up @@ -333,12 +348,14 @@ waiting_for_connack(?CONNACK_PACKET(?CONNACK_ACCEPT), State = #state{
pending_pubsub = Pending,
proto_state = ProtoState,
keepalive = KeepAlive,
connection_listener = ConnListener,
logger = Logger}) ->
Logger:info("[Client ~s] RECV: CONNACK_ACCEPT", [Name]),
{ok, ProtoState1} = emqttc_protocol:received('CONNACK', ProtoState),
[gen_fsm:send_event(self(), Event) || Event <- lists:reverse(Pending)],
%% start keepalive
KeepAlive1 = emqttc_keepalive:start(KeepAlive),
ConnListener ! connected,
{next_state, connected, State#state{proto_state = ProtoState1,
keepalive = KeepAlive1,
pending_pubsub = []}};
Expand Down

0 comments on commit e7cc33a

Please sign in to comment.