From 936b3a0452c81f8dec21e1b007dc5bbf8f6a9131 Mon Sep 17 00:00:00 2001 From: Dominic Della Valle Date: Tue, 4 May 2021 11:09:35 -0400 Subject: [PATCH] Fix exec deadlock when emitter is not Typer intf --- executor.go | 54 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/executor.go b/executor.go index f25f5e68..5bcfb7a5 100644 --- a/executor.go +++ b/executor.go @@ -50,32 +50,46 @@ func (x *executor) Execute(req *Request, re ResponseEmitter, env Environment) er return err } } + maybeStartPostRun := func(formatters PostRunMap) <-chan error { + var ( + postRun func(Response, ResponseEmitter) error + postRunCh = make(chan error) + ) - // contains the error returned by PostRun - errCh := make(chan error, 1) - if cmd.PostRun != nil { - if typer, ok := re.(interface { - Type() PostRunType - }); ok && cmd.PostRun[typer.Type()] != nil { - var ( - res Response - lower = re - ) - - re, res = NewChanResponsePair(req) + if postRun == nil { + close(postRunCh) + return postRunCh + } - go func() { - defer close(errCh) - errCh <- lower.CloseWithError(cmd.PostRun[typer.Type()](res, lower)) - }() + // check if we have a formatter for this emitter type + typer, isTyper := re.(interface { + Type() PostRunType + }) + if isTyper && + formatters[typer.Type()] != nil { + postRun = formatters[typer.Type()] + } else { + close(postRunCh) + return postRunCh } - } else { - // not using this channel today - close(errCh) + + // redirect emitter to us + // and start waiting for emissions + var ( + postRes Response + postEmitter = re + ) + re, postRes = NewChanResponsePair(req) + go func() { + defer close(postRunCh) + postRunCh <- postEmitter.CloseWithError(postRun(postRes, postEmitter)) + }() + return postRunCh } + postRunCh := maybeStartPostRun(cmd.PostRun) runCloseErr := re.CloseWithError(cmd.Run(req, re, env)) - postCloseErr := <-errCh + postCloseErr := <-postRunCh switch runCloseErr { case ErrClosingClosedEmitter, nil: default: