From 1196e9d352ff4f2ff09d5c02e73f844ee64be833 Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Tue, 13 Dec 2016 10:10:53 -0600 Subject: [PATCH] defer bulk channel init for mongo node reuse (#211) * defer bulk channel creation until Listen func * update CHANGELOG --- CHANGELOG.md | 1 + pkg/adaptor/mongodb/mongodb.go | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2db5b275..9ed91d436 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.1.2 [UNRELEASED] ### Bugfixes +- [#211](https://github.com/compose/transporter/pull/211): defer bulk channel init for mongo node reuse - [#213](https://github.com/compose/transporter/pull/213): track mongodb \_id field so we can attempt to reissue queries diff --git a/pkg/adaptor/mongodb/mongodb.go b/pkg/adaptor/mongodb/mongodb.go index 957b9eede..c1853204d 100644 --- a/pkg/adaptor/mongodb/mongodb.go +++ b/pkg/adaptor/mongodb/mongodb.go @@ -104,18 +104,16 @@ func init() { } m := &MongoDB{ - restartable: true, // assume for that we're able to restart the process - oplogTimeout: 5 * time.Second, // timeout the oplog iterator - pipe: p, - uri: conf.URI, - tail: conf.Tail, - debug: conf.Debug, - path: path, - opsBuffer: make(map[string][]message.Msg), - bulkWriteChannel: make(chan syncDoc), - bulkQuitChannel: make(chan chan bool), - bulk: conf.Bulk, - conf: conf, + restartable: true, // assume for that we're able to restart the process + oplogTimeout: 5 * time.Second, // timeout the oplog iterator + pipe: p, + uri: conf.URI, + tail: conf.Tail, + debug: conf.Debug, + path: path, + opsBuffer: make(map[string][]message.Msg), + bulk: conf.Bulk, + conf: conf, } // opsBuffer: make([]*SyncDoc, 0, MONGO_BUFFER_LEN), @@ -218,6 +216,8 @@ func (m *MongoDB) Listen() (err error) { }() if m.bulk { + m.bulkWriteChannel = make(chan syncDoc) + m.bulkQuitChannel = make(chan chan bool) go m.bulkWriter() } return m.pipe.Listen(m.writeMessage, m.collectionMatch) @@ -228,7 +228,7 @@ func (m *MongoDB) Stop() error { m.pipe.Stop() // if we're bulk writing, ask our writer to exit here - if m.bulk { + if m.bulkWriteChannel != nil { q := make(chan bool) m.bulkQuitChannel <- q <-q @@ -391,6 +391,7 @@ func (m *MongoDB) catData() error { break } errSleep = time.Second + iter.Close() sess.Close() break }