|
 |
aad6a1 |
From: Luke Bakken <lbakken@pivotal.io>
|
|
 |
aad6a1 |
Date: Tue, 13 Mar 2018 09:00:50 -0700
|
|
 |
aad6a1 |
Subject: [PATCH] Add special case in handle_other for normal TCP port exit
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
Handle noport at epmd monitor startup
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
Handle EXIT from TCP port more gracefully
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
Ensure that Parent pid is matched
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
|
|
 |
aad6a1 |
index 9d8044e6e..1a14a640d 100644
|
|
 |
aad6a1 |
--- a/src/rabbit_epmd_monitor.erl
|
|
 |
aad6a1 |
+++ b/src/rabbit_epmd_monitor.erl
|
|
 |
aad6a1 |
@@ -48,16 +48,26 @@
|
|
 |
aad6a1 |
%% epmd" as a shutdown or uninstall step.
|
|
 |
aad6a1 |
%% ----------------------------------------------------------------------------
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
-start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
 |
aad6a1 |
+start_link() ->
|
|
 |
aad6a1 |
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
init([]) ->
|
|
 |
aad6a1 |
{Me, Host} = rabbit_nodes:parts(node()),
|
|
 |
aad6a1 |
Mod = net_kernel:epmd_module(),
|
|
 |
aad6a1 |
- {port, Port, _Version} = Mod:port_please(Me, Host),
|
|
 |
aad6a1 |
- {ok, ensure_timer(#state{mod = Mod,
|
|
 |
aad6a1 |
- me = Me,
|
|
 |
aad6a1 |
- host = Host,
|
|
 |
aad6a1 |
- port = Port})}.
|
|
 |
aad6a1 |
+ init_handle_port_please(Mod:port_please(Me, Host), Mod, Me, Host).
|
|
 |
aad6a1 |
+
|
|
 |
aad6a1 |
+init_handle_port_please(noport, Mod, Me, Host) ->
|
|
 |
aad6a1 |
+ State = #state{mod = Mod,
|
|
 |
aad6a1 |
+ me = Me,
|
|
 |
aad6a1 |
+ host = Host,
|
|
 |
aad6a1 |
+ port = undefined},
|
|
 |
aad6a1 |
+ {ok, ensure_timer(State)};
|
|
 |
aad6a1 |
+init_handle_port_please({port, Port, _Version}, Mod, Me, Host) ->
|
|
 |
aad6a1 |
+ State = #state{mod = Mod,
|
|
 |
aad6a1 |
+ me = Me,
|
|
 |
aad6a1 |
+ host = Host,
|
|
 |
aad6a1 |
+ port = Port},
|
|
 |
aad6a1 |
+ {ok, ensure_timer(State)}.
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
handle_call(_Request, _From, State) ->
|
|
 |
aad6a1 |
{noreply, State}.
|
|
 |
aad6a1 |
@@ -65,9 +75,9 @@ handle_call(_Request, _From, State) ->
|
|
 |
aad6a1 |
handle_cast(_Msg, State) ->
|
|
 |
aad6a1 |
{noreply, State}.
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
-handle_info(check, State) ->
|
|
 |
aad6a1 |
- check_epmd(State),
|
|
 |
aad6a1 |
- {noreply, ensure_timer(State#state{timer = undefined})};
|
|
 |
aad6a1 |
+handle_info(check, State0) ->
|
|
 |
aad6a1 |
+ {ok, State1} = check_epmd(State0),
|
|
 |
aad6a1 |
+ {noreply, ensure_timer(State1#state{timer = undefined})};
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
handle_info(_Info, State) ->
|
|
 |
aad6a1 |
{noreply, State}.
|
|
 |
aad6a1 |
@@ -83,15 +93,18 @@ code_change(_OldVsn, State, _Extra) ->
|
|
 |
aad6a1 |
ensure_timer(State) ->
|
|
 |
aad6a1 |
rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
-check_epmd(#state{mod = Mod,
|
|
 |
aad6a1 |
- me = Me,
|
|
 |
aad6a1 |
- host = Host,
|
|
 |
aad6a1 |
- port = Port}) ->
|
|
 |
aad6a1 |
- case Mod:port_please(Me, Host) of
|
|
 |
aad6a1 |
- noport -> rabbit_log:warning(
|
|
 |
aad6a1 |
- "epmd does not know us, re-registering ~s at port ~b~n",
|
|
 |
aad6a1 |
- [Me, Port]),
|
|
 |
aad6a1 |
- rabbit_nodes:ensure_epmd(),
|
|
 |
aad6a1 |
- Mod:register_node(Me, Port);
|
|
 |
aad6a1 |
- _ -> ok
|
|
 |
aad6a1 |
- end.
|
|
 |
aad6a1 |
+check_epmd(State = #state{mod = Mod,
|
|
 |
aad6a1 |
+ me = Me,
|
|
 |
aad6a1 |
+ host = Host,
|
|
 |
aad6a1 |
+ port = Port}) ->
|
|
 |
aad6a1 |
+ Port1 = case Mod:port_please(Me, Host) of
|
|
 |
aad6a1 |
+ noport ->
|
|
 |
aad6a1 |
+ rabbit_log:warning("epmd does not know us, re-registering ~s at port ~b~n",
|
|
 |
aad6a1 |
+ [Me, Port]),
|
|
 |
aad6a1 |
+ Port;
|
|
 |
aad6a1 |
+ {port, NewPort, _Version} ->
|
|
 |
aad6a1 |
+ NewPort
|
|
 |
aad6a1 |
+ end,
|
|
 |
aad6a1 |
+ rabbit_nodes:ensure_epmd(),
|
|
 |
aad6a1 |
+ Mod:register_node(Me, Port1),
|
|
 |
aad6a1 |
+ {ok, State#state{port = Port1}}.
|
|
 |
aad6a1 |
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
|
|
 |
aad6a1 |
index 24de35e7e..a6cca9438 100644
|
|
 |
aad6a1 |
--- a/src/rabbit_reader.erl
|
|
 |
aad6a1 |
+++ b/src/rabbit_reader.erl
|
|
 |
aad6a1 |
@@ -563,9 +563,14 @@ handle_other({channel_closing, ChPid}, State) ->
|
|
 |
aad6a1 |
ok = rabbit_channel:ready_for_close(ChPid),
|
|
 |
aad6a1 |
{_, State1} = channel_cleanup(ChPid, State),
|
|
 |
aad6a1 |
maybe_close(control_throttle(State1));
|
|
 |
aad6a1 |
+handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
|
|
 |
aad6a1 |
+ %% rabbitmq/rabbitmq-server#544
|
|
 |
aad6a1 |
+ %% The connection port process has exited due to the TCP socket being closed.
|
|
 |
aad6a1 |
+ %% Handle this case in the same manner as receiving {error, closed}
|
|
 |
aad6a1 |
+ stop(closed, State);
|
|
 |
aad6a1 |
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
|
|
 |
aad6a1 |
- terminate(io_lib:format("broker forced connection closure "
|
|
 |
aad6a1 |
- "with reason '~w'", [Reason]), State),
|
|
 |
aad6a1 |
+ Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
|
|
 |
aad6a1 |
+ terminate(Msg, State),
|
|
 |
aad6a1 |
%% this is what we are expected to do according to
|
|
 |
aad6a1 |
%% http://www.erlang.org/doc/man/sys.html
|
|
 |
aad6a1 |
%%
|
|
 |
aad6a1 |
@@ -794,7 +799,7 @@ wait_for_channel_termination(N, TimerRef,
|
|
 |
aad6a1 |
wait_for_channel_termination(N-1, TimerRef, State1)
|
|
 |
aad6a1 |
end;
|
|
 |
aad6a1 |
{'EXIT', Sock, _Reason} ->
|
|
 |
aad6a1 |
- [channel_cleanup(ChPid, State) || ChPid <- all_channels()],
|
|
 |
aad6a1 |
+ clean_up_all_channels(State),
|
|
 |
aad6a1 |
exit(normal);
|
|
 |
aad6a1 |
cancel_wait ->
|
|
 |
aad6a1 |
exit(channel_termination_timeout)
|
|
 |
aad6a1 |
@@ -963,6 +968,12 @@ channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
+clean_up_all_channels(State) ->
|
|
 |
aad6a1 |
+ CleanupFun = fun(ChPid) ->
|
|
 |
aad6a1 |
+ channel_cleanup(ChPid, State)
|
|
 |
aad6a1 |
+ end,
|
|
 |
aad6a1 |
+ lists:foreach(CleanupFun, all_channels()).
|
|
 |
aad6a1 |
+
|
|
 |
aad6a1 |
%%--------------------------------------------------------------------------
|
|
 |
aad6a1 |
|
|
 |
aad6a1 |
handle_frame(Type, 0, Payload,
|