Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctr-remote: allow analyzer waiting for a line from the container #933

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package analyzer

import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -161,6 +163,7 @@ func Analyze(ctx context.Context, client *containerd.Client, ref string, opts ..
defer container.Delete(ctx, containerd.WithSnapshotCleanup)
var ioCreator cio.Creator
var con console.Console
waitLine := newLineWaiter(aOpts.waitLineOut)
stdinC := newLazyReadCloser(os.Stdin)
if aOpts.terminal {
if !aOpts.stdin {
Expand All @@ -172,11 +175,11 @@ func Analyze(ctx context.Context, client *containerd.Client, ref string, opts ..
return "", err
}
// On terminal mode, the "stderr" field is unused.
ioCreator = cio.NewCreator(cio.WithStreams(con, con, nil), cio.WithTerminal)
ioCreator = cio.NewCreator(cio.WithStreams(con, waitLine.registerWriter(con), nil), cio.WithTerminal)
} else if aOpts.stdin {
ioCreator = cio.NewCreator(cio.WithStreams(stdinC, os.Stdout, os.Stderr))
ioCreator = cio.NewCreator(cio.WithStreams(stdinC, waitLine.registerWriter(os.Stdout), os.Stderr))
} else {
ioCreator = cio.NewCreator(cio.WithStreams(nil, os.Stdout, os.Stderr))
ioCreator = cio.NewCreator(cio.WithStreams(nil, waitLine.registerWriter(os.Stdout), os.Stderr))
}
task, err := container.NewTask(ctx, ioCreator)
if err != nil {
Expand Down Expand Up @@ -248,7 +251,7 @@ func Analyze(ctx context.Context, client *containerd.Client, ref string, opts ..
aOpts.period = defaultPeriod
}
log.G(ctx).Infof("waiting for %v ...", aOpts.period)
status, killOk, err = waitOnTimeout(ctx, container, task, aOpts.period)
status, killOk, err = waitOnTimeout(ctx, container, task, aOpts.period, waitLine)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -325,23 +328,25 @@ func waitOnSignal(ctx context.Context, container containerd.Container, task cont
}
}

func waitOnTimeout(ctx context.Context, container containerd.Container, task containerd.Task, period time.Duration) (containerd.ExitStatus, bool, error) {
func waitOnTimeout(ctx context.Context, container containerd.Container, task containerd.Task, period time.Duration, line *lineWaiter) (containerd.ExitStatus, bool, error) {
statusC, err := task.Wait(ctx)
if err != nil {
return containerd.ExitStatus{}, false, err
}
select {
case status := <-statusC:
return status, true, nil
case l := <-line.waitCh:
log.G(ctx).Infof("Waiting line detected %q; killing task", l)
case <-time.After(period):
log.G(ctx).Warnf("killing task. the time period to monitor access log (%s) has timed out", period.String())
status, err := killTask(ctx, container, task, statusC)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to kill container")
return containerd.ExitStatus{}, false, nil
}
return status, true, nil
}
status, err := killTask(ctx, container, task, statusC)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to kill container")
return containerd.ExitStatus{}, false, nil
}
return status, true, nil
}

func killTask(ctx context.Context, container containerd.Container, task containerd.Task, statusC <-chan containerd.ExitStatus) (containerd.ExitStatus, error) {
Expand Down Expand Up @@ -399,3 +404,37 @@ func (s *lazyReadCloser) Read(p []byte) (int, error) {
}
return n, err
}

func newLineWaiter(s string) *lineWaiter {
return &lineWaiter{
waitCh: make(chan string),
waitLine: s,
}
}

type lineWaiter struct {
waitCh chan string
waitLine string
}

func (lw *lineWaiter) registerWriter(w io.Writer) io.Writer {
if lw.waitLine == "" {
return w
}

pr, pw := io.Pipe()
go func() {
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
if strings.Contains(scanner.Text(), lw.waitLine) {
lw.waitCh <- lw.waitLine
}
}
if _, err := io.Copy(io.Discard, pr); err != nil {
pr.CloseWithError(err)
return
}
}()

return io.MultiWriter(w, pw)
}
9 changes: 9 additions & 0 deletions analyzer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type analyzerOpts struct {
specOpts SpecOpts
terminal bool
stdin bool
waitLineOut string
}

// Option is runtime configuration of analyzer container
Expand Down Expand Up @@ -79,3 +80,11 @@ func WithSnapshotter(snapshotter string) Option {
opts.snapshotter = snapshotter
}
}

// WithWaitLineOut specifies a substring of a stdout line to be waited.
// When this line is detected, the container will be killed.
func WithWaitLineOut(s string) Option {
return func(opts *analyzerOpts) {
opts.waitLineOut = s
}
}
7 changes: 6 additions & 1 deletion cmd/ctr-remote/commands/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ var OptimizeCommand = cli.Command{
Name: "wait-on-signal",
Usage: "ignore context cancel and keep the container running until it receives SIGINT (Ctrl + C) sent manually",
},
cli.StringFlag{
Name: "wait-on-line",
Usage: "Substring of a stdout line to be waited. When this string is detected, the container will be killed.",
},
cli.BoolFlag{
Name: "no-optimize",
Usage: "convert image without optimization",
Expand Down Expand Up @@ -231,7 +235,8 @@ func analyze(ctx context.Context, clicontext *cli.Context, client *containerd.Cl
aOpts = append(aOpts, analyzer.WithWaitOnSignal())
} else {
aOpts = append(aOpts,
analyzer.WithPeriod(time.Duration(clicontext.Int("period"))*time.Second))
analyzer.WithPeriod(time.Duration(clicontext.Int("period"))*time.Second),
analyzer.WithWaitLineOut(clicontext.String("wait-on-line")))
}
if clicontext.Bool("terminal") {
if !clicontext.Bool("i") {
Expand Down