Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
shimv2: convert vc errors to grpc errors
Browse files Browse the repository at this point in the history
containerd checks for the grpc error code to determine
correct recover action upon grpc errors. We need to provide
them properly.

Unfortunately ttrpc doesn't support grpc interceptor so we have
to modify every service function for it.

Fixes: #1527

Signed-off-by: Peng Tao <bergwolf@hyper.sh>
  • Loading branch information
bergwolf committed Apr 12, 2019
1 parent cf90751 commit 8215a3c
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 17 deletions.
62 changes: 62 additions & 0 deletions containerd-shim-v2/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2019 hyper.sh
//
// SPDX-License-Identifier: Apache-2.0
//

package containerdshim

import (
"strings"
"syscall"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

vc "github.com/kata-containers/runtime/virtcontainers/pkg/types"
)

// toGRPC maps the virtcontainers error into a grpc error,
// using the original error message as a description.
func toGRPC(err error) error {
if err == nil {
return nil
}

if isGRPCError(err) {
// error has already been mapped to grpc
return err
}

err = errors.Cause(err)
switch {
case isInvalidArgument(err):
return status.Errorf(codes.InvalidArgument, err.Error())
case isNotFound(err):
return status.Errorf(codes.NotFound, err.Error())
}

return err
}

// toGRPCf maps the error to grpc error codes, assembling the formatting string
// and combining it with the target error string.
func toGRPCf(err error, format string, args ...interface{}) error {
return toGRPC(errors.Wrapf(err, format, args...))
}

func isGRPCError(err error) bool {
_, ok := status.FromError(err)
return ok
}

func isInvalidArgument(err error) bool {
return err == vc.ErrNeedSandbox || err == vc.ErrNeedSandboxID ||
err == vc.ErrNeedContainerID || err == vc.ErrNeedState ||
err == syscall.EINVAL
}

func isNotFound(err error) bool {
return err == vc.ErrNoSuchContainer || err == syscall.ENOENT ||
strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not exist")
}
29 changes: 29 additions & 0 deletions containerd-shim-v2/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2019 hyper.sh
//
// SPDX-License-Identifier: Apache-2.0
//

package containerdshim

import (
"syscall"
"testing"

vc "github.com/kata-containers/runtime/virtcontainers/pkg/types"
"github.com/stretchr/testify/assert"
)

func TestToGRPC(t *testing.T) {
assert := assert.New(t)

for _, err := range []error{vc.ErrNeedSandbox, vc.ErrNeedSandboxID,
vc.ErrNeedContainerID, vc.ErrNeedState, syscall.EINVAL, vc.ErrNoSuchContainer, syscall.ENOENT} {
assert.False(isGRPCError(err))
err = toGRPC(err)
assert.True(isGRPCError(err))
err = toGRPC(err)
assert.True(isGRPCError(err))
err = toGRPCf(err, "appending")
assert.True(isGRPCError(err))
}
}
106 changes: 89 additions & 17 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,17 @@ func getTopic(ctx context.Context, e interface{}) string {
return cdruntime.TaskUnknownTopic
}

func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
//Since the binary cleanup will return the DeleteResponse from stdout to
//containerd, thus we must make sure there is no any outputs in stdout except
//the returned response, thus here redirect the log to stderr in case there's
//any log output to stdout.
logrus.SetOutput(os.Stderr)

defer func() {
err = toGRPC(err)
}()

if s.id == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "the container id is empty, please specify the container id")
}
Expand Down Expand Up @@ -310,6 +314,10 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)

// Create a new sandbox or container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -351,7 +359,11 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
}

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -393,7 +405,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
}

// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -453,7 +469,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}

// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -482,7 +502,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}

// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -512,7 +536,11 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
}

// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -556,7 +584,11 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
}

// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -586,7 +618,11 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
}

// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -614,7 +650,11 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
}

// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -640,9 +680,13 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
// Pids returns all pids inside the container
// Since for kata, it cannot get the process's pid from VM,
// thus only return the Shim's pid directly.
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
var processes []*task.ProcessInfo

defer func() {
err = toGRPC(err)
}()

pInfo := task.ProcessInfo{
Pid: s.pid,
}
Expand All @@ -654,7 +698,11 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
}

// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -682,12 +730,20 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
}

// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "service Checkpoint")
}

// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -698,7 +754,11 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}, nil
}

func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
if len(s.containers) != 0 {
s.mu.Unlock()
Expand All @@ -713,7 +773,11 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
return empty, nil
}

func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -733,7 +797,11 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}

// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -756,9 +824,13 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
}

// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
var ret uint32

defer func() {
err = toGRPC(err)
}()

s.mu.Lock()
c, err := s.getContainer(r.ID)
s.mu.Unlock()
Expand Down

0 comments on commit 8215a3c

Please sign in to comment.