Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
defer bulk channel init for mongo node reuse (#211)
Browse files Browse the repository at this point in the history
* defer bulk channel creation until Listen func

* update CHANGELOG
  • Loading branch information
jipperinbham committed Jan 27, 2017
1 parent 6c81b7b commit 1196e9d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.1.2 [UNRELEASED]

### Bugfixes
- [#211](https://github.com/compose/transporter/pull/211): defer bulk channel init for mongo node reuse
- [#213](https://github.com/compose/transporter/pull/213): track mongodb \_id field so we can attempt to reissue queries


Expand Down
27 changes: 14 additions & 13 deletions pkg/adaptor/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,16 @@ func init() {
}

m := &MongoDB{
restartable: true, // assume for that we're able to restart the process
oplogTimeout: 5 * time.Second, // timeout the oplog iterator
pipe: p,
uri: conf.URI,
tail: conf.Tail,
debug: conf.Debug,
path: path,
opsBuffer: make(map[string][]message.Msg),
bulkWriteChannel: make(chan syncDoc),
bulkQuitChannel: make(chan chan bool),
bulk: conf.Bulk,
conf: conf,
restartable: true, // assume for that we're able to restart the process
oplogTimeout: 5 * time.Second, // timeout the oplog iterator
pipe: p,
uri: conf.URI,
tail: conf.Tail,
debug: conf.Debug,
path: path,
opsBuffer: make(map[string][]message.Msg),
bulk: conf.Bulk,
conf: conf,
}
// opsBuffer: make([]*SyncDoc, 0, MONGO_BUFFER_LEN),

Expand Down Expand Up @@ -218,6 +216,8 @@ func (m *MongoDB) Listen() (err error) {
}()

if m.bulk {
m.bulkWriteChannel = make(chan syncDoc)
m.bulkQuitChannel = make(chan chan bool)
go m.bulkWriter()
}
return m.pipe.Listen(m.writeMessage, m.collectionMatch)
Expand All @@ -228,7 +228,7 @@ func (m *MongoDB) Stop() error {
m.pipe.Stop()

// if we're bulk writing, ask our writer to exit here
if m.bulk {
if m.bulkWriteChannel != nil {
q := make(chan bool)
m.bulkQuitChannel <- q
<-q
Expand Down Expand Up @@ -391,6 +391,7 @@ func (m *MongoDB) catData() error {
break
}
errSleep = time.Second
iter.Close()
sess.Close()
break
}
Expand Down

0 comments on commit 1196e9d

Please sign in to comment.