diff --git a/commands/commands.go b/commands/commands.go index b20e48ac7..f119e246f 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -28,7 +28,7 @@ type ServiceExecutor interface { ServicePublishDefinitionFile(path string) (string, error) ServiceListenEvents(id, eventFilter string) (chan *coreapi.EventData, chan error, error) ServiceListenResults(id, taskFilter, outputFilter string, tagFilters []string) (chan *coreapi.ResultData, chan error, error) - ServiceLogs(id string, dependencies ...string) (logs []*provider.Log, closer func(), err error) + ServiceLogs(id string, dependencies ...string) (logs []*provider.Log, closer func(), errC chan error, err error) ServiceExecuteTask(id, taskKey, inputData string, tags []string) error ServiceStart(id string) error ServiceStop(id string) error diff --git a/commands/mocks/Executor.go b/commands/mocks/Executor.go index 1a6f116a0..c20a668e6 100644 --- a/commands/mocks/Executor.go +++ b/commands/mocks/Executor.go @@ -276,7 +276,7 @@ func (_m *Executor) ServiceListenResults(id string, taskFilter string, outputFil } // ServiceLogs provides a mock function with given fields: id, dependencies -func (_m *Executor) ServiceLogs(id string, dependencies ...string) ([]*provider.Log, func(), error) { +func (_m *Executor) ServiceLogs(id string, dependencies ...string) ([]*provider.Log, func(), chan error, error) { _va := make([]interface{}, len(dependencies)) for _i := range dependencies { _va[_i] = dependencies[_i] @@ -304,14 +304,23 @@ func (_m *Executor) ServiceLogs(id string, dependencies ...string) ([]*provider. } } - var r2 error - if rf, ok := ret.Get(2).(func(string, ...string) error); ok { + var r2 chan error + if rf, ok := ret.Get(2).(func(string, ...string) chan error); ok { r2 = rf(id, dependencies...) } else { - r2 = ret.Error(2) + if ret.Get(2) != nil { + r2 = ret.Get(2).(chan error) + } } - return r0, r1, r2 + var r3 error + if rf, ok := ret.Get(3).(func(string, ...string) error); ok { + r3 = rf(id, dependencies...) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 } // ServicePublishDefinitionFile provides a mock function with given fields: path diff --git a/commands/provider/service_provider.go b/commands/provider/service_provider.go index dd26d8eb3..2ad50afe3 100644 --- a/commands/provider/service_provider.go +++ b/commands/provider/service_provider.go @@ -159,13 +159,13 @@ type Log struct { } // ServiceLogs returns logs reader for all service dependencies. -func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs []*Log, close func(), err error) { +func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs []*Log, close func(), errC chan error, err error) { if len(dependencies) == 0 { resp, err := p.client.GetService(context.Background(), &coreapi.GetServiceRequest{ ServiceID: id, }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for _, dep := range resp.Service.Dependencies { dependencies = append(dependencies, dep.Key) @@ -180,7 +180,7 @@ func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs [ }) if err != nil { cancel() - return nil, nil, err + return nil, nil, nil, err } for _, key := range dependencies { @@ -200,22 +200,19 @@ func (p *ServiceProvider) ServiceLogs(id string, dependencies ...string) (logs [ } } - errC := make(chan error, len(logs)) + errC = make(chan error) go func() { <-stream.Context().Done() errC <- stream.Context().Err() }() go p.listenServiceLogs(stream, logs, errC) - go func() { - <-errC - closer() - }() if err := acknowledgement.WaitForStreamToBeReady(stream); err != nil { - return nil, nil, err + closer() + return nil, nil, nil, err } - return logs, closer, nil + return logs, closer, errC, nil } // listenServiceLogs listen gRPC stream to get service logs. diff --git a/commands/service_dev.go b/commands/service_dev.go index dc816f5ba..0effda916 100644 --- a/commands/service_dev.go +++ b/commands/service_dev.go @@ -107,7 +107,7 @@ func (c *serviceDevCmd) runE(cmd *cobra.Command, args []string) error { return err } - closer, err := showLogs(c.e, id) + closer, logsErrC, err := showLogs(c.e, id) if err != nil { return err } @@ -121,6 +121,9 @@ func (c *serviceDevCmd) runE(cmd *cobra.Command, args []string) error { case <-abort: return nil + case err := <-logsErrC: + return err + case e := <-listenEventsC: fmt.Printf("Receive event %s: %s\n", pretty.Success(e.EventKey), diff --git a/commands/service_logs.go b/commands/service_logs.go index 63758c6d2..4b517df10 100644 --- a/commands/service_logs.go +++ b/commands/service_logs.go @@ -41,25 +41,29 @@ mesg-core service logs SERVICE --dependencies DEPENDENCY_NAME,DEPENDENCY_NAME,.. } func (c *serviceLogsCmd) runE(cmd *cobra.Command, args []string) error { - closer, err := showLogs(c.e, args[0], c.dependencies...) + closer, errC, err := showLogs(c.e, args[0], c.dependencies...) if err != nil { return err } defer closer() - <-xsignal.WaitForInterrupt() - return nil + select { + case err := <-errC: + return err + case <-xsignal.WaitForInterrupt(): + return nil + } } -func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (closer func(), err error) { +func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (closer func(), errC chan error, err error) { var ( logs []*provider.Log ) pretty.Progress("Loading logs...", func() { - logs, closer, err = e.ServiceLogs(serviceID, dependencies...) + logs, closer, errC, err = e.ServiceLogs(serviceID, dependencies...) }) if err != nil { - return nil, err + return nil, nil, err } // if there was no dependencies copy all returned @@ -77,7 +81,7 @@ func showLogs(e ServiceExecutor, serviceID string, dependencies ...string) (clos go prefixedCopy(os.Stderr, log.Error, prefixes[log.Dependency]) } - return closer, nil + return closer, errC, nil } // dependencyPrefixes returns colored dependency name prefixes. diff --git a/utils/chunker/stream.go b/utils/chunker/stream.go index ed0a6d7d4..01a9e9081 100644 --- a/utils/chunker/stream.go +++ b/utils/chunker/stream.go @@ -20,7 +20,7 @@ type Stream struct { func NewStream() *Stream { return &Stream{ recv: make(chan []byte), - done: make(chan struct{}), + done: make(chan struct{}, 1), } } @@ -60,6 +60,5 @@ func (s *Stream) Close() error { return nil } s.done <- struct{}{} - s.done = nil return nil }