Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in statusConnectionPool #20

Closed
kuningfellow opened this issue Apr 7, 2024 · 5 comments
Closed

Deadlock in statusConnectionPool #20

kuningfellow opened this issue Apr 7, 2024 · 5 comments
Assignees

Comments

@kuningfellow
Copy link

Deadlock occurs at statusConnectionPool.OnFailure.

https://github.com/elastic/elastic-transport-go/blob/v8.5.0/elastictransport/connection.go#L184

	sort.Slice(cp.dead, func(i, j int) bool {
		c1 := cp.dead[i]
		c2 := cp.dead[j]
		c1.Lock()
		c2.Lock() // <- here
		defer c1.Unlock()
		defer c2.Unlock()

		res := c1.Failures > c2.Failures
		return res
	})

Here is my hypothesis:

	/*
		Initially using Pool 1

			Goroutine 1:                                        |  DiscoverNodes()  | Pool 2
			Goroutine 2: get Pool 1 node        OnFailure()     |                   |  ...............{resurects on Pool 1}
			Goroutine 3: get Pool 1 node                        |                   |  OnFailure()
			Goroutine 4: get Pool 1 node                        |                   |  ....................................OnFailure()
	*/

Here is my test function:

package elastictransport

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	_ "net/http/pprof"
	"net/url"
	"sync"
	"testing"
	"time"
)

type ctxKey int

const shouldError, onTransport = ctxKey(1), ctxKey(2)

var someErr = errors.New("some error")

type mockTransport struct {
	t *testing.T
}

func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
	switch req.URL.Path {
	case "/_nodes/http":
		n := nodeInfo{}
		n.HTTP.PublishAddress = "localhost:1234"
		n2 := nodeInfo{}
		n2.HTTP.PublishAddress = "localhost:5678"
		return m.fromJson(map[string]map[string]nodeInfo{
			"nodes": {
				"es0": n,
				"es1": n2,
			},
		}), nil
	default:
		if f, ok := req.Context().Value(onTransport).(func()); ok {
			f()
		}
		if y, _ := req.Context().Value(shouldError).(bool); y {
			return nil, someErr
		}
		return &http.Response{
			StatusCode: 200,
			Body:       io.NopCloser(bytes.NewReader(nil)),
		}, nil
	}
}

func (m *mockTransport) fromJson(v interface{}) *http.Response {
	b, err := json.Marshal(v)
	if err != nil {
		m.t.Fatal(err)
	}
	return &http.Response{
		StatusCode: 200,
		Body:       io.NopCloser(bytes.NewReader(b)),
	}
}

type lg struct{}

func (lg) Log(a ...interface{}) error {
	fmt.Println(a...)
	return nil
}
func (lg) Logf(format string, a ...interface{}) error {
	fmt.Printf(format, a...)
	return nil
}

func TestDeadlock(t *testing.T) {
	go func() {
		http.ListenAndServe(":8080", nil)
	}()
	debugLogger = lg{}
	defaultResurrectTimeoutInitial = 10 * time.Second

	var (
		transport http.RoundTripper = &mockTransport{t}
		makeReq                     = func(ctx context.Context) *http.Request {
			t.Helper()
			req, err := http.NewRequestWithContext(ctx, "GET", "", io.NopCloser(bytes.NewReader(nil)))
			if err != nil {
				t.Fatal(err)
			}
			return req
		}
	)
	u, err := url.Parse("http://localhost:9200") // just the initial node
	if err != nil {
		t.Fatal(err)
	}

	c, err := New(Config{
		Transport:    transport,
		URLs:         []*url.URL{u, u},
		DisableRetry: true,
	})
	if err != nil {
		t.Fatal(err)
	}

	c.DiscoverNodes()

	/*
		Initially using Pool 1

			Goroutine 1:                                        |  DiscoverNodes()  | Pool 2
			Goroutine 2: get Pool 1 node        OnFailure()     |                   |  ...............{resurects on Pool 1}
			Goroutine 3: get Pool 1 node                        |                   |  OnFailure()
			Goroutine 4: get Pool 1 node                        |                   |  ....................................OnFailure()
	*/

	var (
		firstPhaseWg,
		secondPhaseWg,
		thirdPhaseWg,
		getPoolWg,
		discoverNodesWg sync.WaitGroup
	)

	var firstPhases = func(c *Client) {
		defer firstPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
		})
		c.Perform(makeReq(ctx))
	}

	var discoverNodesPhase = func(c *Client) {
		defer discoverNodesWg.Done()
		getPoolWg.Wait()
		c.DiscoverNodes()
	}

	var secondPhases = func(c *Client) {
		defer secondPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
			firstPhaseWg.Wait()
			discoverNodesWg.Wait()
		})
		c.Perform(makeReq(ctx))
	}

	var thirdPhases = func(c *Client) {
		defer thirdPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
			secondPhaseWg.Wait()
			time.Sleep(10 * time.Second)
		})
		c.Perform(makeReq(ctx))
	}

	runPar := func(n int, wg []*sync.WaitGroup, f func(*Client), c *Client) {
		for _, w := range wg {
			w.Add(n)
		}
		for i := 0; i < n; i++ {
			go f(c)
		}
	}

	var N = 10
	runPar(N, []*sync.WaitGroup{&firstPhaseWg, &getPoolWg}, firstPhases, c)
	runPar(N, []*sync.WaitGroup{&secondPhaseWg, &getPoolWg}, secondPhases, c)
	runPar(N, []*sync.WaitGroup{&thirdPhaseWg, &getPoolWg}, thirdPhases, c)
	runPar(1, []*sync.WaitGroup{&discoverNodesWg}, discoverNodesPhase, c)

	thirdPhaseWg.Wait()

	fmt.Println("pool urls", c.pool.URLs())
	fmt.Println("deads", c.pool.(*statusConnectionPool).dead)
}

I can consistently get it to deadlock, and when I open http://localhost:8080/debug/pprof/goroutine?debug=1 I get this profile

1 @ 0x43b356 0x44ca0f 0x44c9e6 0x4694e6 0x474065 0x73d745 0x73d71d 0x490251 0x49083d 0x48fd9a 0x73d429 0x741c09 0x74bda9 0x46d701
#	0x4694e5	sync.runtime_SemacquireMutex+0x25										/home/<user>/go1.20.11/go/src/runtime/sema.go:77
#	0x474064	sync.(*Mutex).lockSlow+0x164											/home/<user>/go1.20.11/go/src/sync/mutex.go:171
#	0x73d744	sync.(*Mutex).Lock+0xa4												/home/<user>/go1.20.11/go/src/sync/mutex.go:90
#	0x73d71c	github.com/elastic/elastic-transport-go/v8/elastictransport.(*statusConnectionPool).OnFailure.func1+0x7c	/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/connection.go:184
#	0x490250	sort.insertionSort_func+0xb0											/home/<user>/go1.20.11/go/src/sort/zsortfunc.go:12
#	0x49083c	sort.pdqsort_func+0x2dc												/home/<user>/go1.20.11/go/src/sort/zsortfunc.go:73
#	0x48fd99	sort.Slice+0xf9													/home/<user>/go1.20.11/go/src/sort/slice.go:26
#	0x73d428	github.com/elastic/elastic-transport-go/v8/elastictransport.(*statusConnectionPool).OnFailure+0x268		/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/connection.go:180
#	0x741c08	github.com/elastic/elastic-transport-go/v8/elastictransport.(*Client).Perform+0x9a8				/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/elastictransport.go:386
#	0x74bda8	github.com/elastic/elastic-transport-go/v8/elastictransport.TestDeadlock.func6+0x148				/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/a_test.go:172

Suggested fix: in *Client.Perform:

  1. Save the pool to a local variable when calling .Next()
  2. Use the local var when calling OnSuccess() or OnFailure()

Thanks.

@Anaethelion Anaethelion self-assigned this Apr 23, 2024
@Anaethelion
Copy link
Contributor

Hi @kuningfellow,

Thank you for this very detailed issue, I agree there seems to be a deadlock.

I also think there's a race condition in your test code, runPar should read:

runPar := func(n int, wg []*sync.WaitGroup, f func(*Client), c *Client) {
  for i := 0; i < n; i++ {
	  for _, w := range wg {
		  w.Add(i)
	  }
	  go f(c)
  }
}

Otherwise there's a discrepancy between the number of active goroutines and the number declared in the WaitGroup.

Using this version I believe locking the connections found in the dead slice is unneeded. The connection pool is already locked in the beginning of OnFailure which should prevent concurrent access while the slice is reordered.

My tests are conclusive that removing the locks like this, solves the issue:

	sort.Slice(cp.dead, func(i, j int) bool {
		c1 := cp.dead[i]
		c2 := cp.dead[j]

		res := c1.Failures > c2.Failures
		return res
	})

Can I ask you test on your end and report your findings ?
Thanks again!

@kuningfellow
Copy link
Author

kuningfellow commented Apr 25, 2024

Hi @Anaethelion

There is no discrepancy between waitgroup counts. Had there been any, my test code would've panicked due to negative wait count.

It's true that the connection pool is locked before OnFailure. But the problem lies because OnFailure might operate on a different pool from the initial call to Next

Thanks for the reply!

@Anaethelion
Copy link
Contributor

I've revamped the logic to better handle the discovery and prevent overwriting the connection pool while it is in use.
Let me know if you have the opportunity to try it, I believe it solves our issue here.

@kuningfellow
Copy link
Author

@Anaethelion Couldn't we just save the pool to a variable like so?

This fixes the race condition. But I'm not sure if it will break something else.

diff --git a/elastictransport/elastictransport.go b/elastictransport/elastictransport.go
index d9b2f85..89be42c 100644
--- a/elastictransport/elastictransport.go
+++ b/elastictransport/elastictransport.go
@@ -335,12 +335,16 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
                        conn            *Connection
                        shouldRetry     bool
                        shouldCloseBody bool
+                       pool            ConnectionPool
                )
 
-               // Get connection from the pool
+               // Get pool from client
                c.Lock()
-               conn, err = c.pool.Next()
+               pool = c.pool
                c.Unlock()
+
+               // Get connection from the pool
+               conn, err = pool.Next()
                if err != nil {
                        if c.logger != nil {
                                c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0))
@@ -382,9 +386,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
                        }
 
                        // Report the connection as unsuccessful
-                       c.Lock()
-                       c.pool.OnFailure(conn)
-                       c.Unlock()
+                       pool.OnFailure(conn)
 
                        // Retry upon decision by the user
                        if !c.disableRetry && (c.retryOnError == nil || c.retryOnError(req, err)) {
@@ -392,9 +394,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {
                        }
                } else {
                        // Report the connection as succesfull
-                       c.Lock()
-                       c.pool.OnSuccess(conn)
-                       c.Unlock()
+                       pool.OnSuccess(conn)
                }
 
                if res != nil && c.metrics != nil {

Sorry for the diff. I haven't got permission to push a branch.

@Anaethelion
Copy link
Contributor

Saving the pool to a variable solves the race condition, it doesn't really solves the concurrency problem.
Discovery would periodically overwrite the statusConnectionPool underneath the transport thus resulting in a data loss of the state of known nodes.

Saving the current connection to a local variable feels more like a performance optimization. It could prove faster at a small memory cost, but that would need to be benchmarked!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants