diff --git a/base/deprecated.jl b/base/deprecated.jl index 2049cf5ade283..d34057c4e4b0c 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -207,16 +207,17 @@ const MemoryError = OutOfMemoryError #9295 @deprecate push!(t::Associative, key, v) setindex!(t, v, key) -@deprecate (|>)(src::AbstractCmd, dest::AbstractCmd) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::AbstractCmd) pipe(src, stderr=dest) -@deprecate (|>)(src::Redirectable, dest::AbstractCmd) pipe(src, dest) -@deprecate (|>)(src::AbstractCmd, dest::Redirectable) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::Redirectable) pipe(src, stderr=dest) -@deprecate (|>)(src::AbstractCmd, dest::AbstractString) pipe(src, dest) -@deprecate (|>)(src::AbstractString, dest::AbstractCmd) pipe(src, dest) -@deprecate (.>)(src::AbstractCmd, dest::AbstractString) pipe(src, stderr=dest) -@deprecate (>>)(src::AbstractCmd, dest::AbstractString) pipe(src, stdout=dest, append=true) -@deprecate (.>>)(src::AbstractCmd, dest::AbstractString) pipe(src, stderr=dest, append=true) +@deprecate (|>)(src::AbstractCmd, dest::AbstractCmd) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::AbstractCmd) pipeline(src, stderr=dest) +@deprecate (|>)(src::Redirectable, dest::AbstractCmd) pipeline(src, dest) +@deprecate (|>)(src::AbstractCmd, dest::Redirectable) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::Redirectable) pipeline(src, stderr=dest) +@deprecate (|>)(src::AbstractCmd, dest::AbstractString) pipeline(src, dest) +@deprecate (|>)(src::AbstractString, dest::AbstractCmd) pipeline(src, dest) +@deprecate (.>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stderr=dest) +@deprecate (>>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stdout=dest, append=true) +@deprecate (.>>)(src::AbstractCmd, dest::AbstractString) pipeline(src, stderr=dest, append=true) +@deprecate pipe pipeline # 10314 @deprecate filter!(r::Regex, d::Dict) filter!((k,v)->ismatch(r,k), d) diff --git a/base/docs/helpdb.jl b/base/docs/helpdb.jl index 609a8ab5064ef..a16be90d9b31c 100644 --- a/base/docs/helpdb.jl +++ b/base/docs/helpdb.jl @@ -3548,9 +3548,9 @@ doc""" Connect to the host ``host`` on port ``port`` :: - connect(path) -> Pipe + connect(path) -> PipeEndpoint -Connect to the Named Pipe/Domain Socket at ``path`` +Connect to the Named Pipe / Domain Socket at ``path`` :: connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream) @@ -12163,7 +12163,7 @@ To listen on all interfaces pass ``IPv4(0)`` or ``IPv6(0)`` as appropriate. :: listen(path) -> PipeServer -Listens on/Creates a Named Pipe/Domain Socket +Create and listen on a Named Pipe / Domain Socket ``` """ listen diff --git a/base/exports.jl b/base/exports.jl index cf3f19fb1568d..b0cfb2ef6ec8a 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1160,7 +1160,8 @@ export nb_available, ntoh, open, - pipe, + pipeline, + PipeEndpoint, PipeBuffer, poll_fd, poll_file, diff --git a/base/interactiveutil.jl b/base/interactiveutil.jl index 1d4c8c0f111c4..41b06768ad22a 100644 --- a/base/interactiveutil.jl +++ b/base/interactiveutil.jl @@ -87,7 +87,7 @@ end global _clipboardcmd _clipboardcmd !== nothing && return _clipboardcmd for cmd in (:xclip, :xsel) - success(pipe(`which $cmd`, DevNull)) && return _clipboardcmd = cmd + success(pipeline(`which $cmd`, DevNull)) && return _clipboardcmd = cmd end error("no clipboard command found, please install xsel or xclip") end @@ -165,7 +165,7 @@ function versioninfo(io::IO=STDOUT, verbose::Bool=false) println(io, " WORD_SIZE: ", Sys.WORD_SIZE) if verbose lsb = "" - @linux_only try lsb = readchomp(pipe(`lsb_release -ds`, stderr=DevNull)) end + @linux_only try lsb = readchomp(pipeline(`lsb_release -ds`, stderr=DevNull)) end @windows_only try lsb = strip(readall(`$(ENV["COMSPEC"]) /c ver`)) end if lsb != "" println(io, " ", lsb) @@ -343,7 +343,7 @@ downloadcmd = nothing global downloadcmd if downloadcmd === nothing for checkcmd in (:curl, :wget, :fetch) - if success(pipe(`which $checkcmd`, DevNull)) + if success(pipeline(`which $checkcmd`, DevNull)) downloadcmd = checkcmd break end diff --git a/base/pkg/entry.jl b/base/pkg/entry.jl index 036f7d24d95bb..6b17f0a28db92 100644 --- a/base/pkg/entry.jl +++ b/base/pkg/entry.jl @@ -55,7 +55,7 @@ function add(pkg::AbstractString, vers::VersionSet) outdated = :yes else try - run(pipe(Git.cmd(`fetch -q --all`, dir="METADATA"),stdout=DevNull,stderr=DevNull)) + run(pipeline(Git.cmd(`fetch -q --all`, dir="METADATA"),stdout=DevNull,stderr=DevNull)) outdated = Git.success(`diff --quiet origin/$branch`, dir="METADATA") ? (:no) : (:yes) end diff --git a/base/pkg/generate.jl b/base/pkg/generate.jl index 738fb3af1c974..8de5321149295 100644 --- a/base/pkg/generate.jl +++ b/base/pkg/generate.jl @@ -11,7 +11,7 @@ github_user() = readchomp(ignorestatus(`git config --global --get github.user`)) function git_contributors(dir::AbstractString, n::Int=typemax(Int)) contrib = Dict() tty = @windows? "CON:" : "/dev/tty" - for line in eachline(pipe(tty, Git.cmd(`shortlog -nes`, dir=dir))) + for line in eachline(pipeline(tty, Git.cmd(`shortlog -nes`, dir=dir))) m = match(r"\s*(\d+)\s+(.+?)\s+\<(.+?)\>\s*$", line) m === nothing && continue commits, name, email = m.captures diff --git a/base/pkg/git.jl b/base/pkg/git.jl index 5959f674dffde..32e492f074dc1 100644 --- a/base/pkg/git.jl +++ b/base/pkg/git.jl @@ -21,7 +21,7 @@ function git(d) end cmd(args::Cmd; dir="") = `$(git(dir)) $args` -run(args::Cmd; dir="", out=STDOUT) = Base.run(pipe(cmd(args,dir=dir), out)) +run(args::Cmd; dir="", out=STDOUT) = Base.run(pipeline(cmd(args,dir=dir), out)) readall(args::Cmd; dir="") = Base.readall(cmd(args,dir=dir)) readchomp(args::Cmd; dir="") = Base.readchomp(cmd(args,dir=dir)) diff --git a/base/process.jl b/base/process.jl index 2034a3c5971e7..4bdd2635c78e8 100644 --- a/base/process.jl +++ b/base/process.jl @@ -50,7 +50,7 @@ function show(io::IO, cmd::Cmd) end function show(io::IO, cmds::Union{OrCmds,ErrOrCmds}) - print(io, "pipe(") + print(io, "pipeline(") show(io, cmds.a) print(io, ", ") print(io, isa(cmds, ErrOrCmds) ? "stderr=" : "stdout=") @@ -100,7 +100,7 @@ type CmdRedirect <: AbstractCmd end function show(io::IO, cr::CmdRedirect) - print(io, "pipe(") + print(io, "pipeline(") show(io, cr.cmd) print(io, ", ") if cr.stream_no == STDOUT_NO @@ -149,7 +149,7 @@ redir_err(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirec redir_out_append(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirect(dest, true), STDOUT_NO) redir_err_append(src::AbstractCmd, dest::AbstractString) = CmdRedirect(src, FileRedirect(dest, true), STDERR_NO) -function pipe(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, append::Bool=false) +function pipeline(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, append::Bool=false) if append && stdout === nothing && stderr === nothing error("append set to true, but no output redirections specified") end @@ -165,10 +165,10 @@ function pipe(cmd::AbstractCmd; stdin=nothing, stdout=nothing, stderr=nothing, a return cmd end -pipe(cmd::AbstractCmd, dest) = pipe(cmd, stdout=dest) -pipe(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipe(cmd, stdin=src) +pipeline(cmd::AbstractCmd, dest) = pipeline(cmd, stdout=dest) +pipeline(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipeline(cmd, stdin=src) -pipe(a, b, c, d...) = pipe(pipe(a,b), c, d...) +pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...) typealias RawOrBoxedHandle Union{UVHandle,AsyncStream,Redirectable,IOStream} typealias StdIOSet NTuple{3,RawOrBoxedHandle} @@ -309,9 +309,11 @@ macro setup_stdio() quote close_in,close_out,close_err = false,false,false in,out,err = stdios - if isa(stdios[1], Pipe) + if isa(stdios[1], PipeEndpoint) if stdios[1].handle == C_NULL - error("pipes passed to spawn must be initialized") + in = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(in,false,stdios[1],true) + close_in = true end elseif isa(stdios[1], FileRedirect) in = FS.open(stdios[1].filename, JL_O_RDONLY) @@ -319,9 +321,11 @@ macro setup_stdio() elseif isa(stdios[1], IOStream) in = FS.File(RawFD(fd(stdios[1]))) end - if isa(stdios[2], Pipe) + if isa(stdios[2], PipeEndpoint) if stdios[2].handle == C_NULL - error("pipes passed to spawn must be initialized") + out = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(stdios[2],true,out,false) + close_out = true end elseif isa(stdios[2], FileRedirect) out = FS.open(stdios[2].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[2].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) @@ -329,9 +333,11 @@ macro setup_stdio() elseif isa(stdios[2], IOStream) out = FS.File(RawFD(fd(stdios[2]))) end - if isa(stdios[3], Pipe) + if isa(stdios[3], PipeEndpoint) if stdios[3].handle == C_NULL - error("pipes passed to spawn must be initialized") + err = box(Ptr{Void},Intrinsics.jl_alloca(_sizeof_uv_named_pipe)) + link_pipe(stdios[3],true,err,false) + close_err = true end elseif isa(stdios[3], FileRedirect) err = FS.open(stdios[3].filename, JL_O_WRONLY | JL_O_CREAT | (stdios[3].append?JL_O_APPEND:JL_O_TRUNC), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) @@ -345,9 +351,9 @@ end macro cleanup_stdio() esc( quote - close_in && close(in) - close_out && close(out) - close_err && close(err) + close_in && (isa(in,Ptr) ? close_pipe_sync(in) : close(in)) + close_out && (isa(out,Ptr) ? close_pipe_sync(out) : close(out)) + close_err && (isa(err,Ptr) ? close_pipe_sync(err) : close(err)) end) end @@ -405,8 +411,8 @@ spawn(cmds::AbstractCmd, args...) = spawn(false, cmds, spawn_opts_swallow(args.. macro tmp_rpipe(pipe, tmppipe, code, args...) esc(quote - $pipe = Pipe(C_NULL) - $tmppipe = Pipe(C_NULL) + $pipe = PipeEndpoint() + $tmppipe = PipeEndpoint() link_pipe($pipe, true, $tmppipe, false) r = begin $code @@ -418,8 +424,8 @@ end macro tmp_wpipe(tmppipe, pipe, code) esc(quote - $pipe = Pipe(C_NULL) - $tmppipe = Pipe(C_NULL) + $pipe = PipeEndpoint() + $tmppipe = PipeEndpoint() link_pipe($tmppipe, false, $pipe, true) r = begin $code @@ -438,7 +444,7 @@ function eachline(cmd::AbstractCmd, stdin) end eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) -# return a (Pipe,Process) pair to write/read to/from the pipeline +# return a (PipeEndpoint,Process) pair to write/read to/from the pipeline function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull) if mode == "r" processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR)) diff --git a/base/random.jl b/base/random.jl index da6ffff361da9..397aebb600999 100644 --- a/base/random.jl +++ b/base/random.jl @@ -142,7 +142,7 @@ function make_seed() seed = reinterpret(UInt64, time()) seed = hash(seed, UInt64(getpid())) try - seed = hash(seed, parse(UInt64, readall(pipe(`ifconfig`, `sha1sum`))[1:40], 16)) + seed = hash(seed, parse(UInt64, readall(pipeline(`ifconfig`, `sha1sum`))[1:40], 16)) end return make_seed(seed) end diff --git a/base/socket.jl b/base/socket.jl index 99ae4eb4b78cb..5b8d1fb866781 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -343,7 +343,12 @@ _jl_sockaddr_set_port(ptr::Ptr{Void},port::UInt16) = ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},UInt16),ptr,port) accept(server::TCPServer) = accept(server, TCPSocket()) -accept(server::PipeServer) = accept(server, Pipe()) + +# Libuv will internally reset the readable and writable flags on +# this pipe after it has successfully accepted the connection, to +# remember that before that this is an invalid pipe +accept(server::PipeServer) = accept(server, init_pipe!(PipeEndpoint(); + readable=false, writable=false, julia_only=true)) ## @@ -388,7 +393,7 @@ function UDPSocket() this end -function uvfinalize(uv::Union{TTY,Pipe,PipeServer,TCPServer,TCPSocket,UDPSocket}) +function uvfinalize(uv::Union{TTY,PipeEndpoint,PipeServer,TCPServer,TCPSocket,UDPSocket}) if (uv.status != StatusUninit && uv.status != StatusInit) close(uv) end diff --git a/base/stream.jl b/base/stream.jl index fe74e456118b4..786df861b3eda 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -102,7 +102,7 @@ uv_req_data(handle) = ccall(:jl_uv_req_data,Ptr{Void},(Ptr{Void},),handle) uv_req_set_data(req,data) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Any),req,data) uv_req_set_data(req,data::Ptr{Void}) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Ptr{Void}),req,data) -type Pipe <: AsyncStream +type PipeEndpoint <: AsyncStream handle::Ptr{Void} status::Int buffer::IOBuffer @@ -117,7 +117,7 @@ type Pipe <: AsyncStream lock::ReentrantLock throttle::Int - Pipe(handle) = new( + PipeEndpoint(handle::Ptr{Void} = C_NULL) = new( handle, StatusUninit, PipeBuffer(), @@ -128,18 +128,6 @@ type Pipe <: AsyncStream nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) end -function Pipe() - handle = Libc.malloc(_sizeof_uv_named_pipe) - try - ret = Pipe(handle) - associate_julia_struct(ret.handle,ret) - finalizer(ret,uvfinalize) - return init_pipe!(ret;readable=true) - catch - Libc.free(handle) - rethrow() - end -end type PipeServer <: UVServer handle::Ptr{Void} @@ -155,10 +143,11 @@ type PipeServer <: UVServer false,Condition()) end -function init_pipe!(pipe::Union{Pipe,PipeServer};readable::Bool=false,writable=false,julia_only=true) +function init_pipe!(pipe::Union{PipeEndpoint,PipeServer};readable::Bool=false,writable=false,julia_only=true) if pipe.handle == C_NULL - error("failed to initialize pipe") - elseif pipe.status != StatusUninit + malloc_julia_pipe!(pipe) + end + if pipe.status != StatusUninit error("pipe is already initialized") end uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), pipe.handle, writable,readable,julia_only)) @@ -179,7 +168,7 @@ function PipeServer() end end -show(io::IO,stream::Pipe) = print(io,"Pipe(",uv_status_string(stream),", ", +show(io::IO,stream::PipeEndpoint) = print(io,"PipeEndpoint(",uv_status_string(stream),", ", nb_available(stream.buffer)," bytes waiting)") show(io::IO,stream::PipeServer) = print(io,"PipeServer(",uv_status_string(stream),")") @@ -226,9 +215,9 @@ end # note that uv_is_readable/writable work for any subtype of # uv_stream_t, including uv_tty_t and uv_pipe_t -isreadable(io::Union{Pipe,TTY}) = +isreadable(io::Union{PipeEndpoint,TTY}) = ccall(:uv_is_readable, Cint, (Ptr{Void},), io.handle)!=0 -iswritable(io::Union{Pipe,TTY}) = +iswritable(io::Union{PipeEndpoint,TTY}) = ccall(:uv_is_writable, Cint, (Ptr{Void},), io.handle)!=0 nb_available(stream::AsyncStream) = nb_available(stream.buffer) @@ -271,7 +260,7 @@ function init_stdio(handle) elseif t == UV_TCP ret = TCPSocket(handle) elseif t == UV_NAMED_PIPE - ret = Pipe(handle) + ret = PipeEndpoint(handle) else throw(ArgumentError("invalid stdio type: $t")) end @@ -652,7 +641,7 @@ function process_events(block::Bool) end ## pipe functions ## -function malloc_julia_pipe(x) +function malloc_julia_pipe!(x) x.handle = Libc.malloc(_sizeof_uv_named_pipe) associate_julia_struct(x.handle,x) finalizer(x,uvfinalize) @@ -674,30 +663,30 @@ function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Ptr{ _link_pipe(read_end,write_end) end -function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool) +function link_pipe(read_end::PipeEndpoint,readable_julia_only::Bool,write_end::Ptr{Void},writable_julia_only::Bool) if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), write_end, 1, 0, writable_julia_only)) _link_pipe(read_end.handle,write_end) read_end.status = StatusOpen end -function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) +function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::PipeEndpoint,writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) _link_pipe(read_end,write_end.handle) write_end.status = StatusOpen end -function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Pipe,writable_julia_only::Bool) +function link_pipe(read_end::PipeEndpoint,readable_julia_only::Bool,write_end::PipeEndpoint,writable_julia_only::Bool) if write_end.handle == C_NULL - malloc_julia_pipe(write_end) + malloc_julia_pipe!(write_end) end if read_end.handle == C_NULL - malloc_julia_pipe(read_end) + malloc_julia_pipe!(read_end) end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) init_pipe!(write_end; readable = false, writable = true, julia_only = writable_julia_only) @@ -706,7 +695,7 @@ function link_pipe(read_end::Pipe,readable_julia_only::Bool,write_end::Pipe,writ read_end.status = StatusOpen nothing end -close_pipe_sync(p::Pipe) = (ccall(:uv_pipe_close_sync,Void,(Ptr{Void},),p.handle); p.status = StatusClosed) +close_pipe_sync(p::PipeEndpoint) = (ccall(:uv_pipe_close_sync,Void,(Ptr{Void},),p.handle); p.status = StatusClosed) close_pipe_sync(handle::UVHandle) = ccall(:uv_pipe_close_sync,Void,(UVHandle,),handle) function close(stream::Union{AsyncStream,UVServer}) @@ -832,11 +821,11 @@ function readuntil(this::AsyncStream,c::UInt8) readuntil(buf,c) end -#function finish_read(pipe::Pipe) +#function finish_read(pipe::PipeEndpoint) # close(pipe) #handles to UV and ios will be invalid after this point #end # -#function finish_read(state::(Pipe,ByteString)) +#function finish_read(state::(PipeEndpoint,ByteString)) # finish_read(state...) #end @@ -949,7 +938,7 @@ show(io::IO, e::UVError) = print(io, e.prefix*": "*struverror(e)*" ("*uverrornam ## server functions ## -function accept_nonblock(server::PipeServer,client::Pipe) +function accept_nonblock(server::PipeServer,client::PipeEndpoint) if client.status != StatusInit error(client.status == StatusUninit ? "client is not initialized" : "client is already in use or has been closed") @@ -961,7 +950,7 @@ function accept_nonblock(server::PipeServer,client::Pipe) err end function accept_nonblock(server::PipeServer) - client = Pipe() + client = init_pipe!(PipeEndpoint(); readable=true, writable=true, julia_only=true) uv_error("accept", accept_nonblock(server,client) != 0) client end @@ -1016,7 +1005,7 @@ function listen(path::AbstractString) sock end -function connect!(sock::Pipe, path::AbstractString) +function connect!(sock::PipeEndpoint, path::AbstractString) @assert sock.status == StatusInit req = Libc.malloc(_sizeof_uv_connect) uv_req_set_data(req,C_NULL) @@ -1031,7 +1020,9 @@ function connect(sock::AsyncStream, args...) sock end -connect(path::AbstractString) = connect(Pipe(),path) +# Libuv will internally reset read/writability, which is uses to +# mark that this is an invalid pipe. +connect(path::AbstractString) = connect(init_pipe!(PipeEndpoint(); readable=false, writable=false, julia_only=true),path) _fd(x::IOStream) = RawFD(fd(x)) @unix_only _fd(x::AsyncStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle)) @@ -1057,7 +1048,7 @@ for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,tru handle end function ($f)() - read,write = (Pipe(C_NULL), Pipe(C_NULL)) + read,write = (PipeEndpoint(), PipeEndpoint()) link_pipe(read,$(writable),write,$(!writable)) ($f)($(writable? :write : :read)) (read,write) diff --git a/doc/stdlib/io-network.rst b/doc/stdlib/io-network.rst index 750c7f7800497..7d1f5e32fcecf 100644 --- a/doc/stdlib/io-network.rst +++ b/doc/stdlib/io-network.rst @@ -440,8 +440,7 @@ General I/O :: redirect_stdout(stream) - Replace STDOUT by stream for all C and julia level output to STDOUT. Note that ``stream`` must be a TTY, a Pipe or a - TcpSocket. + Replace STDOUT by stream for all C and julia level output to STDOUT. Note that ``stream`` must be a TTY, a PipeEndpoint, or a TcpSocket. .. function:: redirect_stdout(stream) @@ -456,8 +455,7 @@ General I/O :: redirect_stdout(stream) - Replace STDOUT by stream for all C and julia level output to STDOUT. Note that ``stream`` must be a TTY, a Pipe or a - TcpSocket. + Replace STDOUT by stream for all C and julia level output to STDOUT. Note that ``stream`` must be a TTY, a PipeEndpoint, or a TcpSocket. .. function:: redirect_stderr([stream]) @@ -1264,9 +1262,9 @@ Network I/O Connect to the host ``host`` on port ``port`` :: - connect(path) -> Pipe + connect(path) -> PipeEndpoint - Connect to the Named Pipe/Domain Socket at ``path`` + Connect to the Named Pipe / Domain Socket at ``path`` :: connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream) @@ -1277,7 +1275,7 @@ Network I/O must ensure that messages are delivered and received completely and in order. ``Base.connect(manager::ClusterManager.....)`` sets up TCP/IP socket connections in-between workers. -.. function:: connect(path) -> Pipe +.. function:: connect(path) -> PipeEndpoint :: connect([host],port) -> TcpSocket @@ -1285,9 +1283,9 @@ Network I/O Connect to the host ``host`` on port ``port`` :: - connect(path) -> Pipe + connect(path) -> PipeEndpoint - Connect to the Named Pipe/Domain Socket at ``path`` + Connect to the Named Pipe / Domain Socket at ``path`` :: connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream) @@ -1306,7 +1304,7 @@ Network I/O :: listen(path) -> PipeServer - Listens on/Creates a Named Pipe/Domain Socket + Create and listens on a Named Pipe / Domain Socket .. function:: listen(path) -> PipeServer @@ -1319,7 +1317,7 @@ Network I/O :: listen(path) -> PipeServer - Listens on/Creates a Named Pipe/Domain Socket + Create and listens on a Named Pipe / Domain Socket .. function:: getaddrinfo(host) diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 6140b97b3b0ed..1a2849d267cae 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -755,9 +755,9 @@ Cluster Manager Interface Connect to the host ``host`` on port ``port`` :: - connect(path) -> Pipe + connect(path) -> PipeEndpoint - Connect to the Named Pipe/Domain Socket at ``path`` + Connect to the Named Pipe / Domain Socket at ``path`` :: connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream) diff --git a/test/repl.jl b/test/repl.jl index 307ad3445310a..51968f0e07d27 100644 --- a/test/repl.jl +++ b/test/repl.jl @@ -9,9 +9,9 @@ function fake_repl() # Use pipes so we can easily do blocking reads # In the future if we want we can add a test that the right object # gets displayed by intercepting the display - stdin_read,stdin_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) - stdout_read,stdout_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) - stderr_read,stderr_write = (Base.Pipe(C_NULL), Base.Pipe(C_NULL)) + stdin_read,stdin_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) + stdout_read,stdout_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) + stderr_read,stderr_write = (Base.PipeEndpoint(), Base.PipeEndpoint()) Base.link_pipe(stdin_read,true,stdin_write,true) Base.link_pipe(stdout_read,true,stdout_write,true) Base.link_pipe(stderr_read,true,stderr_write,true) diff --git a/test/spawn.jl b/test/spawn.jl index 313d03ed10012..82b37d2451df4 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -19,23 +19,23 @@ yes = `perl -le 'while (1) {print STDOUT "y"}'` #### Examples used in the manual #### @test readall(`echo hello | sort`) == "hello | sort\n" -@test readall(pipe(`echo hello`, `sort`)) == "hello\n" -@test length(spawn(pipe(`echo hello`, `sort`)).processes) == 2 +@test readall(pipeline(`echo hello`, `sort`)) == "hello\n" +@test length(spawn(pipeline(`echo hello`, `sort`)).processes) == 2 out = readall(`echo hello` & `echo world`) @test search(out,"world") != (0,0) @test search(out,"hello") != (0,0) -@test readall(pipe(`echo hello` & `echo world`, `sort`)) == "hello\nworld\n" +@test readall(pipeline(`echo hello` & `echo world`, `sort`)) == "hello\nworld\n" @test (run(`printf " \033[34m[stdio passthrough ok]\033[0m\n"`); true) # Test for SIGPIPE being treated as normal termination (throws an error if broken) -@unix_only @test (run(pipe(yes,`head`,DevNull)); true) +@unix_only @test (run(pipeline(yes,`head`,DevNull)); true) begin a = Base.Condition() @schedule begin - p = spawn(pipe(yes,DevNull)) + p = spawn(pipeline(yes,DevNull)) Base.notify(a,p) @test !success(p) end @@ -51,30 +51,30 @@ end if false prefixer(prefix, sleep) = `perl -nle '$|=1; print "'$prefix' ", $_; sleep '$sleep';'` - @test success(pipe(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, + @test success(pipeline(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, prefixer("A",2) & prefixer("B",2))) - @test success(pipe(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, + @test success(pipeline(`perl -le '$|=1; for(0..2){ print; sleep 1 }'`, prefixer("X",3) & prefixer("Y",3) & prefixer("Z",3), prefixer("A",2) & prefixer("B",2))) end @test success(`true`) @test !success(`false`) -@test success(pipe(`true`, `true`)) +@test success(pipeline(`true`, `true`)) if false @test success(ignorestatus(`false`)) - @test success(pipe(ignorestatus(`false`), `true`)) - @test !success(pipe(ignorestatus(`false`), `false`)) + @test success(pipeline(ignorestatus(`false`), `true`)) + @test !success(pipeline(ignorestatus(`false`), `false`)) @test !success(ignorestatus(`false`) & `false`) - @test success(ignorestatus(pipe(`false`, `false`))) + @test success(ignorestatus(pipeline(`false`, `false`))) @test success(ignorestatus(`false` & `false`)) end # STDIN Redirection file = tempname() -run(pipe(`echo hello world`, file)) -@test readall(pipe(file, `cat`)) == "hello world\n" -@test open(readall, pipe(file, `cat`), "r") == "hello world\n" +run(pipeline(`echo hello world`, file)) +@test readall(pipeline(file, `cat`)) == "hello world\n" +@test open(readall, pipeline(file, `cat`), "r") == "hello world\n" rm(file) # Stream Redirection @@ -84,12 +84,12 @@ rm(file) port, server = listenany(2326) put!(r,port) client = accept(server) - @test readall(pipe(client, `cat`)) == "hello world\n" + @test readall(pipeline(client, `cat`)) == "hello world\n" close(server) end @async begin sock = connect(fetch(r)) - run(pipe(`echo hello world`, sock)) + run(pipeline(`echo hello world`, sock)) close(sock) end end @@ -118,7 +118,7 @@ str2 = readall(stdout) # This test hangs if the end of run walk across uv streams calls shutdown on a stream that is shutting down. file = tempname() -open(pipe(`cat -`, file), "w") do io +open(pipeline(`cat -`, file), "w") do io write(io, str) end rm(file) @@ -169,18 +169,22 @@ unmark(sock) close(sock) # issue #4535 -exename = joinpath(JULIA_HOME, Base.julia_exename()) +exename = Base.julia_cmd() if valgrind_off # If --trace-children=yes is passed to valgrind, we will get a # valgrind banner here, not "Hello World\n". - @test readall(pipe(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n" + @test readall(pipeline(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr=`cat`)) == "Hello World\n" + out = PipeEndpoint() + proc = spawn(pipeline(`$exename -f -e 'println(STDERR,"Hello World")'`, stderr = out)) + @test readall(out) == "Hello World\n" + @test success(proc) end # issue #6310 -@test readall(pipe(`echo "2+2"`, `$exename -f`)) == "4\n" +@test readall(pipeline(`echo "2+2"`, `$exename -f`)) == "4\n" # issue #5904 -@test run(pipe(ignorestatus(`false`), `true`)) === nothing +@test run(pipeline(ignorestatus(`false`), `true`)) === nothing # issue #6010 @@ -244,16 +248,15 @@ let bad = "bad\0name" end # issue #8529 -@test_throws ErrorException run(pipe(Base.Pipe(C_NULL), `cat`)) let fname = tempname() open(fname, "w") do f println(f, "test") end code = """ for line in eachline(STDIN) - run(pipe(`echo asdf`,`cat`)) + run(pipeline(`echo asdf`,`cat`)) end """ - @test success(pipe(`cat $fname`, `$exename -e $code`)) + @test success(pipeline(`cat $fname`, `$exename -e $code`)) rm(fname) end diff --git a/test/strings/io.jl b/test/strings/io.jl index 1677094d93f9e..dc315fb3bb62e 100644 --- a/test/strings/io.jl +++ b/test/strings/io.jl @@ -155,7 +155,7 @@ else for encoding in ["UTF-32LE", "UTF-16BE", "UTF-16LE", "UTF-8"] output_path = joinpath(unicodedir, encoding*".unicode") f = Base.FS.open(output_path,Base.JL_O_WRONLY|Base.JL_O_CREAT,Base.S_IRUSR | Base.S_IWUSR | Base.S_IRGRP | Base.S_IROTH) - run(pipe(`iconv -f $primary_encoding -t $encoding $primary_path`, f)) + run(pipeline(`iconv -f $primary_encoding -t $encoding $primary_path`, f)) Base.FS.close(f) end