Skip to content
This repository has been archived by the owner on Jun 18, 2022. It is now read-only.

Commit

Permalink
Close logs when user cancels request
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Jellick authored and ibuildthecloud committed Jan 19, 2017
1 parent 4273587 commit ab67993
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions service/hostapi/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package logs

import (
"bufio"
"io"
"bytes"
"net/url"
"strconv"
"strings"

log "github.com/Sirupsen/logrus"

"github.com/rancher/websocket-proxy/backend"
"github.com/rancher/websocket-proxy/common"

"bytes"
"github.com/docker/distribution/context"
"github.com/docker/docker/api/types"
"github.com/rancher/agent/service/hostapi/auth"
"github.com/rancher/agent/service/hostapi/events"
"golang.org/x/net/context"
)

var (
Expand Down Expand Up @@ -73,49 +73,51 @@ func (l *Handler) Handle(key string, initialMessage string, incomingMessages <-c
Tail: tail,
}

stdoutReader, err := client.ContainerLogs(context.Background(), container, logOpts)
ctx, cancelFnc := context.WithCancel(context.Background())
stdout, err := client.ContainerLogs(ctx, container, logOpts)
if err != nil {
log.Error(err)
return
}
defer stdout.Close()

go func() {
for {
_, ok := <-incomingMessages
if !ok {
cancelFnc()
return
}
}
}()

go func(stdout io.ReadCloser) {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
body := ""
data := scanner.Bytes()
if bytes.Contains(data, stdoutHead) {
if len(data) > 8 {
body = stdoutPrefix + string(data[8:])
}
} else if bytes.Contains(data, stderrHead) {
if len(data) > 8 {
body = stderrPrefix + string(data[8:])
}
} else {
body = bothPrefix + string(data)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
body := ""
data := scanner.Bytes()
if bytes.Contains(data, stdoutHead) {
if len(data) > 8 {
body = stdoutPrefix + string(data[8:])
}
message := common.Message{
Key: key,
Type: common.Body,
Body: body,
} else if bytes.Contains(data, stderrHead) {
if len(data) > 8 {
body = stderrPrefix + string(data[8:])
}
response <- message
} else {
body = bothPrefix + string(data)
}
message := common.Message{
Key: key,
Type: common.Body,
Body: body,
}
if err := scanner.Err(); err != nil {
response <- message
}
if err := scanner.Err(); err != nil {
// hacky, but can't do a type assertion on the cancellation error, which is the "normal" error received
// when the logs are closed properly
if !strings.Contains(err.Error(), "request canceled") {
log.WithFields(log.Fields{"error": err}).Error("Error with the container log scanner.")
}
stdout.Close()
}(stdoutReader)

select {}
}
}

0 comments on commit ab67993

Please sign in to comment.