Skip to content

Commit

Permalink
Support exec after LM. Support kill after LM
Browse files Browse the repository at this point in the history
  • Loading branch information
kevpar committed Nov 16, 2024
1 parent 84c2a4d commit 1d7d1de
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 437 deletions.
310 changes: 0 additions & 310 deletions cmd/shimdiag/lm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,279 +4,23 @@ package main

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/Microsoft/go-winio"
runhcsopts "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
"github.com/Microsoft/hcsshim/internal/appargs"
lmproto "github.com/Microsoft/hcsshim/internal/lm/proto"
statepkg "github.com/Microsoft/hcsshim/internal/state"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/runtime/task/v2"
"github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
"github.com/urfave/cli"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb"
)

var createCommand = cli.Command{
Name: "create",
Usage: "Creates a task",
ArgsUsage: "[flags] <address> <id> <bundle dir>",
Flags: []cli.Flag{
cli.StringFlag{
Name: "stdin",
Usage: "Named pipe path",
},
cli.StringFlag{
Name: "stdout",
Usage: "Named pipe path",
},
cli.StringFlag{
Name: "stderr",
Usage: "Named pipe path",
},
cli.BoolFlag{
Name: "tty",
Usage: "Enable terminal mode for task IO",
},
cli.StringFlag{
Name: "rootfs",
Usage: "JSON file to read rootfs from",
},
cli.StringFlag{
Name: "options",
Usage: "jsonpb file to read shim options from",
},
},
SkipArgReorder: true,
Before: appargs.Validate(appargs.String, appargs.String, appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]
id := args[1]

bundle, err := filepath.Abs(args[2])
if err != nil {
return err
}

var rootfs []*types.Mount
if clictx.IsSet("rootfs") {
data, err := os.ReadFile(clictx.String("rootfs"))
if err != nil {
return err
}
if err := json.Unmarshal(data, &rootfs); err != nil {
return err
}
}

var options *anypb.Any
if clictx.IsSet("options") {
data, err := os.ReadFile(clictx.String("options"))
if err != nil {
return err
}
var opts runhcsopts.Options
if err := (protojson.UnmarshalOptions{}).Unmarshal(data, &opts); err != nil {
return err
}
any, err := typeurl.MarshalAny(&opts)
if err != nil {
return err
}
options = &anypb.Any{TypeUrl: any.GetTypeUrl(), Value: any.GetValue()}
}

conn, err := winio.DialPipe(address, nil)
if err != nil {
return fmt.Errorf("dial %s: %w", address, err)
}

client := ttrpc.NewClient(conn)
svc := task.NewTaskClient(client)

ctx := context.Background()

{
resp, err := svc.Create(ctx, &task.CreateTaskRequest{
ID: id,
Bundle: bundle,
Rootfs: rootfs,
Terminal: clictx.Bool("tty"),
Stdin: clictx.String("stdin"),
Stdout: clictx.String("stdout"),
Stderr: clictx.String("stderr"),
Options: options,
})
if err != nil {
return fmt.Errorf("task.Create: %w", err)
}
fmt.Printf("task pid is %d\n", resp.Pid)
}
return nil
},
}

var startCommand = cli.Command{
Name: "start",
Usage: "",
ArgsUsage: "[flags] <address> <id>",
Flags: []cli.Flag{},
SkipArgReorder: true,
Before: appargs.Validate(appargs.String, appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]
id := args[1]

conn, err := winio.DialPipe(address, nil)
if err != nil {
return fmt.Errorf("dial %s: %w", address, err)
}

client := ttrpc.NewClient(conn)
svc := task.NewTaskClient(client)

ctx := context.Background()

{
resp, err := svc.Start(ctx, &task.StartRequest{
ID: id,
})
if err != nil {
return fmt.Errorf("task.Start: %w", err)
}
fmt.Printf("task pid is %d\n", resp.Pid)
}
return nil
},
}

var pipeCommand = cli.Command{
Name: "pipe",
Usage: "",
ArgsUsage: "[flags] <stdin pipe> <stdout pipe> <stderr pipe>",
Flags: []cli.Flag{},
SkipArgReorder: true,
Action: func(clictx *cli.Context) error {
args := clictx.Args()

f := func(name, pipe string, wg *sync.WaitGroup, copy func(c net.Conn) (int64, error)) {
defer wg.Done()
l, err := winio.ListenPipe(pipe, nil)
if err != nil {
panic(err)
}
fmt.Printf("%s: listening on %s\n", name, pipe)
c, err := l.Accept()
if err != nil {
panic(err)
}
fmt.Printf("%s: received connection\n", name)
n, err := copy(c)
fmt.Printf("%s: copy completed after %d bytes", name, n)
if err != nil {
fmt.Printf(" and error: %s", err)
}
fmt.Printf("\n")
}

var wg sync.WaitGroup
if len(args) > 0 {
wg.Add(1)
go f("stdin", args[0], &wg, func(c net.Conn) (int64, error) { return io.Copy(c, os.Stdin) })
}
if len(args) > 1 {
wg.Add(1)
go f("stdout", args[1], &wg, func(c net.Conn) (int64, error) { return io.Copy(os.Stdout, c) })
}
if len(args) > 2 {
wg.Add(1)
go f("stderr", args[2], &wg, func(c net.Conn) (int64, error) { return io.Copy(os.Stderr, c) })
}
wg.Wait()

return nil
},
}

type eventsSvc struct {
m sync.Mutex
}

func (e *eventsSvc) Forward(ctx context.Context, req *events.ForwardRequest) (*emptypb.Empty, error) {
e.m.Lock()
defer e.m.Unlock()

fmt.Printf("[%s][%s]: %s\n", req.Envelope.Timestamp.AsTime().Format(time.RFC3339), req.Envelope.Namespace, req.Envelope.Topic)
v, err := typeurl.UnmarshalAny(req.Envelope.Event)
if err != nil {
fmt.Printf("\tunmarshal failed: %s\n", err)
}
switch v := v.(type) {
case *eventtypes.TaskCreate:
fmt.Printf("\tContainerID: %s\n", v.ContainerID)
fmt.Printf("\tBundle: %s\n", v.Bundle)
fmt.Printf("\tPID: %d\n", v.Pid)
case *eventtypes.TaskStart:
fmt.Printf("\tContainerID: %s\n", v.ContainerID)
fmt.Printf("\tPID: %d\n", v.Pid)
case *eventtypes.TaskExit:
fmt.Printf("\tID: %s\n", v.ID)
fmt.Printf("\tContainerID: %s\n", v.ContainerID)
fmt.Printf("\tPID: %d\n", v.Pid)
fmt.Printf("\tExitStatus: %d\n", v.ExitStatus)
fmt.Printf("\tExitedAt: %v\n", v.ExitedAt.AsTime().Format(time.RFC3339))
default:
fmt.Printf("\tunrecognized event type: %T\n", v)
}
return &emptypb.Empty{}, nil
}

var eventsCommand = cli.Command{
Name: "events",
Usage: "",
ArgsUsage: "[flags] <address>",
Flags: []cli.Flag{},
SkipArgReorder: true,
Before: appargs.Validate(appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]

l, err := winio.ListenPipe(address, nil)
if err != nil {
return err
}

server, err := ttrpc.NewServer()
if err != nil {
return err
}
events.RegisterEventsService(server, &eventsSvc{})
if err := server.Serve(context.Background(), l); err != nil {
return err
}
return nil
},
}

var lmPrepareCommand = cli.Command{
Name: "lmprepare",
Usage: "Prepares the sandbox for migration",
Expand Down Expand Up @@ -572,57 +316,3 @@ var pb2jsonCommand = cli.Command{
return nil
},
}

var deleteCommand = cli.Command{
Name: "delete",
ArgsUsage: "<pipe> <task id>",
SkipArgReorder: true,
Before: appargs.Validate(appargs.String, appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]
id := args[1]

conn, err := winio.DialPipe(address, nil)
if err != nil {
return fmt.Errorf("dial %s: %w", address, err)
}

client := ttrpc.NewClient(conn)
svc := task.NewTaskClient(client)

ctx := context.Background()

if _, err := svc.Delete(ctx, &task.DeleteRequest{ID: id}); err != nil {
return err
}
return nil
},
}

var shutdownCommand = cli.Command{
Name: "shutdown",
ArgsUsage: "<pipe> <task id>",
SkipArgReorder: true,
Before: appargs.Validate(appargs.String, appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]
id := args[1]

conn, err := winio.DialPipe(address, nil)
if err != nil {
return fmt.Errorf("dial %s: %w", address, err)
}

client := ttrpc.NewClient(conn)
svc := task.NewTaskClient(client)

ctx := context.Background()

if _, err := svc.Shutdown(ctx, &task.ShutdownRequest{ID: id}); err != nil {
return err
}
return nil
},
}
6 changes: 0 additions & 6 deletions cmd/shimdiag/shimdiag.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,8 @@ func main() {
lmDialCommand,
lmTransferCommand,
lmFinalizeCommand,
createCommand,
pipeCommand,
startCommand,
eventsCommand,
json2pbCommand,
pb2jsonCommand,
deleteCommand,
shutdownCommand,
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
Expand Down
13 changes: 10 additions & 3 deletions cmd/task/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,22 @@ var startCommand = cli.Command{
}

var deleteCommand = cli.Command{
Name: "delete",
ArgsUsage: "<pipe> <task id>",
Name: "delete",
ArgsUsage: "<pipe> <task id>",
Flags: []cli.Flag{
cli.StringFlag{
Name: "execid",
},
},
SkipArgReorder: true,
Before: appargs.Validate(appargs.String, appargs.String),
Action: func(clictx *cli.Context) error {
args := clictx.Args()
address := args[0]
id := args[1]

execID := clictx.String("execid")

conn, err := winio.DialPipe(address, nil)
if err != nil {
return fmt.Errorf("dial %s: %w", address, err)
Expand All @@ -219,7 +226,7 @@ var deleteCommand = cli.Command{

ctx := context.Background()

if _, err := svc.Delete(ctx, &task.DeleteRequest{ID: id}); err != nil {
if _, err := svc.Delete(ctx, &task.DeleteRequest{ID: id, ExecID: execID}); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 1d7d1de

Please sign in to comment.