Skip to content

Commit

Permalink
fix JuliaLang/julia#35937, serializing TaskFailedException in Distrib…
Browse files Browse the repository at this point in the history
…uted (JuliaLang/julia#35943)

(cherry picked from commit 3e4a0e7)
  • Loading branch information
JeffBezanson authored and KristofferC committed May 25, 2020
1 parent 45ccf87 commit c67df47
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
39 changes: 38 additions & 1 deletion src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

using Serialization: serialize_cycle, deserialize_cycle, writetag,
serialize_typename, deserialize_typename,
TYPENAME_TAG, reset_state, serialize_type
TYPENAME_TAG, TASK_TAG, reset_state, serialize_type
using Serialization.__deserialized_types__

import Serialization: object_number, lookup_object_number, remember_object
Expand Down Expand Up @@ -102,6 +102,26 @@ function serialize(s::ClusterSerializer, t::Core.TypeName)
nothing
end

function serialize(s::ClusterSerializer, t::Task)
serialize_cycle(s, t) && return
if istaskstarted(t) && !istaskdone(t)
error("cannot serialize a running Task")
end
writetag(s.io, TASK_TAG)
serialize(s, t.code)
serialize(s, t.storage)
bt = t.backtrace
if bt !== nothing
if !isa(bt, Vector{Any})
bt = Base.process_backtrace(bt, 100)
end
serialize(s, bt)
end
serialize(s, t.state)
serialize(s, t.result)
serialize(s, t.exception)
end

function serialize(s::ClusterSerializer, g::GlobalRef)
# Record if required and then invoke the default GlobalRef serializer.
sym = g.name
Expand Down Expand Up @@ -231,6 +251,23 @@ function deserialize(s::ClusterSerializer, t::Type{<:CapturedException})
return CapturedException(capex, bt)
end

function deserialize(s::ClusterSerializer, ::Type{Task})
t = Task(nothing)
deserialize_cycle(s, t)
t.code = deserialize(s)
t.storage = deserialize(s)
state_or_bt = deserialize(s)
if state_or_bt isa Symbol
t.state = state_or_bt
else
t.backtrace = state_or_bt
t.state = deserialize(s)
end
t.result = deserialize(s)
t.exception = deserialize(s)
t
end

"""
clear!(syms, pids=workers(); mod=Main)
Expand Down
13 changes: 13 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,19 @@ let (h, t) = Distributed.head_and_tail(Int[], 0)
@test collect(t) == []
end

# issue #35937
let e
try
pmap(1) do _
wait(@async error(42))
end
catch ex
e = ex
end
# check that the inner TaskFailedException is correctly formed & can be printed
@test sprint(showerror, e) isa String
end

include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
Expand Down

0 comments on commit c67df47

Please sign in to comment.