Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Change protobuf #87

Merged
merged 1 commit into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/blast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func main() {
//},
cli.StringFlag{
Name: "grpc-address",
Value: "",
Value: ":5100",
Usage: "The gRPC address of the node for which to retrieve the node information",
},
},
Expand Down Expand Up @@ -321,7 +321,7 @@ func main() {
//},
cli.StringFlag{
Name: "grpc-address",
Value: "",
Value: ":5100",
Usage: "The gRPC address of the node for which to retrieve the node information",
},
},
Expand Down
14 changes: 1 addition & 13 deletions cmd/blast/manager_cluster_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"

"github.com/mosuka/blast/manager"
"github.com/mosuka/blast/protobuf"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -61,17 +59,7 @@ func managerClusterWatch(c *cli.Context) error {
break
}

cluster, err := protobuf.MarshalAny(resp.Cluster)
if err != nil {
return err
}
if cluster == nil {
return errors.New("nil")
}

var clusterBytes []byte
clusterMap := *cluster.(*map[string]interface{})
clusterBytes, err = json.MarshalIndent(clusterMap, "", " ")
clusterBytes, err := json.MarshalIndent(resp.Cluster, "", " ")
if err != nil {
return err
}
Expand Down
23 changes: 8 additions & 15 deletions cmd/blast/manager_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/mosuka/blast/indexutils"
"github.com/mosuka/blast/logutils"
"github.com/mosuka/blast/manager"
"github.com/mosuka/blast/protobuf/management"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -91,20 +92,12 @@ func managerStart(c *cli.Context) error {
httpLogCompress,
)

// create cluster config
clusterConfig := config.DefaultClusterConfig()
if peerGrpcAddr != "" {
clusterConfig.PeerAddr = peerGrpcAddr
}

// create node config
nodeConfig := &config.NodeConfig{
NodeId: nodeId,
BindAddr: nodeAddr,
GRPCAddr: grpcAddr,
HTTPAddr: httpAddr,
DataDir: dataDir,
RaftStorageType: raftStorageType,
node := &management.Node{
BindAddress: nodeAddr,
Metadata: &management.Metadata{
GrpcAddress: grpcAddr,
HttpAddress: httpAddr,
},
}

var err error
Expand All @@ -127,7 +120,7 @@ func managerStart(c *cli.Context) error {
IndexStorageType: indexStorageType,
}

svr, err := manager.NewServer(clusterConfig, nodeConfig, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
svr, err := manager.NewServer(peerGrpcAddr, nodeId, node, dataDir, raftStorageType, indexConfig, logger.Named(nodeId), grpcLogger.Named(nodeId), httpLogger)
if err != nil {
return err
}
Expand Down
94 changes: 34 additions & 60 deletions dispatcher/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/mosuka/blast/manager"
"github.com/mosuka/blast/protobuf"
"github.com/mosuka/blast/protobuf/distribute"
"github.com/mosuka/blast/protobuf/management"
"github.com/mosuka/blast/sortutils"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand All @@ -45,7 +46,7 @@ type GRPCService struct {
managerAddr string
logger *zap.Logger

managers map[string]interface{}
managers *management.Cluster
managerClients map[string]*manager.GRPCClient
updateManagersStopCh chan struct{}
updateManagersDoneCh chan struct{}
Expand All @@ -61,7 +62,7 @@ func NewGRPCService(managerAddr string, logger *zap.Logger) (*GRPCService, error
managerAddr: managerAddr,
logger: logger,

managers: make(map[string]interface{}, 0),
managers: &management.Cluster{Nodes: make(map[string]*management.Node, 0)},
managerClients: make(map[string]*manager.GRPCClient, 0),

indexers: make(map[string]interface{}, 0),
Expand Down Expand Up @@ -92,28 +93,22 @@ func (s *GRPCService) Stop() error {
func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) {
var client *manager.GRPCClient

for id, node := range s.managers {
nm, ok := node.(map[string]interface{})
if !ok {
s.logger.Warn("assertion failed", zap.String("id", id))
continue
}

state, ok := nm["state"].(string)
if !ok {
s.logger.Warn("missing state", zap.String("id", id), zap.String("state", state))
for id, node := range s.managers.Nodes {
if node.Metadata == nil {
s.logger.Warn("missing metadata", zap.String("id", id))
continue
}

if state == raft.Leader.String() || state == raft.Follower.String() {
if node.Status == raft.Leader.String() || node.Status == raft.Follower.String() {
var ok bool
client, ok = s.managerClients[id]
if ok {
return client, nil
} else {
s.logger.Error("node does not exist", zap.String("id", id))
}
} else {
s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", state))
s.logger.Debug("node has not available", zap.String("id", id), zap.String("state", node.Status))
}
}

Expand All @@ -123,7 +118,7 @@ func (s *GRPCService) getManagerClient() (*manager.GRPCClient, error) {
return nil, err
}

func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interface{}, error) {
func (s *GRPCService) getInitialManagers(managerAddr string) (*management.Cluster, error) {
client, err := manager.NewGRPCClient(s.managerAddr)
defer func() {
err := client.Close()
Expand Down Expand Up @@ -165,29 +160,21 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
s.logger.Debug("initialize manager list", zap.Any("managers", s.managers))

// create clients for managers
for nodeId, node := range s.managers {
nm, ok := node.(map[string]interface{})
if !ok {
s.logger.Warn("assertion failed", zap.String("node_id", nodeId))
for nodeId, node := range s.managers.Nodes {
if node.Metadata == nil {
s.logger.Warn("missing metadata", zap.String("node_id", nodeId))
continue
}

nodeConfig, ok := nm["node_config"].(map[string]interface{})
if !ok {
s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig))
continue
}

grpcAddr, ok := nodeConfig["grpc_addr"].(string)
if !ok {
s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
if node.Metadata.GrpcAddress == "" {
s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
continue
}

s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
client, err := manager.NewGRPCClient(grpcAddr)
s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
client, err := manager.NewGRPCClient(node.Metadata.GrpcAddress)
if err != nil {
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
}
if client != nil {
s.managerClients[nodeId] = client
Expand Down Expand Up @@ -223,41 +210,28 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
s.logger.Error(err.Error())
continue
}

// get current manager cluster
managersIntr, err := protobuf.MarshalAny(resp.Cluster)
if err != nil {
s.logger.Error(err.Error())
continue
}
if managersIntr == nil {
s.logger.Error(err.Error())
continue
}
managers := *managersIntr.(*map[string]interface{})
managers := resp.Cluster

if !reflect.DeepEqual(s.managers, managers) {
// open clients
for nodeId, metadata := range managers {
mm, ok := metadata.(map[string]interface{})
if !ok {
s.logger.Warn("assertion failed", zap.String("node_id", nodeId))
for nodeId, node := range managers.Nodes {
if node.Metadata == nil {
s.logger.Warn("missing metadata", zap.String("node_id", nodeId))
continue
}

grpcAddr, ok := mm["grpc_addr"].(string)
if !ok {
s.logger.Warn("missing metadata", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
if node.Metadata.GrpcAddress == "" {
s.logger.Warn("missing gRPC address", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
continue
}

client, exist := s.managerClients[nodeId]
if exist {
s.logger.Debug("client has already exist in manager list", zap.String("node_id", nodeId))

if client.GetAddress() != grpcAddr {
s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr))
s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
if client.GetAddress() != node.Metadata.GrpcAddress {
s.logger.Debug("gRPC address has been changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", node.Metadata.GrpcAddress))
s.logger.Debug("recreate gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))

delete(s.managerClients, nodeId)

Expand All @@ -266,24 +240,24 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
s.logger.Error(err.Error(), zap.String("node_id", nodeId))
}

newClient, err := manager.NewGRPCClient(grpcAddr)
newClient, err := manager.NewGRPCClient(node.Metadata.GrpcAddress)
if err != nil {
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
}

if newClient != nil {
s.managerClients[nodeId] = newClient
}
} else {
s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", grpcAddr))
s.logger.Debug("gRPC address has not changed", zap.String("node_id", nodeId), zap.String("client_grpc_addr", client.GetAddress()), zap.String("grpc_addr", node.Metadata.GrpcAddress))
}
} else {
s.logger.Debug("client does not exist in peer list", zap.String("node_id", nodeId))

s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
newClient, err := manager.NewGRPCClient(grpcAddr)
s.logger.Debug("create gRPC client", zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
newClient, err := manager.NewGRPCClient(node.Metadata.GrpcAddress)
if err != nil {
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", grpcAddr))
s.logger.Error(err.Error(), zap.String("node_id", nodeId), zap.String("grpc_addr", node.Metadata.GrpcAddress))
}
if newClient != nil {
s.managerClients[nodeId] = newClient
Expand All @@ -293,7 +267,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {

// close nonexistent clients
for nodeId, client := range s.managerClients {
if nodeConfig, exist := managers[nodeId]; !exist {
if nodeConfig, exist := managers.Nodes[nodeId]; !exist {
s.logger.Info("this client is no longer in use", zap.String("node_id", nodeId), zap.Any("node_config", nodeConfig))

s.logger.Debug("close client", zap.String("node_id", nodeId), zap.String("grpc_addr", client.GetAddress()))
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/mosuka/blast
go 1.12

require (
github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89 // indirect
github.com/blevesearch/bleve v0.7.0
github.com/blevesearch/blevex v0.0.0-20180227211930-4b158bb555a3 // indirect
github.com/blevesearch/cld2 v0.0.0-20150916130542-10f17c049ec9 // indirect
Expand All @@ -18,13 +17,13 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect
github.com/golang/protobuf v1.3.1
github.com/google/go-cmp v0.3.0
github.com/gorilla/mux v1.7.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/raft v1.1.0
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477
github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa
github.com/ikawaha/kagome.ipadic v1.0.1 // indirect
github.com/imdario/mergo v0.3.7
github.com/jmhodges/levigo v1.0.0 // indirect
Expand All @@ -38,7 +37,6 @@ require (
github.com/prometheus/procfs v0.0.0-20190322151404-55ae3d9d5573 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect
github.com/stretchr/objx v0.1.1
github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tebeka/snowball v0.0.0-20130405174319-16e884df4e19 // indirect
github.com/tecbot/gorocksdb v0.0.0-20181010114359-8752a9433481 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89 h1:A1SPjPcl2LdF2Skv9Zt41jWu4XYQAyvBDzrveQjlkhQ=
github.com/armon/gomdb v0.0.0-20180202201627-75f545a47e89/go.mod h1:wSblbytRgcqD+U+gGCKz5145DyjUYPh5fqh2uyXxfZw=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/blevesearch/bleve v0.7.0 h1:znyZ3zjsh2Scr60vszs7rbF29TU6i1q9bfnZf1vh0Ac=
Expand Down Expand Up @@ -84,6 +82,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U=
Expand Down Expand Up @@ -113,8 +113,6 @@ github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0=
github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 h1:bLsrEmB2NUwkHH18FOJBIa04wOV2RQalJrcafTYu6Lg=
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk=
github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa h1:ccwcWyXHTaonH6yzx+t/3p9aNm/ogSTfd6YobZOtHmE=
github.com/hashicorp/raft-mdb v0.0.0-20180824152511-9ee9663b6ffa/go.mod h1:ooP3NrrH0GG/sVjF9pbRvhF6nVHRR4mkkwscLqReN1o=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ikawaha/kagome.ipadic v1.0.1 h1:4c/tx3Rga6LvtTouEdvodcfeWWTttATZg8XIH8lRHG4=
Expand Down Expand Up @@ -231,8 +229,6 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 h1:hOY+O8MxdkPV10pNf7/XEHaySCiPKxixMKUshfHsGn0=
golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A=
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
Expand Down
32 changes: 32 additions & 0 deletions hashutils/hashutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2019 Minoru Osuka
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hashutils

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
)

func Hash(v interface{}) (string, error) {
b, err := json.Marshal(v)
if err != nil {
return "", err
}

hb := sha256.Sum256(b)

return hex.EncodeToString(hb[:]), nil
}
Loading