Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change open(cmd) to return Process instead of Tuple #12807

Merged
merged 2 commits into from
May 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1324,5 +1324,18 @@ end

# END 0.6 deprecations

# BEGIN 0.7 deprecations

# 12807
start(::Union{Process, ProcessChain}) = 1
done(::Union{Process, ProcessChain}, i::Int) = (i == 3)
next(p::Union{Process, ProcessChain}, i::Int) = (getindex(p, i), i + 1)
@noinline function getindex(p::Union{Process, ProcessChain}, i::Int)
depwarn("open(cmd) now returns only a Process<:IO object", :getindex)
return i == 1 ? getfield(p, p.openstream) : p
end

# END 0.7 deprecations

# BEGIN 1.0 deprecations
# END 1.0 deprecations
4 changes: 2 additions & 2 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ function launch_additional(np::Integer, cmd::Cmd)
addresses = Vector{Any}(np)

for i in 1:np
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
io_objs[i] = io
io = open(detach(cmd))
io_objs[i] = io.out
end

for (i,io) in enumerate(io_objs)
Expand Down
13 changes: 6 additions & 7 deletions base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
# detach launches the command in a new process group, allowing it to outlive
# the initial julia process (Ctrl-C and teardown methods are handled through messages)
# for the launched processes.
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
io = open(detach(cmd))

wconfig = WorkerConfig()
wconfig.io = io
wconfig.io = io.out
wconfig.host = host
wconfig.tunnel = params[:tunnel]
wconfig.sshflags = sshflags
Expand Down Expand Up @@ -321,12 +321,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`

for i in 1:manager.np
io, pobj = open(pipeline(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)),
stderr=STDERR), "r")
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`
io = open(detach(setenv(cmd, dir=dir)))
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.process = io
wconfig.io = io.out
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
push!(launched, wconfig)
end
Expand Down
34 changes: 17 additions & 17 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,15 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
eval(Main, deserialize(STDIN))
end
"""
io, pobj = open(pipeline(detach(`$(julia_cmd()) -O0
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--color=$(have_color ? "yes" : "no")
--eval $code_object`), stderr=STDERR),
"w", STDOUT)
io = open(pipeline(detach(`$(julia_cmd()) -O0
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--color=$(have_color ? "yes" : "no")
--eval $code_object`), stderr=STDERR),
"w", STDOUT)
in = io.in
try
serialize(io, quote
serialize(in, quote
empty!(Base.LOAD_PATH)
append!(Base.LOAD_PATH, $LOAD_PATH)
empty!(Base.LOAD_CACHE_PATH)
Expand All @@ -619,22 +620,21 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
end)
source = source_path(nothing)
if source !== nothing
serialize(io, quote
serialize(in, quote
task_local_storage()[:SOURCE_PATH] = $(source)
end)
end
serialize(io, :(Base.include($(abspath(input)))))
serialize(in, :(Base.include($(abspath(input)))))
if source !== nothing
serialize(io, :(delete!(task_local_storage(), :SOURCE_PATH)))
serialize(in, :(delete!(task_local_storage(), :SOURCE_PATH)))
end
close(io)
wait(pobj)
return pobj
catch
kill(pobj)
close(io)
rethrow()
close(in)
catch ex
close(in)
process_running(io) && Timer(t -> kill(io), 5.0) # wait a short time before killing the process to give it a chance to clean up on its own first
rethrow(ex)
end
return io
end

compilecache(mod::Symbol) = compilecache(string(mod))
Expand Down
76 changes: 50 additions & 26 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ mutable struct Process <: AbstractPipe
termsignal::Int32
exitnotify::Condition
closenotify::Condition
openstream::Symbol # for open(cmd) deprecation
function Process(cmd::Cmd, handle::Ptr{Void},
in::Union{Redirectable, Ptr{Void}},
out::Union{Redirectable, Ptr{Void}},
Expand Down Expand Up @@ -344,7 +345,9 @@ struct ProcessChain <: AbstractPipe
in::Redirectable
out::Redirectable
err::Redirectable
openstream::Symbol # for open(cmd) deprecation
ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3])
ProcessChain(chain::ProcessChain, openstream::Symbol) = new(chain.processes, chain.in, chain.out, chain.err, openstream) # for open(cmd) deprecation
end
pipe_reader(p::ProcessChain) = p.out
pipe_writer(p::ProcessChain) = p.in
Expand Down Expand Up @@ -577,38 +580,55 @@ the process's standard input and `stdio` optionally specifies the process's stan
stream.
"""
function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull)
if mode == "r"
if mode == "r+" || mode == "w+"
other === DevNull || throw(ArgumentError("no other stream for mode rw+"))
in = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
close(out.in)
elseif mode == "r"
in = other
out = io = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(out.in)
if isa(processes, ProcessChain) # for open(cmd) deprecation
processes = ProcessChain(processes, :out)
else
processes.openstream = :out
end
elseif mode == "w"
in = io = Pipe()
in = Pipe()
out = other
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
if isa(processes, ProcessChain) # for open(cmd) deprecation
processes = ProcessChain(processes, :in)
else
processes.openstream = :in
end
else
throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\""))
end
return (io, processes)
return processes
end

"""
open(f::Function, command, mode::AbstractString="r", stdio=DevNull)

Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting read or
write stream, then closes the stream and waits for the process to complete. Returns the
value returned by `f`.
Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting process
stream, then closes the input stream and waits for the process to complete.
Returns the value returned by `f`.
"""
function open(f::Function, cmds::AbstractCmd, args...)
io, P = open(cmds, args...)
P = open(cmds, args...)
ret = try
f(io)
catch
f(P)
catch e
kill(P)
rethrow()
rethrow(e)
finally
close(io)
close(P.in)
end
success(P) || pipeline_error(P)
return ret
Expand All @@ -623,15 +643,14 @@ Starts running a command asynchronously, and returns a tuple (stdout,stdin,proce
output stream and input stream of the process, and the process object itself.
"""
function readandwrite(cmds::AbstractCmd)
in = Pipe()
out, processes = open(cmds, "r", in)
(out, in, processes)
processes = open(cmds, "r+")
return (processes.out, processes.in, processes)
end

function read(cmd::AbstractCmd, stdin::Redirectable=DevNull)
out, procs = open(cmd, "r", stdin)
bytes = read(out)
!success(procs) && pipeline_error(procs)
procs = open(cmd, "r", stdin)
bytes = read(procs.out)
success(procs) || pipeline_error(procs)
return bytes
end

Expand All @@ -656,9 +675,17 @@ function run(cmds::AbstractCmd, args...)
success(ps) ? nothing : pipeline_error(ps)
end

const SIGPIPE = 13
# some common signal numbers that are usually available on all platforms
# and might be useful as arguments to `kill` or testing against `Process.termsignal`
const SIGHUP = 1
const SIGINT = 2
const SIGQUIT = 3 # !windows
const SIGKILL = 9
const SIGPIPE = 13 # !windows
const SIGTERM = 15

function test_success(proc::Process)
assert(process_exited(proc))
@assert process_exited(proc)
if proc.exitcode < 0
#TODO: this codepath is not currently tested
throw(UVError("could not start process $(string(proc.cmd))", proc.exitcode))
Expand All @@ -668,8 +695,7 @@ end

function success(x::Process)
wait(x)
kill(x)
test_success(x)
return test_success(x)
end
success(procs::Vector{Process}) = mapreduce(success, &, procs)
success(procs::ProcessChain) = success(procs.processes)
Expand Down Expand Up @@ -705,8 +731,6 @@ function pipeline_error(procs::ProcessChain)
error(msg)
end

_jl_kill(p::Process, signum::Integer) = ccall(:uv_process_kill, Int32, (Ptr{Void},Int32), p.handle, signum)

"""
kill(p::Process, signum=SIGTERM)

Expand All @@ -715,14 +739,14 @@ Send a signal to a process. The default is to terminate the process.
function kill(p::Process, signum::Integer)
if process_running(p)
@assert p.handle != C_NULL
_jl_kill(p, signum)
ccall(:uv_process_kill, Int32, (Ptr{Void}, Int32), p.handle, signum)
else
Int32(-1)
end
end
kill(ps::Vector{Process}) = map(kill, ps)
kill(ps::ProcessChain) = map(kill, ps.processes)
kill(p::Process) = kill(p, 15) #SIGTERM
kill(p::Process) = kill(p, SIGTERM)

function _contains_newline(bufptr::Ptr{Void}, len::Int32)
return (ccall(:memchr, Ptr{Void}, (Ptr{Void},Int32,Csize_t), bufptr, '\n', len) != C_NULL)
Expand Down
4 changes: 2 additions & 2 deletions examples/clustermanager/simple/UnixDomainCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
sockname = tempname()
try
cmd = `$(params[:exename]) --startup-file=no $(@__FILE__) udwrkr $sockname $cookie`
io, pobj = open(cmd, "r")
pobj = open(cmd)

wconfig = WorkerConfig()
wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj)
push!(launched, wconfig)
notify(c)
catch e
Expand Down
6 changes: 3 additions & 3 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1307,11 +1307,11 @@ function Base.launch(manager::ErrorSimulator, params::Dict, launched::Array, c::
else
error("Unknown mode")
end
io, pobj = open(pipeline(detach(setenv(cmd, dir=dir)); stderr=STDERR), "r")
io = open(detach(setenv(cmd, dir=dir)))

wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.process = io
wconfig.io = io.out
push!(launched, wconfig)
notify(c)
end
Expand Down
16 changes: 8 additions & 8 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,26 @@ let fname = tempname()
function thrash(handle::Ptr{Void})
# Kill the memory, but write a nice low value in the libuv type field to
# trigger the right code path
ccall(:memset,Ptr{Void},(Ptr{Void},Cint,Csize_t),handle,0xee,3*sizeof(Ptr{Void}))
unsafe_store!(convert(Ptr{Cint},handle+2*sizeof(Ptr{Void})),15)
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), handle, 0xee, 3 * sizeof(Ptr{Void}))
unsafe_store!(convert(Ptr{Cint}, handle + 2 * sizeof(Ptr{Void})), 15)
nothing
end
OLD_STDERR = STDERR
redirect_stderr(open("$(escape_string(fname))","w"))
redirect_stderr(open("$(escape_string(fname))", "w"))
# Usually this would be done by GC. Do it manually, to make the failure
# case more reliable.
oldhandle = OLD_STDERR.handle
OLD_STDERR.status = Base.StatusClosing
OLD_STDERR.handle = C_NULL
ccall(:uv_close,Void,(Ptr{Void},Ptr{Void}),oldhandle,cfunction(thrash,Void,(Ptr{Void},)))
ccall(:uv_close, Void, (Ptr{Void}, Ptr{Void}), oldhandle, cfunction(thrash, Void, (Ptr{Void},)))
sleep(1)
import Base.zzzInvalidIdentifier
"""
try
(in,p) = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
write(in,cmd)
close(in)
wait(p)
io = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
write(io, cmd)
close(io)
wait(io)
catch
error("IOStream redirect failed. Child stderr was \n$(readstring(fname))\n")
finally
Expand Down
9 changes: 5 additions & 4 deletions test/topology.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ function Base.launch(manager::TopoTestManager, params::Dict, launched::Array, c:
exename = params[:exename]
exeflags = params[:exeflags]

cmd = `$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())`
cmd = pipeline(detach(setenv(cmd, dir=dir)))
for i in 1:manager.np
io, pobj = open(pipeline(detach(
setenv(`$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())`, dir=dir)); stderr=STDERR), "r")
io = open(cmd)
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.process = io
wconfig.io = io.out
wconfig.ident = i
wconfig.connect_idents = collect(i+2:2:manager.np)
push!(launched, wconfig)
Expand Down