Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed processor dispose race condition #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

echistyakov
Copy link

Fix race condition in RequestResponse (with processor Dispose())

Motivation:

I caught this bug while stress-testing RSocket-based Client/Server implementation in Facebook Thrift: https://github.com/facebook/fbthrift/blob/main/thrift/lib/go/thrift/stress/server_test.go

It's a pretty simple stress test - just 100K concurrent RequestResponse's to 1 server.
At most 100 concurrent RSocket connections at a time, a fresh connection is created for each request.

The symptom of the stress-test failure was that a small portion of requests would fail with use of closed network connection error.

2024/12/16 15:21:02 flush failed drain: flush failed: write tcp [::1]:43993->[::1]:40658: use of closed network connection
2024/12/16 15:21:02 flush failed drain: flush failed: write tcp [::1]:43993->[::1]:35820: use of closed network connection
2024/12/16 15:21:02 flush failed drain: flush failed: write tcp [::1]:43993->[::1]:36604: use of closed network connection
2024/12/16 15:21:02 flush failed drain: flush failed: write tcp [::1]:43993->[::1]:36780: use of closed network connection
2024/12/16 15:21:02 flush failed drain: flush failed: write tcp [::1]:43993->[::1]:35856: use of closed network connection

This is a race condition between two clients and it goes like this:

  1. Client makes a RequestResponse type of request here:
    func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono) {
  2. A mono processor is created by pulling it from a global processor pool:
    m, s, _ := mono.NewProcessor(dc.reqSche, onFinally)
  3. A callback handler is registered, the processor is assigned to its sink field:
    handler.sink = s
    dc.register(sid, handler)
  4. The client gets a response back from the server (no issues).
  5. The processor invokes the onFinally callback (since the Stream Sequence is now complete):
    onFinally := func(s reactor.SignalType, d reactor.Disposable) {
    common.TryRelease(handler.cache)
    d.Dispose()
    if s == reactor.SignalTypeCancel {
    dc.sendFrame(framing.NewWriteableCancelFrame(sid))
    }
    dc.unregister(sid)
    }
  6. The processor gets disposed on the following line (it is placed back into the global processor pool and becomes available for any other RSocket client to use):
  7. Immediately after the above line - our current Go-routine gets pre-emptied. It does not get a chance to unregister the handler callback (which still holds a pointer to the sink we just released into the global pool):
    dc.unregister(sid)
  8. Another Go-routine starts running.
    a. This Go-routine creates a completely separate RSocket client to make a separate RequestResponse.
    b. This RSocket client happens to get the same sink/processor (that we just disposed earlier) from the global pool.
  9. We call Close() on our original client from earlier steps (since the RequestResponse sequence had already been completed).
    a. A destroyHandler method gets invoked:
    err := dc.GetError()
    if err == nil {
    dc.destroyHandler(errSocketClosed)
    } else {
    dc.destroyHandler(err)
    }

    func (dc *DuplexConnection) destroyHandler(err error) {

    b. It in turn invokes stopWithError method of our handler (which we did not yet unregister because we got pre-emptied in step 8):
    func (s requestResponseCallback) stopWithError(err error) {
    s.sink.Error(err)
    common.TryRelease(s.cache)
    }

    c. However, the sink is already being used by another client. We are sending Error to a completely unrelated client!!! Race condition!
    d. The other (unrelated) client gets a false-positive error that the socket is closed!
  10. At some point after Close() executes, the Go routine from step 8 is scheduled and is finally able to unregister the handler - but it's too late - the race condition already occurred.

Modifications/Fix:

Correctly ordered the relevant operations to avoid the race condition:

  1. Unregister handler callback with its sink (i.e. processor) first.
  2. Dispose (place back into the global pool) of the sink (processor) last.

Result:

The stress test succeeds after this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant