Skip to content

Commit

Permalink
feat(storagenode): add gRPC error codes to the log server
Browse files Browse the repository at this point in the history
This patch adds gRPC error codes to the RPCs of the log server in the storage node. Previously the
storage node had no use of gRPC error codes. Instead, it used the `verrors` package that serializes
and deserializes the error messages into and from strings.

```
+-------------+     +-------------------------+
|             |     |  Client                 |
|             |     |  Library +------------+ |                +------------+
|             |     |          |            | |                |            |
|    User     |   sentinel     |            | |  encode error  |            |
| Application |<----error------+ RPC Client |<+--message into -| RPC Server |
|             |     |          |            | |    Details     |            |
|             |     |          |            | |                |            |
|             |     |          +------------+ |                +------------+
|             |     |                         |
+-------------+     +-------------------------+
```

However, comparing strings to identify errors is an anti-pattern in Go. Moreover, the `verrors`
package has not been maintained for a long time and has no straightforward interface. So, we are
trying to adopt gRPC error codes.

```
+-------------+     +-------------------------+
|             |     |  Client                 |
|             |     |  Library +------------+ |                +------------+
|             |     |          |            | |                |            |
|    User     |   sentinel     |            | |  gRPC status   |            |
| Application |<----error------+ RPC Client |<+-----code-------| RPC Server |
|             |     |          |            | |                |            |
|             |     |          |            | |                |            |
|             |     |          +------------+ |                +------------+
|             |     |                         |
+-------------+     +-------------------------+
```

For further information, I add some links for the Detail fields of gRPC.

- https://pkg.go.dev/google.golang.org/genproto/googleapis/rpc/status#Status
- https://cloud.google.com/apis/design/errors#error_details

Updates #312
  • Loading branch information
ijsong committed Jan 30, 2023
1 parent 5646866 commit 5e813fc
Show file tree
Hide file tree
Showing 7 changed files with 1,083 additions and 13 deletions.
7 changes: 7 additions & 0 deletions internal/storagenode/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package errors

import stderrors "errors"

var (
ErrNotPrimary = stderrors.New("not primary replica")
)
80 changes: 69 additions & 11 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/internal/storagenode/logstream"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
Expand All @@ -20,34 +23,62 @@ type logServer struct {
var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(ctx context.Context, req *snpb.AppendRequest) (*snpb.AppendResponse, error) {
err := snpb.ValidateTopicLogStream(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

payload := req.GetPayload()
req.Payload = nil
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, errors.New("storage node: no such logstream")
return nil, status.Error(codes.NotFound, "no such log stream")
}

res, err := lse.Append(ctx, payload)
if err != nil {
return nil, err
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()
}
return nil, status.Error(code, err.Error())
}
return &snpb.AppendResponse{Results: res}, nil
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
panic("not implemented")
return nil, status.Error(codes.Unimplemented, "deprecated")
}

func (ls logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return errors.New("storage: no such logstream")
return status.Error(codes.NotFound, "no such log stream")
}

ctx := stream.Context()
sr, err := lse.SubscribeWithGLSN(req.GLSNBegin, req.GLSNEnd)
if err != nil {
// FIXME: error propagation via gRPC is awkward.
return verrors.ToStatusError(err)
var code codes.Code
if errors.Is(err, verrors.ErrClosed) {
code = codes.Unavailable
} else if errors.Is(err, verrors.ErrInvalid) {
code = codes.InvalidArgument
} else if errors.Is(err, verrors.ErrTrimmed) {
code = codes.OutOfRange
} else {
code = status.FromContextError(err).Code()
}
return verrors.ToStatusErrorWithCode(err, code)
}

rsp := &snpb.SubscribeResponse{}
Expand All @@ -72,19 +103,40 @@ Loop:
}
sr.Stop()
// FIXME: error propagation via gRPC is awkward.
return verrors.ToStatusError(multierr.Append(err, sr.Err()))
if err == nil && sr.Err() == nil {
return nil
}
if err != nil {
return status.Error(status.FromContextError(err).Code(), multierr.Append(err, sr.Err()).Error())
}
return status.Error(status.FromContextError(sr.Err()).Code(), sr.Err().Error())
}

func (ls logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return errors.New("storage: no such logstream")
return status.Error(codes.NotFound, "no such log stream")
}

ctx := stream.Context()
sr, err := lse.SubscribeWithLLSN(req.LLSNBegin, req.LLSNEnd)
if err != nil {
return err
var code codes.Code
if errors.Is(err, verrors.ErrClosed) {
code = codes.Unavailable
} else if errors.Is(err, verrors.ErrInvalid) {
code = codes.InvalidArgument
} else if errors.Is(err, verrors.ErrTrimmed) {
code = codes.OutOfRange
} else {
code = status.FromContextError(err).Code()
}
return verrors.ToStatusErrorWithCode(err, code)
}

rsp := &snpb.SubscribeToResponse{}
Expand Down Expand Up @@ -121,14 +173,20 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated
}

func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, errors.New("storage: no such logstream")
return nil, status.Error(codes.NotFound, "no such log stream")
}

lsrmd, err := lse.Metadata()
if err != nil {
return nil, err
if err == verrors.ErrClosed {
return nil, status.Error(codes.Unavailable, err.Error())
}
return nil, status.Error(status.FromContextError(err).Code(), err.Error())
}
return &snpb.LogStreamReplicaMetadataResponse{LogStreamReplica: lsrmd}, nil
}
4 changes: 2 additions & 2 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package logstream

import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/kakao/varlog/internal/batchlet"
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
)
Expand Down Expand Up @@ -36,7 +36,7 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App
}

if !lse.isPrimary() {
return nil, errors.New("log stream: not primary")
return nil, snerrors.ErrNotPrimary
}

startTime := time.Now()
Expand Down
Loading

0 comments on commit 5e813fc

Please sign in to comment.