Skip to content
This repository has been archived by the owner on Dec 10, 2021. It is now read-only.

Commit

Permalink
Update protobuf (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosuka authored Jul 29, 2019
1 parent ae406b9 commit dafa34b
Show file tree
Hide file tree
Showing 21 changed files with 807 additions and 933 deletions.
10 changes: 7 additions & 3 deletions cmd/blast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,18 @@ func main() {
Action: managerNodeInfo,
},
{
Name: "health",
Name: "healthcheck",
Usage: "Health check the node",
Flags: []cli.Flag{
cli.StringFlag{
Name: "grpc-address",
Value: "",
Value: ":5100",
Usage: "The gRPC listen address",
},
cli.BoolFlag{
Name: "healthiness",
Usage: "healthiness probe",
},
cli.BoolFlag{
Name: "liveness",
Usage: "Liveness probe",
Expand All @@ -251,7 +255,7 @@ func main() {
Usage: "Readiness probe",
},
},
Action: managerNodeHealth,
Action: managerNodeHealthCheck,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func managerClusterInfo(c *cli.Context) error {
}
}()

cluster, err := client.GetCluster()
cluster, err := client.ClusterInfo()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerClusterLeave(c *cli.Context) error {
}
}()

err = client.DeleteNode(nodeId)
err = client.ClusterLeave(nodeId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_cluster_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func managerClusterWatch(c *cli.Context) error {
return err
}

watchClient, err := client.WatchCluster()
watchClient, err := client.ClusterWatch()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerDelete(c *cli.Context) error {
}
}()

err = client.DeleteValue(key)
err = client.Delete(key)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func managerGet(c *cli.Context) error {
}
}()

value, err := client.GetValue(key)
value, err := client.Get(key)
if err != nil {
return err
}
Expand Down
42 changes: 20 additions & 22 deletions cmd/blast/manager_node_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"os"

"github.com/mosuka/blast/manager"
"github.com/mosuka/blast/protobuf/management"
"github.com/urfave/cli"
)

func managerNodeHealth(c *cli.Context) error {
func managerNodeHealthCheck(c *cli.Context) error {
grpcAddr := c.String("grpc-address")
healthiness := c.Bool("healthiness")
liveness := c.Bool("liveness")
readiness := c.Bool("readiness")

Expand All @@ -38,34 +40,30 @@ func managerNodeHealth(c *cli.Context) error {
}
}()

if !liveness && !readiness {
LivenessState, err := client.LivenessProbe()
var state string
if healthiness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
return err
state = management.NodeHealthCheckResponse_UNHEALTHY.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", LivenessState))

readinessState, err := client.ReadinessProbe()
} else if liveness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_LIVENESS.String())
if err != nil {
return err
state = management.NodeHealthCheckResponse_DEAD.String()
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", readinessState))
} else {
if liveness {
state, err := client.LivenessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else if readiness {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_READINESS.String())
if err != nil {
state = management.NodeHealthCheckResponse_NOT_READY.String()
}
if readiness {
state, err := client.ReadinessProbe()
if err != nil {
return err
}
_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))
} else {
state, err = client.NodeHealthCheck(management.NodeHealthCheckRequest_HEALTHINESS.String())
if err != nil {
state = management.NodeHealthCheckResponse_UNHEALTHY.String()
}
}

_, _ = fmt.Fprintln(os.Stdout, fmt.Sprintf("%v", state))

return nil
}
2 changes: 1 addition & 1 deletion cmd/blast/manager_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerNodeInfo(c *cli.Context) error {
}
}()

metadata, err := client.GetNode(nodeId)
metadata, err := client.NodeInfo(nodeId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func managerSet(c *cli.Context) error {
}
}()

err = client.SetValue(key, value)
err = client.Set(key, value)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blast/manager_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func managerWatch(c *cli.Context) error {
}
}()

watchClient, err := client.WatchStore(key)
watchClient, err := client.Watch(key)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions dispatcher/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interfa
return nil, err
}

managers, err := client.GetCluster()
managers, err := client.ClusterInfo()
if err != nil {
s.logger.Error(err.Error())
return nil, err
Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
}

// create stream
stream, err := client.WatchCluster()
stream, err := client.ClusterWatch()
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
}

// get initial indexers
clusters, err := client.GetValue("/cluster_config/clusters/")
clusters, err := client.Get("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
continue
}

stream, err := client.WatchStore("/cluster_config/clusters/")
stream, err := client.Watch("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
continue
Expand All @@ -443,7 +443,7 @@ func (s *GRPCService) startUpdateIndexers(checkInterval time.Duration) {
}
s.logger.Debug("data has changed", zap.String("key", resp.Key))

cluster, err := client.GetValue("/cluster_config/clusters/")
cluster, err := client.Get("/cluster_config/clusters/")
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion dispatcher/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestServer_Start(t *testing.T) {
t.Fatalf("%v", err)
}
// get cluster info from manager1
managerCluster1, err := managerClient1.GetCluster()
managerCluster1, err := managerClient1.ClusterInfo()
if err != nil {
t.Fatalf("%v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions indexer/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *GRPCService) getInitialManagers(managerAddr string) (map[string]interfa
return nil, err
}

managers, err := client.GetCluster()
managers, err := client.ClusterInfo()
if err != nil {
s.logger.Error(err.Error())
return nil, err
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *GRPCService) startUpdateManagers(checkInterval time.Duration) {
continue
}

stream, err := client.WatchCluster()
stream, err := client.ClusterWatch()
if err != nil {
s.logger.Error(err.Error())
continue
Expand Down Expand Up @@ -504,7 +504,7 @@ func (s *GRPCService) startUpdateCluster(checkInterval time.Duration) {
if err != nil {
s.logger.Error(err.Error())
}
err = client.SetValue(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId), cluster)
err = client.Set(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId), cluster)
if err != nil {
s.logger.Error(err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Server) Start() {
return
}

clusterIntr, err := mc.GetValue(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId))
clusterIntr, err := mc.Get(fmt.Sprintf("cluster_config/clusters/%s/nodes", s.clusterConfig.ClusterId))
if err != nil && err != errors.ErrNotFound {
s.logger.Fatal(err.Error())
return
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *Server) Start() {
}

s.logger.Debug("pull index config from manager", zap.String("address", mc.GetAddress()))
value, err := mc.GetValue("/index_config")
value, err := mc.Get("/index_config")
if err != nil {
s.logger.Fatal(err.Error())
return
Expand Down
Loading

0 comments on commit dafa34b

Please sign in to comment.