Skip to content

Commit

Permalink
datastore: commit transactions after queueing modifcations
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Dec 8, 2014
1 parent dc4bf20 commit 203fa56
Show file tree
Hide file tree
Showing 9 changed files with 697 additions and 236 deletions.
10 changes: 7 additions & 3 deletions docs/components/docs/docs.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,28 @@ angular

function getMixIns($sce, $q, $http, version, baseUrl) {
return function(data) {
var classMethodNames = data.map(function(method) {
return method.name;
});
var methodWithMixIns = data.filter(function(method) {
return method.mixes.length > 0;
})[0];
if (!methodWithMixIns) {
return data;
}
return $q
.all(getMixInMethods(methodWithMixIns))
.all(getMixInMethods(classMethodNames, methodWithMixIns))
.then(combineMixInMethods(data));
};
function getMixInMethods(method) {
function getMixInMethods(classMethodNames, method) {
return method.mixes.map(function (module) {
module = module.string.trim().replace('module:', '');
return $http.get(baseUrl + '/' + module + '.json')
.then(filterDocJson($sce, version))
.then(function(mixInData) {
return mixInData.filter(function(method) {
return !method.constructor;
return !method.constructor &&
classMethodNames.indexOf(method.name) === -1;
});
});
});
Expand Down
8 changes: 6 additions & 2 deletions lib/datastore/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ function Dataset(options) {
scopes: SCOPES
});

this.projectId = options.projectId;
this.namespace = options.namespace;
this.projectId = options.projectId;
}

nodeutil.inherits(Dataset, DatastoreRequest);
Expand Down Expand Up @@ -142,6 +142,7 @@ Dataset.prototype.key = function(options) {
namespace: this.namespace,
path: util.arrayize(options)
};

return new entity.Key(options);
};

Expand All @@ -163,6 +164,7 @@ Dataset.prototype.createQuery = function(namespace, kinds) {
kinds = util.arrayize(namespace);
namespace = this.namespace;
}

return new Query(namespace, util.arrayize(kinds));
};

Expand Down Expand Up @@ -191,12 +193,14 @@ Dataset.prototype.createQuery = function(namespace, kinds) {
*/
Dataset.prototype.runInTransaction = function(fn, callback) {
var newTransaction = this.createTransaction_();

newTransaction.begin(function(err) {
if (err) {
callback(err);
return;
}
fn(newTransaction, newTransaction.finalize.bind(newTransaction, callback));

fn(newTransaction, newTransaction.commit.bind(newTransaction, callback));
});
};

Expand Down
3 changes: 2 additions & 1 deletion lib/datastore/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ function Key(options) {
},
path: {
enumerable: true,
value: options.path
value: options.path,
writable: true
}
});
}
Expand Down
136 changes: 86 additions & 50 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,41 +105,52 @@ function DatastoreRequest() {}
* ], function(err, entities) {});
*/
DatastoreRequest.prototype.get = function(keys, callback) {
var that = this;

var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];

callback = callback || util.noop;

var req = {
key: keys.map(entity.keyToKeyProto)
};

this.makeReq_('lookup', req, function(err, resp) {
if (err) {
callback(err);
return;
}

var found = entity.formatArray(resp.found);

if (isMultipleRequest && resp.deferred && resp.deferred.length) {
// There may be more results. Call `.get` again, and append the results.
this.get(
that.get(
resp.deferred.map(entity.keyFromKeyProto), function(err, entities) {
if (err) {
callback(err);
return;
}

if (resp) {
found = (found || []).concat(entities);
}

callback(null, found);
});

return;
}

callback(null, isMultipleRequest ? found : found[0]);
}.bind(this));
});
};

/**
* Insert or update the specified object(s) in the current transaction. If a key
* is incomplete, its associated object is inserted and its generated identifier
* is returned to the callback.
* Insert or update the specified object(s). If a key is incomplete, its
* associated object is inserted and the original Key object is updated to
* contain the generated ID.
*
* This method will determine the correct Datastore method to execute (`upsert`,
* `insert`, `update`, and `insertAutoId`) by using the key(s) provided. For
Expand All @@ -163,24 +174,26 @@ DatastoreRequest.prototype.get = function(keys, callback) {
*
* @example
* //-
* // Where you see `transaction`, assume this is the context that's relevant to
* // your use, whether that be a Dataset or a Transaction object.
* // Save a single entity.
* //
* // Notice that we are providing an incomplete key. After saving, the original
* // Key object used to save will be updated to contain the path with its
* // generated ID.
* //-
* var key = dataset.key('Company');
*
* // Save a single entity.
* transaction.save({
* key: dataset.key('Company'),
* dataset.save({
* key: key,
* data: {
* rating: '10'
* }
* }, function(err, key) {
* // Because we gave an incomplete key as an argument, `key` will be
* // populated with the complete, generated key.
* });
* }, function(err) {});
*
* //-
* // To specify an `excludeFromIndexes` value for a Datastore entity, pass in
* // an array for the key's data. The above example would then look like:
* transaction.save({
* //-
* dataset.save({
* key: dataset.key('Company'),
* data: [
* {
Expand All @@ -189,109 +202,128 @@ DatastoreRequest.prototype.get = function(keys, callback) {
* excludeFromIndexes: false
* }
* ]
* }, function(err, key) {});
* }, function(err) {});
*
* //-
* // Save multiple entities at once.
* transaction.save([
* //-
* var companyKey = dataset.key(['Company', 123]);
* var productKey = dataset.key(['Product', 'Computer']);
*
* dataset.save([
* {
* key: dataset.key(['Company', 123]),
* key: companyKey,
* data: {
* HQ: 'Dallas, TX'
* }
* },
* {
* key: dataset.key(['Product', 'Computer']),
* key: productKey,
* data: {
* vendor: 'Dell'
* }
* }
* ], function(err, keys) {});
* ], function(err) {});
*/
DatastoreRequest.prototype.save = function(entities, callback) {
var isMultipleRequest = Array.isArray(entities);
entities = isMultipleRequest ? entities : [entities];

var insertIndexes = [];
var keys = entities.map(function(entityObject) {
return entityObject.key;
});

var req = {
mutation: entities.reduce(function(acc, entityObject, index) {
var ent = {};

if (Array.isArray(entityObject.data)) {
ent.property = entityObject.data.map(function(data) {
data.value = entity.valueToProperty(data.value);

if (util.is(data.excludeFromIndexes, 'boolean')) {
data.value.indexed = !data.excludeFromIndexes;
delete data.excludeFromIndexes;
}

return data;
});
} else {
ent = entity.entityToEntityProto(entityObject.data);
}

ent.key = entity.keyToKeyProto(entityObject.key);

if (entity.isKeyComplete(entityObject.key)) {
acc.upsert.push(ent);
} else {
insertIndexes.push(index);
acc.insert_auto_id.push(ent);
}

return acc;
}.bind(this), { upsert: [], insert_auto_id: [] })
}, {
upsert: [],
insert_auto_id: []
})
};
this.makeReq_('commit', req, function(err, resp) {

if (this.id) {
this.requests_.push(req);
this.requestCallbacks_.push(onCommit);
return;
} else {
this.makeReq_('commit', req, onCommit);
}

function onCommit(err, resp) {
if (err || !resp) {
callback(err);
return;
}
if (this.id) {
this.isFinalized = true;
}

var autoInserted = (resp.mutation_result.insert_auto_id_key || []);
autoInserted.forEach(function(key, index) {
keys[insertIndexes[index]] = entity.keyFromKeyProto(key);
var path = entity.keyFromKeyProto(key).path;
entities[insertIndexes[index]].key.path = path;
});
callback(null, isMultipleRequest ? keys : keys[0]);
}.bind(this));

callback(null);
}
};

/**
* Delete all entities identified with the specified key(s) in the current
* transaction.
* Delete all entities identified with the specified key(s).
*
* @param {Key|Key[]} key - Datastore key object(s).
* @param {function} callback - The callback function.
*
* @example
* //-
* // Where you see `transaction`, assume this is the context that's relevant to
* // your use, whether that be a Dataset or a Transaction object.
* //-
*
* // Delete a single entity.
* transaction.delete(dataset.key(['Company', 123]), function(err) {});
* dataset.delete(dataset.key(['Company', 123]), function(err) {});
*
* // Delete multiple entities at once.
* transaction.delete([
* dataset.delete([
* dataset.key(['Company', 123]),
* dataset.key(['Product', 'Computer'])
* ], function(err) {});
*/
DatastoreRequest.prototype.delete = function(keys, callback) {
var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];

callback = callback || util.noop;

var req = {
mutation: {
delete: keys.map(entity.keyToKeyProto)
}
};
this.makeReq_('commit', req, function(err) {
if (!err && this.id) {
this.isFinalized = true;
}
callback.apply(null, util.toArray(arguments));
}.bind(this));

if (this.id) {
this.requests_.push(req);
return;
}

this.makeReq_('commit', req, callback);
};

/**
Expand Down Expand Up @@ -434,22 +466,24 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) {
if (entity.isKeyComplete(incompleteKey)) {
throw new Error('An incomplete key should be provided.');
}

var incompleteKeys = [];
for (var i = 0; i < n; i++) {
incompleteKeys.push(entity.keyToKeyProto(incompleteKey));
}

var req = {
key: incompleteKeys
};

this.makeReq_('allocateIds', req, function(err, resp) {
if (err) {
callback(err);
return;
}
var keys = [];
(resp.key || []).forEach(function(k) {
keys.push(entity.keyFromKeyProto(k));
});

var keys = (resp.key || []).map(entity.keyFromKeyProto);

callback(null, keys);
});
};
Expand Down Expand Up @@ -478,6 +512,7 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
callback = body;
body = {};
}

callback = callback || util.noop;

// Set properties to indicate if we're in a transaction or not.
Expand Down Expand Up @@ -514,6 +549,7 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
callback(err);
return;
}

var remoteStream = https.request(authorizedReqOpts, function(resp) {
var buffer = new Buffer('');
resp.on('data', function(chunk) {
Expand Down
Loading

0 comments on commit 203fa56

Please sign in to comment.