From 5fdb83b3d277bdb1adcd7b35d03e7e2bff803e28 Mon Sep 17 00:00:00 2001 From: Ali Mahdavi Date: Mon, 23 Oct 2023 18:48:19 +0300 Subject: [PATCH] chore: fix master connection issue --- ClusterOperator/Operator.js | 18 +++++++++++------- ClusterOperator/server.js | 1 + modules/mysql-import/mysql-import.js | 13 ++++++++----- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/ClusterOperator/Operator.js b/ClusterOperator/Operator.js index 0fb50fa..918d7bc 100644 --- a/ClusterOperator/Operator.js +++ b/ClusterOperator/Operator.js @@ -181,10 +181,10 @@ class Operator { } if (this.lastBufferSeqNo > BackLog.sequenceNumber + 1) { let i = 1; - while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) { + while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) { if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) { log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta'); - missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000); + missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000); await fluxAPI.askQuery(BackLog.sequenceNumber + 1, this.masterWSConn); i += 1; } @@ -199,10 +199,10 @@ class Operator { this.lastBufferSeqNo = sequenceNumber; if (this.buffer[BackLog.sequenceNumber + 1] === undefined && missingQueryBuffer.get(BackLog.sequenceNumber + 1) !== true) { let i = 1; - while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) { + while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) { if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) { log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta'); - missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000); + missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000); await fluxAPI.askQuery(BackLog.sequenceNumber + i, this.masterWSConn); i += 1; } @@ -321,7 +321,7 @@ class Operator { * [sendWriteQuery] * @param {string} query [description] */ - static async sendWriteQuery(query, connId = false, fullQuery = null) { + static async sendWriteQuery(query, connId = false, fullQuery = null, masterSocket = null) { if (this.masterNode !== null) { // log.info(`master node: ${this.masterNode}`); if (!this.IamMaster) { @@ -349,9 +349,13 @@ class Operator { */ const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery || query); // log.info(`sending query to slaves: ${JSON.stringify(result)}`); - if (result && this.serverSocket) { + if (result) { log.info(`emitting ${result[1]}`); - this.serverSocket.emit('query', query, result[1], result[2], false); + if (this.serverSocket) { + this.serverSocket.emit('query', query, result[1], result[2], false); + } else { + masterSocket.emit('query', query, result[1], result[2], false); + } } else { log.info(JSON.stringify(result)); } diff --git a/ClusterOperator/server.js b/ClusterOperator/server.js index 9ae928f..f4ec77d 100644 --- a/ClusterOperator/server.js +++ b/ClusterOperator/server.js @@ -337,6 +337,7 @@ function startUI() { await timer.setTimeout(2000); const importer = new SqlImporter({ callback: Operator.sendWriteQuery, + serverSocket: Operator.serverSocket, }); importer.onProgress((progress) => { const percent = Math.floor((progress.bytes_processed / progress.total_bytes) * 10000) / 100; diff --git a/modules/mysql-import/mysql-import.js b/modules/mysql-import/mysql-import.js index ae5abb9..6ea662b 100644 --- a/modules/mysql-import/mysql-import.js +++ b/modules/mysql-import/mysql-import.js @@ -30,7 +30,8 @@ class Importer{ */ constructor(settings){ this._connection_settings = settings; - this. callback = (settings.callback) ? settings.callback : null; + this.callback = (settings.callback) ? settings.callback : null; + this.serverSocket = (settings.serverSocket) ? settings.serverSocket : null; this._conn = null; this._encoding = 'utf8'; this._imported = []; @@ -142,7 +143,7 @@ class Importer{ next(); return; } - this._importSingleFile(file, this.callback).then(()=>{ + this._importSingleFile(file, this.callback, this.serverSocket).then(()=>{ next(); }).catch(err=>{ error = err; @@ -196,10 +197,11 @@ class Importer{ * - size: The size of the file in bytes * @returns {Promise} */ - _importSingleFile(fileObj, callback){ + _importSingleFile(fileObj, callback, serverSocket){ return new Promise((resolve, reject)=>{ var parser = new queryParser({ - callback: callback, + callback, + serverSocket, db_connection: this._conn, encoding: this._encoding, onProgress: (progress) => { @@ -439,6 +441,7 @@ class queryParser extends stream.Writable{ this.executeCallback = options.callback; + this.serverSocket = options.serverSocket; } //////////////////////////////////////////////////////////////////////////// @@ -473,7 +476,7 @@ class queryParser extends stream.Writable{ console.log (query); console.log (this.executeCallback); if (this.executeCallback){ - return await this.executeCallback(query); + return await this.executeCallback(query, false, false, this.serverSocket); } return new Promise((resolve, reject)=>{ this.db_connection.query(query, err=>{