Skip to content

Commit

Permalink
Merge pull request #63 from RunOnFlux/development
Browse files Browse the repository at this point in the history
v1.3.0
  • Loading branch information
alihm authored Apr 29, 2024
2 parents 7c4c893 + ad7b077 commit 5f820d4
Show file tree
Hide file tree
Showing 42 changed files with 967 additions and 5,506 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ debug.txt
info.txt
errors.txt
warnings.txt
query.txt
dev/
36 changes: 1 addition & 35 deletions ClusterOperator/Backlog.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const log = require('../lib/log');
const Security = require('./Security');
const ConnectionPool = require('../lib/ConnectionPool');
const utill = require('../lib/utill');
const mysqldump = require('../modules/mysqldump');
const mysqldump = require('../lib/mysqldump');

class BackLog {
static buffer = [];
Expand Down Expand Up @@ -140,40 +140,6 @@ class BackLog {
}
return [result, seqForThis, timestamp];
}
/*
if (seq === 0 || this.sequenceNumber + 1 === seq) {
while (this.writeLock) await timer.setTimeout(10);
this.writeLock = true;
if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; }
const seqForThis = this.sequenceNumber;
let result2 = null;
if (connId === false) {
result2 = await this.UserDBClient.query(query);
} else {
result2 = await ConnectionPool.getConnectionById(connId).query(query);
}
await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`,
[seqForThis, query, timestamp],
);
this.writeLock = false;
return [result2, seqForThis, timestamp];
} else if (this.bufferStartSequenceNumber === this.sequenceNumber + 1) {
await this.moveBufferToBacklog();
return await this.pushQuery(query, seq, timestamp, buffer, connId);
} else {
if (this.sequenceNumber + 1 < seq) {
log.error(`Wrong query order, ${this.sequenceNumber + 1} < ${seq}. pushing to buffer.`);
if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq;
this.bufferSequenceNumber = seq;
await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`,
[seq, query, timestamp],
);
}
return [];
} */
}
} catch (e) {
this.writeLock = false;
Expand Down
111 changes: 0 additions & 111 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ class Operator {
*/
static handleAuthorize(param) {
try {
// log.debug(`DB auth from ${param.remoteIP}`);
// log.debug(JSON.stringify(param));
if (this.status !== 'OK' || this.operator.ghosted) {
// log.info(`status: ${this.status},${this.operator.status}, rejecting connection`);
return false;
Expand Down Expand Up @@ -332,19 +330,6 @@ class Operator {
});
}
}
/*
if (BackLog.writeLock) {
const myTicket = this.operator.getTicket();
log.info(`put into queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
this.operator.masterQueue.push(myTicket);
while (BackLog.writeLock || this.operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(5);
}
BackLog.writeLock = true;
this.operator.masterQueue.shift();
log.info(`out of queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery || query);
// log.info(`sending query to slaves: ${JSON.stringify(result)}`);
if (result) {
Expand Down Expand Up @@ -441,81 +426,34 @@ class Operator {
case mySQLConsts.COM_QUERY:
const query = extra.toString();
const analyzedQueries = sqlAnalyzer(query, 'mysql');
// if (analyzedQueries.length > 2) log.info(JSON.stringify(analyzedQueries));
for (const queryItem of analyzedQueries) {
// log.query(queryItem, 'white', id);
if (queryItem[1] === 'w' && this.isNotBacklogQuery(queryItem[0], this.BACKLOG_DB)) {
// forward it to the master node
// log.info(`${id},${queryItem[0]}`);
// log.info(`incoming write ${id}`);
if (this.operator.sessionQueries[id] !== undefined) {
await this.sendWriteQuery(this.operator.sessionQueries[id], -1);
this.operator.sessionQueries[id] = undefined;
}
await this.sendWriteQuery(queryItem[0], id, query);
// log.info(`finish write ${id}`);
// this.localDB.enableSocketWrite = false;
// let result = await this.localDB.query(queryItem[0], true);
// this.sendOK({ message: 'OK' });
} else if (queryItem[1] === 's') {
// eslint-disable-next-line prefer-destructuring
this.operator.sessionQueries[id] = queryItem[0];
// log.info(`incoming set session ${id}`);
await ConnectionPool.getConnectionById(id).query(queryItem[0], true);
// log.info(`finish set session ${id}`);
} else {
// forward it to the local DB
// eslint-disable-next-line prefer-const
// log.info(`incoming read ${id}`);
await ConnectionPool.getConnectionById(id).query(queryItem[0], true);
// log.info(`finish read ${id}`);
// log.info(`result: ${JSON.stringify(result)}`);
}
// log.info(result);
// Then send it back to the user in table format
/*
if(result[1]){
let fieldNames = [];
for (let definition of result[1]) fieldNames.push(definition.name);
this.sendDefinitions(result[1]);
let finalResult = [];
for (let row of result[0]){
let newRow =[];
for(let filed of fieldNames){
newRow.push(row[filed]);
}
finalResult.push(newRow);
}
this.sendRows(finalResult);
break;
} else if(result[0]){
this.sendOK({ message: 'OK' });
break;
}else if(result.warningStatus==0){
this.sendOK({ message: 'OK' });
break;
}else{
//this.sendError({ message: result[3] });
//break;
} */
}

break;
case mySQLConsts.COM_PING:
// console.log('got ping');
this.sendOK({ message: 'OK' });
break;
case null:
case undefined:
case mySQLConsts.COM_QUIT:
// log.info(`Disconnecting from ${id}`);
this.end();
break;
case mySQLConsts.COM_INIT_DB:
// this.localDB.setSocket(this.socket, id);
await ConnectionPool.getConnectionById(id).query(`use ${extra}`);
// this.sendOK({ message: 'OK' });
break;
default:
log.info(`Unknown Command: ${command}`);
Expand All @@ -536,7 +474,6 @@ class Operator {
try {
const response = await fluxAPI.getKeys(this.masterWSConn);
const keys = JSON.parse(Security.decryptComm(Buffer.from(response.keys, 'hex')));
// console.log(keys);
// eslint-disable-next-line guard-for-in
for (const key in keys) {
BackLog.pushKey(key, keys[key]);
Expand Down Expand Up @@ -630,12 +567,10 @@ class Operator {
if (appIPList[i].ip.includes(':')) appIPList[i].ip = appIPList[i].ip.split(':')[0];
this.AppNodes.push(appIPList[i].ip);
}
// log.info(`cluster ip's: ${JSON.stringify(this.OpNodes)}`);
let activeNodes = 1;
for (let i = 0; i < ipList.length; i += 1) {
// extraxt ip from upnp nodes
log.info(`asking my ip from: ${ipList[i].ip}:${config.containerApiPort}`);
// const myTempIp = await fluxAPI.getMyIp(ipList[i].ip, config.containerApiPort);
const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort);
log.info(`response was: ${JSON.stringify(status)}`);
if (status === null || status === 'null') {
Expand Down Expand Up @@ -826,52 +761,6 @@ class Operator {
return null;
}

/**
* [getMyIp]
*/
/*
static async getMyIp(retries=1) {
try{
if(this.myIP !== null){
return this.myIP
}else{
//let ipList = [];
for(let i=0; i < this.OpNodes.length && i < 5; i++){
log.info(`asking myip from ${this.OpNodes[i].ip}`);
let tempIp = await fluxAPI.getMyIp(this.OpNodes[i].ip, config.containerApiPort);
log.info(`response from ${this.OpNodes[i].ip} was ${tempIp}`);
let j=1;
if(tempIp!==null){
this.myIP = tempIp;
log.info(`My ip is ${JSON.stringify(tempIp)}`);
return tempIp;
}
}
log.info(`other nodes are not responding to api port ${config.containerApiPort}, retriying again ${retries}...`);
await this.updateAppInfo();
await timer.setTimeout(15000 * retries);
return this.getMyIp(retries+1);
log.info(`all response list: ${JSON.stringify(ipList)}`);
//find the highest occurrence in the array
if(ipList.length>=2){
const myIP = ipList.sort((a,b) =>ipList.filter(v => v===a).length - ipList.filter(v => v===b).length).pop();
this.myIP = myIP;
log.info(`My ip is ${JSON.stringify(myIP)}`);
return myIP;
}else{
log.info(`other nodes are not responding to api port ${config.containerApiPort}, retriying again ${retries}...`);
await this.updateAppInfo();
await timer.setTimeout(15000 * retries);
return this.getMyIp(retries+1);
}
}
}catch(err){
log.error(err);
}
}
*/

/**
* [ConnectLocalDB]
*/
Expand Down
6 changes: 3 additions & 3 deletions ClusterOperator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ module.exports = {
debugUIPort: 8008,
containerDBPort: String(process.env.DB_PORT || 33949).trim(),
containerApiPort: String(process.env.API_PORT || 33950).trim(),
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME || 'explorer',
version: '1.2.4',
DBAppName: process.env.DB_APPNAME || '',
AppName: process.env.CLIENT_APPNAME || '',
version: '1.3.0',
whiteListedIps: process.env.WHITELIST || '127.0.0.1',
debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
Expand Down
28 changes: 4 additions & 24 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const log = require('../lib/log');
const utill = require('../lib/utill');
const config = require('./config');
const Security = require('./Security');
const SqlImporter = require('../modules/mysql-import');
const SqlImporter = require('../lib/mysqlimport');

/**
* [auth]
Expand Down Expand Up @@ -193,6 +193,8 @@ function startUI() {
}
});
app.get('/getstatus', async (req, res) => {
res.status(403).send('Bad Request');
/*
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
Expand All @@ -207,6 +209,7 @@ function startUI() {
await timer.setTimeout(2000);
// console.log(count);
}
*/
});

app.get('/status', (req, res) => {
Expand Down Expand Up @@ -534,19 +537,6 @@ async function initServer() {
});
socket.on('writeQuery', async (query, connId, callback) => {
log.info(`writeQuery from ${utill.convertIP(socket.handshake.address)}:${connId}`);
/*
if (BackLog.writeLock) {
const myTicket = Operator.getTicket();
log.info(`put into queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
Operator.masterQueue.push(myTicket);
while (BackLog.writeLock || Operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(5);
}
BackLog.writeLock = true;
Operator.masterQueue.shift();
log.info(`out of queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query);
// log.info(`forwarding query to slaves: ${JSON.stringify(result)}`);
socket.broadcast.emit('query', query, result[1], result[2], false);
Expand Down Expand Up @@ -579,16 +569,6 @@ async function initServer() {
}
}
}
// if (record) {

// log.info(`record type: ${Array.isArray(record)}`, 'magenta');
// if (Array.isArray(record)) {
// socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false);
// log.warn(`query ${index} not in query cache`, 'red');
// } else {

// }
// }
callback({ status: Operator.status });
});
socket.on('shareKeys', async (pubKey, callback) => {
Expand Down
Loading

0 comments on commit 5f820d4

Please sign in to comment.