diff --git a/broker.go b/broker.go index 5523fd4e9..c4596f909 100644 --- a/broker.go +++ b/broker.go @@ -45,6 +45,10 @@ func NewBroker(addr string) *Broker { // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or // AlreadyConnected. If conf is nil, the result of NewConfig() is used. func (b *Broker) Open(conf *Config) error { + if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { + return ErrAlreadyConnected + } + if conf == nil { conf = NewConfig() } @@ -54,18 +58,8 @@ func (b *Broker) Open(conf *Config) error { return err } - if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { - return ErrAlreadyConnected - } - b.lock.Lock() - if b.conn != nil { - b.lock.Unlock() - Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected) - return ErrAlreadyConnected - } - go withRecover(func() { defer b.lock.Unlock() @@ -80,9 +74,9 @@ func (b *Broker) Open(conf *Config) error { b.conn, b.connErr = dialer.Dial("tcp", b.addr) } if b.connErr != nil { + Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) b.conn = nil atomic.StoreInt32(&b.opened, 0) - Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) return } b.conn = newBufConn(b.conn) @@ -129,14 +123,14 @@ func (b *Broker) Close() error { b.done = nil b.responses = nil - atomic.StoreInt32(&b.opened, 0) - if err == nil { Logger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } + atomic.StoreInt32(&b.opened, 0) + return err }