Skip to content

Commit

Permalink
Merge pull request etcd-io#83 from hexfusion/bump-3.4.16
Browse files Browse the repository at this point in the history
ETCD-199: bump etcd v3.4.16
  • Loading branch information
openshift-merge-robot committed Jun 18, 2021
2 parents 91a8b2b + 0956f6d commit 4fd092b
Show file tree
Hide file tree
Showing 61 changed files with 1,671 additions and 508 deletions.
33 changes: 24 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sudo: required
services: docker

go:
- 1.12.12
- 1.12.17
- 1.15.11

notifications:
on_success: never
Expand All @@ -27,10 +28,23 @@ env:
matrix:
fast_finish: true
allow_failures:
- go: 1.12.12
- go: 1.12.17
env: TARGET=linux-amd64-grpcproxy
- go: 1.12.12
- go: 1.12.17
env: TARGET=linux-386-unit
exclude:
- go: 1.15.11
env: TARGET=linux-amd64-integration-1-cpu
- go: 1.15.11
env: TARGET=linux-amd64-integration-2-cpu
- go: 1.15.11
env: TARGET=linux-amd64-unit-4-cpu-race
- go: 1.15.11
env: TARGET=linux-amd64-coverage
- go: 1.15.11
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
- go: 1.15.11
env: TARGET=linux-386-unit-1-cpu

before_install:
- if [[ $TRAVIS_GO_VERSION == 1.* ]]; then docker pull gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION}; fi
Expand All @@ -40,6 +54,7 @@ install:

script:
- echo "TRAVIS_GO_VERSION=${TRAVIS_GO_VERSION}"
- RACE='true'; if [[ $TRAVIS_GO_VERSION == 1.15.11 ]]; then echo 'setting race off'; RACE='false'; fi
- >
case "${TARGET}" in
linux-amd64-fmt)
Expand All @@ -50,27 +65,27 @@ script:
linux-amd64-integration-1-cpu)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 CPU=1 PASSES='integration' ./test"
/bin/bash -c "GOARCH=amd64 CPU=1 PASSES='integration' RACE='${RACE}' ./test"
;;
linux-amd64-integration-2-cpu)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 CPU=2 PASSES='integration' ./test"
/bin/bash -c "GOARCH=amd64 CPU=2 PASSES='integration' RACE='${RACE}' ./test"
;;
linux-amd64-integration-4-cpu)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 CPU=4 PASSES='integration' ./test"
/bin/bash -c "GOARCH=amd64 CPU=4 PASSES='integration' RACE='${RACE}' ./test"
;;
linux-amd64-functional)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' ./test"
/bin/bash -c "./build && GOARCH=amd64 PASSES='functional' RACE='${RACE}' ./test"
;;
linux-amd64-unit)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 PASSES='unit' ./test"
/bin/bash -c "GOARCH=amd64 PASSES='unit' RACE='${RACE}' ./test"
;;
all-build)
docker run --rm \
Expand All @@ -84,7 +99,7 @@ script:
&& GO_BUILD_FLAGS='-v' GOARCH=ppc64le ./build"
;;
linux-amd64-grpcproxy)
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build grpcproxy'" make docker-test
sudo HOST_TMP_DIR=/tmp TEST_OPTS="PASSES='build grpcproxy'" GO_VERSION=${TRAVIS_GO_VERSION} make docker-test
;;
linux-386-unit)
docker run --rm \
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ docker-remove:



GO_VERSION ?= 1.12.12
GO_VERSION ?= 1.12.17
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")

TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
Expand All @@ -65,11 +65,11 @@ endif


# Example:
# GO_VERSION=1.12.12 make build-docker-test
# GO_VERSION=1.12.17 make build-docker-test
# make build-docker-test
#
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
# GO_VERSION=1.12.12 make push-docker-test
# GO_VERSION=1.12.17 make push-docker-test
# make push-docker-test
#
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com
Expand Down
2 changes: 1 addition & 1 deletion bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
]
},
{
"project": "golang.org/x/sys/unix",
"project": "golang.org/x/sys",
"licenses": [
{
"type": "BSD 3-clause \"New\" or \"Revised\" License",
Expand Down
9 changes: 7 additions & 2 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
Expand Down Expand Up @@ -298,6 +299,9 @@ type Config struct {
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down Expand Up @@ -404,8 +408,9 @@ func NewConfig() *Config {
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.
return outgoing
},
),
wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
),
)
}
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ Experimental feature:
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
15 changes: 10 additions & 5 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,16 +763,21 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
if len(ems) != len(lms) {
return fmt.Errorf("member count is unequal")
}
sort.Sort(MembersByPeerURLs(ems))
sort.Sort(MembersByPeerURLs(lms))

ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
for i := range ems {
if ok, err := netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[i].PeerURLs); !ok {
return fmt.Errorf("unmatched member while checking PeerURLs (%v)", err)
var err error
ok := false
for j := range lms {
if ok, err = netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[j].PeerURLs); ok {
lms[j].ID = ems[i].ID
break
}
}
if !ok {
return fmt.Errorf("PeerURLs: no match found for existing member (%v, %v), last resolver error (%v)", ems[i].ID, ems[i].PeerURLs, err)
}
lms[i].ID = ems[i].ID
}
local.members = make(map[types.ID]*Member)
for _, m := range lms {
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/api/v2store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func TestStoreWatchSlowConsumer(t *testing.T) {
s.Watch("/foo", true, true, 0) // stream must be true
// Fill watch channel with 100 events
for i := 1; i <= 100; i++ {
s.Set("/foo", false, string(i), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // ok
s.Set("/foo", false, string(rune(i)), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // ok
}
// testutil.AssertEqual(t, s.WatcherHub.count, int64(1))
s.Set("/foo", false, "101", v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // ok
Expand Down
55 changes: 49 additions & 6 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNoLeader
}

cctx, cancel := context.WithCancel(ss.Context())
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
ctx := newCancellableContext(ss.Context())
ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}

smap.mu.Lock()
smap.streams[ss] = struct{}{}
Expand All @@ -228,7 +228,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
cancel()
// TODO: investigate whether the reason for cancellation here is useful to know
ctx.Cancel(nil)
}()
}
}
Expand All @@ -237,10 +238,52 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}

// cancellableContext wraps a context with new cancellable context that allows a
// specific cancellation error to be preserved and later retrieved using the
// Context.Err() function. This is so downstream context users can disambiguate
// the reason for the cancellation which could be from the client (for example)
// or from this interceptor code.
type cancellableContext struct {
context.Context

lock sync.RWMutex
cancel context.CancelFunc
cancelReason error
}

func newCancellableContext(parent context.Context) *cancellableContext {
ctx, cancel := context.WithCancel(parent)
return &cancellableContext{
Context: ctx,
cancel: cancel,
}
}

// Cancel stores the cancellation reason and then delegates to context.WithCancel
// against the parent context.
func (c *cancellableContext) Cancel(reason error) {
c.lock.Lock()
c.cancelReason = reason
c.lock.Unlock()
c.cancel()
}

// Err will return the preserved cancel reason error if present, and will
// otherwise return the underlying error from the parent context.
func (c *cancellableContext) Err() error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.cancelReason != nil {
return c.cancelReason
}
return c.Context.Err()
}

type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
cancel *context.CancelFunc

// ctx is used so that we can preserve a reason for cancellation.
ctx *cancellableContext
}

func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
Expand Down Expand Up @@ -272,7 +315,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
smap.mu.Lock()
for ss := range smap.streams {
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
(*ssWithCtx.cancel)()
ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
<-ss.Context().Done()
}
}
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
16 changes: 13 additions & 3 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()

// TODO: There's a race here. When a stream is closed (e.g. due to a cancellation),
// the underlying error (e.g. a gRPC stream error) may be returned and handled
// through errc if the recv goroutine finishes before the send goroutine.
// When the recv goroutine wins, the stream error is retained. When recv loses
// the race, the underlying error is lost (unless the root error is propagated
// through Context.Err() which is not always the case (as callers have to decide
// to implement a custom context to do so). The stdlib context package builtins
// may be insufficient to carry semantically useful errors around and should be
// revisited.
select {
case err = <-errc:
if err == context.Canceled {
err = rpctypes.ErrGRPCWatchCanceled
}
close(sws.ctrlStream)

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
err = rpctypes.ErrGRPCWatchCanceled
}
}

Expand Down
8 changes: 2 additions & 6 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import (
"go.uber.org/zap"
)

const (
warnApplyDuration = 100 * time.Millisecond
)

type applyResult struct {
resp proto.Message
err error
Expand Down Expand Up @@ -115,7 +111,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
Expand Down Expand Up @@ -185,7 +181,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil)
defer warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, time.Now(), stringer, nil, nil)

switch r.Method {
case "POST":
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

WarningApplyDuration time.Duration

StrictReconfigCheck bool

// ClientCertAuthEnabled is true when cert has been signed by the client CA.
Expand Down
Loading

0 comments on commit 4fd092b

Please sign in to comment.