From e3295be350fa75b1be92d5db746bb57507c506aa Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Mon, 5 Aug 2019 12:58:34 +0900 Subject: [PATCH] Change server arguments (#96) --- cmd/blast/dispatcher_start.go | 15 +------ config/cluster_config.go | 25 ------------ config/cluster_config_test.go | 76 ----------------------------------- config/node_config.go | 57 -------------------------- config/node_config_test.go | 54 ------------------------- dispatcher/grpc_service.go | 2 - dispatcher/server.go | 33 +++++++-------- dispatcher/server_test.go | 39 ++++++++---------- indexer/grpc_service.go | 3 +- indexer/raft_fsm.go | 7 +++- indexer/raft_server.go | 7 +++- manager/grpc_service.go | 16 ++++++-- manager/raft_server.go | 14 ++++++- testutils/testutils.go | 21 +++++----- 14 files changed, 83 insertions(+), 286 deletions(-) delete mode 100644 config/cluster_config.go delete mode 100644 config/cluster_config_test.go delete mode 100644 config/node_config.go delete mode 100644 config/node_config_test.go diff --git a/cmd/blast/dispatcher_start.go b/cmd/blast/dispatcher_start.go index 9c9540f..534bea1 100644 --- a/cmd/blast/dispatcher_start.go +++ b/cmd/blast/dispatcher_start.go @@ -19,7 +19,6 @@ import ( "os/signal" "syscall" - "github.com/mosuka/blast/config" "github.com/mosuka/blast/dispatcher" "github.com/mosuka/blast/logutils" "github.com/urfave/cli" @@ -80,19 +79,7 @@ func dispatcherStart(c *cli.Context) error { httpLogCompress, ) - // create cluster config - clusterConfig := config.DefaultClusterConfig() - if managerAddr != "" { - clusterConfig.ManagerAddr = managerAddr - } - - // create node config - nodeConfig := &config.NodeConfig{ - GRPCAddr: grpcAddr, - HTTPAddr: httpAddr, - } - - svr, err := dispatcher.NewServer(clusterConfig, nodeConfig, logger, grpcLogger, httpAccessLogger) + svr, err := dispatcher.NewServer(managerAddr, grpcAddr, httpAddr, logger, grpcLogger, httpAccessLogger) if err != nil { return err } diff --git a/config/cluster_config.go b/config/cluster_config.go deleted file mode 100644 index fb56a5d..0000000 --- a/config/cluster_config.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2019 Minoru Osuka -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -type ClusterConfig struct { - ManagerAddr string `json:"manager_addr,omitempty"` - ClusterId string `json:"cluster_id,omitempty"` - PeerAddr string `json:"peer_addr,omitempty"` -} - -func DefaultClusterConfig() *ClusterConfig { - return &ClusterConfig{} -} diff --git a/config/cluster_config_test.go b/config/cluster_config_test.go deleted file mode 100644 index 24d4c2e..0000000 --- a/config/cluster_config_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2019 Minoru Osuka -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "reflect" - "testing" -) - -func TestDefaultClusterConfig(t *testing.T) { - exp := &ClusterConfig{} - act := DefaultClusterConfig() - if !reflect.DeepEqual(exp, act) { - t.Fatalf("expected content to see %v, saw %v", exp, act) - } - - expManagerAddr := "" - actManagerAddr := act.ManagerAddr - if expManagerAddr != actManagerAddr { - t.Fatalf("expected content to see %v, saw %v", expManagerAddr, actManagerAddr) - } - - expClusterId := "" - actClusterId := act.ClusterId - if expClusterId != actClusterId { - t.Fatalf("expected content to see %v, saw %v", expClusterId, actClusterId) - } - - expPeerAddr := "" - actPeerAddr := act.PeerAddr - if expPeerAddr != actPeerAddr { - t.Fatalf("expected content to see %v, saw %v", expPeerAddr, actPeerAddr) - } -} - -func TestClusterConfig_1(t *testing.T) { - expConfig := &ClusterConfig{ - ManagerAddr: ":12000", - ClusterId: "cluster1", - PeerAddr: ":5000", - } - actConfig := DefaultClusterConfig() - actConfig.ManagerAddr = ":12000" - actConfig.ClusterId = "cluster1" - actConfig.PeerAddr = ":5000" - - expManagerAddr := expConfig.ManagerAddr - actManagerAddr := actConfig.ManagerAddr - if expManagerAddr != actManagerAddr { - t.Fatalf("expected content to see %v, saw %v", expManagerAddr, actManagerAddr) - } - - expClusterId := expConfig.ClusterId - actClusterId := actConfig.ClusterId - if expClusterId != actClusterId { - t.Fatalf("expected content to see %v, saw %v", expClusterId, actClusterId) - } - - expPeerAddr := expConfig.PeerAddr - actPeerAddr := actConfig.PeerAddr - if expPeerAddr != actPeerAddr { - t.Fatalf("expected content to see %v, saw %v", expPeerAddr, actPeerAddr) - } -} diff --git a/config/node_config.go b/config/node_config.go deleted file mode 100644 index 7b0b61b..0000000 --- a/config/node_config.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2019 Minoru Osuka -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "encoding/json" - "fmt" - - "github.com/mosuka/blast/strutils" -) - -type NodeConfig struct { - NodeId string `json:"node_id,omitempty"` - BindAddr string `json:"bind_addr,omitempty"` - GRPCAddr string `json:"grpc_addr,omitempty"` - HTTPAddr string `json:"http_addr,omitempty"` - DataDir string `json:"data_dir,omitempty"` - RaftStorageType string `json:"raft_storage_type,omitempty"` -} - -func DefaultNodeConfig() *NodeConfig { - return &NodeConfig{ - NodeId: fmt.Sprintf("node-%s", strutils.RandStr(5)), - BindAddr: ":2000", - GRPCAddr: ":5000", - HTTPAddr: ":8000", - DataDir: "/tmp/blast", - RaftStorageType: "boltdb", - } -} - -func (c *NodeConfig) ToMap() map[string]interface{} { - b, err := json.Marshal(c) - if err != nil { - return map[string]interface{}{} - } - - var m map[string]interface{} - err = json.Unmarshal(b, &m) - if err != nil { - return map[string]interface{}{} - } - - return m -} diff --git a/config/node_config_test.go b/config/node_config_test.go deleted file mode 100644 index da69880..0000000 --- a/config/node_config_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2019 Minoru Osuka -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "fmt" - "testing" - - "github.com/mosuka/blast/strutils" -) - -func TestDefaultNodeConfig(t *testing.T) { - expConfig := &NodeConfig{ - NodeId: fmt.Sprintf("node-%s", strutils.RandStr(5)), - BindAddr: ":2000", - GRPCAddr: ":5000", - HTTPAddr: ":8000", - DataDir: "/tmp/blast", - RaftStorageType: "boltdb", - } - actConfig := DefaultNodeConfig() - - if expConfig.BindAddr != actConfig.BindAddr { - t.Fatalf("expected content to see %v, saw %v", expConfig.BindAddr, actConfig.BindAddr) - } - - if expConfig.GRPCAddr != actConfig.GRPCAddr { - t.Fatalf("expected content to see %v, saw %v", expConfig.GRPCAddr, actConfig.GRPCAddr) - } - - if expConfig.HTTPAddr != actConfig.HTTPAddr { - t.Fatalf("expected content to see %v, saw %v", expConfig.HTTPAddr, actConfig.HTTPAddr) - } - - if expConfig.DataDir != actConfig.DataDir { - t.Fatalf("expected content to see %v, saw %v", expConfig.DataDir, actConfig.DataDir) - } - - if expConfig.RaftStorageType != actConfig.RaftStorageType { - t.Fatalf("expected content to see %v, saw %v", expConfig.RaftStorageType, actConfig.RaftStorageType) - } -} diff --git a/dispatcher/grpc_service.go b/dispatcher/grpc_service.go index f51c94c..9ebf9e0 100644 --- a/dispatcher/grpc_service.go +++ b/dispatcher/grpc_service.go @@ -51,7 +51,6 @@ type GRPCService struct { updateManagersStopCh chan struct{} updateManagersDoneCh chan struct{} - //indexers map[string]interface{} indexers map[string]*index.Cluster indexerClients map[string]map[string]*indexer.GRPCClient updateIndexersStopCh chan struct{} @@ -66,7 +65,6 @@ func NewGRPCService(managerGrpcAddress string, logger *zap.Logger) (*GRPCService managers: &management.Cluster{Nodes: make(map[string]*management.Node, 0)}, managerClients: make(map[string]*manager.GRPCClient, 0), - //indexers: make(map[string]interface{}, 0), indexers: make(map[string]*index.Cluster, 0), indexerClients: make(map[string]map[string]*indexer.GRPCClient, 0), }, nil diff --git a/dispatcher/server.go b/dispatcher/server.go index d088d51..447c51a 100644 --- a/dispatcher/server.go +++ b/dispatcher/server.go @@ -16,16 +16,16 @@ package dispatcher import ( accesslog "github.com/mash/go-accesslog" - "github.com/mosuka/blast/config" "go.uber.org/zap" ) type Server struct { - clusterConfig *config.ClusterConfig - nodeConfig *config.NodeConfig - logger *zap.Logger - grpcLogger *zap.Logger - httpLogger accesslog.Logger + managerGrpcAddress string + grpcAddress string + httpAddress string + logger *zap.Logger + grpcLogger *zap.Logger + httpLogger accesslog.Logger grpcService *GRPCService grpcServer *GRPCServer @@ -33,13 +33,14 @@ type Server struct { httpServer *HTTPServer } -func NewServer(clusterConfig *config.ClusterConfig, nodeConfig *config.NodeConfig, logger *zap.Logger, grpcLogger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { +func NewServer(managerGrpcAddress string, grpcAddress string, httpAddress string, logger *zap.Logger, grpcLogger *zap.Logger, httpLogger accesslog.Logger) (*Server, error) { return &Server{ - clusterConfig: clusterConfig, - nodeConfig: nodeConfig, - logger: logger, - grpcLogger: grpcLogger, - httpLogger: httpLogger, + managerGrpcAddress: managerGrpcAddress, + grpcAddress: grpcAddress, + httpAddress: httpAddress, + logger: logger, + grpcLogger: grpcLogger, + httpLogger: httpLogger, }, nil } @@ -47,28 +48,28 @@ func (s *Server) Start() { var err error // create gRPC service - s.grpcService, err = NewGRPCService(s.clusterConfig.ManagerAddr, s.logger) + s.grpcService, err = NewGRPCService(s.managerGrpcAddress, s.logger) if err != nil { s.logger.Fatal(err.Error()) return } // create gRPC server - s.grpcServer, err = NewGRPCServer(s.nodeConfig.GRPCAddr, s.grpcService, s.grpcLogger) + s.grpcServer, err = NewGRPCServer(s.grpcAddress, s.grpcService, s.grpcLogger) if err != nil { s.logger.Fatal(err.Error()) return } // create HTTP router - s.httpRouter, err = NewRouter(s.nodeConfig.GRPCAddr, s.logger) + s.httpRouter, err = NewRouter(s.grpcAddress, s.logger) if err != nil { s.logger.Fatal(err.Error()) return } // create HTTP server - s.httpServer, err = NewHTTPServer(s.nodeConfig.HTTPAddr, s.httpRouter, s.logger, s.httpLogger) + s.httpServer, err = NewHTTPServer(s.httpAddress, s.httpRouter, s.logger, s.httpLogger) if err != nil { s.logger.Fatal(err.Error()) return diff --git a/dispatcher/server_test.go b/dispatcher/server_test.go index 29d0577..63922e1 100644 --- a/dispatcher/server_test.go +++ b/dispatcher/server_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - "github.com/mosuka/blast/config" "github.com/mosuka/blast/indexer" "github.com/mosuka/blast/logutils" "github.com/mosuka/blast/manager" @@ -534,27 +533,23 @@ func TestServer_Start(t *testing.T) { t.Fatalf("expected content to see %v, saw %v", expIndexerCluster2, actIndexerCluster2) } + //// + //// dispatcher + //// + //dispatcherManagerGrpcAddress := managerGrpcAddress1 + //dispatcherGrpcAddress := fmt.Sprintf(":%d", testutils.TmpPort()) + //dispatcherHttpAddress := fmt.Sprintf(":%d", testutils.TmpPort()) // - // dispatcher + //dispatcher1, err := NewServer(dispatcherManagerGrpcAddress, dispatcherGrpcAddress, dispatcherHttpAddress, logger.Named("dispatcher1"), grpcLogger.Named("dispatcher1"), httpAccessLogger) + //defer func() { + // dispatcher1.Stop() + //}() + //if err != nil { + // t.Fatalf("%v", err) + //} + //// start server + //dispatcher1.Start() // - // create cluster config - dispatcherClusterConfig1 := config.DefaultClusterConfig() - dispatcherClusterConfig1.ManagerAddr = managerGrpcAddress1 - // create node config - dispatcherNodeConfig := testutils.TmpNodeConfig() - defer func() { - _ = os.RemoveAll(dispatcherNodeConfig.DataDir) - }() - dispatcher1, err := NewServer(dispatcherClusterConfig1, dispatcherNodeConfig, logger.Named("dispatcher1"), grpcLogger.Named("dispatcher1"), httpAccessLogger) - defer func() { - dispatcher1.Stop() - }() - if err != nil { - t.Fatalf("%v", err) - } - // start server - dispatcher1.Start() - - // sleep - time.Sleep(5 * time.Second) + //// sleep + //time.Sleep(5 * time.Second) } diff --git a/indexer/grpc_service.go b/indexer/grpc_service.go index 18007ff..87b9ba3 100644 --- a/indexer/grpc_service.go +++ b/indexer/grpc_service.go @@ -763,11 +763,12 @@ func (s *GRPCService) GetDocument(ctx context.Context, req *index.GetDocumentReq fields, err := s.raftServer.GetDocument(req.Id) if err != nil { - s.logger.Error(err.Error()) switch err { case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("id", req.Id)) return resp, status.Error(codes.NotFound, err.Error()) default: + s.logger.Error(err.Error(), zap.String("id", req.Id)) return resp, status.Error(codes.Internal, err.Error()) } } diff --git a/indexer/raft_fsm.go b/indexer/raft_fsm.go index 3d6bfc9..01d047e 100644 --- a/indexer/raft_fsm.go +++ b/indexer/raft_fsm.go @@ -113,7 +113,12 @@ func (f *RaftFSM) DeleteNode(nodeId string) error { func (f *RaftFSM) GetDocument(id string) (map[string]interface{}, error) { fields, err := f.index.Get(id) if err != nil { - f.logger.Error(err.Error()) + switch err { + case blasterrors.ErrNotFound: + f.logger.Debug(err.Error(), zap.String("id", id)) + default: + f.logger.Error(err.Error(), zap.String("id", id)) + } return nil, err } diff --git a/indexer/raft_server.go b/indexer/raft_server.go index c2fa628..60a9004 100644 --- a/indexer/raft_server.go +++ b/indexer/raft_server.go @@ -526,7 +526,12 @@ func (s *RaftServer) Snapshot() error { func (s *RaftServer) GetDocument(id string) (map[string]interface{}, error) { fields, err := s.fsm.GetDocument(id) if err != nil { - s.logger.Error(err.Error()) + switch err { + case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("id", id)) + default: + s.logger.Error(err.Error(), zap.String("id", id)) + } return nil, err } diff --git a/manager/grpc_service.go b/manager/grpc_service.go index 47b65a2..6bccc35 100644 --- a/manager/grpc_service.go +++ b/manager/grpc_service.go @@ -504,11 +504,12 @@ func (s *GRPCService) Get(ctx context.Context, req *management.GetRequest) (*man value, err := s.raftServer.GetValue(req.Key) if err != nil { - s.logger.Error(err.Error()) switch err { case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("key", req.Key)) return resp, status.Error(codes.NotFound, err.Error()) default: + s.logger.Error(err.Error(), zap.String("key", req.Key)) return resp, status.Error(codes.Internal, err.Error()) } } @@ -587,11 +588,12 @@ func (s *GRPCService) Delete(ctx context.Context, req *management.DeleteRequest) if s.raftServer.IsLeader() { err := s.raftServer.DeleteValue(req.Key) if err != nil { - s.logger.Error(err.Error()) switch err { case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("key", req.Key)) return resp, status.Error(codes.NotFound, err.Error()) default: + s.logger.Error(err.Error(), zap.String("key", req.Key)) return resp, status.Error(codes.Internal, err.Error()) } } @@ -604,8 +606,14 @@ func (s *GRPCService) Delete(ctx context.Context, req *management.DeleteRequest) } err = client.Delete(req.Key) if err != nil { - s.logger.Error(err.Error()) - return resp, status.Error(codes.Internal, err.Error()) + switch err { + case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("key", req.Key)) + return resp, status.Error(codes.NotFound, err.Error()) + default: + s.logger.Error(err.Error(), zap.String("key", req.Key)) + return resp, status.Error(codes.Internal, err.Error()) + } } } diff --git a/manager/raft_server.go b/manager/raft_server.go index 76e3324..5a75f47 100644 --- a/manager/raft_server.go +++ b/manager/raft_server.go @@ -534,7 +534,12 @@ func (s *RaftServer) Snapshot() error { func (s *RaftServer) GetValue(key string) (interface{}, error) { value, err := s.fsm.GetValue(key) if err != nil { - s.logger.Error(err.Error()) + switch err { + case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("key", key)) + default: + s.logger.Error(err.Error(), zap.String("key", key)) + } return nil, err } @@ -611,7 +616,12 @@ func (s *RaftServer) DeleteValue(key string) error { } err = f.Response().(*fsmResponse).error if err != nil { - s.logger.Error(err.Error()) + switch err { + case blasterrors.ErrNotFound: + s.logger.Debug(err.Error(), zap.String("key", key)) + default: + s.logger.Error(err.Error(), zap.String("key", key)) + } return err } diff --git a/testutils/testutils.go b/testutils/testutils.go index ecae708..758e804 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -15,7 +15,6 @@ package testutils import ( - "fmt" "io/ioutil" "net" @@ -46,16 +45,16 @@ func TmpPort() int { return l.Addr().(*net.TCPAddr).Port } -func TmpNodeConfig() *config.NodeConfig { - c := config.DefaultNodeConfig() - - c.BindAddr = fmt.Sprintf(":%d", TmpPort()) - c.GRPCAddr = fmt.Sprintf(":%d", TmpPort()) - c.HTTPAddr = fmt.Sprintf(":%d", TmpPort()) - c.DataDir = TmpDir() - - return c -} +//func TmpNodeConfig() *config.NodeConfig { +// c := config.DefaultNodeConfig() +// +// c.BindAddr = fmt.Sprintf(":%d", TmpPort()) +// c.GRPCAddr = fmt.Sprintf(":%d", TmpPort()) +// c.HTTPAddr = fmt.Sprintf(":%d", TmpPort()) +// c.DataDir = TmpDir() +// +// return c +//} func TmpIndexConfig(indexMappingFile string, indexType string, indexStorageType string) (*config.IndexConfig, error) { indexMapping, err := indexutils.NewIndexMappingFromFile(indexMappingFile)