Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

defer bulk channel init for mongo node reuse #211

Merged
merged 3 commits into from
Dec 13, 2016
Merged

Conversation

jipperinbham
Copy link
Contributor

@jipperinbham jipperinbham commented Dec 8, 2016

  • CHANGELOG.md updated (feel free to wait until changes have been reviewed by a maintainer)

When a mongodb adaptor is defined in the config.yaml with bulk=true and is used for the Source and Sink in the application.js, transporter will never shutdown properly.

The following change adds a new field to the adaptor and sets it when the Listen func is called and checked with Stop is called.

fixes #152

@trinchan
Copy link
Contributor

trinchan commented Dec 9, 2016

I'm having a hard time figuring out how the bulkWriter close blocks, causing the app to hang. From what I understand, the only exit from that bulkWriter is through the quit channel and we're skipping over that here because it presumably blocks. Is the final data flush hanging? If so, I think it's more natural to leave the quit/cleanup in and attacking the bug from the bulk writer end.

If you could a quick execution path summary of the blocking state, that'd be awesome.

I don't doubt this works, I just want to make sure I understand the code path properly.

@jipperinbham
Copy link
Contributor Author

jipperinbham commented Dec 12, 2016

transporter_config.yaml:

nodes:
  my_mongo:
    type: mongo
    uri: mongodb://127.0.0.1:27017/my_database
    bulk: true

transporter_app.js:

Source({name:"my_mongo", namespace: 'my_database.SomeCollection'})
  .transform({filename: 'changenamespace_and_log.js', namespace: 'my_database.SomeCollection'})
  .save({name:"my_mongo", namespace: 'my_database.OtherCollection'});\

changenamespace_and_log.js:

module.exports = function(doc) {
  console.log("transformer: " + JSON.stringify(doc));
  doc.ns = "my_database.OtherCollection";
  return doc
}

load up the my_database.SomeCollection using the following:

use my_database
for (var i=0; i<1000; i++){db.SomeCollection.insert({i: i});}

then run transporter:

./transporter run --config transporter_config.yaml transporter_app.js

you'll see the documents get transferred properly to the new collection but transporter never completes. if you add some print statements in the Listen and Stop functions of the mongodb adaptor, you'll see the sink node stop completely but the source node will just hang because the bulk flag is true.

@jipperinbham jipperinbham changed the title use a listening flag for mongo node reuse defer bulk channel init for mongo node reuse Dec 13, 2016
Copy link
Contributor

@trinchan trinchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for this @jipperinbham !

@jipperinbham jipperinbham merged commit 1905640 into experimental Dec 13, 2016
@jipperinbham jipperinbham deleted the 152-fix branch December 13, 2016 16:10
jipperinbham added a commit that referenced this pull request Jan 27, 2017
* defer bulk channel creation until Listen func

* update CHANGELOG
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants