diff --git a/etcdmain/config.go b/etcdmain/config.go index 117ba4a9bfd6..30d3c15f556b 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -115,6 +115,8 @@ type config struct { printVersion bool + v3demo bool + ignored []string } @@ -208,6 +210,9 @@ func NewConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit") + // demo flag + fs.BoolVar(&cfg.v3demo, "v3demo", false, "Enable v3 demo") + // backwards-compatibility with v0.4.6 fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.") fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 3c925bf64cbb..77619237d41f 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -31,9 +31,12 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/osutil" @@ -233,6 +236,15 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { clns = append(clns, l) } + var v3l net.Listener + if cfg.v3demo { + v3l, err = net.Listen("tcp", "127.0.0.1:12379") + if err != nil { + plog.Fatal(err) + } + plog.Infof("listening for client rpc on 127.0.0.1:12379") + } + srvcfg := &etcdserver.ServerConfig{ Name: cfg.name, ClientURLs: cfg.acurls, @@ -250,6 +262,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Transport: pt, TickMs: cfg.TickMs, ElectionTicks: cfg.electionTicks(), + V3demo: cfg.v3demo, } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(srvcfg) @@ -281,6 +294,14 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Fatal(serveHTTP(l, ch, 0)) }(l) } + + if cfg.v3demo { + // set up v3 demo rpc + grpcServer := grpc.NewServer() + etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s)) + go plog.Fatal(grpcServer.Serve(v3l)) + } + return s.StopNotify(), nil } diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go new file mode 100644 index 000000000000..da029fac721d --- /dev/null +++ b/etcdserver/api/v3rpc/key.go @@ -0,0 +1,54 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type handler struct { + server etcdserver.Server +} + +func New(s etcdserver.Server) pb.EtcdServer { + return &handler{s} +} + +func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + return resp.(*pb.RangeResponse), nil +} + +func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + return resp.(*pb.PutResponse), nil +} + +func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + panic("not implemented") + return nil, nil +} + +func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + panic("not implemented") + return nil, nil +} + +func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + panic("not implemented") + return nil, nil +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 9b6132670ecb..b19eae375c2e 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -43,6 +43,8 @@ type ServerConfig struct { TickMs uint ElectionTicks int + + V3demo bool } // VerifyBootstrapConfig sanity-checks the initial config for bootstrap case diff --git a/etcdserver/server.go b/etcdserver/server.go index 330029bdad06..fbfc20b9876a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "net/http" + "os" "path" "regexp" "sync/atomic" @@ -27,6 +28,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" // owner can make/remove files inside the directory "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" @@ -43,13 +45,13 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/snap" + dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/etcd/wal" ) const ( - // owner can make/remove files inside the directory privateDirMode = 0700 defaultSyncTimeout = time.Second @@ -106,6 +108,7 @@ type Server interface { Leader() types.ID // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) + V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. Process(ctx context.Context, m raftpb.Message) error @@ -156,6 +159,7 @@ type EtcdServer struct { cluster *cluster store store.Store + kv dstorage.KV stats *stats.ServerStats lstats *stats.LeaderStats @@ -313,6 +317,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { forceVersionC: make(chan struct{}), } + if cfg.V3demo { + srv.kv = dstorage.New(path.Join(cfg.DataDir, "member", "v3demo")) + } else { + // we do not care about the error of the removal + os.RemoveAll(path.Join(cfg.DataDir, "member", "v3demo")) + } + // TODO: move transport initialization near the definition of remote tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats) // add all remotes into transport diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go new file mode 100644 index 000000000000..0f0fa8a2acce --- /dev/null +++ b/etcdserver/v3demo_server.go @@ -0,0 +1,53 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { + switch { + case r.Range != nil: + rr := r.Range + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0) + if err != nil { + panic("not handled error") + } + + resp.Header.Index = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp + case r.Put != nil: + rp := r.Put + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + rev := s.kv.Put(rp.Key, rp.Value) + resp.Header.Index = rev + return resp + case r.DeleteRange != nil: + case r.Txn != nil: + panic("not implemented") + default: + panic("not implemented") + } + return nil +}