From 525ac6d2306ececfcd0705156362f0bfbe0ba1a3 Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Mon, 18 Mar 2019 09:11:07 +0900 Subject: [PATCH] Restructure store package (#36) * Restructure store package * Update CHANGES.md --- CHANGES.md | 3 ++- kvs/{client.go => grpc_client.go} | 3 +-- kvs/{service.go => grpc_service.go} | 5 ++-- kvs/{handler.go => http_handler.go} | 0 {store => kvs}/kvs.go | 6 ++--- store/kvs_fsm.go => kvs/raft_fsm.go | 22 +++++++-------- store/kvs_store.go => kvs/raft_server.go | 34 ++++++++++++------------ kvs/server.go | 5 ++-- 8 files changed, 38 insertions(+), 40 deletions(-) rename kvs/{client.go => grpc_client.go} (99%) rename kvs/{service.go => grpc_service.go} (95%) rename kvs/{handler.go => http_handler.go} (100%) rename {store => kvs}/kvs.go (97%) rename store/kvs_fsm.go => kvs/raft_fsm.go (87%) rename store/kvs_store.go => kvs/raft_server.go (82%) diff --git a/CHANGES.md b/CHANGES.md index 62305b8..4f180a4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,12 +12,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Add Wikipedia example #35 - Support cznicb and leveldb #34 +- Add logging #33 - Add CHANGES.md #29 - Add error handling for server startup #28. ### Changed -- Add logging #33 +- Restructure store package #36 - Update examples #32 - update Makefile #31 diff --git a/kvs/client.go b/kvs/grpc_client.go similarity index 99% rename from kvs/client.go rename to kvs/grpc_client.go index c3e454d..56701e9 100644 --- a/kvs/client.go +++ b/kvs/grpc_client.go @@ -19,10 +19,9 @@ import ( "log" "math" + "github.com/golang/protobuf/ptypes/empty" "github.com/mosuka/blast/protobuf/kvs" "github.com/mosuka/blast/protobuf/raft" - - "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" ) diff --git a/kvs/service.go b/kvs/grpc_service.go similarity index 95% rename from kvs/service.go rename to kvs/grpc_service.go index 9ed1767..156eb4f 100644 --- a/kvs/service.go +++ b/kvs/grpc_service.go @@ -22,17 +22,16 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/mosuka/blast/protobuf/kvs" "github.com/mosuka/blast/protobuf/raft" - "github.com/mosuka/blast/store" ) type Service struct { // wrapper and manager for db instance - store *store.KeyValueStore + store *RaftServer logger *log.Logger } -func NewService(store *store.KeyValueStore, logger *log.Logger) (*Service, error) { +func NewService(store *RaftServer, logger *log.Logger) (*Service, error) { return &Service{ store: store, logger: logger, diff --git a/kvs/handler.go b/kvs/http_handler.go similarity index 100% rename from kvs/handler.go rename to kvs/http_handler.go diff --git a/store/kvs.go b/kvs/kvs.go similarity index 97% rename from store/kvs.go rename to kvs/kvs.go index e5688f1..a6d19f5 100644 --- a/store/kvs.go +++ b/kvs/kvs.go @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package kvs import ( "log" "time" "github.com/dgraph-io/badger" - blasterrors "github.com/mosuka/blast/errors" + "github.com/mosuka/blast/errors" "github.com/mosuka/blast/protobuf/kvs" ) @@ -79,7 +79,7 @@ func (b *KVS) Get(key []byte) ([]byte, error) { }) if err == badger.ErrKeyNotFound { b.logger.Printf("[DEBUG] %v", err) - return nil, blasterrors.ErrNotFound + return nil, errors.ErrNotFound } if err != nil { return nil, err diff --git a/store/kvs_fsm.go b/kvs/raft_fsm.go similarity index 87% rename from store/kvs_fsm.go rename to kvs/raft_fsm.go index 9abc2ab..e4ff687 100644 --- a/store/kvs_fsm.go +++ b/kvs/raft_fsm.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package kvs import ( "errors" @@ -27,13 +27,13 @@ import ( "github.com/mosuka/blast/protobuf/kvs" ) -type KVSFSM struct { +type RaftFSM struct { kvs *KVS logger *log.Logger } -func NewKVSFSM(path string, logger *log.Logger) (*KVSFSM, error) { +func NewRaftFSM(path string, logger *log.Logger) (*RaftFSM, error) { // Create directory err := os.MkdirAll(path, 0755) if err != nil && !os.IsExist(err) { @@ -45,13 +45,13 @@ func NewKVSFSM(path string, logger *log.Logger) (*KVSFSM, error) { return nil, err } - return &KVSFSM{ + return &RaftFSM{ logger: logger, kvs: kvs, }, nil } -func (f *KVSFSM) Close() error { +func (f *RaftFSM) Close() error { err := f.kvs.Close() if err != nil { return err @@ -60,7 +60,7 @@ func (f *KVSFSM) Close() error { return nil } -func (f *KVSFSM) Get(key []byte) ([]byte, error) { +func (f *RaftFSM) Get(key []byte) ([]byte, error) { value, err := f.kvs.Get(key) if err != nil { return nil, err @@ -69,7 +69,7 @@ func (f *KVSFSM) Get(key []byte) ([]byte, error) { return value, nil } -func (f *KVSFSM) applySet(key []byte, value []byte) interface{} { +func (f *RaftFSM) applySet(key []byte, value []byte) interface{} { err := f.kvs.Set(key, value) if err != nil { f.logger.Printf("[ERR] %v", err) @@ -79,7 +79,7 @@ func (f *KVSFSM) applySet(key []byte, value []byte) interface{} { return nil } -func (f *KVSFSM) applyDelete(key []byte) interface{} { +func (f *RaftFSM) applyDelete(key []byte) interface{} { err := f.kvs.Delete(key) if err != nil { f.logger.Printf("[ERR] %v", err) @@ -89,7 +89,7 @@ func (f *KVSFSM) applyDelete(key []byte) interface{} { return nil } -func (f *KVSFSM) Apply(l *raft.Log) interface{} { +func (f *RaftFSM) Apply(l *raft.Log) interface{} { var c kvs.KVSCommand err := proto.Unmarshal(l.Data, &c) if err != nil { @@ -106,14 +106,14 @@ func (f *KVSFSM) Apply(l *raft.Log) interface{} { } } -func (f *KVSFSM) Snapshot() (raft.FSMSnapshot, error) { +func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) { return &KVSFSMSnapshot{ kvs: f.kvs, logger: f.logger, }, nil } -func (f *KVSFSM) Restore(rc io.ReadCloser) error { +func (f *RaftFSM) Restore(rc io.ReadCloser) error { defer func() { err := rc.Close() if err != nil { diff --git a/store/kvs_store.go b/kvs/raft_server.go similarity index 82% rename from store/kvs_store.go rename to kvs/raft_server.go index 9c51648..678dae1 100644 --- a/store/kvs_store.go +++ b/kvs/raft_server.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package kvs import ( "log" @@ -23,27 +23,27 @@ import ( "github.com/golang/protobuf/proto" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" - blasterrors "github.com/mosuka/blast/errors" + "github.com/mosuka/blast/errors" "github.com/mosuka/blast/protobuf/kvs" ) -type KeyValueStore struct { +type RaftServer struct { BindAddr string DataDir string raft *raft.Raft - fsm *KVSFSM + fsm *RaftFSM logger *log.Logger } -func NewKeyValueStore(bindAddr string, dataDir string, logger *log.Logger) (*KeyValueStore, error) { - fsm, err := NewKVSFSM(filepath.Join(dataDir, "kvs"), logger) +func NewRaftServer(bindAddr string, dataDir string, logger *log.Logger) (*RaftServer, error) { + fsm, err := NewRaftFSM(filepath.Join(dataDir, "kvs"), logger) if err != nil { return nil, err } - return &KeyValueStore{ + return &RaftServer{ BindAddr: bindAddr, DataDir: dataDir, fsm: fsm, @@ -51,7 +51,7 @@ func NewKeyValueStore(bindAddr string, dataDir string, logger *log.Logger) (*Key }, nil } -func (s *KeyValueStore) Open(bootstrap bool, localID string) error { +func (s *RaftServer) Open(bootstrap bool, localID string) error { config := raft.DefaultConfig() config.LocalID = raft.ServerID(localID) config.SnapshotThreshold = 1024 @@ -99,7 +99,7 @@ func (s *KeyValueStore) Open(bootstrap bool, localID string) error { return nil } -func (s *KeyValueStore) Close() error { +func (s *RaftServer) Close() error { err := s.fsm.Close() if err != nil { return err @@ -108,7 +108,7 @@ func (s *KeyValueStore) Close() error { return nil } -func (s *KeyValueStore) Join(nodeId string, addr string) error { +func (s *RaftServer) Join(nodeId string, addr string) error { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { @@ -132,7 +132,7 @@ func (s *KeyValueStore) Join(nodeId string, addr string) error { return nil } -func (s *KeyValueStore) Leave(nodeId string) error { +func (s *RaftServer) Leave(nodeId string) error { cf := s.raft.GetConfiguration() err := cf.Error() if err != nil { @@ -156,7 +156,7 @@ func (s *KeyValueStore) Leave(nodeId string) error { return nil } -func (s *KeyValueStore) Snapshot() error { +func (s *RaftServer) Snapshot() error { f := s.raft.Snapshot() err := f.Error() if err != nil { @@ -166,7 +166,7 @@ func (s *KeyValueStore) Snapshot() error { return nil } -func (s *KeyValueStore) Get(key []byte) ([]byte, error) { +func (s *RaftServer) Get(key []byte) ([]byte, error) { value, err := s.fsm.Get(key) if err != nil { return nil, err @@ -175,9 +175,9 @@ func (s *KeyValueStore) Get(key []byte) ([]byte, error) { return value, nil } -func (s *KeyValueStore) Set(key []byte, value []byte) error { +func (s *RaftServer) Set(key []byte, value []byte) error { if s.raft.State() != raft.Leader { - return blasterrors.ErrNotLeader + return errors.ErrNotLeader } c := &kvs.KVSCommand{ @@ -200,9 +200,9 @@ func (s *KeyValueStore) Set(key []byte, value []byte) error { return nil } -func (s *KeyValueStore) Delete(key []byte) error { +func (s *RaftServer) Delete(key []byte) error { if s.raft.State() != raft.Leader { - return blasterrors.ErrNotLeader + return errors.ErrNotLeader } c := &kvs.KVSCommand{ diff --git a/kvs/server.go b/kvs/server.go index 27dcaf4..8bd8fef 100644 --- a/kvs/server.go +++ b/kvs/server.go @@ -24,13 +24,12 @@ import ( blasthttp "github.com/mosuka/blast/http" "github.com/mosuka/blast/protobuf/kvs" "github.com/mosuka/blast/protobuf/raft" - "github.com/mosuka/blast/store" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" ) type Server struct { - store *store.KeyValueStore + store *RaftServer // gRPC grpcListener net.Listener @@ -54,7 +53,7 @@ func NewServer(nodeId string, bindAddr string, grpcAddr string, httpAddr string, } // store - server.store, err = store.NewKeyValueStore(bindAddr, dataDir, server.logger) + server.store, err = NewRaftServer(bindAddr, dataDir, server.logger) if err != nil { server.logger.Printf("[ERR] %v", err) return nil