Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #63

Merged
merged 9 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ debug.txt
info.txt
errors.txt
warnings.txt
query.txt
dev/
36 changes: 1 addition & 35 deletions ClusterOperator/Backlog.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const log = require('../lib/log');
const Security = require('./Security');
const ConnectionPool = require('../lib/ConnectionPool');
const utill = require('../lib/utill');
const mysqldump = require('../modules/mysqldump');
const mysqldump = require('../lib/mysqldump');

class BackLog {
static buffer = [];
Expand Down Expand Up @@ -140,40 +140,6 @@ class BackLog {
}
return [result, seqForThis, timestamp];
}
/*
if (seq === 0 || this.sequenceNumber + 1 === seq) {

while (this.writeLock) await timer.setTimeout(10);
this.writeLock = true;
if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; }
const seqForThis = this.sequenceNumber;
let result2 = null;
if (connId === false) {
result2 = await this.UserDBClient.query(query);
} else {
result2 = await ConnectionPool.getConnectionById(connId).query(query);
}
await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`,
[seqForThis, query, timestamp],
);
this.writeLock = false;
return [result2, seqForThis, timestamp];
} else if (this.bufferStartSequenceNumber === this.sequenceNumber + 1) {
await this.moveBufferToBacklog();
return await this.pushQuery(query, seq, timestamp, buffer, connId);
} else {
if (this.sequenceNumber + 1 < seq) {
log.error(`Wrong query order, ${this.sequenceNumber + 1} < ${seq}. pushing to buffer.`);
if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq;
this.bufferSequenceNumber = seq;
await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`,
[seq, query, timestamp],
);
}
return [];
} */
}
} catch (e) {
this.writeLock = false;
Expand Down
111 changes: 0 additions & 111 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ class Operator {
*/
static handleAuthorize(param) {
try {
// log.debug(`DB auth from ${param.remoteIP}`);
// log.debug(JSON.stringify(param));
if (this.status !== 'OK' || this.operator.ghosted) {
// log.info(`status: ${this.status},${this.operator.status}, rejecting connection`);
return false;
Expand Down Expand Up @@ -332,19 +330,6 @@ class Operator {
});
}
}
/*
if (BackLog.writeLock) {
const myTicket = this.operator.getTicket();
log.info(`put into queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
this.operator.masterQueue.push(myTicket);
while (BackLog.writeLock || this.operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(5);
}
BackLog.writeLock = true;
this.operator.masterQueue.shift();
log.info(`out of queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery || query);
// log.info(`sending query to slaves: ${JSON.stringify(result)}`);
if (result) {
Expand Down Expand Up @@ -441,81 +426,34 @@ class Operator {
case mySQLConsts.COM_QUERY:
const query = extra.toString();
const analyzedQueries = sqlAnalyzer(query, 'mysql');
// if (analyzedQueries.length > 2) log.info(JSON.stringify(analyzedQueries));
for (const queryItem of analyzedQueries) {
// log.query(queryItem, 'white', id);
if (queryItem[1] === 'w' && this.isNotBacklogQuery(queryItem[0], this.BACKLOG_DB)) {
// forward it to the master node
// log.info(`${id},${queryItem[0]}`);
// log.info(`incoming write ${id}`);
if (this.operator.sessionQueries[id] !== undefined) {
await this.sendWriteQuery(this.operator.sessionQueries[id], -1);
this.operator.sessionQueries[id] = undefined;
}
await this.sendWriteQuery(queryItem[0], id, query);
// log.info(`finish write ${id}`);
// this.localDB.enableSocketWrite = false;
// let result = await this.localDB.query(queryItem[0], true);
// this.sendOK({ message: 'OK' });
} else if (queryItem[1] === 's') {
// eslint-disable-next-line prefer-destructuring
this.operator.sessionQueries[id] = queryItem[0];
// log.info(`incoming set session ${id}`);
await ConnectionPool.getConnectionById(id).query(queryItem[0], true);
// log.info(`finish set session ${id}`);
} else {
// forward it to the local DB
// eslint-disable-next-line prefer-const
// log.info(`incoming read ${id}`);
await ConnectionPool.getConnectionById(id).query(queryItem[0], true);
// log.info(`finish read ${id}`);
// log.info(`result: ${JSON.stringify(result)}`);
}
// log.info(result);
// Then send it back to the user in table format
/*
if(result[1]){
let fieldNames = [];
for (let definition of result[1]) fieldNames.push(definition.name);
this.sendDefinitions(result[1]);
let finalResult = [];
for (let row of result[0]){
let newRow =[];
for(let filed of fieldNames){
newRow.push(row[filed]);
}
finalResult.push(newRow);
}

this.sendRows(finalResult);
break;
} else if(result[0]){
this.sendOK({ message: 'OK' });
break;
}else if(result.warningStatus==0){
this.sendOK({ message: 'OK' });
break;
}else{
//this.sendError({ message: result[3] });
//break;
} */
}

break;
case mySQLConsts.COM_PING:
// console.log('got ping');
this.sendOK({ message: 'OK' });
break;
case null:
case undefined:
case mySQLConsts.COM_QUIT:
// log.info(`Disconnecting from ${id}`);
this.end();
break;
case mySQLConsts.COM_INIT_DB:
// this.localDB.setSocket(this.socket, id);
await ConnectionPool.getConnectionById(id).query(`use ${extra}`);
// this.sendOK({ message: 'OK' });
break;
default:
log.info(`Unknown Command: ${command}`);
Expand All @@ -536,7 +474,6 @@ class Operator {
try {
const response = await fluxAPI.getKeys(this.masterWSConn);
const keys = JSON.parse(Security.decryptComm(Buffer.from(response.keys, 'hex')));
// console.log(keys);
// eslint-disable-next-line guard-for-in
for (const key in keys) {
BackLog.pushKey(key, keys[key]);
Expand Down Expand Up @@ -630,12 +567,10 @@ class Operator {
if (appIPList[i].ip.includes(':')) appIPList[i].ip = appIPList[i].ip.split(':')[0];
this.AppNodes.push(appIPList[i].ip);
}
// log.info(`cluster ip's: ${JSON.stringify(this.OpNodes)}`);
let activeNodes = 1;
for (let i = 0; i < ipList.length; i += 1) {
// extraxt ip from upnp nodes
log.info(`asking my ip from: ${ipList[i].ip}:${config.containerApiPort}`);
// const myTempIp = await fluxAPI.getMyIp(ipList[i].ip, config.containerApiPort);
const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort);
log.info(`response was: ${JSON.stringify(status)}`);
if (status === null || status === 'null') {
Expand Down Expand Up @@ -826,52 +761,6 @@ class Operator {
return null;
}

/**
* [getMyIp]
*/
/*
static async getMyIp(retries=1) {
try{
if(this.myIP !== null){
return this.myIP
}else{
//let ipList = [];
for(let i=0; i < this.OpNodes.length && i < 5; i++){
log.info(`asking myip from ${this.OpNodes[i].ip}`);
let tempIp = await fluxAPI.getMyIp(this.OpNodes[i].ip, config.containerApiPort);
log.info(`response from ${this.OpNodes[i].ip} was ${tempIp}`);
let j=1;

if(tempIp!==null){
this.myIP = tempIp;
log.info(`My ip is ${JSON.stringify(tempIp)}`);
return tempIp;
}
}
log.info(`other nodes are not responding to api port ${config.containerApiPort}, retriying again ${retries}...`);
await this.updateAppInfo();
await timer.setTimeout(15000 * retries);
return this.getMyIp(retries+1);
log.info(`all response list: ${JSON.stringify(ipList)}`);
//find the highest occurrence in the array
if(ipList.length>=2){
const myIP = ipList.sort((a,b) =>ipList.filter(v => v===a).length - ipList.filter(v => v===b).length).pop();
this.myIP = myIP;
log.info(`My ip is ${JSON.stringify(myIP)}`);
return myIP;
}else{
log.info(`other nodes are not responding to api port ${config.containerApiPort}, retriying again ${retries}...`);
await this.updateAppInfo();
await timer.setTimeout(15000 * retries);
return this.getMyIp(retries+1);
}
}
}catch(err){
log.error(err);
}
}
*/

/**
* [ConnectLocalDB]
*/
Expand Down
6 changes: 3 additions & 3 deletions ClusterOperator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ module.exports = {
debugUIPort: 8008,
containerDBPort: String(process.env.DB_PORT || 33949).trim(),
containerApiPort: String(process.env.API_PORT || 33950).trim(),
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME || 'explorer',
version: '1.2.4',
DBAppName: process.env.DB_APPNAME || '',
AppName: process.env.CLIENT_APPNAME || '',
version: '1.3.0',
whiteListedIps: process.env.WHITELIST || '127.0.0.1',
debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
Expand Down
28 changes: 4 additions & 24 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const log = require('../lib/log');
const utill = require('../lib/utill');
const config = require('./config');
const Security = require('./Security');
const SqlImporter = require('../modules/mysql-import');
const SqlImporter = require('../lib/mysqlimport');

/**
* [auth]
Expand Down Expand Up @@ -193,6 +193,8 @@ function startUI() {
}
});
app.get('/getstatus', async (req, res) => {
res.status(403).send('Bad Request');
/*
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
Expand All @@ -207,6 +209,7 @@ function startUI() {
await timer.setTimeout(2000);
// console.log(count);
}
*/
});

app.get('/status', (req, res) => {
Expand Down Expand Up @@ -534,19 +537,6 @@ async function initServer() {
});
socket.on('writeQuery', async (query, connId, callback) => {
log.info(`writeQuery from ${utill.convertIP(socket.handshake.address)}:${connId}`);
/*
if (BackLog.writeLock) {
const myTicket = Operator.getTicket();
log.info(`put into queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
Operator.masterQueue.push(myTicket);
while (BackLog.writeLock || Operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(5);
}
BackLog.writeLock = true;
Operator.masterQueue.shift();
log.info(`out of queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query);
// log.info(`forwarding query to slaves: ${JSON.stringify(result)}`);
socket.broadcast.emit('query', query, result[1], result[2], false);
Expand Down Expand Up @@ -579,16 +569,6 @@ async function initServer() {
}
}
}
// if (record) {

// log.info(`record type: ${Array.isArray(record)}`, 'magenta');
// if (Array.isArray(record)) {
// socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false);
// log.warn(`query ${index} not in query cache`, 'red');
// } else {

// }
// }
callback({ status: Operator.status });
});
socket.on('shareKeys', async (pubKey, callback) => {
Expand Down
Loading
Loading