Skip to content

Commit

Permalink
Add smart retries to table.mutate (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolodny authored and stephenplusplus committed Dec 13, 2017
1 parent b85679b commit e4770e6
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 32 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
"lint": "repo-tools lint --cmd eslint -- src/ samples/ system-test/ test/",
"prettier": "repo-tools exec -- prettier --write src/*.js src/*/*.js samples/*.js samples/*/*.js test/*.js test/*/*.js system-test/*.js system-test/*/*.js",
"publish-module": "node ../../scripts/publish.js bigtable",
"presystem-test": "git apply patches/patch-for-v4.patch || git apply patches/patch-for-v6-and-up.patch || true",
"system-test": "repo-tools test run --cmd mocha -- system-test/*.js --no-timeouts"
},
"dependencies": {
"@google-cloud/common": "^0.15.1",
"@google-cloud/common-grpc": "^0.4.0",
"@google-cloud/common-grpc": "^0.5.1",
"arrify": "^1.0.0",
"concat-stream": "^1.5.0",
"create-error-class": "^3.0.2",
Expand Down
13 changes: 13 additions & 0 deletions patches/patch-for-v4.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--- a/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js
+++ b/node_modules/through2/node_modules/readable-stream/node_modules/process-nextick-args/index.js
@@ -5,7 +5,9 @@ if (!process.version ||
process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) {
module.exports = nextTick;
} else {
- module.exports = process.nextTick;
+ module.exports = function() {
+ return process.nextTick.apply(this, arguments);
+ };
}

function nextTick(fn, arg1, arg2, arg3) {
13 changes: 13 additions & 0 deletions patches/patch-for-v6-and-up.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--- a/node_modules/process-nextick-args/index.js
+++ b/node_modules/process-nextick-args/index.js
@@ -5,7 +5,9 @@ if (!process.version ||
process.version.indexOf('v1.') === 0 && process.version.indexOf('v1.8.') !== 0) {
module.exports = nextTick;
} else {
- module.exports = process.nextTick;
+ module.exports = function() {
+ return process.nextTick.apply(this, arguments);
+ };
}

function nextTick(fn, arg1, arg2, arg3) {
101 changes: 70 additions & 31 deletions src/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var Filter = require('./filter.js');
var Mutation = require('./mutation.js');
var Row = require('./row.js');

// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRY_STATUS_CODES = new Set([4, 10, 14]);

/**
* Create a Table object to interact with a Cloud Bigtable table.
*
Expand Down Expand Up @@ -925,47 +929,82 @@ Table.prototype.insert = function(entries, callback) {
* });
*/
Table.prototype.mutate = function(entries, callback) {
var self = this;

entries = flatten(arrify(entries));

var grpcOpts = {
service: 'Bigtable',
method: 'mutateRows',
};
var numRequestsMade = 0;

var reqOpts = {
objectMode: true,
tableName: this.id,
entries: entries.map(Mutation.parse),
};
var maxRetries = is.number(this.maxRetries) ? this.maxRetries : 3;
var pendingEntryIndices = new Set(entries.map((entry, index) => index));
var entryToIndex = new Map(entries.map((entry, index) => [entry, index]));
var mutationErrorsByEntryIndex = new Map();

var mutationErrors = [];
function onBatchResponse(err) {
if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) {
makeNextBatchRequest();
return;
}

this.requestStream(grpcOpts, reqOpts)
.on('error', callback)
.on('data', function(obj) {
obj.entries.forEach(function(entry) {
// Mutation was successful.
if (entry.status.code === 0) {
return;
}
if (mutationErrorsByEntryIndex.size !== 0) {
var mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
err = new common.util.PartialFailureError({
errors: mutationErrors,
});
}

var status = commonGrpc.Service.decorateStatus_(entry.status);
status.entry = entries[entry.index];
callback(err);
}

mutationErrors.push(status);
});
})
.on('end', function() {
var err = null;
function makeNextBatchRequest() {
var grpcOpts = {
service: 'Bigtable',
method: 'mutateRows',
retryOpts: {
currentRetryAttempt: numRequestsMade,
},
};

if (mutationErrors.length > 0) {
err = new common.util.PartialFailureError({
errors: mutationErrors,
var entryBatch = entries.filter((entry, index) => {
return pendingEntryIndices.has(index);
});

var reqOpts = {
objectMode: true,
tableName: self.id,
entries: entryBatch.map(Mutation.parse),
};

self
.requestStream(grpcOpts, reqOpts)
.on('error', onBatchResponse)
.on('request', () => numRequestsMade++)
.on('data', function(obj) {
obj.entries.forEach(function(entry) {
var originalEntry = entryBatch[entry.index];
var originalEntriesIndex = entryToIndex.get(originalEntry);

// Mutation was successful.
if (entry.status.code === 0) {
pendingEntryIndices.delete(originalEntriesIndex);
mutationErrorsByEntryIndex.delete(originalEntriesIndex);
return;
}

if (!RETRY_STATUS_CODES.has(entry.status.code)) {
pendingEntryIndices.delete(originalEntriesIndex);
}

var status = commonGrpc.Service.decorateStatus_(entry.status);
status.entry = originalEntry;

mutationErrorsByEntryIndex.set(originalEntriesIndex, status);
});
}
})
.on('end', onBatchResponse);
}

callback(err);
});
makeNextBatchRequest();
};

/**
Expand Down
117 changes: 117 additions & 0 deletions system-test/data/mutate-rows-retry-test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"tests": [
{
"name": "valid mutation",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 0, 0 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ]
]
}, {
"name": "retries the failed mutations",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 4 ] },
{ "code": 200, "entry_codes": [ 4, 0 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 0 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar", "baz" ],
[ "bar" ],
[ "bar" ]
]
}, {
"name": "has a `PartialFailureError` error when an entry fails after the retries",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 0 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar" ],
[ "bar" ],
[ "bar" ]
],
"errors": [
{ "index_in_mutations_request": 1 }
]
}, {
"name": "does not retry unretryable mutations",
"max_retries": 5,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} },
{ "method": "insert", "key": "qux", "data": {} },
{ "method": "insert", "key": "quux", "data": {} },
{ "method": "insert", "key": "corge", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] },
{ "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] },
{ "code": 200, "entry_codes": [ 1, 4, 4, 0 ] },
{ "code": 200, "entry_codes": [ 0, 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 1 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz", "qux", "quux", "corge" ],
[ "foo", "bar", "baz", "qux", "quux" ],
[ "foo", "bar", "baz", "qux" ],
[ "bar", "baz" ],
[ "baz" ],
[ "baz" ]
],
"errors": [
{ "index_in_mutations_request": 0 },
{ "index_in_mutations_request": 2 },
{ "index_in_mutations_request": 5 }
]
}, {
"name": "considers network errors towards the retry count",
"max_retries": 3,
"mutations_request": [
{ "method": "insert", "key": "foo", "data": {} },
{ "method": "insert", "key": "bar", "data": {} },
{ "method": "insert", "key": "baz", "data": {} }
],
"responses": [
{ "code": 200, "entry_codes": [ 0, 4, 0 ] },
{ "code": 429 },
{ "code": 200, "entry_codes": [ 4 ] },
{ "code": 200, "entry_codes": [ 4 ] }
],
"mutation_batches_invoked": [
[ "foo", "bar", "baz" ],
[ "bar" ],
[ "bar" ],
[ "bar" ]
],
"errors": [
{ "index_in_mutations_request": 1 }
]
}
]
}
Loading

0 comments on commit e4770e6

Please sign in to comment.