Skip to content

Commit

Permalink
Merge pull request #13 from trailbehind/fix-write-failures
Browse files Browse the repository at this point in the history
Fix write failures
  • Loading branch information
JesseCrocker authored Feb 28, 2018
2 parents f469d99 + a6d3701 commit 4de57a3
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ Usage: node add-tile-list.js [sources]... [options]
sources source names to queue
Options:
-f FILE, --file FILE Read list from file. By default list is read from STDIN
-v, --version Show version info
-d, --dryRun Dry run. Don't actually add messages to queue.
-v, --version Show version info
Read a list of tiles to queue for rendering.
```
Expand Down
10 changes: 5 additions & 5 deletions bin/add-tile-list.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ var nomnom = require("nomnom")
help: "source names to queue",
list: true
},
file: {
abbr: "f",
metavar: "FILE",
help: "Read list from file. By default list is read from STDIN"
dryRun: {
abbr: "d",
flag: true,
help: "Dry run. Don't actually add messages to queue."
},
version: {
abbr: "v",
Expand All @@ -34,7 +34,7 @@ if (!opts.sources || opts.sources.length == 0) {
process.exit(-1);
}

new QueueWriter(opts.sources, {}, function(err, queueWriter) {
new QueueWriter(opts.sources, { dryRun: opts.dryRun }, function(err, queueWriter) {
var pipe = process.stdin.pipe(require("split")());
pipe.on("data", function(line) {
if (line.length) {
Expand Down
23 changes: 19 additions & 4 deletions lib/queueWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,42 @@ function QueueWriter(sources, opts, callback) {
var tileStream = Writable();
this.tileStream = tileStream;
this.dryRun = opts.dryRun || false;
var retryDelay = opts.retryDelay || 1000;
var qw = this;

debug("Connecting to " + server + "/" + ampqTopic);

context.on("ready", function() {
debug("Connected to context");
var pub = context.socket("PUSH");
pub.connect(ampqTopic, function() {
debug("Connected to socket");
tileStream._write = function(chunk, enc, next) {
var tilestreamWrite = function(chunk, enc, next, retries) {
var success = true;
for (var sourceIndex = 0; sourceIndex < sources.length; sourceIndex++) {
var message = sources[sourceIndex] + "+" + chunk;
pub.write(message, "utf8");
debug("Queueing " + message);
if (!pub.write(message, "utf8")) {
success = false;
if (retries === undefined) {
retries = 0;
}
console.log("Error writing to queue, retrying after delay. retry count:" + retries);
setTimeout(function() {
tilestreamWrite(chunk, enc, next, retries + 1);
}, retryDelay);
break;
}
}
if (success) {
next();
}
next();
};
tileStream._write = tilestreamWrite;
//Finish needs to be setup here, if it is setup before the socket is connected,
// and end is called before the connection happens then writes will never happen
tileStream.on("finish", function() {
debug("tileStream finished");
pub.close();
setTimeout(function() {
debug("Exiting");
process.exit(0);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tile-squirrel",
"version": "1.4.0",
"version": "1.5.0",
"description": "Tile Squirrel reads from a rabbit queue and cache tiles from a tilelive source to a tilelive sink",
"main": "lib/cacher.js",
"bin": "bin/squirrel.js",
Expand Down

0 comments on commit 4de57a3

Please sign in to comment.