diff --git a/rabbus.go b/rabbus.go index a3fe8a7..8aeb0ab 100644 --- a/rabbus.go +++ b/rabbus.go @@ -157,7 +157,7 @@ func New(dsn string, options ...Option) (*Rabbus, error) { emit: make(chan Message), emitErr: make(chan error), emitOk: make(chan struct{}), - reconn: make(chan struct{}, 10), + reconn: make(chan struct{}), exDeclared: make(map[string]struct{}), } @@ -192,16 +192,28 @@ func New(dsn string, options ...Option) (*Rabbus, error) { // Run starts rabbus channels for emitting and listening for amqp connection close // returns ctx error in case of any. func (r *Rabbus) Run(ctx context.Context) error { + notifyClose := r.NotifyClose(make(chan *amqp.Error)) + for { select { case m, ok := <-r.emit: - if ok { - r.produce(m) + if !ok { + return errors.New("unexpected close of emit channel") } - case err, ok := <-r.NotifyClose(make(chan *amqp.Error)): - if ok { - r.handleAmqpClose(err) + + r.produce(m) + + case err := <-notifyClose: + if err == nil { + // "… on a graceful close, no error will be sent." + return nil } + + r.handleAmqpClose(err) + + // We have reconnected, so we need a new NotifyClose again. + notifyClose = r.NotifyClose(make(chan *amqp.Error)) + case <-ctx.Done(): return ctx.Err() } @@ -437,7 +449,6 @@ func (r *Rabbus) listenReconn(c ListenConfig, messages chan ConsumerMessage) { for range r.reconn { msgs, err := r.CreateConsumer(c.Exchange, c.Key, c.Kind, c.Queue, r.config.durable) if err != nil { - r.Close() continue }