Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Commit

Permalink
Restructure store package (#36)
Browse files Browse the repository at this point in the history
* Restructure store package

* Update CHANGES.md
  • Loading branch information
mosuka authored Mar 18, 2019
1 parent b36a0fa commit 525ac6d
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 40 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Add Wikipedia example #35
- Support cznicb and leveldb #34
- Add logging #33
- Add CHANGES.md #29
- Add error handling for server startup #28.

### Changed

- Add logging #33
- Restructure store package #36
- Update examples #32
- update Makefile #31

Expand Down
3 changes: 1 addition & 2 deletions kvs/client.go → kvs/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
"log"
"math"

"github.com/golang/protobuf/ptypes/empty"
"github.com/mosuka/blast/protobuf/kvs"
"github.com/mosuka/blast/protobuf/raft"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)

Expand Down
5 changes: 2 additions & 3 deletions kvs/service.go → kvs/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ import (
"github.com/golang/protobuf/ptypes/empty"
"github.com/mosuka/blast/protobuf/kvs"
"github.com/mosuka/blast/protobuf/raft"
"github.com/mosuka/blast/store"
)

type Service struct {
// wrapper and manager for db instance
store *store.KeyValueStore
store *RaftServer

logger *log.Logger
}

func NewService(store *store.KeyValueStore, logger *log.Logger) (*Service, error) {
func NewService(store *RaftServer, logger *log.Logger) (*Service, error) {
return &Service{
store: store,
logger: logger,
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions store/kvs.go → kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package store
package kvs

import (
"log"
"time"

"github.com/dgraph-io/badger"
blasterrors "github.com/mosuka/blast/errors"
"github.com/mosuka/blast/errors"
"github.com/mosuka/blast/protobuf/kvs"
)

Expand Down Expand Up @@ -79,7 +79,7 @@ func (b *KVS) Get(key []byte) ([]byte, error) {
})
if err == badger.ErrKeyNotFound {
b.logger.Printf("[DEBUG] %v", err)
return nil, blasterrors.ErrNotFound
return nil, errors.ErrNotFound
}
if err != nil {
return nil, err
Expand Down
22 changes: 11 additions & 11 deletions store/kvs_fsm.go → kvs/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package store
package kvs

import (
"errors"
Expand All @@ -27,13 +27,13 @@ import (
"github.com/mosuka/blast/protobuf/kvs"
)

type KVSFSM struct {
type RaftFSM struct {
kvs *KVS

logger *log.Logger
}

func NewKVSFSM(path string, logger *log.Logger) (*KVSFSM, error) {
func NewRaftFSM(path string, logger *log.Logger) (*RaftFSM, error) {
// Create directory
err := os.MkdirAll(path, 0755)
if err != nil && !os.IsExist(err) {
Expand All @@ -45,13 +45,13 @@ func NewKVSFSM(path string, logger *log.Logger) (*KVSFSM, error) {
return nil, err
}

return &KVSFSM{
return &RaftFSM{
logger: logger,
kvs: kvs,
}, nil
}

func (f *KVSFSM) Close() error {
func (f *RaftFSM) Close() error {
err := f.kvs.Close()
if err != nil {
return err
Expand All @@ -60,7 +60,7 @@ func (f *KVSFSM) Close() error {
return nil
}

func (f *KVSFSM) Get(key []byte) ([]byte, error) {
func (f *RaftFSM) Get(key []byte) ([]byte, error) {
value, err := f.kvs.Get(key)
if err != nil {
return nil, err
Expand All @@ -69,7 +69,7 @@ func (f *KVSFSM) Get(key []byte) ([]byte, error) {
return value, nil
}

func (f *KVSFSM) applySet(key []byte, value []byte) interface{} {
func (f *RaftFSM) applySet(key []byte, value []byte) interface{} {
err := f.kvs.Set(key, value)
if err != nil {
f.logger.Printf("[ERR] %v", err)
Expand All @@ -79,7 +79,7 @@ func (f *KVSFSM) applySet(key []byte, value []byte) interface{} {
return nil
}

func (f *KVSFSM) applyDelete(key []byte) interface{} {
func (f *RaftFSM) applyDelete(key []byte) interface{} {
err := f.kvs.Delete(key)
if err != nil {
f.logger.Printf("[ERR] %v", err)
Expand All @@ -89,7 +89,7 @@ func (f *KVSFSM) applyDelete(key []byte) interface{} {
return nil
}

func (f *KVSFSM) Apply(l *raft.Log) interface{} {
func (f *RaftFSM) Apply(l *raft.Log) interface{} {
var c kvs.KVSCommand
err := proto.Unmarshal(l.Data, &c)
if err != nil {
Expand All @@ -106,14 +106,14 @@ func (f *KVSFSM) Apply(l *raft.Log) interface{} {
}
}

func (f *KVSFSM) Snapshot() (raft.FSMSnapshot, error) {
func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
return &KVSFSMSnapshot{
kvs: f.kvs,
logger: f.logger,
}, nil
}

func (f *KVSFSM) Restore(rc io.ReadCloser) error {
func (f *RaftFSM) Restore(rc io.ReadCloser) error {
defer func() {
err := rc.Close()
if err != nil {
Expand Down
34 changes: 17 additions & 17 deletions store/kvs_store.go → kvs/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package store
package kvs

import (
"log"
Expand All @@ -23,35 +23,35 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
blasterrors "github.com/mosuka/blast/errors"
"github.com/mosuka/blast/errors"
"github.com/mosuka/blast/protobuf/kvs"
)

type KeyValueStore struct {
type RaftServer struct {
BindAddr string
DataDir string

raft *raft.Raft
fsm *KVSFSM
fsm *RaftFSM

logger *log.Logger
}

func NewKeyValueStore(bindAddr string, dataDir string, logger *log.Logger) (*KeyValueStore, error) {
fsm, err := NewKVSFSM(filepath.Join(dataDir, "kvs"), logger)
func NewRaftServer(bindAddr string, dataDir string, logger *log.Logger) (*RaftServer, error) {
fsm, err := NewRaftFSM(filepath.Join(dataDir, "kvs"), logger)
if err != nil {
return nil, err
}

return &KeyValueStore{
return &RaftServer{
BindAddr: bindAddr,
DataDir: dataDir,
fsm: fsm,
logger: logger,
}, nil
}

func (s *KeyValueStore) Open(bootstrap bool, localID string) error {
func (s *RaftServer) Open(bootstrap bool, localID string) error {
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(localID)
config.SnapshotThreshold = 1024
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *KeyValueStore) Open(bootstrap bool, localID string) error {
return nil
}

func (s *KeyValueStore) Close() error {
func (s *RaftServer) Close() error {
err := s.fsm.Close()
if err != nil {
return err
Expand All @@ -108,7 +108,7 @@ func (s *KeyValueStore) Close() error {
return nil
}

func (s *KeyValueStore) Join(nodeId string, addr string) error {
func (s *RaftServer) Join(nodeId string, addr string) error {
cf := s.raft.GetConfiguration()
err := cf.Error()
if err != nil {
Expand All @@ -132,7 +132,7 @@ func (s *KeyValueStore) Join(nodeId string, addr string) error {
return nil
}

func (s *KeyValueStore) Leave(nodeId string) error {
func (s *RaftServer) Leave(nodeId string) error {
cf := s.raft.GetConfiguration()
err := cf.Error()
if err != nil {
Expand All @@ -156,7 +156,7 @@ func (s *KeyValueStore) Leave(nodeId string) error {
return nil
}

func (s *KeyValueStore) Snapshot() error {
func (s *RaftServer) Snapshot() error {
f := s.raft.Snapshot()
err := f.Error()
if err != nil {
Expand All @@ -166,7 +166,7 @@ func (s *KeyValueStore) Snapshot() error {
return nil
}

func (s *KeyValueStore) Get(key []byte) ([]byte, error) {
func (s *RaftServer) Get(key []byte) ([]byte, error) {
value, err := s.fsm.Get(key)
if err != nil {
return nil, err
Expand All @@ -175,9 +175,9 @@ func (s *KeyValueStore) Get(key []byte) ([]byte, error) {
return value, nil
}

func (s *KeyValueStore) Set(key []byte, value []byte) error {
func (s *RaftServer) Set(key []byte, value []byte) error {
if s.raft.State() != raft.Leader {
return blasterrors.ErrNotLeader
return errors.ErrNotLeader
}

c := &kvs.KVSCommand{
Expand All @@ -200,9 +200,9 @@ func (s *KeyValueStore) Set(key []byte, value []byte) error {
return nil
}

func (s *KeyValueStore) Delete(key []byte) error {
func (s *RaftServer) Delete(key []byte) error {
if s.raft.State() != raft.Leader {
return blasterrors.ErrNotLeader
return errors.ErrNotLeader
}

c := &kvs.KVSCommand{
Expand Down
5 changes: 2 additions & 3 deletions kvs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import (
blasthttp "github.com/mosuka/blast/http"
"github.com/mosuka/blast/protobuf/kvs"
"github.com/mosuka/blast/protobuf/raft"
"github.com/mosuka/blast/store"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
)

type Server struct {
store *store.KeyValueStore
store *RaftServer

// gRPC
grpcListener net.Listener
Expand All @@ -54,7 +53,7 @@ func NewServer(nodeId string, bindAddr string, grpcAddr string, httpAddr string,
}

// store
server.store, err = store.NewKeyValueStore(bindAddr, dataDir, server.logger)
server.store, err = NewRaftServer(bindAddr, dataDir, server.logger)
if err != nil {
server.logger.Printf("[ERR] %v", err)
return nil
Expand Down

0 comments on commit 525ac6d

Please sign in to comment.