diff --git a/cmd/blastd/indexer.go b/cmd/blastd/indexer.go index e72dda1..8d2f7cb 100644 --- a/cmd/blastd/indexer.go +++ b/cmd/blastd/indexer.go @@ -49,6 +49,7 @@ func startIndexer(c *cli.Context) error { grpcAddr := c.String("grpc-addr") httpAddr := c.String("http-addr") dataDir := c.String("data-dir") + raftStorageType := c.String("raft-storage-type") peerAddr := c.String("peer-addr") indexMappingFile := c.String("index-mapping-file") @@ -132,7 +133,7 @@ func startIndexer(c *cli.Context) error { "index_storage_type": indexStorageType, } - svr, err := indexer.NewServer(managerAddr, clusterId, nodeId, metadata, peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) + svr, err := indexer.NewServer(managerAddr, clusterId, nodeId, metadata, raftStorageType, peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) if err != nil { return err } diff --git a/cmd/blastd/main.go b/cmd/blastd/main.go index ccb949a..27788f1 100644 --- a/cmd/blastd/main.go +++ b/cmd/blastd/main.go @@ -130,6 +130,11 @@ func main() { Value: "/tmp/blast-index", Usage: "Data directory", }, + cli.StringFlag{ + Name: "raft-storage-type", + Value: "boltdb", + Usage: "Raft log storage type to use", + }, cli.StringFlag{ Name: "peer-addr", Value: "", @@ -182,6 +187,11 @@ func main() { Value: "./", Usage: "Data directory", }, + cli.StringFlag{ + Name: "raft-storage-type", + Value: "boltdb", + Usage: "Raft log storage type to use", + }, cli.StringFlag{ Name: "peer-addr", Value: "", diff --git a/cmd/blastd/manager.go b/cmd/blastd/manager.go index 33dce6a..6e48b27 100644 --- a/cmd/blastd/manager.go +++ b/cmd/blastd/manager.go @@ -46,6 +46,7 @@ func startManager(c *cli.Context) error { grpcAddr := c.String("grpc-addr") httpAddr := c.String("http-addr") dataDir := c.String("data-dir") + raftStorageType := c.String("raft-storage-type") peerAddr := c.String("peer-addr") indexMappingFile := c.String("index-mapping-file") @@ -129,7 +130,7 @@ func startManager(c *cli.Context) error { "index_storage_type": indexStorageType, } - svr, err := manager.NewServer(nodeId, metadata, peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) + svr, err := manager.NewServer(nodeId, metadata, raftStorageType, peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) if err != nil { return err } diff --git a/example/wiki_search_request_simple.json b/example/wiki_search_request_simple.json index e4cac4d..9ed3040 100644 --- a/example/wiki_search_request_simple.json +++ b/example/wiki_search_request_simple.json @@ -1,6 +1,6 @@ { "query": { - "query": "+_all:search" + "query": "+text_en:search" }, "size": 10, "from": 0, diff --git a/go.mod b/go.mod index 49ff791..3866069 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,8 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.1 // indirect - github.com/hashicorp/raft v1.0.0 + github.com/hashicorp/raft v1.1.0 + github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa github.com/ikawaha/kagome.ipadic v1.0.1 // indirect github.com/imdario/mergo v0.3.7 @@ -31,7 +32,6 @@ require ( github.com/mash/go-accesslog v0.0.0-20180522074327-610c2be04217 github.com/mosuka/bbadger v0.0.0-20190319122948-67a91aedfe68 github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/pascaldekloe/goe v0.1.0 // indirect github.com/prometheus/client_golang v0.9.2 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect github.com/prometheus/common v0.2.0 // indirect @@ -46,7 +46,6 @@ require ( go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 golang.org/x/net v0.0.0-20190327214358-63eda1eb0650 // indirect - golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 // indirect google.golang.org/genproto v0.0.0-20190327125643-d831d65fe17d // indirect google.golang.org/grpc v1.19.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 95a8517..a38a8a7 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkBy github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/RoaringBitmap/roaring v0.4.17 h1:oCYFIFEMSQZrLHpywH7919esI1VSrQZ0pJXkZPGIJ78= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/Smerity/govarint v0.0.0-20150407073650-7265e41f48f1 h1:G/NOANWMQev0CftoyxQwtRakdyNNNMB3qxkt/tj1HGs= @@ -12,6 +13,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89 h1:A1SPjPcl2LdF2Skv9Zt41jWu4XYQAyvBDzrveQjlkhQ= github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89/go.mod h1:wSblbytRgcqD+U+gGCKz5145DyjUYPh5fqh2uyXxfZw= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= @@ -30,6 +33,8 @@ github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d h1:iPCfLX github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d/go.mod h1:cdytUvf6FKWA9NpXJihYdZq8TN2AiQ5HOS0UZUz0C9g= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/couchbase/ghistogram v0.0.0-20170308220240-d910dd063dd6 h1:T7Qykid5GIoDEVTZL0NcbimcT2qmzjo5mNGhe8i0/5M= github.com/couchbase/ghistogram v0.0.0-20170308220240-d910dd063dd6/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k= @@ -87,10 +92,16 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmo github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -98,6 +109,10 @@ github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/raft v1.0.0 h1:htBVktAOtGs4Le5Z7K8SF5H2+oWsQFYVmOgH5loro7Y= github.com/hashicorp/raft v1.0.0/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI= +github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0= +github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= +github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 h1:bLsrEmB2NUwkHH18FOJBIa04wOV2RQalJrcafTYu6Lg= +github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa h1:ccwcWyXHTaonH6yzx+t/3p9aNm/ogSTfd6YobZOtHmE= github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa/go.mod h1:ooP3NrrH0GG/sVjF9pbRvhF6nVHRR4mkkwscLqReN1o= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -179,6 +194,7 @@ github.com/tecbot/gorocksdb v0.0.0-20181010114359-8752a9433481 h1:HOxvxvnntLiPn1 github.com/tecbot/gorocksdb v0.0.0-20181010114359-8752a9433481/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= @@ -217,6 +233,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 h1:hOY+O8MxdkPV10pNf7/XEHaySCiPKxixMKUshfHsGn0= golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/indexer/raft_server.go b/indexer/raft_server.go index 335e595..565480b 100644 --- a/indexer/raft_server.go +++ b/indexer/raft_server.go @@ -25,6 +25,7 @@ import ( "github.com/blevesearch/bleve" "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" raftbadgerdb "github.com/markthethomas/raft-badger" _ "github.com/mosuka/blast/config" blasterrors "github.com/mosuka/blast/errors" @@ -33,10 +34,10 @@ import ( ) type RaftServer struct { - id string - metadata map[string]interface{} - - bootstrap bool + id string + metadata map[string]interface{} + raftStorageType string + bootstrap bool raft *raft.Raft fsm *RaftFSM @@ -46,12 +47,12 @@ type RaftServer struct { logger *zap.Logger } -func NewRaftServer(id string, metadata map[string]interface{}, bootstrap bool, indexConfig map[string]interface{}, logger *zap.Logger) (*RaftServer, error) { +func NewRaftServer(id string, metadata map[string]interface{}, raftStorageType string, bootstrap bool, indexConfig map[string]interface{}, logger *zap.Logger) (*RaftServer, error) { return &RaftServer{ - id: id, - metadata: metadata, - - bootstrap: bootstrap, + id: id, + metadata: metadata, + raftStorageType: raftStorageType, + bootstrap: bootstrap, indexConfig: indexConfig, logger: logger, @@ -107,7 +108,7 @@ func (s *RaftServer) Start() error { return err } - snapshotPath := filepath.Join(dataDir, "snapshots") + snapshotPath := dataDir s.logger.Info("create snapshot store", zap.String("path", snapshotPath)) snapshotStore, err := raft.NewFileSnapshotStore(snapshotPath, 2, ioutil.Discard) if err != nil { @@ -115,26 +116,59 @@ func (s *RaftServer) Start() error { return err } - logStore := filepath.Join(dataDir, "raft.db") - s.logger.Info("create Raft log store", zap.String("path", logStore)) - //raftLogStore, err := raftboltdb.NewBoltStore(logStore) - err = os.MkdirAll(filepath.Join(logStore, "badger"), 0755) - if err != nil { - s.logger.Fatal(err.Error()) - return err - } - raftLogStore, err := raftbadgerdb.NewBadgerStore(logStore) - //raftLogStore, err := raftmdb.NewMDBStore(logStore) - if err != nil { - s.logger.Fatal(err.Error()) - return err - } - s.logger.Info("create Raft machine") - s.raft, err = raft.NewRaft(config, s.fsm, raftLogStore, raftLogStore, snapshotStore, transport) - if err != nil { - s.logger.Fatal(err.Error()) - return err + switch s.raftStorageType { + case "boltdb": + logStorePath := filepath.Join(dataDir, "raft", "boltdb.db") + err = os.MkdirAll(filepath.Join(dataDir, "raft"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftboltdb.NewBoltStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + case "badger": + logStorePath := filepath.Join(dataDir, "raft") + err = os.MkdirAll(filepath.Join(logStorePath, "badger"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftbadgerdb.NewBadgerStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + default: + logStorePath := filepath.Join(dataDir, "raft", "boltdb.db") + err = os.MkdirAll(filepath.Join(dataDir, "raft"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftboltdb.NewBoltStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } } if s.bootstrap { diff --git a/indexer/server.go b/indexer/server.go index 2168dda..9f6fc65 100644 --- a/indexer/server.go +++ b/indexer/server.go @@ -28,10 +28,10 @@ type Server struct { managerAddr string clusterId string - id string - metadata map[string]interface{} - - peerAddr string + id string + metadata map[string]interface{} + raftStorageType string + peerAddr string indexConfig map[string]interface{} @@ -45,15 +45,15 @@ type Server struct { httpLogger accesslog.Logger } -func NewServer(managerAddr string, clusterId string, id string, metadata map[string]interface{}, peerAddr string, indexConfig map[string]interface{}, logger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { +func NewServer(managerAddr string, clusterId string, id string, metadata map[string]interface{}, raftStorageType, peerAddr string, indexConfig map[string]interface{}, logger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { return &Server{ managerAddr: managerAddr, clusterId: clusterId, - id: id, - metadata: metadata, - - peerAddr: peerAddr, + id: id, + metadata: metadata, + raftStorageType: raftStorageType, + peerAddr: peerAddr, indexConfig: indexConfig, @@ -168,20 +168,12 @@ func (s *Server) Start() { s.logger.Fatal(err.Error()) return } - - //ins, err := protobuf.MarshalAny(resp.IndexConfig) - //if err != nil { - // s.logger.Fatal(err.Error()) - // return - //} - // - //s.indexConfig = *ins.(*map[string]interface{}) } var err error // create raft server - s.raftServer, err = NewRaftServer(s.id, s.metadata, bootstrap, s.indexConfig, s.logger) + s.raftServer, err = NewRaftServer(s.id, s.metadata, s.raftStorageType, bootstrap, s.indexConfig, s.logger) if err != nil { s.logger.Fatal(err.Error()) return diff --git a/indexer/server_test.go b/indexer/server_test.go index 2103ca6..38fb242 100644 --- a/indexer/server_test.go +++ b/indexer/server_test.go @@ -87,7 +87,7 @@ func TestIndexserStandalone(t *testing.T) { } // create logger - logger := logutils.NewLogger("DEBUG", "", 500, 3, 30, false) + logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) @@ -133,7 +133,7 @@ func TestIndexserStandalone(t *testing.T) { "data_dir": dataDir, } - server, err := NewServer("", "", nodeId, metadata, peerAddr, indexConfig, logger, httpAccessLogger) + server, err := NewServer("", "", nodeId, metadata, "boltdb", peerAddr, indexConfig, logger, httpAccessLogger) defer func() { server.Stop() }() diff --git a/manager/raft_server.go b/manager/raft_server.go index 145c340..4c07ff0 100644 --- a/manager/raft_server.go +++ b/manager/raft_server.go @@ -25,6 +25,7 @@ import ( "time" "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" raftbadgerdb "github.com/markthethomas/raft-badger" _ "github.com/mosuka/blast/config" blasterrors "github.com/mosuka/blast/errors" @@ -33,10 +34,10 @@ import ( ) type RaftServer struct { - id string - metadata map[string]interface{} - - bootstrap bool + id string + metadata map[string]interface{} + raftStorageType string + bootstrap bool raft *raft.Raft fsm *RaftFSM @@ -47,12 +48,12 @@ type RaftServer struct { mu sync.RWMutex } -func NewRaftServer(id string, metadata map[string]interface{}, bootstrap bool, indexConfig map[string]interface{}, logger *zap.Logger) (*RaftServer, error) { +func NewRaftServer(id string, metadata map[string]interface{}, raftStorageType string, bootstrap bool, indexConfig map[string]interface{}, logger *zap.Logger) (*RaftServer, error) { return &RaftServer{ - id: id, - metadata: metadata, - - bootstrap: bootstrap, + id: id, + metadata: metadata, + raftStorageType: raftStorageType, + bootstrap: bootstrap, indexConfig: indexConfig, logger: logger, @@ -109,7 +110,7 @@ func (s *RaftServer) Start() error { return err } - snapshotPath := filepath.Join(dataDir, "snapshots") + snapshotPath := dataDir s.logger.Info("create snapshot store", zap.String("path", snapshotPath)) snapshotStore, err := raft.NewFileSnapshotStore(snapshotPath, 2, ioutil.Discard) if err != nil { @@ -117,26 +118,59 @@ func (s *RaftServer) Start() error { return err } - logStore := filepath.Join(dataDir, "raft.db") - s.logger.Info("create Raft log store", zap.String("path", logStore)) - //raftLogStore, err := raftboltdb.NewBoltStore(logStore) - err = os.MkdirAll(filepath.Join(logStore, "badger"), 0755) - if err != nil { - s.logger.Fatal(err.Error()) - return err - } - raftLogStore, err := raftbadgerdb.NewBadgerStore(logStore) - //raftLogStore, err := raftmdb.NewMDBStore(logStore) - if err != nil { - s.logger.Fatal(err.Error()) - return err - } - s.logger.Info("create Raft machine") - s.raft, err = raft.NewRaft(config, s.fsm, raftLogStore, raftLogStore, snapshotStore, transport) - if err != nil { - s.logger.Fatal(err.Error()) - return err + switch s.raftStorageType { + case "boltdb": + logStorePath := filepath.Join(dataDir, "raft", "boltdb.db") + err = os.MkdirAll(filepath.Join(dataDir, "raft"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftboltdb.NewBoltStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + case "badger": + logStorePath := filepath.Join(dataDir, "raft") + err = os.MkdirAll(filepath.Join(logStorePath, "badger"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftbadgerdb.NewBadgerStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + default: + logStorePath := filepath.Join(dataDir, "raft", "boltdb.db") + err = os.MkdirAll(filepath.Join(dataDir, "raft"), 0755) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + logStore, err := raftboltdb.NewBoltStore(logStorePath) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } + s.raft, err = raft.NewRaft(config, s.fsm, logStore, logStore, snapshotStore, transport) + if err != nil { + s.logger.Fatal(err.Error()) + return err + } } if s.bootstrap { diff --git a/manager/server.go b/manager/server.go index 938e947..76ca672 100644 --- a/manager/server.go +++ b/manager/server.go @@ -22,10 +22,10 @@ import ( ) type Server struct { - id string - metadata map[string]interface{} - - peerAddr string + id string + metadata map[string]interface{} + raftStorageType string + peerAddr string indexConfig map[string]interface{} @@ -39,14 +39,15 @@ type Server struct { httpLogger accesslog.Logger } -func NewServer(id string, metadata map[string]interface{}, peerAddr string, indexConfig map[string]interface{}, logger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { +func NewServer(id string, metadata map[string]interface{}, raftStorageType string, peerAddr string, indexConfig map[string]interface{}, logger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { return &Server{ - id: id, - metadata: metadata, - peerAddr: peerAddr, - indexConfig: indexConfig, - logger: logger, - httpLogger: httpLogger, + id: id, + metadata: metadata, + raftStorageType: raftStorageType, + peerAddr: peerAddr, + indexConfig: indexConfig, + logger: logger, + httpLogger: httpLogger, }, nil } @@ -58,7 +59,7 @@ func (s *Server) Start() { s.logger.Info("bootstrap", zap.Bool("bootstrap", bootstrap)) // create raft server - s.raftServer, err = NewRaftServer(s.id, s.metadata, bootstrap, s.indexConfig, s.logger) + s.raftServer, err = NewRaftServer(s.id, s.metadata, s.raftStorageType, bootstrap, s.indexConfig, s.logger) if err != nil { s.logger.Fatal(err.Error()) return diff --git a/manager/server_test.go b/manager/server_test.go index d9e0d02..4abe102 100644 --- a/manager/server_test.go +++ b/manager/server_test.go @@ -87,7 +87,7 @@ func TestManagerStandalone(t *testing.T) { } // create logger - logger := logutils.NewLogger("DEBUG", "", 500, 3, 30, false) + logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) @@ -135,7 +135,7 @@ func TestManagerStandalone(t *testing.T) { "data_dir": dataDir, } - server, err := NewServer(nodeId, metadata, peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) + server, err := NewServer(nodeId, metadata, "boltdb", peerAddr, indexConfig, logger.Named(nodeId), httpAccessLogger) defer func() { server.Stop() }() @@ -341,7 +341,7 @@ func TestManagerCluster(t *testing.T) { } // create logger - logger := logutils.NewLogger("DEBUG", "", 500, 3, 30, false) + logger := logutils.NewLogger("WARN", "", 500, 3, 30, false) // create HTTP access logger httpAccessLogger := logutils.NewApacheCombinedLogger("", 500, 3, 30, false) @@ -380,7 +380,7 @@ func TestManagerCluster(t *testing.T) { "http_addr": manager1HttpAddr, "data_dir": manager1DataDir, } - manager1, err := NewServer(manager1NodeId, manager1Metadata, manager1PeerAddr, indexConfig, logger.Named(manager1NodeId), httpAccessLogger) + manager1, err := NewServer(manager1NodeId, manager1Metadata, "boltdb", manager1PeerAddr, indexConfig, logger.Named(manager1NodeId), httpAccessLogger) defer func() { manager1.Stop() }() @@ -422,7 +422,7 @@ func TestManagerCluster(t *testing.T) { "http_addr": manager2HttpAddr, "data_dir": manager2DataDir, } - manager2, err := NewServer(manager2NodeId, manager2Metadata, manager2PeerAddr, nil, logger.Named(manager2NodeId), httpAccessLogger) + manager2, err := NewServer(manager2NodeId, manager2Metadata, "boltdb", manager2PeerAddr, nil, logger.Named(manager2NodeId), httpAccessLogger) defer func() { manager2.Stop() }() @@ -464,7 +464,7 @@ func TestManagerCluster(t *testing.T) { "http_addr": manager3HttpAddr, "data_dir": manager3DataDir, } - manager3, err := NewServer(manager3NodeId, manager3Metadata, manager3PeerAddr, nil, logger.Named(manager3NodeId), httpAccessLogger) + manager3, err := NewServer(manager3NodeId, manager3Metadata, "boltdb", manager3PeerAddr, nil, logger.Named(manager3NodeId), httpAccessLogger) defer func() { manager3.Stop() }()