Skip to content

Commit

Permalink
etcdserver: add linearizable_read check to readyz.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Dec 15, 2023
1 parent c58ef8d commit 785c9cc
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 23 deletions.
18 changes: 10 additions & 8 deletions etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,19 @@ type CheckRegistry struct {

func installLivezEndpoints(mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("serializable_read", readCheck(server, true /* serializable */))
reg.InstallHttpEndpoints(mux)
}

func installReadyzEndpoints(mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
reg.Register("serializable_read", serializableReadCheck(server))
// serializable_read checks if local read is ok.
// linearizable_read checks if there is consensus in the cluster.
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
reg.Register("serializable_read", readCheck(server, true))
// linearizable_read check would be replaced by read_index check in 3.6
reg.Register("linearizable_read", readCheck(server, false))
reg.InstallHttpEndpoints(mux)
}

Expand Down Expand Up @@ -441,13 +446,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}

func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
return err
}
}
76 changes: 61 additions & 15 deletions etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ import (

type fakeHealthServer struct {
fakeServer
apiError error
missingLeader bool
authStore auth.AuthStore
serializableReadError error
linearizableReadError error
missingLeader bool
authStore auth.AuthStore
}

func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, s.apiError
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
if req.Serializable {
return nil, s.serializableReadError
}
return nil, s.linearizableReadError
}

func (s *fakeHealthServer) Config() etcdserver.ServerConfig {
Expand Down Expand Up @@ -146,10 +150,11 @@ func TestHealthHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend()
defer be.Close()
HandleHealth(mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
apiError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, be, nil, 0),
fakeServer: fakeServer{alarms: tt.alarms},
serializableReadError: tt.apiError,
linearizableReadError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, be, nil, 0),
})
ts := httptest.NewServer(mux)
defer ts.Close()
Expand Down Expand Up @@ -185,8 +190,8 @@ func TestHttpSubPath(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(mux, s)
ts := httptest.NewServer(mux)
Expand Down Expand Up @@ -269,23 +274,23 @@ func TestSerializableReadCheck(t *testing.T) {
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
inResult: []string{"[-]serializable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(mux, s)
ts := httptest.NewServer(mux)
Expand All @@ -296,6 +301,47 @@ func TestSerializableReadCheck(t *testing.T) {
}
}

func TestLinearizableReadCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend()
defer be.Close()
tests := []healthTestCase{
{
name: "Alive normal",
healthCheckURL: "/livez?verbose",
expectStatusCode: http.StatusOK,
inResult: []string{"[+]serializable_read ok"},
},
{
name: "Alive if lineariable range api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
linearizableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, be, nil, 0),
}
HandleHealth(mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
})
}
}

func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})

Expand Down
8 changes: 8 additions & 0 deletions tests/e2e/ctl_v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ func alarmTest(cx ctlCtx) {
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"false"}`}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
// '/livez' handler should return 'ok'
if err := cURLGet(cx.epc, cURLReq{endpoint: "/livez", expected: `ok`}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
// '/readyz' handler should return 'ok'
if err := cURLGet(cx.epc, cURLReq{endpoint: "/readyz", expected: `ok`}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}

// check that Put is rejected when alarm is on
if err := ctlV3Put(cx, "3rd_test", smallbuf, ""); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true"}`},
{"/livez", `ok`},
{"/readyz", `ok`},
} {
i++
if err := ctlV3Put(cx, fmt.Sprintf("%d", i), "v", ""); err != nil {
Expand Down

0 comments on commit 785c9cc

Please sign in to comment.