diff --git a/examples/connect/index.js b/examples/connect/index.js new file mode 100644 index 0000000..0d119cd --- /dev/null +++ b/examples/connect/index.js @@ -0,0 +1,78 @@ +/* eslint-disable no-console */ +"use strict"; + +/** + * It's an example to test the connect/disconnect logic of adapters. + */ + +const { ServiceBroker } = require("moleculer"); +const { inspect } = require("util"); +const DbService = require("../../index").Service; + +// Create broker +const broker = new ServiceBroker({ + logger: { + type: "Console", + options: { + level: { + POSTS: "debug", + "*": "info" + }, + objectPrinter: obj => + inspect(obj, { + breakLength: 50, + colors: true, + depth: 3 + }) + } + } +}); + +// Create a service +broker.createService({ + name: "posts", + mixins: [ + DbService({ + adapter: { + type: "MongoDB" + } + }) + ], + + settings: { + fields: { + id: { type: "string", primaryKey: true, columnName: "_id" }, + title: { + type: "string", + max: 255, + trim: true, + required: true + }, + content: { type: "string" }, + votes: { type: "number", integer: true, min: 0, default: 0, columnType: "int" }, + status: { type: "boolean", default: true } + } + }, + + async started() { + this.logger.info("Creating multiple adapters..."); + const adapters = await Promise.all([ + this.getAdapter(), + this.getAdapter(), + this.getAdapter() + ]); + this.logger.info( + "Adapters created.", + adapters.map(a => a.constructor.name) + ); + } +}); + +// Start server +broker + .start() + .then(() => broker.repl()) + .catch(err => { + broker.logger.error(err); + process.exit(1); + }); diff --git a/src/adapters/mongodb.js b/src/adapters/mongodb.js index 94ae2c8..211fc84 100644 --- a/src/adapters/mongodb.js +++ b/src/adapters/mongodb.js @@ -136,8 +136,7 @@ class MongoDBAdapter extends BaseAdapter { * @returns {ObjectId} */ stringToObjectID(id) { - if (typeof id == "string" && ObjectId.isValid(id)) - return ObjectId.createFromHexString(id); + if (typeof id == "string" && ObjectId.isValid(id)) return ObjectId.createFromHexString(id); return id; } @@ -335,7 +334,7 @@ class MongoDBAdapter extends BaseAdapter { * */ async removeById(id) { - const res = await this.collection.findOneAndDelete({ _id: this.stringToObjectID(id) }); + await this.collection.findOneAndDelete({ _id: this.stringToObjectID(id) }); return id; } diff --git a/src/methods.js b/src/methods.js index bbc2630..f6f60d1 100644 --- a/src/methods.js +++ b/src/methods.js @@ -29,19 +29,30 @@ module.exports = function (mixinOpts) { const item = this.adapters.get(hash); if (item) { item.touched = Date.now(); + if (!item.connectPromise) return item.adapter; + // This adapter is connecting now. Wait for it. + this.logger.debug(`Adapter '${hash}' is connecting, right now. Wait for it...`); + await item.connectPromise; return item.adapter; } this.logger.debug(`Adapter not found for '${hash}'. Create a new adapter instance...`); const adapter = Adapters.resolve(adapterOpts); adapter.init(this); - this.adapters.set(hash, { hash, adapter, touched: Date.now() }); + // We store the promise of connect, because we don't want to call the connect method twice. + const connectPromise = this._connect(adapter, hash, adapterOpts); + const storedAdapterItem = { hash, adapter, connectPromise, touched: Date.now() }; + // Store the adapter + this.adapters.set(hash, storedAdapterItem); await this.maintenanceAdapters(); - await this._connect(adapter, hash, adapterOpts); + // Wait for real connect + await connectPromise; this.logger.info( `Adapter '${hash}' connected. Number of adapters:`, this.adapters.size ); + // Clean the connect promise + delete storedAdapterItem.connectPromise; return adapter; }, @@ -126,7 +137,7 @@ module.exports = function (mixinOpts) { // Close the connection if (_.isFunction(adapter.disconnect)) await adapter.disconnect(); this.logger.info( - `Adapter '${item ? item.hash : "unknown"}' diconnected. Number of adapters:`, + `Adapter '${hash || "unknown"}' disconnected. Number of adapters:`, this.adapters.size );