diff --git a/go/pkg/electron/electron_test.go b/go/pkg/electron/electron_test.go index d76e10ad32..ddbce35ec1 100644 --- a/go/pkg/electron/electron_test.go +++ b/go/pkg/electron/electron_test.go @@ -331,3 +331,18 @@ func TestHeartbeat(t *testing.T) { t.Error("expected server side time-out or connection abort error") } } + +func TestReceiverCloseBeforeAcknowledge(t *testing.T) { + p := newPipe(t, nil, nil) + defer func() { p.close() }() + r, s := p.receiver(Source("foo")) + go func() { + out := s.SendSync(amqp.NewMessageWith(0)) + _ = test.ErrorIf(t, test.Differ(Closed, out.Error)) + }() + rm, err := r.Receive() + test.FatalIf(t, err) + r.Close(nil) + <-r.Done() + _ = test.ErrorIf(t, test.Differ(Closed, rm.Accept())) +} diff --git a/go/pkg/electron/receiver.go b/go/pkg/electron/receiver.go index d412ac1eca..af49c5351d 100644 --- a/go/pkg/electron/receiver.go +++ b/go/pkg/electron/receiver.go @@ -28,7 +28,6 @@ import ( ) // Receiver is a Link that receives messages. -// type Receiver interface { Endpoint LinkSettings @@ -201,10 +200,19 @@ type ReceivedMessage struct { // Acknowledge a ReceivedMessage with the given delivery status. func (rm *ReceivedMessage) acknowledge(status uint64) error { - return rm.receiver.(*receiver).engine().Inject(func() { - // Deliveries are valid as long as the connection is, unless settled. - rm.pDelivery.SettleAs(uint64(status)) + receiverError := make(chan error) + err := rm.receiver.(*receiver).engine().Inject(func() { + // Deliveries are valid as long as the receiver is, unless settled. + err := rm.receiver.Error() + receiverError <- err + if err == nil { + rm.pDelivery.SettleAs(status) + } }) + if err != nil { + return err + } + return <-receiverError } // Accept tells the sender that we take responsibility for processing the message.