Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 emulation over v3 #8407

Merged
merged 11 commits into from
Aug 31, 2017
1 change: 1 addition & 0 deletions e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
}
args = addV2Args(args)
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")
}
Expand Down
19 changes: 19 additions & 0 deletions e2e/v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !v2v3

package e2e

func addV2Args(args []string) []string { return args }
21 changes: 21 additions & 0 deletions e2e/v2v3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build v2v3

package e2e

func addV2Args(args []string) []string {
return append(args, "--experimental-enable-v2v3", "v2/")
}
1 change: 1 addition & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type Config struct {
// Experimental flags

ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down
9 changes: 8 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/etcdserver/api/v2v3"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/debugutil"
Expand Down Expand Up @@ -409,7 +411,12 @@ func (e *Etcd) serve() (err error) {
// Start a client server goroutine for each listen address
var h http.Handler
if e.Config().EnableV2 {
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
if len(e.Config().ExperimentalEnableV2V3) > 0 {
srv := v2v3.NewServer(v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
h = v2http.NewClientHandler(srv, e.Server.Cfg.ReqTimeout())
} else {
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
}
} else {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, e.Server)
Expand Down
14 changes: 3 additions & 11 deletions etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
}
}

func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
toTTLOptions(r)
func applyRequest(req *pb.Request, applyV2 etcdserver.ApplierV2) {
r := (*etcdserver.RequestV2)(req)
r.TTLOptions()
switch r.Method {
case "POST":
applyV2.Post(r)
Expand All @@ -236,15 +237,6 @@ func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
}
}

func toTTLOptions(r *pb.Request) store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
}
return ttlOptions
}

func writeStore(w io.Writer, st store.Store) uint64 {
all, err := st.Get("/1", true, true)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func newConfig() *config {

fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", cfg.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
fs.BoolVar(&cfg.EnableV2, "enable-v2", true, "Accept etcd V2 client requests.")
fs.StringVar(&cfg.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")

// proxy
fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", ")))
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,7 @@ auth flags:
experimental flags:
--experimental-corrupt-check-time '0s'
duration of time between cluster corruption check passes.
--experimental-enable-v2v3 ''
serve v2 requests through the v3 backend under a given prefix.
`
)
3 changes: 0 additions & 3 deletions etcdserver/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type Cluster interface {
// Member retrieves a particular member based on ID, or nil if the
// member does not exist in the cluster
Member(id types.ID) *membership.Member
// IsIDRemoved checks whether the given ID has been removed from this
// cluster at some point in the past
IsIDRemoved(id types.ID) bool
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}
2 changes: 1 addition & 1 deletion etcdserver/api/etcdhttp/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (

// HandleBasic adds handlers to a mux for serving JSON etcd client requests
// that do not access the v2 store.
func HandleBasic(mux *http.ServeMux, server *etcdserver.EtcdServer) {
func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) {
mux.HandleFunc(varsPath, serveVars)
mux.HandleFunc(configPath+"/local/log", logHandleFunc)
HandleMetricsHealth(mux, server)
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/api/etcdhttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
)

// HandleMetricsHealth registers metrics and health handlers.
func HandleMetricsHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(pathMetrics, prometheus.Handler())
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
Expand All @@ -44,7 +44,7 @@ func HandlePrometheus(mux *http.ServeMux) {
}

// HandleHealth registers health handler on '/health'.
func HandleHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}

Expand Down Expand Up @@ -74,7 +74,7 @@ type Health struct {
Errors []string `json:"errors,omitempty"`
}

func checkHealth(srv *etcdserver.EtcdServer) Health {
func checkHealth(srv etcdserver.ServerV2) Health {
h := Health{Health: false}

as := srv.Alarms()
Expand Down
9 changes: 2 additions & 7 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@ const (
)

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
func NewPeerHandler(s etcdserver.ServerPeer) http.Handler {
return newPeerHandler(s.Cluster(), s.RaftHandler(), s.LeaseHandler())
}

func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
Expand Down
1 change: 0 additions & 1 deletion etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (c *fakeCluster) Members() []*membership.Member {
return []*membership.Member(ms)
}
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil }

// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
Expand Down
34 changes: 17 additions & 17 deletions etcdserver/api/v2http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,21 @@ const (
)

// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
func NewClientHandler(server etcdserver.ServerPeer, timeout time.Duration) http.Handler {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, server)
handleV2(mux, server, timeout)
return requestLogger(mux)
}

func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Duration) {
func handleV2(mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) {
sec := auth.NewStore(server, timeout)
kh := &keysHandler{
sec: sec,
server: server,
cluster: server.Cluster(),
timer: server,
timeout: timeout,
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}

sh := &statsHandler{
Expand All @@ -78,15 +77,15 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du
cluster: server.Cluster(),
timeout: timeout,
clock: clockwork.NewRealClock(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}

mah := &machinesHandler{cluster: server.Cluster()}

sech := &authHandler{
sec: sec,
cluster: server.Cluster(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}
mux.HandleFunc("/", http.NotFound)
mux.Handle(keysPrefix, kh)
Expand All @@ -102,9 +101,8 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du

type keysHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timer etcdserver.RaftTimer
timeout time.Duration
clientCertAuthEnabled bool
}
Expand Down Expand Up @@ -142,15 +140,15 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, noValueOnSuccess, h.timer); err != nil {
if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil {
// Should never be reached
plog.Errorf("error writing event (%v)", err)
}
reportRequestCompleted(rr, resp, startTime)
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cancel()
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
handleKeyWatch(ctx, w, resp, rr.Stream)
default:
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
}
Expand All @@ -170,7 +168,7 @@ func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

type membersHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timeout time.Duration
clock clockwork.Clock
Expand Down Expand Up @@ -503,14 +501,15 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
// writeKeyEvent trims the prefix of key path in a single Event under
// StoreKeysPrefix, serializes it and writes the resulting JSON to the given
// ResponseWriter, along with the appropriate headers.
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noValueOnSuccess bool, rt etcdserver.RaftTimer) error {
func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error {
ev := resp.Event
if ev == nil {
return errors.New("cannot write empty Event!")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))

if ev.IsCreated() {
w.WriteHeader(http.StatusCreated)
Expand Down Expand Up @@ -552,7 +551,8 @@ func writeKeyError(w http.ResponseWriter, err error) {
}
}

func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response, stream bool) {
wa := resp.Watcher
defer wa.Remove()
ech := wa.EventChan()
var nch <-chan bool
Expand All @@ -562,8 +562,8 @@ func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher

w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
Expand Down
Loading