Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Update protobuf #86

Merged
merged 1 commit into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/blast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,18 @@ func main() {
Action: managerNodeInfo,
},
{
Name: "health",
Name: "healthcheck",
Usage: "Health check the node",
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-address",
Value: "",
Value: ":5100",
Usage: "The gRPC listen address",
},
cli.BoolFlag{
Name: "healthiness",
Usage: "healthiness probe",
},
cli.BoolFlag{
Name: "liveness",
Usage: "Liveness probe",
Expand All @@ -251,7 +255,7 @@ func main() {
Usage: "Readiness probe",
},
},
Action: managerNodeHealth,
Action: managerNodeHealthCheck,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func managerClusterInfo(c *cli.Context) error {
}
}()

cluster, err := client.GetCluster()
cluster, err := client.ClusterInfo()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerClusterLeave(c *cli.Context) error {
}
}()

err = client.DeleteNode(nodeId)
err = client.ClusterLeave(nodeId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func managerClusterWatch(c *cli.Context) error {
return err
}

watchClient, err := client.WatchCluster()
watchClient, err := client.ClusterWatch()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerDelete(c *cli.Context) error {
}
}()

err = client.DeleteValue(key)
err = client.Delete(key)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func managerGet(c *cli.Context) error {
}
}()

value, err := client.GetValue(key)
value, err := client.Get(key)
if err != nil {
return err
}
Expand Down
42 changes: 20 additions & 22 deletions cmd/blast/manager_node_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"os"

"github.com/mosuka/blast/manager"
"github.com/mosuka/blast/protobuf/management"
"github.com/urfave/cli"
)

func managerNodeHealth(c *cli.Context) error {
func managerNodeHealthCheck(c *cli.Context) error {
grpcAddr := c.String("grpc-address")
healthiness := c.Bool("healthiness")
liveness := c.Bool("liveness")
readiness := c.Bool("readiness")

Expand All @@ -38,34 +40,30 @@ func managerNodeHealth(c *cli.Context) error {
}
}()

if !liveness && !readiness {
LivenessState, err := client.LivenessProbe()
var state string
if healthiness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
return err
state = management.NodeHealthCheckResponse_UNHEALTHY.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", LivenessState))

readinessState, err := client.ReadinessProbe()
} else if liveness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_LIVENESS.String())
if err != nil {
return err
state = management.NodeHealthCheckResponse_DEAD.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", readinessState))
} else {
if liveness {
state, err := client.LivenessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else if readiness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_READINESS.String())
if err != nil {
state = management.NodeHealthCheckResponse_NOT_READY.String()
}
if readiness {
state, err := client.ReadinessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
state = management.NodeHealthCheckResponse_UNHEALTHY.String()
}
}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))

return nil
}
2 changes: 1 addition & 1 deletion cmd/blast/manager_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerNodeInfo(c *cli.Context) error {
}
}()

metadata, err := client.GetNode(nodeId)
metadata, err := client.NodeInfo(nodeId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func managerSet(c *cli.Context) error {
}
}()

err = client.SetValue(key, value)
err = client.Set(key, value)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerWatch(c *cli.Context) error {
}
}()

watchClient, err := client.WatchStore(key)
watchClient, err := client.Watch(key)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions dispatcher/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interfa
return nil, err
}

managers, err := client.GetCluster()
managers, err := client.ClusterInfo()
if err != nil {
s.logger.Error(err.Error())
return nil, err
Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
}

// create stream
stream, err := client.WatchCluster()
stream, err := client.ClusterWatch()
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
}

// get initial indexers
clusters, err := client.GetValue("/cluster_config/clusters/")
clusters, err := client.Get("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
continue
}

stream, err := client.WatchStore("/cluster_config/clusters/")
stream, err := client.Watch("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
continue
Expand All @@ -443,7 +443,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
}
s.logger.Debug("data has changed", zap.String("key", resp.Key))

cluster, err := client.GetValue("/cluster_config/clusters/")
cluster, err := client.Get("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion dispatcher/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestServer_Start(t *testing.T) {
t.Fatalf("%v", err)
}
// get cluster info from manager1
managerCluster1, err := managerClient1.GetCluster()
managerCluster1, err := managerClient1.ClusterInfo()
if err != nil {
t.Fatalf("%v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions indexer/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interfa
return nil, err
}

managers, err := client.GetCluster()
managers, err := client.ClusterInfo()
if err != nil {
s.logger.Error(err.Error())
return nil, err
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
continue
}

stream, err := client.WatchCluster()
stream, err := client.ClusterWatch()
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down Expand Up @@ -504,7 +504,7 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) {
if err != nil {
s.logger.Error(err.Error())
}
err = client.SetValue(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId), cluster)
err = client.Set(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId), cluster)
if err != nil {
s.logger.Error(err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Server) Start() {
return
}

clusterIntr, err := mc.GetValue(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId))
clusterIntr, err := mc.Get(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId))
if err != nil && err != errors.ErrNotFound {
s.logger.Fatal(err.Error())
return
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *Server) Start() {
}

s.logger.Debug("pull index config from manager", zap.String("address", mc.GetAddress()))
value, err := mc.GetValue("/index_config")
value, err := mc.Get("/index_config")
if err != nil {
s.logger.Fatal(err.Error())
return
Expand Down
Loading