Skip to content

Commit

Permalink
chore: fix master connection issue
Browse files Browse the repository at this point in the history
  • Loading branch information
alihm committed Oct 23, 2023
1 parent 16e4381 commit 5fdb83b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
18 changes: 11 additions & 7 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ class Operator {
}
if (this.lastBufferSeqNo > BackLog.sequenceNumber + 1) {
let i = 1;
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000);
await fluxAPI.askQuery(BackLog.sequenceNumber + 1, this.masterWSConn);
i += 1;
}
Expand All @@ -199,10 +199,10 @@ class Operator {
this.lastBufferSeqNo = sequenceNumber;
if (this.buffer[BackLog.sequenceNumber + 1] === undefined && missingQueryBuffer.get(BackLog.sequenceNumber + 1) !== true) {
let i = 1;
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000);
await fluxAPI.askQuery(BackLog.sequenceNumber + i, this.masterWSConn);
i += 1;
}
Expand Down Expand Up @@ -321,7 +321,7 @@ class Operator {
* [sendWriteQuery]
* @param {string} query [description]
*/
static async sendWriteQuery(query, connId = false, fullQuery = null) {
static async sendWriteQuery(query, connId = false, fullQuery = null, masterSocket = null) {
if (this.masterNode !== null) {
// log.info(`master node: ${this.masterNode}`);
if (!this.IamMaster) {
Expand Down Expand Up @@ -349,9 +349,13 @@ class Operator {
*/
const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery || query);
// log.info(`sending query to slaves: ${JSON.stringify(result)}`);
if (result && this.serverSocket) {
if (result) {
log.info(`emitting ${result[1]}`);
this.serverSocket.emit('query', query, result[1], result[2], false);
if (this.serverSocket) {
this.serverSocket.emit('query', query, result[1], result[2], false);
} else {
masterSocket.emit('query', query, result[1], result[2], false);
}
} else {
log.info(JSON.stringify(result));
}
Expand Down
1 change: 1 addition & 0 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ function startUI() {
await timer.setTimeout(2000);
const importer = new SqlImporter({
callback: Operator.sendWriteQuery,
serverSocket: Operator.serverSocket,
});
importer.onProgress((progress) => {
const percent = Math.floor((progress.bytes_processed / progress.total_bytes) * 10000) / 100;
Expand Down
13 changes: 8 additions & 5 deletions modules/mysql-import/mysql-import.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Importer{
*/
constructor(settings){
this._connection_settings = settings;
this. callback = (settings.callback) ? settings.callback : null;
this.callback = (settings.callback) ? settings.callback : null;
this.serverSocket = (settings.serverSocket) ? settings.serverSocket : null;
this._conn = null;
this._encoding = 'utf8';
this._imported = [];
Expand Down Expand Up @@ -142,7 +143,7 @@ class Importer{
next();
return;
}
this._importSingleFile(file, this.callback).then(()=>{
this._importSingleFile(file, this.callback, this.serverSocket).then(()=>{
next();
}).catch(err=>{
error = err;
Expand Down Expand Up @@ -196,10 +197,11 @@ class Importer{
* - size: The size of the file in bytes
* @returns {Promise}
*/
_importSingleFile(fileObj, callback){
_importSingleFile(fileObj, callback, serverSocket){
return new Promise((resolve, reject)=>{
var parser = new queryParser({
callback: callback,
callback,
serverSocket,
db_connection: this._conn,
encoding: this._encoding,
onProgress: (progress) => {
Expand Down Expand Up @@ -439,6 +441,7 @@ class queryParser extends stream.Writable{

this.executeCallback = options.callback;

this.serverSocket = options.serverSocket;
}

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -473,7 +476,7 @@ class queryParser extends stream.Writable{
console.log (query);
console.log (this.executeCallback);
if (this.executeCallback){
return await this.executeCallback(query);
return await this.executeCallback(query, false, false, this.serverSocket);
}
return new Promise((resolve, reject)=>{
this.db_connection.query(query, err=>{
Expand Down

0 comments on commit 5fdb83b

Please sign in to comment.