From 322da47016e00262f4f850eb80f979aa4dfbb93a Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 13 Aug 2019 11:53:44 +0530 Subject: [PATCH] do not use myid() to differentiate master & worker Occasionally while adding a large number of workers and particularly worker-worker connections are not lazy, it is possible to encounter the following error: ``` ERROR (unhandled task failure): MethodError: no method matching manage(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig, ::Symbol) Closest candidates are: manage(!Matched::Base.Distributed.SSHManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:224 manage(!Matched::Base.Distributed.LocalManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:337 manage(!Matched::Union{ClusterManagers.PBSManager, ClusterManagers.QRSHManager, ClusterManagers.SGEManager}, ::Int64, ::WorkerConfig, ::Symbol) at /home/jrun/.julia/v0.6/ClusterManagers/src/qsub.jl:115 ... Stacktrace: [1] deregister_worker(::Base.Distributed.ProcessGroup, ::Int64) at ./distributed/cluster.jl:903 [2] message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:220 [3] process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:118 [4] (::Base.Distributed.##101#102{TCPSocket,TCPSocket,Bool})() at ./event.jl:73 ``` It can be simulated with this exact sequence of events: - worker2 in process of connecting to master - master has received the worker2s listen port, connected to it, sent the JoinPGRP message to it - master is now aware of worker2, and has added it to its list of workers - worker2 has still not processed the JoinPGRP message, so it is still unaware of its worker id - worker3 now connects to master - master sends the JoinPGRP message along with list of existing workers that includes worker2 - worker3 connects to worker2 - worker2 receives a new connection from worker3 and attempts to process it - worker3 faces an error and exits, thus breaking the connection - worker2 gets an error processing message from worker3 - goes into error handling - the current error handling code sees the self pid as 1 and incorrectly thinks it is the master - attempts to process the worker disconnection as a master and gets the error we see The MethodError prevents proper cleanup at the worker where it happens. The issue seems to be that it is not correct to identify whether a Julia process is master or worker by looking at the process id. Instead we should have a dedicated indicator for that. This change adds a new local process role variable that is set to `:master` by default, but is set to `:worker` when `start_worker` is invoked. This allows a process to know that it is running as a worker irrespective of whether it has received a process id or not. --- stdlib/Distributed/src/cluster.jl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 3caf002b3cd30..d70424100a161 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -224,6 +224,7 @@ It does not return. """ start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...) function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) + myrole!(:worker) init_multi() close_stdin && close(stdin) # workers will not use it @@ -783,12 +784,19 @@ end # globals const LPROC = LocalProcess() +const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN=16 const HDR_COOKIE_LEN=16 const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() +# whether process is a master or worker in a distributed setup +myrole() = LPROCROLE[] +function myrole!(proctype::Symbol) + LPROCROLE[] = proctype +end + # cluster management related API """ myid() @@ -1108,7 +1116,7 @@ function deregister_worker(pg, pid) end end - if myid() == 1 && isdefined(w, :config) + if myid() == 1 && (myrole() === :master) && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) if PGRP.topology != :all_to_all || isclusterlazy()