diff --git a/.github/workflows/010_build_and_test.yaml b/.github/workflows/010_build_and_test.yaml index 7f150a3df..2c96f8af1 100644 --- a/.github/workflows/010_build_and_test.yaml +++ b/.github/workflows/010_build_and_test.yaml @@ -8,7 +8,7 @@ jobs: build-and-test: runs-on: [self-hosted, tfchainrunner01] container: - image: threefolddev/tfchain:4 + image: threefolddev/tfchain:5 env: DEBIAN_FRONTEND: noninteractive PATH: /root/.cargo/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/go/bin diff --git a/.github/workflows/020_lint_and_test_go_client.yaml b/.github/workflows/020_lint_and_test_go_client.yaml index 5694ef912..d2a7243d1 100644 --- a/.github/workflows/020_lint_and_test_go_client.yaml +++ b/.github/workflows/020_lint_and_test_go_client.yaml @@ -21,21 +21,22 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: "1.20" + go-version: "1.21" cache: false # cache-dependency-path: clients/tfchain-client-go/go.sum id: go - name: golangci-lint - uses: golangci/golangci-lint-action@v3.7.0 + uses: golangci/golangci-lint-action@v6 with: args: --timeout 3m --verbose working-directory: clients/tfchain-client-go - name: staticcheck - uses: dominikh/staticcheck-action@v1.3.0 + uses: dominikh/staticcheck-action@v1 with: - version: "2022.1.3" + version: "latest" + install-go: false working-directory: clients/tfchain-client-go env: GO111MODULE: on diff --git a/.github/workflows/build_test.Dockerfile b/.github/workflows/build_test.Dockerfile index e8b37454a..43fdb2e54 100644 --- a/.github/workflows/build_test.Dockerfile +++ b/.github/workflows/build_test.Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 ENV DEBIAN_FRONTEND=noninteractive RUN apt update && \ apt install -y \ @@ -16,8 +16,8 @@ RUN apt update && \ zstd \ wget \ protobuf-compiler && \ - wget https://go.dev/dl/go1.20.2.linux-amd64.tar.gz && \ - tar -xvf go1.20.2.linux-amd64.tar.gz && \ + wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz && \ + tar -xvf go1.21.13.linux-amd64.tar.gz && \ mv go /usr/local && \ echo "GOPATH=/usr/local/go" >> ~/.bashrc && \ echo "PATH=\$PATH:\$GOPATH/bin" >> ~/.bashrc && \ diff --git a/clients/tfchain-client-go/.github/workflows/lint.yml b/clients/tfchain-client-go/.github/workflows/lint.yml deleted file mode 100644 index d8d8c7fff..000000000 --- a/clients/tfchain-client-go/.github/workflows/lint.yml +++ /dev/null @@ -1,35 +0,0 @@ -name: Lint -on: - pull_request: - paths-ignore: - - "readme.md" - push: - paths-ignore: - - "readme.md" - -jobs: - lint: - name: lint - runs-on: ubuntu-latest - timeout-minutes: 5 - steps: - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: "1.19" - id: go - - - name: Check out code into the Go module directory - uses: actions/checkout@v3.2.0 - with: - submodules: "true" - - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - args: --timeout 3m --verbose - - - name: gofmt - uses: Jerome1337/gofmt-action@v1.0.5 - with: - gofmt-flags: "-l -d" \ No newline at end of file diff --git a/clients/tfchain-client-go/.github/workflows/test.yml b/clients/tfchain-client-go/.github/workflows/test.yml deleted file mode 100644 index 951e7a782..000000000 --- a/clients/tfchain-client-go/.github/workflows/test.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: test-substrate-client - -on: - push: - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: install go - uses: actions/setup-go@v3 - with: - go-version: 1.18 - - - name: run docker image - run: docker run -d -p 9944:9944 threefolddev/tfchain:2.3.0-rc2 --dev --ws-external - - - name: wait for node initialization - run: sleep 3 - - - name: run test - run: go test -v ./... diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index b6911a932..369f5cee5 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -1,9 +1,12 @@ package substrate import ( + "context" "fmt" "math/rand" + "slices" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff" @@ -13,20 +16,16 @@ import ( "github.com/rs/zerolog/log" ) -const ( - // acceptable delay is amount of blocks (in second) that a node can - // be behind before we don't accept it. block time is 6 seconds, so - // right now we only allow 2 blocks delay - acceptableDelay = 2 * 6 * time.Second +var ( + ErrInvalidVersion = fmt.Errorf("invalid version") + ErrUnknownVersion = fmt.Errorf("unknown version") + ErrNotFound = fmt.Errorf("object not found") + ErrNoConnectionsAvailable = fmt.Errorf("no healthy connections available") + ErrMaxPoolSizeReached = fmt.Errorf("max pool size reached") ) -var ( - //ErrInvalidVersion is returned if version 4bytes is invalid - ErrInvalidVersion = fmt.Errorf("invalid version") - //ErrUnknownVersion is returned if version number is not supported - ErrUnknownVersion = fmt.Errorf("unknown version") - //ErrNotFound is returned if an object is not found - ErrNotFound = fmt.Errorf("object not found") +const ( + AcceptableDelay = 2 * 6 * time.Second ) // Versioned base for all types @@ -37,145 +36,437 @@ type Versioned struct { type Conn = *gsrpc.SubstrateAPI type Meta = *types.Metadata +// Pool connection +type poolConn struct { + conn Conn + meta Meta + url string + lastUsed atomic.Int64 // Unix timestamp + inUse atomic.Bool +} + +func (pc *poolConn) isHealthy() bool { + if pc == nil || pc.conn == nil || pc.meta == nil { + return false + } + _, err := getTime(pc.conn, pc.meta) + return err == nil +} + +func (pc *poolConn) close() { + if pc != nil && pc.conn != nil { + pc.conn.Client.Close() + pc.conn = nil + pc.meta = nil + log.Debug().Str("url", pc.url).Msg("closed connection") + } +} + type Manager interface { + GetConnection(ctx context.Context) (*Substrate, error) + Close() error + + // Deprecated methods Raw() (Conn, Meta, error) Substrate() (*Substrate, error) } -type mgrImpl struct { - urls []string +type ManagerConfig struct { + // Maximum number of connections in the pool + MaxPoolSize int + // Minimum number of connections to maintain + MinPoolSize int + // Maximum time a connection can be idle before being closed (if the pool has more than MinPoolSize) + MaxIdleTime time.Duration + // Interval between health checks + // After thinking about it, we don't need to periodically check the health of the connections + // because this creates a lot of overhead + // so instead we just check the health when we need to and do the maintanance in demand + // HealthCheckInterval time.Duration + // Timeout for creating new connections + ConnectionTimeout time.Duration +} + +// Default configuration +var DefaultConfig = ManagerConfig{ + MaxPoolSize: 5, + MinPoolSize: 2, + MaxIdleTime: 30 * time.Minute, + // HealthCheckInterval: 120 * time.Second, + ConnectionTimeout: 10 * time.Second, +} + +type manager struct { + urls []string + pool []*poolConn + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + config ManagerConfig + checkChan chan struct{} +} - r int - m sync.Mutex +func NewManager(urls ...string) Manager { + return NewManagerWithConfig(DefaultConfig, urls...) } -func NewManager(url ...string) Manager { - if len(url) == 0 { - panic("at least one url is required") +func NewManagerWithConfig(config ManagerConfig, urls ...string) Manager { + if len(urls) == 0 { + panic("at least one URL required") } - // the shuffle is needed so if one endpoints fails, and the next one - // is tried, we will end up moving all connections to the "next" endpoint - // which will get overloaded. Instead the shuffle helps to make the "next" - // different for reach instace of the pool. - rand.Shuffle(len(url), func(i, j int) { - url[i], url[j] = url[j], url[i] - }) + // Validate and adjust configuration + if config.MaxPoolSize < 1 { + config.MaxPoolSize = DefaultConfig.MaxPoolSize + } + if config.MinPoolSize < 1 || config.MinPoolSize > config.MaxPoolSize { + config.MinPoolSize = min(DefaultConfig.MinPoolSize, config.MaxPoolSize) + } + if config.MaxIdleTime <= 0 { + config.MaxIdleTime = DefaultConfig.MaxIdleTime + } + // if config.HealthCheckInterval <= 0 { + // config.HealthCheckInterval = DefaultConfig.HealthCheckInterval + // } + if config.ConnectionTimeout <= 0 { + config.ConnectionTimeout = DefaultConfig.ConnectionTimeout + } - return &mgrImpl{ - urls: url, - r: rand.Intn(len(url)), // start with random url, then roundrobin + ctx, cancel := context.WithCancel(context.Background()) + m := &manager{ + urls: shuffle(urls), + pool: make([]*poolConn, 0, config.MaxPoolSize), + ctx: ctx, + cancel: cancel, + config: config, + checkChan: make(chan struct{}, 1), } + + m.initializePool() + m.wg.Add(1) + go m.healthChecker() + + return m } -// endpoint return the next endpoint to use -// in roundrobin fashion. need to be called -// while lock is acquired. -func (p *mgrImpl) endpoint() string { - defer func() { - p.r = (p.r + 1) % len(p.urls) - }() +func (m *manager) initializePool() { + log.Debug().Msg("initializing connection pool") + for i := 0; i < m.config.MinPoolSize; i++ { + select { + case m.checkChan <- struct{}{}: + default: + } + } +} - return p.urls[p.r] +func (m *manager) createConnection(url string) (*poolConn, error) { + log.Debug().Str("url", url).Msg("attempting to create a new connection") + if conn, meta, err := createSubstrateConn(url); err == nil { + log.Debug().Str("url", url).Msg("created new connection") + return &poolConn{ + conn: conn, + meta: meta, + url: url, + lastUsed: atomic.Int64{}, + inUse: atomic.Bool{}, + }, nil + } else { + log.Error().Str("url", url).Err(err).Msg("failed to create connection") + } + return nil, fmt.Errorf("failed to create connection to %s", url) } -// Substrate return a new wrapped substrate connection -// the connection must be closed after you are done using it -func (p *mgrImpl) Substrate() (*Substrate, error) { - cl, meta, err := p.Raw() +func (m *manager) GetConnection(ctx context.Context) (*Substrate, error) { + log.Debug().Msg("getting a connection") + conn, err := m.getHealthyConn() if err != nil { - return nil, err - } - - return newSubstrate(cl, meta, p.put) -} - -// Raw returns a RPC substrate client. plus meta. The returned connection -// is not tracked by the pool, nor reusable. It's the caller responsibility -// to close the connection when done -func (p *mgrImpl) Raw() (Conn, Meta, error) { - // right now this pool implementation just tests the connection - // makes sure that it is still active, otherwise, tries again - // until the connection is restored. - // A better pool implementation can be done later were multiple connections - // can be handled - // TODO: thread safety! - p.m.Lock() - defer p.m.Unlock() - - boff := backoff.WithMaxRetries( - backoff.NewConstantBackOff(200*time.Millisecond), - 2*uint64(len(p.urls)), - ) - - var ( - cl *gsrpc.SubstrateAPI - meta *types.Metadata - err error - ) - - err = backoff.RetryNotify(func() error { - endpoint := p.endpoint() - log.Debug().Str("url", endpoint).Msg("connecting") - cl, err = newSubstrateAPI(endpoint) - if err != nil { - return errors.Wrapf(err, "error connecting to substrate at '%s'", endpoint) + log.Error().Err(err).Msg("failed to get connection") + return nil, fmt.Errorf("failed to get connection: %w", err) + } + log.Debug().Str("url", conn.url).Msg("successfully obtained connection") + return newSubstrate(conn, m), nil +} + +func (m *manager) getHealthyConn() (*poolConn, error) { + log.Debug().Int("pool_size", len(m.pool)).Int("aquired_count", m.aquiredConnCount()). + Msg("checking for healthy connections") + + // Try getting existing connection first + if conn := m.getExistingConn(); conn != nil { + return conn, nil + } + + b := backoff.NewExponentialBackOff() + b.MaxInterval = 2 * time.Second + b.MaxElapsedTime = 4 * time.Second + b.InitialInterval = 500 * time.Millisecond + b.Multiplier = 2 + + var conn *poolConn + err := backoff.Retry(func() error { + // Check if we can get an existing connection + if c := m.getExistingConn(); c != nil { + conn = c + return nil } - meta, err = cl.RPC.State.GetMetadataLatest() - if err != nil { - return errors.Wrapf(err, "error getting latest metadata at '%s'", endpoint) + m.mu.RLock() + poolSize := len(m.pool) + m.mu.RUnlock() + + if poolSize >= m.config.MaxPoolSize { + return backoff.Permanent(ErrMaxPoolSizeReached) } - t, err := getTime(cl, meta) - if err != nil { - return errors.Wrapf(err, "error getting node time at '%s'", endpoint) + select { + case m.checkChan <- struct{}{}: + log.Debug().Msg("triggered connection check") + default: + log.Debug().Msg("connection check already pending") } - if time.Since(t) > acceptableDelay { - return fmt.Errorf("node '%s' is behind acceptable delay with timestamp '%s'", endpoint, t) + // time.Sleep(50 * time.Millisecond) + return ErrNoConnectionsAvailable + }, b) + + return conn, err +} + +func (m *manager) healthChecker() { + defer m.wg.Done() + // ticker := time.NewTicker(m.config.HealthCheckInterval) + // defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + // case <-ticker.C: + // m.checkConnections() + case <-m.checkChan: + m.checkConnections() } + } +} - return nil +func (m *manager) checkConnections() { + m.mu.Lock() + healthy := make([]*poolConn, 0, len(m.pool)) + for _, conn := range m.pool { + if conn == nil { + continue + } - }, boff, func(err error, _ time.Duration) { - log.Error().Err(err).Msg("failed to connect to endpoint, retrying") - }) + if !conn.isHealthy() { + log.Debug().Str("url", conn.url).Msg("closing unhealthy connection") + conn.close() + continue + } + + // Check if connection is idle for too long if we have more than min pool size + if !conn.inUse.Load() && time.Since(time.Unix(conn.lastUsed.Load(), 0)) > m.config.MaxIdleTime && len(m.pool) > m.config.MinPoolSize { + log.Debug().Str("url", conn.url).Msg("closing idle connection") + conn.close() + continue + } + + healthy = append(healthy, conn) + } + + m.pool = healthy + m.mu.Unlock() + m.ensureMinConnections() - return cl, meta, err } -// TODO: implement reusable connections instead of -// closing the connection. -func (p *mgrImpl) put(cl *Substrate) { - // naive put implementation for now - // we just immediately kill the connection - if cl.cl != nil { - cl.cl.Client.Close() +func (m *manager) ensureMinConnections() { + log.Debug().Msg("ensuring minimum connections in the pool") + inUseCount := m.aquiredConnCount() + urls := shuffle(m.unusedURLs()) + urls = append(urls, m.urls...) + + for _, url := range urls { + poolSize := len(m.pool) + + if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { + if conn, err := m.createConnection(url); err == nil { + m.mu.Lock() + m.pool = append(m.pool, conn) + m.mu.Unlock() + log.Debug().Str("url", url).Msg("added new connection to pool") + } + } else { + break + } } - cl.cl = nil - cl.meta = nil } -// Substrate client -type Substrate struct { - cl Conn - meta Meta +func (m *manager) Close() error { + m.cancel() + m.wg.Wait() + + m.mu.Lock() + defer m.mu.Unlock() + + for _, conn := range m.pool { + conn.close() + } + m.pool = nil + return nil +} - close func(s *Substrate) +// Helper methods +func (m *manager) unusedURLs() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + // get all urls that are not in the pool + used := make([]string, 0, len(m.pool)) + for _, conn := range m.pool { + used = append(used, conn.url) + } + unused := make([]string, 0, len(m.urls)) + for _, url := range m.urls { + if !slices.Contains(used, url) { + unused = append(unused, url) + } + } + return unused +} + +func (m *manager) aquiredConnCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + count := 0 + for _, conn := range m.pool { + if conn.inUse.Load() { + count++ + } + } + return count } -// NewSubstrate creates a substrate client -func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) { - return &Substrate{cl: cl, meta: meta, close: close}, nil +func (m *manager) getExistingConn() *poolConn { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, conn := range m.pool { + if conn.isHealthy() && !conn.inUse.Load() { + if conn.inUse.CompareAndSwap(false, true) { + conn.lastUsed.Store(time.Now().Unix()) + return conn + } + } + } + return nil } -func (s *Substrate) Close() { - s.close(s) +func shuffle(urls []string) []string { + result := make([]string, len(urls)) + copy(result, urls) + rand.Shuffle(len(result), func(i, j int) { + result[i], result[j] = result[j], result[i] + }) + return result +} + +// Deprecated methods implementation +func (m *manager) Raw() (Conn, Meta, error) { + conn, err := m.GetConnection(context.Background()) + if err != nil { + return nil, nil, err + } + return conn.conn.conn, conn.conn.meta, nil +} + +func (m *manager) Substrate() (*Substrate, error) { + return m.GetConnection(context.Background()) +} + +type Substrate struct { + conn *poolConn + mgr *manager + mu sync.Mutex + closed atomic.Bool +} + +func newSubstrate(conn *poolConn, mgr *manager) *Substrate { + return &Substrate{ + conn: conn, + mgr: mgr, + } +} + +func createSubstrateConn(url string) (Conn, Meta, error) { + cl, err := newSubstrateAPI(url) + if err != nil { + return nil, nil, err + } + + meta, err := cl.RPC.State.GetMetadataLatest() + if err != nil { + cl.Client.Close() + return nil, nil, err + } + + t, err := getTime(cl, meta) + if err != nil || time.Since(t) > AcceptableDelay { + cl.Client.Close() + return nil, nil, fmt.Errorf("node health check failed") + } + + return cl, meta, nil } func (s *Substrate) GetClient() (Conn, Meta, error) { - return s.cl, s.meta, nil + if s.closed.Load() { + log.Error().Msg("attempted to get client from closed substrate") + return nil, nil, fmt.Errorf("substrate connection closed") + } + + s.mu.Lock() + if s.conn != nil && s.conn.isHealthy() { + conn := s.conn.conn + meta := s.conn.meta + s.conn.lastUsed.Store(time.Now().Unix()) + s.mu.Unlock() + return conn, meta, nil + } + if s.conn != nil { + s.conn.inUse.Store(false) + } + s.mu.Unlock() + + conn, err := s.mgr.getHealthyConn() + if err != nil { + log.Error().Err(err).Msg("failed to get healthy connection for client") + return nil, nil, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.conn = conn + + log.Debug().Str("url", conn.url).Msg("swapped connection") + return conn.conn, conn.meta, nil +} + +func (s *Substrate) Release() { + if !s.closed.CompareAndSwap(false, true) { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + if s.conn != nil { + s.conn.inUse.Store(false) + log.Debug().Str("url", s.conn.url).Msg("releasing connection to pool") + s.conn = nil + } } func (s *Substrate) getVersion(b types.StorageDataRaw) (uint32, error) { @@ -196,6 +487,11 @@ func (s *Substrate) Time() (t time.Time, err error) { return getTime(cl, meta) } +// deprecated methods +func (s *Substrate) Close() { + s.Release() +} + func getTime(cl Conn, meta Meta) (t time.Time, err error) { key, err := types.CreateStorageKey(meta, "Timestamp", "Now", nil) if err != nil { diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go new file mode 100644 index 000000000..cc751c581 --- /dev/null +++ b/clients/tfchain-client-go/impl_test.go @@ -0,0 +1,201 @@ +package substrate + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPoolInitialization(t *testing.T) { + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) + defer mgr.Close() + + time.Sleep(100 * time.Millisecond) + + mgrImpl := mgr.(*manager) + mgrImpl.mu.RLock() + defer mgrImpl.mu.RUnlock() + + assert.LessOrEqual(t, len(mgrImpl.pool), mgrImpl.config.MinPoolSize) + assert.Greater(t, len(mgrImpl.pool), 0) +} + +func TestConnectionReuse(t *testing.T) { + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) + defer mgr.Close() + + // Wait for pool initialization + time.Sleep(100 * time.Millisecond) + + // Get first connection + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + + // Store connection details for comparison + conn1 := sub1.conn + url1 := sub1.conn.url + + // Release it back to pool properly + sub1.Release() + + // Small delay to ensure connection is properly released + time.Sleep(10 * time.Millisecond) + + // Get another connection + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + + // Should be the same underlying connection + assert.Equal(t, conn1, sub2.conn) + assert.Equal(t, url1, sub2.conn.url) +} + +func TestConcurrentAccess(t *testing.T) { + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + sub, err := mgr.GetConnection(ctx) + if err != nil { + return + } + defer sub.Release() + _, err = sub.Time() + assert.NoError(t, err) + time.Sleep(10 * time.Millisecond) + }() + } + + wg.Wait() +} + +func TestFailover(t *testing.T) { + urls := []string{"ws://fail1", getUrlBasedOnEnv()} + mgr := NewManager(urls...) + defer mgr.Close() + + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub1.Release() + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + assert.Equal(t, sub1.conn.url, urls[1]) + assert.Equal(t, sub2.conn.url, urls[1]) +} + +func TestHealthChecking(t *testing.T) { + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) + defer mgr.Close() + + sub, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub.Release() + + // Simulate connection failure + old := sub.conn.conn + old.Client.Close() + // simulate usage of the client + _, err = sub.Time() + assert.NoError(t, err) + assert.NotEqual(t, old, sub.conn.conn) +} + +func TestStressWithFailures(t *testing.T) { + if testing.Short() { + t.Skip("Skipping stress test in short mode") + } + + // Use test-specific configuration + config := ManagerConfig{ + MaxPoolSize: 30, + MinPoolSize: 3, + MaxIdleTime: time.Minute, + // HealthCheckInterval: time.Second, + ConnectionTimeout: time.Second, + } + + urls := []string{getUrlBasedOnEnv()} + mgr := NewManagerWithConfig(config, urls...) + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var ( + wg sync.WaitGroup + mu sync.Mutex + errors []error + ) + + for i := 0; i < 30; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + retryBackoff := time.Millisecond * 100 + maxBackoff := time.Second + + for ctx.Err() == nil { + sub, err := mgr.GetConnection(ctx) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("goroutine %d: %w", id, err)) + mu.Unlock() + + jitter := time.Duration(rand.Int63n(int64(retryBackoff))) + time.Sleep(retryBackoff + jitter) + retryBackoff *= 2 + if retryBackoff > maxBackoff { + retryBackoff = maxBackoff + } + continue + } + + // Reset backoff on success + retryBackoff = time.Millisecond * 100 + + // Simulate work + _, err = sub.Time() + assert.NoError(t, err) + time.Sleep(time.Duration(rand.Intn(250)+50) * time.Millisecond) + + if id%2 == 0 && rand.Float32() < 0.1 { + sub.conn.conn.Client.Close() + } + + sub.Release() + } + }(i) + } + + wg.Wait() + + // Log and check errors + for _, err := range errors { + t.Logf("Error: %v", err) + } + + assert.Less(t, len(errors), 10, + "Too many errors occurred during stress test: %d", len(errors)) +} diff --git a/clients/tfchain-client-go/transfer.go b/clients/tfchain-client-go/transfer.go index 5648a851e..c739a5fef 100644 --- a/clients/tfchain-client-go/transfer.go +++ b/clients/tfchain-client-go/transfer.go @@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou bal := big.NewInt(int64(amount)) c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal)) - if err != nil { - panic(err) - } if err != nil { return errors.Wrap(err, "failed to create call") diff --git a/clients/tfchain-client-go/utils_test.go b/clients/tfchain-client-go/utils_test.go index a42343f99..530f227f8 100644 --- a/clients/tfchain-client-go/utils_test.go +++ b/clients/tfchain-client-go/utils_test.go @@ -63,6 +63,14 @@ func startLocalConnection(t *testing.T) *Substrate { return cl } +func getUrlBasedOnEnv() string { + if _, ok := os.LookupEnv("CI"); ok { + return "ws://127.0.0.1:9944" + } else { + return "wss://tfchain.dev.grid.tf" + } +} + func assertCreateTwin(t *testing.T, cl *Substrate, user AccountUser) uint32 { u := Accounts[user]