From b69d0eaa83790dc93a33568c49f909d9c6a54d1e Mon Sep 17 00:00:00 2001 From: David Parsley Date: Thu, 28 Jul 2022 11:53:18 -0400 Subject: [PATCH 1/3] Fail on context cancelled, retry on timeout --- socketmode/socket_mode_managed_conn.go | 36 +++++++------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/socketmode/socket_mode_managed_conn.go b/socketmode/socket_mode_managed_conn.go index 428999732..9c6d0b681 100644 --- a/socketmode/socket_mode_managed_conn.go +++ b/socketmode/socket_mode_managed_conn.go @@ -92,14 +92,11 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { // We're now connected so we can set up listeners var ( - wg sync.WaitGroup firstErr error firstErrOnce sync.Once ) - wg.Add(1) go func() { - defer wg.Done() defer cancel() // The response sender sends Socket Mode responses over the WebSocket conn @@ -110,9 +107,7 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() - wg.Add(1) go func() { - defer wg.Done() defer cancel() // The handler reads Socket Mode requests, and enqueues responses for sending by the response sender @@ -123,9 +118,7 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() - wg.Add(1) go func() { - defer wg.Done() defer cancel() // The receiver reads WebSocket messages, and enqueues parsed Socket Mode requests to be handled by @@ -137,26 +130,17 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() - wg.Add(1) - go func() { - defer wg.Done() - - select { - case <-ctx.Done(): - // Detect when the connection is dead and try close connection. - if err = conn.Close(); err != nil { - smc.Debugf("Failed to close connection: %v", err) - } - case <-deadmanTimer.Elapsed(): - firstErrOnce.Do(func() { - firstErr = errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval") - }) - - cancel() + select { + case <-ctx.Done(): + // Detect when the connection is dead and try close connection. + if err = conn.Close(); err != nil { + smc.Debugf("Failed to close connection: %v", err) } - }() - - wg.Wait() + case <-deadmanTimer.Elapsed(): + firstErrOnce.Do(func() { + firstErr = errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval") + }) + } if firstErr == context.Canceled { return firstErr From d96792e25e981fd3c78ccf46f248ba4b70d91e70 Mon Sep 17 00:00:00 2001 From: David Parsley Date: Thu, 28 Jul 2022 12:06:15 -0400 Subject: [PATCH 2/3] Update comment, remove reference to timeout --- socketmode/socket_mode_managed_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketmode/socket_mode_managed_conn.go b/socketmode/socket_mode_managed_conn.go index 9c6d0b681..e08d77642 100644 --- a/socketmode/socket_mode_managed_conn.go +++ b/socketmode/socket_mode_managed_conn.go @@ -146,7 +146,7 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { return firstErr } - // wg.Wait() finishes only after any of the above go routines finishes. + // select unblocks on first cancel or timeout. // Also, we can expect firstErr to be not nil, as goroutines can finish only on error. smc.Debugf("Reconnecting due to %v", firstErr) From 9521295006fdd196d4828486438600184d6adf20 Mon Sep 17 00:00:00 2001 From: David Parsley Date: Fri, 29 Jul 2022 08:39:13 -0400 Subject: [PATCH 3/3] Add back the waitgroup, just don't wait on ReadJSON --- socketmode/socket_mode_managed_conn.go | 39 ++++++++++++++++++-------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/socketmode/socket_mode_managed_conn.go b/socketmode/socket_mode_managed_conn.go index e08d77642..b259fd624 100644 --- a/socketmode/socket_mode_managed_conn.go +++ b/socketmode/socket_mode_managed_conn.go @@ -92,11 +92,14 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { // We're now connected so we can set up listeners var ( + wg sync.WaitGroup firstErr error firstErrOnce sync.Once ) + wg.Add(1) go func() { + defer wg.Done() defer cancel() // The response sender sends Socket Mode responses over the WebSocket conn @@ -107,7 +110,9 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() + wg.Add(1) go func() { + defer wg.Done() defer cancel() // The handler reads Socket Mode requests, and enqueues responses for sending by the response sender @@ -118,6 +123,8 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() + // We don't wait on runMessageReceiver because it doesn't block on a select with the context, + // so we'd have to wait for the ReadJSON to time out, which can take a while. go func() { defer cancel() @@ -130,23 +137,33 @@ func (smc *Client) run(ctx context.Context, connectionCount int) error { } }() - select { - case <-ctx.Done(): - // Detect when the connection is dead and try close connection. - if err = conn.Close(); err != nil { - smc.Debugf("Failed to close connection: %v", err) + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-ctx.Done(): + // Detect when the connection is dead and try close connection. + if err = conn.Close(); err != nil { + smc.Debugf("Failed to close connection: %v", err) + } + case <-deadmanTimer.Elapsed(): + firstErrOnce.Do(func() { + firstErr = errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval") + }) + + cancel() } - case <-deadmanTimer.Elapsed(): - firstErrOnce.Do(func() { - firstErr = errors.New("ping timeout: Slack did not send us WebSocket PING for more than Client.maxInterval") - }) - } + }() + + wg.Wait() if firstErr == context.Canceled { return firstErr } - // select unblocks on first cancel or timeout. + // wg.Wait() finishes only after any of the above go routines finishes and cancels the + // context, allowing the other threads to shut down gracefully. // Also, we can expect firstErr to be not nil, as goroutines can finish only on error. smc.Debugf("Reconnecting due to %v", firstErr)