Skip to content

Commit

Permalink
introduce ServiceLogs API
Browse files Browse the repository at this point in the history
* cmd: support filtering for multiple dependencies of service log command.

a part of #337
fixes a  part of #426
  • Loading branch information
ilgooz committed Sep 9, 2018
1 parent 9531a5c commit c4fb031
Show file tree
Hide file tree
Showing 11 changed files with 971 additions and 102 deletions.
18 changes: 18 additions & 0 deletions api/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package api

import "github.com/mesg-foundation/core/service"

// ServiceLogsFilter is a filter func for filtering service logs.
type ServiceLogsFilter func(*logLogger)

// ServiceLogsDependenciesFilter returns a dependency filter.
func ServiceLogsDependenciesFilter(dependencies ...string) ServiceLogsFilter {
return func(s *logLogger) {
s.dependencies = dependencies
}
}

// ServiceLogs gives logs for all dependencies or one when specified with filters of service serviceID.
func (a *API) ServiceLogs(serviceID string, filters ...ServiceLogsFilter) ([]*service.Logs, error) {
return newLogLogger(a, filters...).logs(serviceID)
}
34 changes: 34 additions & 0 deletions api/log_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api

import (
"github.com/mesg-foundation/core/database/services"
"github.com/mesg-foundation/core/service"
)

// logLogger provides functionalities to get service logs.
type logLogger struct {
// dependencies used to get only logs from requested dependencies.
dependencies []string

api *API
}

// newLogLogger creates a new logLogger with given api and dependency filters.
func newLogLogger(api *API, filters ...ServiceLogsFilter) *logLogger {
l := &logLogger{
api: api,
}
for _, filter := range filters {
filter(l)
}
return l
}

// logs gives logs of service serviceID and applies dependency filters to filter logs.
func (l *logLogger) logs(serviceID string) ([]*service.Logs, error) {
s, err := services.Get(serviceID)
if err != nil {
return nil, err
}
return s.Logs(l.dependencies...)
}
2 changes: 1 addition & 1 deletion cmd/service/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func devHandler(cmd *cobra.Command, args []string) {
go listenEvents(serviceID, cmd.Flag("event-filter").Value.String())
go listenResults(serviceID, cmd.Flag("task-filter").Value.String(), cmd.Flag("output-filter").Value.String())

closeReaders := showLogs(serviceID, "*")
closeReaders := showLogs(serviceID)
defer closeReaders()

<-xsignal.WaitForInterrupt()
Expand Down
157 changes: 142 additions & 15 deletions cmd/service/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package service

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"time"

"github.com/docker/docker/pkg/stdcopy"
"github.com/fatih/color"
"github.com/mesg-foundation/core/cmd/utils"
"github.com/mesg-foundation/core/database/services"
"github.com/mesg-foundation/core/interface/grpc/core"
"github.com/mesg-foundation/core/x/xsignal"
"github.com/mesg-foundation/prefixer"
"github.com/spf13/cobra"
)

Expand All @@ -23,34 +27,157 @@ mesg-core service logs SERVICE_ID --dependency DEPENDENCY_NAME`,
DisableAutoGenTag: true,
}

var dependencies []string

func init() {
Logs.Flags().StringP("dependency", "d", "*", "Name of the dependency to only show the logs from")
Logs.Flags().StringArrayVarP(&dependencies, "dependency", "d", nil, "Name of the dependency to only show the logs from")
}

func logsHandler(cmd *cobra.Command, args []string) {
closeReaders := showLogs(args[0], cmd.Flag("dependency").Value.String())
closeReaders := showLogs(args[0], dependencies...)
defer closeReaders()
<-xsignal.WaitForInterrupt()
}

func showLogs(serviceID string, dependency string) func() {
reply, err := cli().GetService(context.Background(), &core.GetServiceRequest{
ServiceID: serviceID,
// dependencyLogs keeps dependency info and corresponding std & err log streams.
type dependencyLogs struct {
Dependency string
Standard, Error *logReader
}

func showLogs(serviceID string, dependencies ...string) func() {
stream, err := cli().ServiceLogs(context.Background(), &core.ServiceLogsRequest{
ServiceID: serviceID,
Dependencies: dependencies,
})
utils.HandleError(err)

// TODO(ilgooz) rm this when we stop using internal methods of service in cmd.
s, err := services.Get(reply.Service.ID)
// first received dependency list.
data, err := stream.Recv()
utils.HandleError(err)

readers, err := s.Logs(dependency)
utils.HandleError(err)
for _, reader := range readers {
go stdcopy.StdCopy(os.Stdout, os.Stderr, reader)
var (
logs []*dependencyLogs
readers []io.Reader
closers []io.Closer
)

for _, dep := range data.Depedencies {
var (
rstd = newLogReader(dep, core.LogData_Data_Standard)
rerr = newLogReader(dep, core.LogData_Data_Error)
prefix = color.New(randColor()).Sprintf("%s |", dep)
)

readers = append(readers, newPrefixedReader(rstd, prefix), newPrefixedReader(rerr, prefix))
closers = append(closers, rstd, rerr)

logs = append(logs, &dependencyLogs{
Dependency: dep,
Standard: rstd,
Error: rerr,
})
}

for _, r := range readers {
go io.Copy(os.Stdout, r)
}

for {
data, err := stream.Recv()
if err != nil {
break
}
for _, l := range logs {
l.Standard.process(data)
l.Error.process(data)
}
}

return func() {
for _, reader := range readers {
reader.Close()
for _, c := range closers {
c.Close()
}
}
}

// logReader implements io.Reader to combine log data chunks being received
// from gRPC stream.
type logReader struct {
dependency string
typ core.LogData_Data_Type

recv chan []byte
done chan struct{}

data []byte
i int64
}

// newLogReader returns a new log reader.
func newLogReader(dependency string, typ core.LogData_Data_Type) *logReader {
return &logReader{
dependency: dependency,
typ: typ,
recv: make(chan []byte, 0),
done: make(chan struct{}, 0),
}
}

// process processes log data received from gRPC stream and checks if it belongs
// to this log stream.
func (r *logReader) process(data *core.LogData) {
if r.dependency == data.Data.Dependency &&
r.typ == data.Data.Type {
r.recv <- data.Data.Data
}
}

// Read implements io.Reader.
func (r *logReader) Read(p []byte) (n int, err error) {
if r.i >= int64(len(r.data)) {
for {
select {
case <-r.done:
return 0, io.EOF

case data := <-r.recv:
if err != nil {
return 0, err
}
r.data = data
r.i = 0
return r.Read(p)
}
}
}
n = copy(p, r.data[r.i:])
r.i += int64(n)
return n, nil
}

// Close closes log reader.
func (r *logReader) Close() error {
close(r.done)
return nil
}

// newPrefixedReader wraps io.Reader by adding a prefix for each new line
// in the stream.
func newPrefixedReader(r io.Reader, prefix string) io.Reader {
return prefixer.New(r, fmt.Sprintf("%s ", prefix))
}

// randColor returns a random color.
func randColor() color.Attribute {
attrs := []color.Attribute{
color.FgRed,
color.FgGreen,
color.FgYellow,
color.FgBlue,
color.FgMagenta,
color.FgCyan,
}
rand.Seed(time.Now().UnixNano())
return attrs[rand.Intn(len(attrs))]
}
Loading

0 comments on commit c4fb031

Please sign in to comment.