Skip to content

Commit

Permalink
Merge pull request #12870 from ptabor/20210416-fix-flake-TestSnapshot…
Browse files Browse the repository at this point in the history
…V3RestoreMultiMemberAdd-master

Fix TestSnapshotV3RestoreMultiMemberAdd flakes (leaks)
  • Loading branch information
ptabor authored Apr 16, 2021
2 parents f3c5180 + 17b9823 commit 5744cdf
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
18 changes: 12 additions & 6 deletions client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package transport

import (
"context"
"net"
"net/http"
"strings"
Expand All @@ -31,11 +32,11 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
KeepAlive: 30 * time.Second,
}).Dial,
}).DialContext,
// value taken from http.DefaultTransport
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
Expand All @@ -45,15 +46,20 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
}
dial := func(net, addr string) (net.Conn, error) {
return dialer.Dial("unix", addr)
}

dialContext := func(ctx context.Context, net, addr string) (net.Conn, error) {
return dialer.DialContext(ctx, "unix", addr)
}
tu := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dial,
DialContext: dialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
// Cost of reopening connection on sockets is low, and they are mostly used in testing.
// Long living unix-transport connections were leading to 'leak' test flakes.
// Alternativly the returned Transport (t) should override CloseIdleConnections to
// forward it to 'tu' as well.
IdleConnTimeout: time.Microsecond,
}
ut := &unixTransport{tu}

Expand Down
4 changes: 3 additions & 1 deletion pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package proxy

import (
"context"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -295,6 +296,7 @@ func (s *server) To() string {
func (s *server) listenAndServe() {
defer s.closeWg.Done()

ctx := context.Background()
s.lg.Info("proxy is listening on", zap.String("from", s.From()))
close(s.readyc)

Expand Down Expand Up @@ -380,7 +382,7 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.Dial(s.to.Scheme, s.to.Host)
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package proxy

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -615,7 +616,7 @@ func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSI
if terr != nil {
t.Fatal(terr)
}
out, err = tp.Dial(scheme, addr)
out, err = tp.DialContext(context.Background(), scheme, addr)
} else {
out, err = net.Dial(scheme, addr)
}
Expand Down
4 changes: 3 additions & 1 deletion server/etcdserver/api/rafthttp/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"io/ioutil"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -139,6 +140,7 @@ func (p *pipeline) post(data []byte) (err error) {
go func() {
select {
case <-done:
cancel()
case <-p.stopc:
waitSchedule()
cancel()
Expand Down Expand Up @@ -173,4 +175,4 @@ func (p *pipeline) post(data []byte) (err error) {
}

// waitSchedule waits other goroutines to be scheduled for a while
func waitSchedule() { time.Sleep(time.Millisecond) }
func waitSchedule() { runtime.Gosched() }

0 comments on commit 5744cdf

Please sign in to comment.