Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O streams need to be able to read and write simultaneously #11165

Closed
lilyball opened this issue Dec 27, 2013 · 52 comments
Closed

I/O streams need to be able to read and write simultaneously #11165

lilyball opened this issue Dec 27, 2013 · 52 comments
Labels
P-medium Medium priority
Milestone

Comments

@lilyball
Copy link
Contributor

TcpStream is conceptually two distinct parts: a reader and a writer. In theory you should be able to write data while in the middle of being blocked trying to read. But a TcpStream cannot be shared between tasks, so there is no way to write code to do this.

To that end, TcpStream should have a split() method that returns (TcpWriter, TcpReader) which each implement one half of the TcpStream. This would allow each half to be sent to a separate task.

One practical reason for this is to be able to wrap a TcpStream in a pair of (Port<~[u8], Chan<~[u8]>)s. This requires a split stream and two tasks.

@lilyball
Copy link
Contributor Author

cc @alexcrichton

@alexcrichton
Copy link
Member

I am still not entirely convinced that this is the correct way to go about this. I would rather explore other pathways first such as an I/O select() instead of having a split.

The good thing about select() is that it's generic while with split() it has to be implemented for all readers/writers seperately (different native and green implementations as well).

This is definitely a problem that needs to be fixed before 1.0 (so I'm nominating this), but I want to approach this carefully and avoid just blindly implementing a solution which won't extend very well into the future.

@lilyball
Copy link
Contributor Author

Having select() work with I/O in addition to ports/chans would be quite nice. I suggested split() because that seems simpler.

@derekchiang
Copy link
Contributor

@alexcrichton are you working on a solution? I would like to help to resolve this issue quickly as my project is blocked by this :)

@brson
Copy link
Contributor

brson commented Jan 9, 2014

Updated title to reflect that the solution isn't known.

@pnkfelix
Copy link
Member

pnkfelix commented Jan 9, 2014

Assigning to P-high.

@Matthias247
Copy link
Contributor

I stumbled across the same issue today, while I was starting to implement a websocket library for Rust.
My design idea was to have a websocket task, which owns the TcpStream. It receives and sends through the socket. And it receives data-to-send through a channel from the user and pushes received data in another channel towards the user. No I discovered that as soon as I would start to receive() anything in the task I would never be able to send (unless the remote side sends sth., because I'm blocked).
Also as long as I'm waiting for data to send by port.recv() I could not receive anything from the socket.

I guess this state makes it impossible to implement any protocols that do not follow a request -> response -> request -> response pattern.
It also doesn't allow for a graceful shutdown of a connection, because as long as you are in a blocking receive you can't close the connection.

I basically see the same possible solutions as you:

  • Allow to split TcpStream in TcpWriteStream and TcpReadStream. That would propably equal that what most simpler programs in other languages would do. I also don't like it that much because it would require a blocking background task in addition to the task that writes and you somehow have to care for synchronization between the two parts.
  • Update select() into a version that works on ports as well as all kind of I/O. would absolutely favour this and have some more ideas about that that I will write down later.

@derekchiang
Copy link
Contributor

For those of you looking for a work-around in the mean time, the following seems to work for me:

Say you have a TcpStream called tcp, then you do:

use std::sync::arc::UnsafeArc;
let (tcp_send_arc, tcp_recv_arc) = UnsafeArc::new2(tcp);

Then, you can use tcp_send_arc and tcp_recv_arc in different tasks. For example:

do spawn {
    unsafe {
        let tcp_recv_ptr = tcp_recv_arc.get();
        loop {
            // Then you could use the stream like you would, using (*tcp_recv_ptr)
            let bytes =  (*tcp_recv_ptr).read_to_end();
            // ...
        }
    }
}

The thing here is that the underlying libc implementation of sockets uses different buffers for read and write, so it's safe to read in one thread and write in another.

@ehsanul
Copy link
Contributor

ehsanul commented Jan 13, 2014

Having select work with OS objects (file descriptors and handlers) in addition to ports/channels has come up in the context of timers as well: #11294 (comment)

Is this likely to happen? And we are definitely all talking about this select, right?

@Matthias247
Copy link
Contributor

The problem with your solution is that you can't close the socket and your connection will persist unless the remote will close the collection.
This is because the TcpStream does not expose a close() method and only dropping the writing side reference would not cause a close.
I guess if splitting is implemented the writing side should expose a close() method which would cause the read end to return. I don't know if this would work with libuv bindings or if there is a restriction that you can't own a handle to a connection after you have closed it.

I started yesterday evening a little with experimenting how a select() for sockets and Channels could look like. My first impressions are that a good API design is far from easy.
I basically started with native IO (1:1) because that's easier for me to understand than all the implications of the scheduler. My idea there was to use epoll (or comparable things like IOCP on Win and kqueue on OSX) as the backend for a Selector, so basically the same thing that libuv does. Integrating this with channels should not be that hard. If some new data arrives at the port simply lookup if that port is registered at a Selector and if yes than wake that up. Timers can also be easily integrated in the form of providing a timeout to the select.

But then there's the question about the API design for sockets and other IO. Should the user be able to register for IO readiness and select would unblock when R/W is possible or should it unblock on IO completion. So basically the Unix model vs. Windows model thing. Both have their pros and cons.
Then TcpStream (as an example) must expose functions that register at the selector. My first thoughts were going in the direction of having sth. like async_read(self, selector: &Selector, buffer: ~[u8], offset: uint, length: uint) which would register the IOStream at the selector and starts an asynchronous transfer. The stream should not be available for any more synchronous or asynchronous reads as long as this operation is pending - but still for writes. That could be either by simply setting a flag which would case an IoError on following writes or by consuming TcpStream and returning an object without write methods.

Selector.select() would somehow extract finished operations. The question is what this returns and how to propagate the finish information to the original caller / socket owner. Classic APIs like boost asio or libuv do this by allowing to store a callback together with the beginning of the IO which can be called (or will be automatically called) when the operation finishes. I'm not sure if this works good with Rust, because it's easy to self-destruct objects from inside such callbacks and there goes your safety. We could also use a *Trait to define which object has to signaled when the operation is completed, but it is essentially the same as a callback.

Another idea would be to return IDs when the async operation is started and select() returns the ID of the finished operation. This would be used to call sth. like finish_async_read(id) on the socket. Would however be quite complicated to associate the ID with the corresponding object that sent the request and owns the IOObject in bigger codebases. With a "user data" field in the ID the user can still store a pointer to a callback if he wants to go the unsafe route.

What is obvious for me is that when the IoObject get's destructed then it should be automatically deregistered from the Selector. Maybe also when it get's moved, but I don't know if that's possible.

@lilyball
Copy link
Contributor Author

@derekchiang I'm not convinced that approach is safe. When using libgreen, TcpStream is presumably implemented on top of libuv, and I have no idea whether the libuv data is safe to be accessed from multiple threads.

@Matthias247
Copy link
Contributor

@kballard When using libgreen the scheduler that should guarantee that all calls to the libuv streams are from the done from the (single) native thread which owns the associated libuv eventlooop. Therefore I think that the threading is no problem. However the scheduler and libuv wrapper implementations are propably not designed to support parallel requests on the same libuv streams, so it will still be pretty much undefined what happens.

@sw17ch
Copy link
Contributor

sw17ch commented Jan 16, 2014

Just to clarify something, the proposed select would work on any Port<T> as well as Unix-like file handles? This implies the following pseudo-usage pattern:

// Do things up here to setup processor tasks.

let request_processor_ports = [...];
let socket = bind_listen_and_accept();

while true {
    let selected = select(socket, request_processor_ports);

    if (selected == socket) {
        available_request_processor.process(socket.read());
    } else {
        let msg = selected.read();
        socket.write(msg);
    }
}

Am I misunderstanding the proposal for select? Sorry if I botched the syntax a bit.

Edit: as a followup, would there be a way to do this entirely without tasks?

@Matthias247
Copy link
Contributor

Basically you won't need extra tasks. You would select directly on the TcpStream somehow.
Your example also describes an API that works upon I/O readiness (like select or (e)poll on low level).
I'm currently leaning more in the direction of I/O completion, because

  • I/O readiness does not work good in Windows
  • I think I/O completion does also work better for M:N scheduled green threads, because you won't have to yield 2 times into the I/O thread.

I have started working on a solution and can tell you what my current API looks like, but it's far from finished.

// Create a selector which is a new object
let selector = Selector::new().unwrap();

// First create a TcpStream object. 
// This is in my current implementation quite the same as the native I/O ip stream
// - a very thin wrapper on top of the FD. It provides the same read and write methods.
let mut nativeStream = TcpStream::connect(socketaddr).unwrap();

nativeStream.set_blocking(false); // Switch to non-blocking I/O if desired

// This is new. It "upgrades" the TcpStream into a SelectableTcpStream object by consuming it. 
// This class features additonal async read/write methods that will be performed by using the 
// associated selector.
// I like that approach quite much because if you don't want async I/O you don't need to use it.
let mut selectableStream = stream.associate_selector(&selector);

// By performing selectableStream.disassociate_selector() the original TcpStream can be restored.
// This will only work when no async I/O operation is pending

// Start an async I/O operations. 
// In contrast to the existing APIs the operation needs an owned buffer 
// in order to manipulate it in the background.
let handle = selectableStream.read_some_async(
    ByteBuffer { buffer: ~[0, ..100], offset: 0, length: 20}
);
// handle is either an IoResult<uint>, so either an IoError or a uint as a handle.

// Query the selector for finished I/O when required
let result = selector.wait();
// This blocks until one of the started async operations finishes. It also returns an IoResult<uint>
// which contains in case of success the handle of the operation that finished.

// In case the handles match the result of the async operation can be retrieved
match (handle, result) { 
    (Ok(h1),Ok(h2)) if h1 == h2 => {
        let result = stream.end_read_async();
        // This will return either an IoError or the result of the operation which consists of the ByteBuffer
        // that was passed at the start of the operation and the number of bytes that were read.
    },...
}

While an async read is in progress no other sync or async read can be started.
I played with transforming the SelectableSocket into a WriteOnlySocket then, but in the end mutating the objects in 4 different types only gives headaches.

So that's basically my current state and what I have to about 60% implemented using native I/O on Linux.

What I like is the association and disassociation of the TcpStream to the Selector.
What I'm not convinced of yet is what the read_some_async and selector.wait() operations should return.
A handle is ok in easy cases. But in advanced cases associating the handle to the actual Stream/Channel/Timer/... might be quite hard.
The C# async APIs allow to pass some userstate to the read call which can be retrieved later.
Maybe that's a possibilty, but it screams a little bit unsafe because it would be propably sth. like *void.
Another idea I had was like that:

let pending_op= selectableStream.read_some_async(
    ByteBuffer { buffer: ~[0, ..100], offset: 0, length: 20}
).unwrap();
let result = selector.wait().unwrap();

if (result.pending_op == pending_op) {
    let read_result = pending_op.end_read_async();
}

So calling the end methods directly on the handle that is returned. But returning complex handles propably leads to very complex lifetime semantics.

Some things that are also not that easy to decide and to implement are what should happen if the the user closes the socket or if get's destructed. -> Should the selector still return the aborted op or not

@alexcrichton
Copy link
Member

@Matthias247, that sounds promising! I'm not quite understanding how everything quite works right now, and I'm not sure how async reads play into all this (and how they work in general).

For me, I would start designing all this with an understanding of what primitives we actually have to deal with:

  • libgreen - uv_poll_t and uv_read_start/uv_read_stop (we can't cancel writes). Note that using uv_read_start and uv_read_stop is currently not possible because you're not guaranteed that all I/O handles are on the same event loop. This means that for libgreen the only solution I can think of is uv_poll_t which is sadly super slow on windows (according to the documentation)
  • libnative linux - epoll
  • libnative osx - kqueue (which I believe is very similar to epoll)
  • libnative - select (probably want to use epoll instead)

That's what I know of, I don't really know of what windows has to offer on this front. Do you know of any good docs I can read up about windows apis?

I very much like the idea of a Selector object, I think that any design will end up using that. I'm not entirely sure what the api would look like though. I like your idea of adding kinda arbitrary events to the object. I'm not really a fan of dealing with set_blocking or a read_async method (what does read_async do?).

I don't think that you necessarily need to consume the I/O handle but rather just take a mutable loan on it to prevent further usage. You would regain usage when you drop the select handle returned to you. Regardless, something along these lines sound like a good idea.

All in all, this sounds really promising!

@lilyball
Copy link
Contributor Author

I would really like the ability to select on both I/O and Chans simultaneously. I also need to be able to wrap TcpStream in a BufferedReader (or BufferedStream) and have it still work.

@Matthias247 Assuming your proposal does not allow for selecting on Chans as well, and it doesn't appear to, then the only alternative is to spawn an extra task in order to provide a Chan-based interface to the stream. This is less than ideal, especially when using libnative. And I need to be able to buffer the stream because I need a line-based interface and that's what BufferedReader provides.

@sw17ch
Copy link
Contributor

sw17ch commented Jan 16, 2014

@alexcrichton could you explain how select would be generic where split would not? I don't think I follow and understanding this would help me understand just a bit better a select interface over a split interface. :)

Edit: initial comment about being generic was here: #11165 (comment)

@alexcrichton
Copy link
Member

@kballard selecting over I/O and chans is pretty difficult, the only reasonable solution I know of is to convert a Chan to an OS pipe underneath and spawn a task that recvs and then writes or something like that. You're fundamentally dealing with file descriptors and "something else", and all interfaces that I know of only deal with file descriptors.

@sw17ch You'd only need to implement select once, but all I/O objects would have to opt-in to being selectable. There'd be a small amount of glue for what an I/O object needs to tell the Selector how to select on it. With split, you'd have to managed the split halves on all primitives, meaning you're reimplementing the same sort of dup/arc/refcnt code across all of them. Additionally, it's unclear what split should return. Should it return a TcpReader and a TcpWriter, a ~Reader and ~Writer, a TcpReader and a ~Writer, or should it actually return two TcpStream objects? I'm not really happy with any of the answers, but they all have their merits.

@lilyball
Copy link
Contributor Author

@alexcrichton Cocoa's NSRunLoop manages to handle all sorts of different "sources", including file descriptors, mach messages, timers, and arbitrary user-defined sources.

@alexcrichton
Copy link
Member

Something like NSRunLoop is much more specific though because you have previous knowledge about being an event loop and hence all events are synchronized. We're dealing with a much more parallel context where we have no such knowledge (all events can happen simultaneously).

@lilyball
Copy link
Contributor Author

@alexcrichton What do you mean? Multiple sources in NSRunLoop can fire simultaneously. Events can be delivered, fds become available for reading/writing, and custom CFRunLoopSources may be signaled all at the same time, and when the runloop wakes up (or returns from processing the previous callback) it has to pick among the available sources to fire the next callback. The analogous behavior in a select-style API would be to pick from among the available sources to return an index (presumably picking randomly).

@Matthias247
Copy link
Contributor

@alexcrichton
For the I/O primitives you are basically right.
The Windows equivalent for epoll and kqueue are IO completion ports (IOCP).
There are various resources on the net, but I don't know what's really good. If you want to see how it's used - you know the libuv sources :-)
IOCP has a major difference to epoll and kqueue: You don't wait for the OS telling you that a object is ready to do sth. but instead just start the operation and let it tell you when it's ready. Therefore a common API for the 2 worlds is different. And all that have done one (libuv, boost::asio) have gone for the completion based one, which I tried to mimic here in Rust -> 'read_async' starts the asynchronous operation. The main difference is that I don't automatically invoke callbacks.

If libnative has good async-capable I/O implementation that could be the basis also for the libgreen main threads instead of using uv. However that's propably a far future vision.
The main target I see currently is to get good API in one implementation so that the others can be adjusted to the same one. Regarding set_blocking: I see switching between blocking and nonblocking also as a fundamental feature for very fast I/O. In my experience the lowest latency I/O is trying to do nonblocking calls and only when they fail fallback to using the Selector. If you don't want to deal with it you don't have to and can just stay on blocking and async reads.

@kballard Of course the target is to get this also running on Channels. And on Timers. Propably not on files.
I only have it not yet implemented. Want to finish sockets first.
@alexcrichton It should be doable. At least libuv can already do it too. The basic challenge is to wakeup the Selector when the Channel is associated to one and message is delivered. As you already said, it can be done with self pipes, or on Linux with eventfd and on Win also somehow. I'm also sure that kqueue provides a sophisticated way.

@Matthias247
Copy link
Contributor

@sw17ch
I even think both approaches are possible in parallel.
In my approach I at first create a TcpStream that is a very simple (non-threadsafe and not async-capable) object, which can be freely moved between threads.
Now in my example I have gone in one direction and mutated it into a 'SelectableTcpStream' that adds the async and select capabilities.
Another approach would be to mutate from a 'TcpStream' into a a split Read and WriteStream, which could work by creating new objects which use a mutex to share the underlying TcpStream.

What I like better about the other solution is that the split TcpStream still does not solve things like receiving and listening to a timer in parallel.

@sw17ch
Copy link
Contributor

sw17ch commented Jan 16, 2014

What I like better about the other solution is that the split TcpStream still does not solve things like receiving and listening to a timer in parallel.

Ah, good point. It would be nice to have a better 'event' API in general, I suppose.

@Matthias247
Copy link
Contributor

In the meanwhile I'm one step further.
What I also didn't like that much about my last approach was the statefulness and the complexity of using the async send and receive methods. I then decided to review what others do and had a look at the Go, D and Erlang libraries (but they didn't help too much).

I then had another idea that I decided to try. It's basically a quite old (aka rusty) concept, which is basically a mixture of of what QT or AS3 networking APIs or the Windows wndproc look like.

But as code says more than 1000 words I simply start with an example that I have running (on Linux):

fn main() {
    // Create an event queue. That's the central element instead of Select
    let mut ev_queue = EventQueue::new(); 
    // Create a port/chan pair. Same as rust std API, but different in implementation
    let (port,chan): (Port<~str>, Chan<~str>) = Chan::new();
    // Upgrade the port to a selectable port. Still need another name for that
    let mut selport = SelectablePort::from_port(port, &ev_queue);

    // Create an event based timer 
    let mut main_timer = Timer::create(&ev_queue).unwrap();
    main_timer.set_interval(2000);  
    main_timer.start();

    // Start a subtask that communicates with us
    do native::task::spawn() {
        subtask(chan);
    }

    loop {
        // Wait for events to arrive
        let event = ev_queue.next_event().unwrap();         

        // Look for the origin and the type and then check what to do
        if event.originates_from(selport) {
            match event.event_type {
                event::ChannelMessageEvent => {
                    // We know that we received a message and can fetch it in a nonblocking style
                    let msg = selport.recv().unwrap();
                    println!("Message from subtask: {}", msg);
                },
                event::ChannelClosedEvent => {
                    // Oh, the channel seams dead now
                    println!("Subtask closed");
                    return;
                },
                _ => ()
            }
        }
        else if event.originates_from(main_timer) {
            // The timer send a tick event
            println!("main_timer::tick()");
        }
    }
}

fn subtask(chan: Chan<~str>) {
    // The other task also gets an event queue and a timer
    let mut ev_queue = EventQueue::new();
    let mut sub_timer = Timer::create(&ev_queue).unwrap();
    let mut iterations = 3;
    let mut stream_alive = false;

    // We will play with TCP here, so let's connect to somewhere. This is currently blocking
    let opt_ipaddr:Option<IpAddr> = FromStr::from_str("192.168.1.99");
    let socketaddr = SocketAddr {ip: opt_ipaddr.unwrap(), port: 8000};
    let mut rawstream = TcpStream::connect(socketaddr).unwrap();
    let mut stream = SelectableTcpStream::from_tcp_stream(rawstream, &ev_queue);
    stream_alive = true;

    // Start a timer
    sub_timer.set_interval(3000);   
    sub_timer.start();

    // Send a request. This is also currently blocking
    let request = ~"GET /index.html HTTP/1.1\r\n\r\n";
    stream.write(request.as_bytes());
    iterations -= 1;

    loop {
        // Fetch events and checkout what to do
        let event = ev_queue.next_event().unwrap();

        if event.originates_from(sub_timer) {
            // Send something to the mainthread.
            chan.send(~"subtimer::tick()");
            if !stream_alive {              
                if iterations > 0 {
                    iterations -= 1;
                    // Create a new stream. The old one wil be killed through RAII here
                    rawstream = TcpStream::connect(socketaddr).unwrap();
                    stream = SelectableTcpStream::from_tcp_stream(rawstream, &ev_queue);
                    stream_alive = true;
                    stream.write(request.as_bytes());
                }
                else {
                    return;                     
                }
            }
        }
        else if event.originates_from(stream) {
            match event.event_type {
                event::StreamClosedEvent => {
                    // Oops, the TCP connection was closed by the remote
                    chan.send(~"TCP connection closed");
                    stream_alive = false;
                },
                event::DataAvailableEvent(nr_bytes) => {
                    // Yay, we know that we received at least nr_bytes and can "safely" read them in a nonblocking fashion
                    let mut buffer: ~[u8] = std::vec::from_elem::<u8>(nr_bytes, 0);
                    let read_res = stream.read(buffer);
                    match read_res {
                        Err(err) => {
                            chan.send(err.desc.to_owned());
                        }
                        Ok(nr_read) => {
                            let txt = std::str::from_utf8(buffer.slice(0, nr_read));
                            chan.send(txt.to_owned());
                        }
                    }
                },
                _ => ()
            }
        }
    }

    // Stop my IO performing objects. This will also by done in RAII style if you forget it.
    sub_timer.stop();
    stream.close();
}

Ooh, and of course the glorious output:

Message from subtask: HTTP/1.1 500 Internal Server Error

Message from subtask: TCP connection closed
main_timer::tick()
Message from subtask: subtimer::tick()
Message from subtask: HTTP/1.1 500 Internal Server Error

Message from subtask: TCP connection closed
main_timer::tick()
main_timer::tick()
Message from subtask: subtimer::tick()
Message from subtask: HTTP/1.1 500 Internal Server Error

Message from subtask: TCP connection closed
main_timer::tick()
Message from subtask: subtimer::tick()
Subtask closed

@Matthias247
Copy link
Contributor

Ok, so after the code example some more info about it.

The concept

As you can see the whole asynchronous functionality is build around events which are dispatched from an event queue. The events notifiy the user that something has happened, is ready or was finished.

An event is something quite simply and lightweight:

pub struct Event
{
    event_type: EventKind, // The type of the event which occured and all necessary info
    is_valid: bool, // Internally used
    source: Rc<bool> // source of the event. Type is not relevant for the user
}

// All types of events that are known. This is what I used in the examples, but there would be much more
// Rusts enums are really a great way to describe different kinds of events and their associated data
pub enum EventKind
{
    StreamClosedEvent,
    IoErrorEvent(IoError),
    DataAvailableEvent(uint),
    TimerEvent,
    ChannelClosedEvent,
    ChannelMessageEvent,
    ...,
    SignalReceived(uint),
    DnsQueryResolved(),
    PacketReceived(uint)
}

That makes the API somewhat similar to the select/poll/..., but is at a higher level and therefore much easier to use. Qt performs quite in the same way. The difference is they send a signal/callback that there is data to read and then you can start reading it. Here you must find the mapping from the Event to the associated object yourself. But I want to add 2 more fields to the Event so that you can automatize it:

pub struct Event
{
    ...
    callback_fn: fn(ev: &Event),
    user_data: ~Any
}

Then you could build a callback style eventloop as simple as

loop {
        // Wait for events to arrive
        let event = ev_queue.next_event().unwrap(); 
        event.callback_fn(event);
}

Things that are also relevant to me:

  • An event that is not processed should not be harmful. Accessing I/O objects without waiting for the events also shouldn't be it.
  • The EventQueue should only return one event at a time. Only in this way it can be guaranteed that the event is valid in the moment it is passed to the user.

@Matthias247
Copy link
Contributor

The implementation

I currently only have a Linux 1:1 implementation, but also have some ideas of how it works in other environments.
Here the EventQueue is based on epoll. The IO objects use fd's (timerfd, eventfd, socket fd) and register at the EventQueue. When a fd is readable a callback is invoked on the IO object which produces new events.
For the port/channel I'm currently doing 2way signalization. As long as the port is freestanding (not associated to an EventQueue I'm using a condition variable to wakeup the port from the channel. If it's associated I send a message through eventfd.

The hardest thing to get right are the sockets. The APIs differ here very much. I decided to give a high level view of what happend to the user, e.g. directly if and how many bytes are readable, if the connection was closed or what error occured. Might not be the highest performant, but all OSes can provide the information in some way and it gives an easy to understand API to the user.

  • For OSX the API is very good, because kqueue immediatly delivers all the information that is required for producing an event.
  • For Linux the API is not that good, because epoll doesn't deliver an information how many bytes are readable. It event doesn't tell whether the connection was closed. Only that the fd is readable. Delivering only the information that a socket is readable and not what will happen than on all OSes sounded less appealing, so I rejected it. As an result I need an extra ioctl() to receive the number of readable bytes. I think the performance drawback depends on the applications protocol. Some applications always want to read as much data as they can. These might suffer a little bit. Others might profit because they know from the start how many bytes can be safely read and thereby safe the cost for an unnecessary read() call.
  • On Windows the situation is the most complicated, because it won't give any direct information about readable data. There are 2 workarounds for this:
    a) Start an asynchronous read for 0 bytes after the socket is connected. When that completes we know that there is data available for read. For the amount also an ioctl would be needed.
    b) Create one or more socket internal buffers, start the asynchronous read with that one and if it completes you will know how many data is there. When the application reads, it would read from that buffer (and not from the socket buffer). Advantage of 2 is higher throughput and lower latency. Disadvantage is higher memory usage (you need to preallocate memory for each connection).
    I'm in the favor of a) or propably a setting that allows b) optionally as a "buffered mode".

A question that also arises from the last one is what should happen if the application calls socket.read() even when it didn't got a notification about readable bytes. It could either block until something is readable or it could return 0. I decided to go for the last one, because:

  • It eliminates possibly unintended blocking
  • It is easier to implement on Windows, because what should you do when you have a pending 0 byte read (that will be finished later in the same thread) and now must perform another read? I currently don't know whether such reads are cancellable.

I also decided to automatically close connections after the first error, so that users don't get further spurious errors.

Writes are currently blocking because that's the easiest thing to implement. You must not care about how to hold the data-to-send alive until a background write would be finished. That's propably suboptimal for applications which send lot's of data (which would cause the socket buffer to fill and thereby block). And also suboptimal if you would use it as a base for a M:N scheduler.
Maybe an option would be to implement a nonblocking setting that still allows users to synchronously call the write but writes all data that could not be sent immediatly in the background.

  • For owned buffers the application could pass those to the socket and the socket queues these until they can be sent on following epoll calls. After they were fully sent a MessageSentEvent(~[u8]) could be produced to give the buffer back to the application for reuse. That's similar to what libuv does.
  • For referenced buffers the socket would have to copy the data in the socket into an internal buffer for sending it later.
    Basically it's an optimization thing, for the start I would think most people would be happy with blocking writes.

alexcrichton added a commit that referenced this issue Jan 23, 2014
This is part of the overall strategy I would like to take when approaching
issue #11165. The only two I/O objects that reasonably want to be "split" are
the network stream objects. Everything else can be "split" by just creating
another version.

The initial idea I had was the literally split the object into a reader and a
writer half, but that would just introduce lots of clutter with extra interfaces
that were a little unnnecssary, or it would return a ~Reader and a ~Writer which
means you couldn't access things like the remote peer name or local socket name.

The solution I found to be nicer was to just clone the stream itself. The clone
is just a clone of the handle, nothing fancy going on at the kernel level.
Conceptually I found this very easy to wrap my head around (everything else
supports clone()), and it solved the "split" problem at the same time.

The cloning support is pretty specific per platform/lib combination:

* native/win32 - uses some specific WSA apis to clone the SOCKET handle
* native/unix - uses dup() to get another file descriptor
* green/all - This is where things get interesting. When we support full clones
              of a handle, this implies that we're allowing simultaneous writes
              and reads to happen. It turns out that libuv doesn't support two
              simultaneous reads or writes of the same object. It does support
              *one* read and *one* write at the same time, however. Some extra
              infrastructure was added to just block concurrent writers/readers
              until the previous read/write operation was completed.

I've added tests to the tcp/unix modules to make sure that this functionality is
supported everywhere.
@sw17ch
Copy link
Contributor

sw17ch commented Feb 10, 2014

@Matthias247 @dwrensha @kballard

I also like the single-reader-on-socket model, but only with green threads. Does the current API require the use of a native thread for the socket? I didn't read that into the implementation, but didn't look too hard.

Is there something about the current model that doesn't behave nicely (other than the possibility of leaking the socket)?

@lilyball
Copy link
Contributor Author

@sw17ch The current implementation does not require native threads for sockets. When using libgreen, sockets will go through libuv. The problem is that if you're writing library code, it can't control which threading model it's invoked in, and depending on your API you may not have the luxury of spinning up a libgreen thread pool for your socket tasks.

@sw17ch
Copy link
Contributor

sw17ch commented Feb 10, 2014

@kballard ah, thank you. This clears things up for me a lot.

alexcrichton added a commit to alexcrichton/rust that referenced this issue Apr 25, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

cc rust-lang#11165
alexcrichton added a commit that referenced this issue Apr 28, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc #11165
alexcrichton added a commit to alexcrichton/rust that referenced this issue May 7, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc rust-lang#11165
bors added a commit that referenced this issue May 7, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc #11165
alexcrichton added a commit to alexcrichton/rust that referenced this issue May 8, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc rust-lang#11165
bors added a commit that referenced this issue May 8, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc #11165
@alexcrichton
Copy link
Member

Nominating for closure. We're in a much better state than when this was opened:

  1. I/O objects are cloneable (simultaneous reads/writes)
  2. I/O objects have timeouts
  3. I/O objects can close other ones (close_{read,write})

I believe that this set of functionality encompasses the intent of this issue.

@lilyball
Copy link
Contributor Author

lilyball commented May 8, 2014

@alexcrichton I believe that what you have listed does meet the original intent of this issue. But besides the ability to read/write simultaneously, there's a need for the ability to select over multiple I/O channels at once (or over the reading and writing halves of the same channel). Perhaps that should be covered under a separate ticket, but there was a lot of discussion in here already about doing just that.

@brson
Copy link
Contributor

brson commented May 8, 2014

Closing! Yay!

michaelwoerister pushed a commit to michaelwoerister/rust that referenced this issue Jun 5, 2014
Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc rust-lang#11165
@sw17ch
Copy link
Contributor

sw17ch commented Jun 25, 2014

👏

flip1995 pushed a commit to flip1995/rust that referenced this issue Sep 25, 2023
[`len_without_is_empty`]: follow type alias to find inherent `is_empty` method

Fixes rust-lang#11165

When we see an `impl B` and `B` is a type alias to some type `A`, then we need to follow the type alias to look for an `is_empty` method on the aliased type `A`. Before this PR, it'd get the inherent impls of `B`, which there aren't any and so it would warn that there isn't an `is_empty` method even if there was one.
Passing the type alias `DefId` to `TyCtxt::type_of` gives us the aliased `DefId` (or simply return the type itself if it wasn't a type alias) so we can just use that

changelog: [`len_without_is_empty`]: follow type alias to find inherent `is_empty` method
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P-medium Medium priority
Projects
None yet
Development

No branches or pull requests