Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve command logs errors #772

Merged
merged 5 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ServiceExecutor interface {
ServicePublishDefinitionFile(path string) (string, error)
ServiceListenEvents(id, eventFilter string) (chan *coreapi.EventData, chan error, error)
ServiceListenResults(id, taskFilter, outputFilter string, tagFilters []string) (chan *coreapi.ResultData, chan error, error)
ServiceLogs(id string, dependencies ...string) (logs []*provider.Log, closer func(), err error)
ServiceLogs(id string, dependencies ...string) (logs []*provider.Log, closer func(), errC chan error, err error)
ServiceExecuteTask(id, taskKey, inputData string, tags []string) error
ServiceStart(id string) error
ServiceStop(id string) error
Expand Down
19 changes: 14 additions & 5 deletions commands/mocks/Executor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions commands/provider/service_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ type Log struct {
}

// ServiceLogs returns logs reader for all service dependencies.
func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs []*Log, close func(), err error) {
func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs []*Log, close func(), errC chan error, err error) {
if len(dependencies) == 0 {
resp, err := p.client.GetService(context.Background(), &coreapi.GetServiceRequest{
ServiceID: id,
})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for _, dep := range resp.Service.Dependencies {
dependencies = append(dependencies, dep.Key)
Expand All @@ -180,7 +180,7 @@ func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs [
})
if err != nil {
cancel()
return nil, nil, err
return nil, nil, nil, err
}

for _, key := range dependencies {
Expand All @@ -200,22 +200,19 @@ func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs [
}
}

errC := make(chan error, len(logs))
errC = make(chan error)
go func() {
<-stream.Context().Done()
errC <- stream.Context().Err()
}()
go p.listenServiceLogs(stream, logs, errC)
go func() {
<-errC
closer()
}()

if err := acknowledgement.WaitForStreamToBeReady(stream); err != nil {
return nil, nil, err
closer()
return nil, nil, nil, err
}

return logs, closer, nil
return logs, closer, errC, nil
}

// listenServiceLogs listen gRPC stream to get service logs.
Expand Down
5 changes: 4 additions & 1 deletion commands/service_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *serviceDevCmd) runE(cmd *cobra.Command, args []string) error {
return err
}

closer, err := showLogs(c.e, id)
closer, logsErrC, err := showLogs(c.e, id)
if err != nil {
return err
}
Expand All @@ -121,6 +121,9 @@ func (c *serviceDevCmd) runE(cmd *cobra.Command, args []string) error {
case <-abort:
return nil

case err := <-logsErrC:
return err

case e := <-listenEventsC:
fmt.Printf("Receive event %s: %s\n",
pretty.Success(e.EventKey),
Expand Down
18 changes: 11 additions & 7 deletions commands/service_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,29 @@ mesg-core service logs SERVICE --dependencies DEPENDENCY_NAME,DEPENDENCY_NAME,..
}

func (c *serviceLogsCmd) runE(cmd *cobra.Command, args []string) error {
closer, err := showLogs(c.e, args[0], c.dependencies...)
closer, errC, err := showLogs(c.e, args[0], c.dependencies...)
if err != nil {
return err
}
defer closer()

<-xsignal.WaitForInterrupt()
return nil
select {
case err := <-errC:
return err
case <-xsignal.WaitForInterrupt():
return nil
}
}

func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (closer func(), err error) {
func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (closer func(), errC chan error, err error) {
var (
logs []*provider.Log
)
pretty.Progress("Loading logs...", func() {
logs, closer, err = e.ServiceLogs(serviceID, dependencies...)
logs, closer, errC, err = e.ServiceLogs(serviceID, dependencies...)
})
if err != nil {
return nil, err
return nil, nil, err
}

// if there was no dependencies copy all returned
Expand All @@ -77,7 +81,7 @@ func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (clos
go prefixedCopy(os.Stderr, log.Error, prefixes[log.Dependency])
}

return closer, nil
return closer, errC, nil
}

// dependencyPrefixes returns colored dependency name prefixes.
Expand Down
3 changes: 1 addition & 2 deletions utils/chunker/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Stream struct {
func NewStream() *Stream {
return &Stream{
recv: make(chan []byte),
done: make(chan struct{}),
done: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -60,6 +60,5 @@ func (s *Stream) Close() error {
return nil
}
s.done <- struct{}{}
s.done = nil
return nil
}