diff --git a/base/exports.jl b/base/exports.jl index 285134b6e5ac4..d190e4bdd2c36 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1147,6 +1147,7 @@ export ntoh, open, pipe, + Pipe, PipeBuffer, poll_fd, poll_file, diff --git a/base/process.jl b/base/process.jl index dfe7e34bd7d03..af03f409eca24 100644 --- a/base/process.jl +++ b/base/process.jl @@ -311,7 +311,9 @@ macro setup_stdio() in,out,err = stdios if isa(stdios[1], Pipe) if stdios[1].handle == C_NULL - error("pipes passed to spawn must be initialized") + in = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(in,false,stdios[1],true) + close_in = true end elseif isa(stdios[1], FileRedirect) in = FS.open(stdios[1].filename, JL_O_RDONLY) @@ -321,7 +323,9 @@ macro setup_stdio() end if isa(stdios[2], Pipe) if stdios[2].handle == C_NULL - error("pipes passed to spawn must be initialized") + out = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(stdios[2],true,out,false) + close_out = true end elseif isa(stdios[2], FileRedirect) out = FS.open(stdios[2].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[2].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) @@ -331,7 +335,9 @@ macro setup_stdio() end if isa(stdios[3], Pipe) if stdios[3].handle == C_NULL - error("pipes passed to spawn must be initialized") + err = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(stdios[3],true,err,false) + close_err = true end elseif isa(stdios[3], FileRedirect) err = FS.open(stdios[3].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[3].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) @@ -345,9 +351,9 @@ end macro cleanup_stdio() esc( quote - close_in && close(in) - close_out && close(out) - close_err && close(err) + close_in && (isa(in,Ptr) ? close_pipe_sync(in) : close(in)) + close_out && (isa(out,Ptr) ? close_pipe_sync(out) : close(out)) + close_err && (isa(err,Ptr) ? close_pipe_sync(err) : close(err)) end) end diff --git a/base/socket.jl b/base/socket.jl index ec9e31d0d99cb..b8e5bfd578839 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -349,7 +349,12 @@ _jl_sockaddr_set_port(ptr::Ptr{Void},port::UInt16) = ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},UInt16),ptr,port) accept(server::TCPServer) = accept(server, TCPSocket()) -accept(server::PipeServer) = accept(server, Pipe()) + +# Libuv will internally reset the readable and writable flags on +# this pipe after it has successfully accepted the connection, to +# remember that before that this is an invalid pipe +accept(server::PipeServer) = accept(server, init_pipe!(Pipe(); + readable=false, writable=false, julia_only=true)) ## diff --git a/base/stream.jl b/base/stream.jl index 261863c9dbe2f..3d50336f6ad50 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -127,18 +127,7 @@ type Pipe <: AsyncStream nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) end -function Pipe() - handle = Libc.malloc(_sizeof_uv_named_pipe) - try - ret = Pipe(handle) - associate_julia_struct(ret.handle,ret) - finalizer(ret,uvfinalize) - return init_pipe!(ret;readable=true) - catch - Libc.free(handle) - rethrow() - end -end +Pipe() = Pipe(C_NULL) type PipeServer <: UVServer handle::Ptr{Void} @@ -156,8 +145,9 @@ end function init_pipe!(pipe::Union{Pipe,PipeServer};readable::Bool=false,writable=false,julia_only=true) if pipe.handle == C_NULL - error("failed to initialize pipe") - elseif pipe.status != StatusUninit + malloc_julia_pipe!(pipe) + end + if pipe.status != StatusUninit error("pipe is already initialized") end uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), pipe.handle, writable,readable,julia_only)) @@ -643,7 +633,7 @@ function process_events(block::Bool) end ## pipe functions ## -function malloc_julia_pipe(x) +function malloc_julia_pipe!(x) x.handle = Libc.malloc(_sizeof_uv_named_pipe) associate_julia_struct(x.handle,x) finalizer(x,uvfinalize) @@ -667,7 +657,7 @@ end function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool) if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) @@ -676,7 +666,7 @@ function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void} end function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) @@ -685,10 +675,10 @@ function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe end function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) @@ -952,7 +942,7 @@ function accept_nonblock(server::PipeServer,client::Pipe) err end function accept_nonblock(server::PipeServer) - client = Pipe() + client = init_pipe!(Pipe(); readable=true, writable=true, julia_only=true) uv_error("accept", accept_nonblock(server,client) != 0) client end @@ -1022,7 +1012,9 @@ function connect(sock::AsyncStream, args...) sock end -connect(path::AbstractString) = connect(Pipe(),path) +# Libuv will internally reset read/writability, which is uses to +# mark that this is an invalid pipe. +connect(path::AbstractString) = connect(init_pipe!(Pipe(); readable=false, writable=false, julia_only=true),path) _fd(x::IOStream) = RawFD(fd(x)) @unix_only _fd(x::AsyncStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle)) diff --git a/test/spawn.jl b/test/spawn.jl index c0c3863682672..99fef922149e2 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -174,6 +174,9 @@ if valgrind_off # If --trace-children=yes is passed to valgrind, we will get a # valgrind banner here, not "Hello World\n". @test readall(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n" + p = Pipe() + run(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr = p)) + @test readall(p) == "Hello World\n" end # issue #6310