Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please add context.Context support for RecvMsg() #5401

Closed
GiedriusS opened this issue Jun 6, 2022 · 4 comments
Closed

Please add context.Context support for RecvMsg() #5401

GiedriusS opened this issue Jun 6, 2022 · 4 comments
Labels

Comments

@GiedriusS
Copy link

GiedriusS commented Jun 6, 2022

Use case(s) - what problem will this feature solve?

We want to write a high-performance loop that receives messages from a stream as fast as possible whilst at the same time protecting ourselves from one operation taking too long i.e. we want to return a partial response to our users if one stream is taking too long.

Proposed Solution

RecvMsg() needs to accept a context.Context.

Alternatives Considered

The proposed alternative to spawn a goroutine for each RecvMsg() seems horrible for performance. Adding context.Context support seems straightforward and it isn't such a huge breaking change for users to simply pass context.Background().

Additional Context

#445 it's really not a solution to spawn new goroutines for each RecvMsg() if you are receiving hundreds of thousands of messages potentially.

@GiedriusS GiedriusS added the Type: Feature New features or improvements in behavior label Jun 6, 2022
@dfawley
Copy link
Member

dfawley commented Jun 6, 2022

This is no different from a typical io.Reader, which doesn't accept a context, either. You shouldn't need more than one goroutine for all of your reads in this case. Consider:

func readFromStream(stream pb.Service_MyHandlerServer, ch chan pb.MyMessageType) {
	defer close(ch)
	for {
		res, err := stream.RecvMsg()
		if err != nil {
			// error handling
			return
		}
		select {
		case ch<-res:
		case <-stream.Context().Done():
			// In case the receiver from the channel stops and cancels the RPC.
			// Don't do anything here and let the stream return an error.
		}
	}
}

func myClient() {
	// ...
	ch := make(chan pb.MyMessageType)
	go readFromStream(stream, ch)
	for {
		ctx, cancel := context.WithTimeout(ctx, time.Second)
		// ensure cancel() is called after this loop iteration
		select {
		case msg:=<-ch:
			// Process msg
		case <-ctx.Done():
			// Process timeout
			streamCtxCancel() // unblocks readFromStream by canceling the RPC
		}
	}
}

@github-actions
Copy link

This issue is labeled as requiring an update from the reporter, and no update has been received after 6 days. If no update is provided in the next 7 days, this issue will be automatically closed.

@GiedriusS
Copy link
Author

Unfortunately but using a separate goroutine isn't a good solution performance-wise. There is a huge reduction in performance when you try to pass tens of thousands of received messages via the channel. In our case doing:

for {
  res, err := stream.RecvMsg()
  ...
}

vs. doing that via a channel reduces the query duration by a few seconds (thanos-io/thanos#5296). That's why I have created this issue (: perhaps there is some other solution?

@dfawley
Copy link
Member

dfawley commented Jul 18, 2022

Other ideas which avoid the channel:

Using a per-operation timer:

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.StreamingRPC(ctx)

for {
	t := time.AfterFunc(2*time.Second, cancel)
	res, err := stream.RecvMsg()
	t.Stop()
	// Handle res/err inline as before
}

Using a monitoring goroutine to cancel the stream context when progress stops (may not work if stream response processing is slow/unpredictable):

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.StreamingRPC(ctx)

// Monitor messages received and cancel the stream if progress
// isn't made every two seconds.
msgs := int32(0)
go func() {
	lastMsgs := int32(0)
	for ctx.Err() == nil {
		time.Sleep(2*time.Seconds)
		t := atomic.LoadInt32(msgs)
		if lastMsgs == t {
			cancel()
			return
		}
		lastMsgs = t
	}
}()

for {
	res, err := stream.RecvMsg()
	atomic.AddInt32(msgs,1)
	// Handle res/err inline as before
}

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jan 15, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants