Skip to content
This repository has been archived by the owner on Jul 17, 2024. It is now read-only.

Commit

Permalink
feat: support follow the k8s log (#131)
Browse files Browse the repository at this point in the history
Signed-off-by: Keming <kemingyang@tensorchord.ai>
  • Loading branch information
kemingy authored Aug 8, 2023
1 parent 5cadbb7 commit c23067f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
41 changes: 26 additions & 15 deletions agent/client/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"net/url"
"strings"

"github.com/sirupsen/logrus"
"github.com/tensorchord/openmodelz/agent/api/types"
)

const LogBufferSize = 128

// DeploymentLogGet gets the deployment logs.
func (cli *Client) DeploymentLogGet(ctx context.Context, namespace, name string,
since string, tail int, end string) (
[]types.Message, error) {
since string, tail int, end string, follow bool) (
<-chan types.Message, error) {
urlValues := url.Values{}
urlValues.Add("namespace", namespace)
urlValues.Add("name", name)
Expand All @@ -35,26 +38,34 @@ func (cli *Client) DeploymentLogGet(ctx context.Context, namespace, name string,
urlValues.Add("tail", fmt.Sprintf("%d", tail))
}

resp, err := cli.get(ctx, "/system/logs/inference", urlValues, nil)
defer ensureReaderClosed(resp)
if follow {
urlValues.Add("follow", "true")
}

resp, err := cli.get(ctx, "/system/logs/inference", urlValues, nil)

if err != nil {
return nil,
wrapResponseError(err, resp, "deployment logs", name)
return nil, wrapResponseError(err, resp, "deployment logs", name)
}


stream := make(chan types.Message, LogBufferSize)
var log types.Message
logs := []types.Message{}
scanner := bufio.NewScanner(resp.body)
for scanner.Scan() {
err = json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&log)
if err != nil {
return nil, wrapResponseError(err, resp, "deployment logs", name)
go func () {
defer ensureReaderClosed(resp)
defer close(stream)
for scanner.Scan() {
err = json.Unmarshal(scanner.Bytes(), &log)
if err != nil {
logrus.Warnf("failed to decode %s log: %v | %s | [%s]", name, err, scanner.Text(), scanner.Err())
return
// continue
}
stream <- log
}
logs = append(logs, log)
}
}()

return logs, err
return stream, err
}

func (cli *Client) BuildLogGet(ctx context.Context, namespace, name, since string,
Expand Down
23 changes: 13 additions & 10 deletions agent/pkg/log/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package log

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -63,6 +62,9 @@ func (k *K8sAPIRequestor) Query(ctx context.Context,
if err != nil {
return nil, errdefs.InvalidParameter(err)
}
} else if r.Follow {
// avoid truncate
endTime = time.Now().Add(time.Hour)
} else {
endTime = time.Now()
}
Expand Down Expand Up @@ -152,22 +154,17 @@ func podLogs(ctx context.Context, i v1.PodInterface, pod, container,
opts.SinceSeconds = parseSince(since)
}

stream, err := i.GetLogs(pod, opts).Stream(context.TODO())
stream, err := i.GetLogs(pod, opts).Stream(ctx)
if err != nil {
return err
}
defer stream.Close()

done := make(chan error)
go func() {
reader := bufio.NewReader(stream)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
done <- err
return
}
msg, ts := extractTimestampAndMsg(string(bytes.Trim(line, "\x00")))
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
msg, ts := extractTimestampAndMsg(scanner.Text())
dst <- types.Message{
Timestamp: ts,
Text: msg,
Expand All @@ -176,13 +173,19 @@ func podLogs(ctx context.Context, i v1.PodInterface, pod, container,
Namespace: namespace,
}
}
if err := scanner.Err(); err != nil {
done <- err
return
}
}()

select {
case <-ctx.Done():
logrus.Debug("get-log context cancelled")
return ctx.Err()
case err := <-done:
if err != io.EOF {
logrus.Debugf("failed to read from pod log: %v", err)
return err
}
return nil
Expand Down
10 changes: 8 additions & 2 deletions agent/pkg/server/handler_inference_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net/http"
"time"

"github.com/cockroachdb/errors"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -46,8 +47,13 @@ func (s Server) getLogsFromRequester(c *gin.Context, requester log.Requester) er
}
_ = cn

ctx, cancelQuery := context.WithTimeout(c.Request.Context(),
s.config.Inference.LogTimeout)
timeout := s.config.Inference.LogTimeout
if req.Follow {
// use a much larger timeout for streaming log
timeout = time.Hour
}

ctx, cancelQuery := context.WithTimeout(c.Request.Context(), timeout)
defer cancelQuery()

messages, err := requester.Query(ctx, req)
Expand Down
6 changes: 4 additions & 2 deletions mdz/pkg/cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var (
tail int
since string
end string
follow bool
)

// logCmd represents the log command
Expand Down Expand Up @@ -36,15 +37,16 @@ func init() {
logsCmd.Flags().IntVarP(&tail, "tail", "t", 0, "Number of lines to show from the end of the logs")
logsCmd.Flags().StringVarP(&since, "since", "s", "2006-01-02T15:04:05Z", "Show logs since timestamp (e.g. 2013-01-02T13:23:37Z) or relative (e.g. 42m for 42 minutes)")
logsCmd.Flags().StringVarP(&end, "end", "e", "", "Only return logs before this timestamp (e.g. 2013-01-02T13:23:37Z) or relative (e.g. 42m for 42 minutes)")
logsCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow log output")
}

func commandLogs(cmd *cobra.Command, args []string) error {
logs, err := agentClient.DeploymentLogGet(cmd.Context(), namespace, args[0], since, tail, end)
logStream, err := agentClient.DeploymentLogGet(cmd.Context(), namespace, args[0], since, tail, end, follow)
if err != nil {
cmd.PrintErrf("Failed to get logs: %s\n", err)
return err
}
for _, log := range logs {
for log := range logStream {
cmd.Printf("%s: %s\n", log.Instance, log.Text)
}
return nil
Expand Down

0 comments on commit c23067f

Please sign in to comment.