diff --git a/base/deprecated.jl b/base/deprecated.jl index 442a01cac7bd9..b29917a958919 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1324,5 +1324,18 @@ end # END 0.6 deprecations +# BEGIN 0.7 deprecations + +# 12807 +start(::Union{Process, ProcessChain}) = 1 +done(::Union{Process, ProcessChain}, i::Int) = (i == 3) +next(p::Union{Process, ProcessChain}, i::Int) = (getindex(p, i), i + 1) +@noinline function getindex(p::Union{Process, ProcessChain}, i::Int) + depwarn("open(cmd) now returns only a Process<:IO object", :getindex) + return i == 1 ? getfield(p, p.openstream) : p +end + +# END 0.7 deprecations + # BEGIN 1.0 deprecations # END 1.0 deprecations diff --git a/base/distributed/cluster.jl b/base/distributed/cluster.jl index a2ca068a7161e..686eb8d2a4fff 100644 --- a/base/distributed/cluster.jl +++ b/base/distributed/cluster.jl @@ -525,8 +525,8 @@ function launch_additional(np::Integer, cmd::Cmd) addresses = Vector{Any}(np) for i in 1:np - io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r") - io_objs[i] = io + io = open(detach(cmd)) + io_objs[i] = io.out end for (i,io) in enumerate(io_objs) diff --git a/base/distributed/managers.jl b/base/distributed/managers.jl index 7591f51292cf2..cbd8316a75941 100644 --- a/base/distributed/managers.jl +++ b/base/distributed/managers.jl @@ -202,10 +202,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched, # detach launches the command in a new process group, allowing it to outlive # the initial julia process (Ctrl-C and teardown methods are handled through messages) # for the launched processes. - io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r") + io = open(detach(cmd)) wconfig = WorkerConfig() - wconfig.io = io + wconfig.io = io.out wconfig.host = host wconfig.tunnel = params[:tunnel] wconfig.sshflags = sshflags @@ -321,12 +321,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)` for i in 1:manager.np - io, pobj = open(pipeline(detach( - setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)), - stderr=STDERR), "r") + cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())` + io = open(detach(setenv(cmd, dir=dir))) wconfig = WorkerConfig() - wconfig.process = pobj - wconfig.io = io + wconfig.process = io + wconfig.io = io.out wconfig.enable_threaded_blas = params[:enable_threaded_blas] push!(launched, wconfig) end diff --git a/base/loading.jl b/base/loading.jl index 5601825f94b68..5491b120d95e7 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -599,14 +599,15 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{ eval(Main, deserialize(STDIN)) end """ - io, pobj = open(pipeline(detach(`$(julia_cmd()) -O0 - --output-ji $output --output-incremental=yes - --startup-file=no --history-file=no - --color=$(have_color ? "yes" : "no") - --eval $code_object`), stderr=STDERR), - "w", STDOUT) + io = open(pipeline(detach(`$(julia_cmd()) -O0 + --output-ji $output --output-incremental=yes + --startup-file=no --history-file=no + --color=$(have_color ? "yes" : "no") + --eval $code_object`), stderr=STDERR), + "w", STDOUT) + in = io.in try - serialize(io, quote + serialize(in, quote empty!(Base.LOAD_PATH) append!(Base.LOAD_PATH, $LOAD_PATH) empty!(Base.LOAD_CACHE_PATH) @@ -619,22 +620,21 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{ end) source = source_path(nothing) if source !== nothing - serialize(io, quote + serialize(in, quote task_local_storage()[:SOURCE_PATH] = $(source) end) end - serialize(io, :(Base.include($(abspath(input))))) + serialize(in, :(Base.include($(abspath(input))))) if source !== nothing - serialize(io, :(delete!(task_local_storage(), :SOURCE_PATH))) + serialize(in, :(delete!(task_local_storage(), :SOURCE_PATH))) end - close(io) - wait(pobj) - return pobj - catch - kill(pobj) - close(io) - rethrow() + close(in) + catch ex + close(in) + process_running(io) && Timer(t -> kill(io), 5.0) # wait a short time before killing the process to give it a chance to clean up on its own first + rethrow(ex) end + return io end compilecache(mod::Symbol) = compilecache(string(mod)) diff --git a/base/process.jl b/base/process.jl index fa1cce13f27a9..d0fefa923df3e 100644 --- a/base/process.jl +++ b/base/process.jl @@ -315,6 +315,7 @@ mutable struct Process <: AbstractPipe termsignal::Int32 exitnotify::Condition closenotify::Condition + openstream::Symbol # for open(cmd) deprecation function Process(cmd::Cmd, handle::Ptr{Void}, in::Union{Redirectable, Ptr{Void}}, out::Union{Redirectable, Ptr{Void}}, @@ -344,7 +345,9 @@ struct ProcessChain <: AbstractPipe in::Redirectable out::Redirectable err::Redirectable + openstream::Symbol # for open(cmd) deprecation ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3]) + ProcessChain(chain::ProcessChain, openstream::Symbol) = new(chain.processes, chain.in, chain.out, chain.err, openstream) # for open(cmd) deprecation end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in @@ -577,38 +580,55 @@ the process's standard input and `stdio` optionally specifies the process's stan stream. """ function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull) - if mode == "r" + if mode == "r+" || mode == "w+" + other === DevNull || throw(ArgumentError("no other stream for mode rw+")) + in = Pipe() + out = Pipe() + processes = spawn(cmds, (in,out,STDERR)) + close(in.out) + close(out.in) + elseif mode == "r" in = other - out = io = Pipe() + out = Pipe() processes = spawn(cmds, (in,out,STDERR)) close(out.in) + if isa(processes, ProcessChain) # for open(cmd) deprecation + processes = ProcessChain(processes, :out) + else + processes.openstream = :out + end elseif mode == "w" - in = io = Pipe() + in = Pipe() out = other processes = spawn(cmds, (in,out,STDERR)) close(in.out) + if isa(processes, ProcessChain) # for open(cmd) deprecation + processes = ProcessChain(processes, :in) + else + processes.openstream = :in + end else throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\"")) end - return (io, processes) + return processes end """ open(f::Function, command, mode::AbstractString="r", stdio=DevNull) -Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting read or -write stream, then closes the stream and waits for the process to complete. Returns the -value returned by `f`. +Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting process +stream, then closes the input stream and waits for the process to complete. +Returns the value returned by `f`. """ function open(f::Function, cmds::AbstractCmd, args...) - io, P = open(cmds, args...) + P = open(cmds, args...) ret = try - f(io) - catch + f(P) + catch e kill(P) - rethrow() + rethrow(e) finally - close(io) + close(P.in) end success(P) || pipeline_error(P) return ret @@ -623,15 +643,14 @@ Starts running a command asynchronously, and returns a tuple (stdout,stdin,proce output stream and input stream of the process, and the process object itself. """ function readandwrite(cmds::AbstractCmd) - in = Pipe() - out, processes = open(cmds, "r", in) - (out, in, processes) + processes = open(cmds, "r+") + return (processes.out, processes.in, processes) end function read(cmd::AbstractCmd, stdin::Redirectable=DevNull) - out, procs = open(cmd, "r", stdin) - bytes = read(out) - !success(procs) && pipeline_error(procs) + procs = open(cmd, "r", stdin) + bytes = read(procs.out) + success(procs) || pipeline_error(procs) return bytes end @@ -656,9 +675,17 @@ function run(cmds::AbstractCmd, args...) success(ps) ? nothing : pipeline_error(ps) end -const SIGPIPE = 13 +# some common signal numbers that are usually available on all platforms +# and might be useful as arguments to `kill` or testing against `Process.termsignal` +const SIGHUP = 1 +const SIGINT = 2 +const SIGQUIT = 3 # !windows +const SIGKILL = 9 +const SIGPIPE = 13 # !windows +const SIGTERM = 15 + function test_success(proc::Process) - assert(process_exited(proc)) + @assert process_exited(proc) if proc.exitcode < 0 #TODO: this codepath is not currently tested throw(UVError("could not start process $(string(proc.cmd))", proc.exitcode)) @@ -668,8 +695,7 @@ end function success(x::Process) wait(x) - kill(x) - test_success(x) + return test_success(x) end success(procs::Vector{Process}) = mapreduce(success, &, procs) success(procs::ProcessChain) = success(procs.processes) @@ -705,8 +731,6 @@ function pipeline_error(procs::ProcessChain) error(msg) end -_jl_kill(p::Process, signum::Integer) = ccall(:uv_process_kill, Int32, (Ptr{Void},Int32), p.handle, signum) - """ kill(p::Process, signum=SIGTERM) @@ -715,14 +739,14 @@ Send a signal to a process. The default is to terminate the process. function kill(p::Process, signum::Integer) if process_running(p) @assert p.handle != C_NULL - _jl_kill(p, signum) + ccall(:uv_process_kill, Int32, (Ptr{Void}, Int32), p.handle, signum) else Int32(-1) end end kill(ps::Vector{Process}) = map(kill, ps) kill(ps::ProcessChain) = map(kill, ps.processes) -kill(p::Process) = kill(p, 15) #SIGTERM +kill(p::Process) = kill(p, SIGTERM) function _contains_newline(bufptr::Ptr{Void}, len::Int32) return (ccall(:memchr, Ptr{Void}, (Ptr{Void},Int32,Csize_t), bufptr, '\n', len) != C_NULL) diff --git a/examples/clustermanager/simple/UnixDomainCM.jl b/examples/clustermanager/simple/UnixDomainCM.jl index d1e27e2c6fa2c..e3a1f7f9d350a 100644 --- a/examples/clustermanager/simple/UnixDomainCM.jl +++ b/examples/clustermanager/simple/UnixDomainCM.jl @@ -13,10 +13,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi sockname = tempname() try cmd = `$(params[:exename]) --startup-file=no $(@__FILE__) udwrkr $sockname $cookie` - io, pobj = open(cmd, "r") + pobj = open(cmd) wconfig = WorkerConfig() - wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj) + wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj) push!(launched, wconfig) notify(c) catch e diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 64f28c1e379e7..87247a89ff6c3 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1307,11 +1307,11 @@ function Base.launch(manager::ErrorSimulator, params::Dict, launched::Array, c:: else error("Unknown mode") end - io, pobj = open(pipeline(detach(setenv(cmd, dir=dir)); stderr=STDERR), "r") + io = open(detach(setenv(cmd, dir=dir))) wconfig = WorkerConfig() - wconfig.process = pobj - wconfig.io = io + wconfig.process = io + wconfig.io = io.out push!(launched, wconfig) notify(c) end diff --git a/test/spawn.jl b/test/spawn.jl index f6c514c21bc1a..84cccddbce632 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -242,26 +242,26 @@ let fname = tempname() function thrash(handle::Ptr{Void}) # Kill the memory, but write a nice low value in the libuv type field to # trigger the right code path - ccall(:memset,Ptr{Void},(Ptr{Void},Cint,Csize_t),handle,0xee,3*sizeof(Ptr{Void})) - unsafe_store!(convert(Ptr{Cint},handle+2*sizeof(Ptr{Void})),15) + ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), handle, 0xee, 3 * sizeof(Ptr{Void})) + unsafe_store!(convert(Ptr{Cint}, handle + 2 * sizeof(Ptr{Void})), 15) nothing end OLD_STDERR = STDERR - redirect_stderr(open("$(escape_string(fname))","w")) + redirect_stderr(open("$(escape_string(fname))", "w")) # Usually this would be done by GC. Do it manually, to make the failure # case more reliable. oldhandle = OLD_STDERR.handle OLD_STDERR.status = Base.StatusClosing OLD_STDERR.handle = C_NULL - ccall(:uv_close,Void,(Ptr{Void},Ptr{Void}),oldhandle,cfunction(thrash,Void,(Ptr{Void},))) + ccall(:uv_close, Void, (Ptr{Void}, Ptr{Void}), oldhandle, cfunction(thrash, Void, (Ptr{Void},))) sleep(1) import Base.zzzInvalidIdentifier """ try - (in,p) = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w") - write(in,cmd) - close(in) - wait(p) + io = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w") + write(io, cmd) + close(io) + wait(io) catch error("IOStream redirect failed. Child stderr was \n$(readstring(fname))\n") finally diff --git a/test/topology.jl b/test/topology.jl index c3c92c51a7a1f..8e5cbcaf93459 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -41,12 +41,13 @@ function Base.launch(manager::TopoTestManager, params::Dict, launched::Array, c: exename = params[:exename] exeflags = params[:exeflags] + cmd = `$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())` + cmd = pipeline(detach(setenv(cmd, dir=dir))) for i in 1:manager.np - io, pobj = open(pipeline(detach( - setenv(`$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())`, dir=dir)); stderr=STDERR), "r") + io = open(cmd) wconfig = WorkerConfig() - wconfig.process = pobj - wconfig.io = io + wconfig.process = io + wconfig.io = io.out wconfig.ident = i wconfig.connect_idents = collect(i+2:2:manager.np) push!(launched, wconfig)