Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.2.1 #52

Merged
merged 10 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 46 additions & 28 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ class Operator {
}
if (this.lastBufferSeqNo > BackLog.sequenceNumber + 1) {
let i = 1;
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
fluxAPI.askQuery(BackLog.sequenceNumber + 1, this.masterWSConn);
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000);
await fluxAPI.askQuery(BackLog.sequenceNumber + 1, this.masterWSConn);
i += 1;
}
}
Expand All @@ -199,11 +199,11 @@ class Operator {
this.lastBufferSeqNo = sequenceNumber;
if (this.buffer[BackLog.sequenceNumber + 1] === undefined && missingQueryBuffer.get(BackLog.sequenceNumber + 1) !== true) {
let i = 1;
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 10) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
fluxAPI.askQuery(BackLog.sequenceNumber + i, this.masterWSConn);
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 10000);
await fluxAPI.askQuery(BackLog.sequenceNumber + i, this.masterWSConn);
i += 1;
}
}
Expand Down Expand Up @@ -321,16 +321,18 @@ class Operator {
* [sendWriteQuery]
* @param {string} query [description]
*/
static async sendWriteQuery(query, connId, fullQuery) {
static async sendWriteQuery(query, connId = false, fullQuery = null, masterSocket = null) {
if (this.masterNode !== null) {
// log.info(`master node: ${this.masterNode}`);
if (!this.IamMaster) {
const { masterWSConn } = this;
return new Promise((resolve) => {
masterWSConn.emit('writeQuery', query, connId, (response) => {
resolve(response.result);
if (masterWSConn) {
return new Promise((resolve) => {
masterWSConn.emit('writeQuery', query, connId, (response) => {
resolve(response.result);
});
});
});
}
}
/*
if (BackLog.writeLock) {
Expand All @@ -345,9 +347,18 @@ class Operator {
log.info(`out of queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery);
const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery || query);
// log.info(`sending query to slaves: ${JSON.stringify(result)}`);
if (result) this.serverSocket.emit('query', query, result[1], result[2], false);
if (result) {
log.info(`emitting ${result[1]}`);
if (this.serverSocket) {
this.serverSocket.emit('query', query, result[1], result[2], false);
} else {
masterSocket.emit('query', query, result[1], result[2], false);
}
} else {
log.info(JSON.stringify(result));
}
return result;
}
return null;
Expand All @@ -358,23 +369,30 @@ class Operator {
* @param {int} seq [description]
*/
static async rollBack(seqNo) {
if (this.status !== 'ROLLBACK') {
if (this.IamMaster) {
this.status = 'ROLLBACK';
log.info(`rolling back to ${seqNo}`);
this.serverSocket.emit('rollBack', seqNo);
await BackLog.rebuildDatabase(seqNo);
this.status = 'OK';
} else {
const { masterWSConn } = this;
return new Promise((resolve) => {
masterWSConn.emit('rollBack', seqNo, (response) => {
resolve(response.result);
});
});
try {
if (this.status !== 'ROLLBACK') {
if (this.IamMaster) {
this.status = 'ROLLBACK';
log.info(`rolling back to ${seqNo}`);
this.serverSocket.emit('rollBack', seqNo);
await BackLog.rebuildDatabase(seqNo);
this.status = 'OK';
} else {
const { masterWSConn } = this;
if (masterWSConn) {
return new Promise((resolve) => {
masterWSConn.emit('rollBack', seqNo, (response) => {
resolve(response.result);
});
});
}
}
}
return null;
} catch (e) {
log.error(JSON.stringify(e));
return null;
}
return null;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion ClusterOperator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module.exports = {
containerApiPort: process.env.API_PORT.trim() || 33950,
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME || 'explorer',
version: '1.2.0',
version: '1.2.1',
whiteListedIps: process.env.WHITELIST || '127.0.0.1',
debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
Expand Down
57 changes: 46 additions & 11 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const utill = require('../lib/utill');
const config = require('./config');
const Security = require('./Security');
const fluxAPI = require('../lib/fluxAPI');
const SqlImporter = require('../modules/mysql-import');

/**
* [auth]
Expand Down Expand Up @@ -162,7 +163,7 @@ function startUI() {
res.send(Operator.OpNodes);
res.end();
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.get('/status', (req, res) => {
Expand All @@ -181,7 +182,7 @@ function startUI() {
res.send(await BackLog.getDateRange());
res.end();
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});

Expand All @@ -192,7 +193,7 @@ function startUI() {
res.send(await BackLog.getLogsByTime(starttime, length));
res.end();
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});

Expand Down Expand Up @@ -270,20 +271,20 @@ function startUI() {
res.send(await BackLog.listSqlFiles());
res.end();
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.get('/getbackupfile/:filename', async (req, res) => {
if (authUser(req)) {
const { filename } = req.params;
res.download(path.join(__dirname, `../dumps/${filename}.sql`), `${filename}.sql`, (err) => {
res.download(path.join(__dirname, `../dumps/${sanitize(filename)}.sql`), `${sanitize(filename)}.sql`, (err) => {
if (err) {
// Handle errors, such as file not found
res.status(404).send('File not found');
}
});
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.post('/upload-sql', async (req, res) => {
Expand All @@ -301,27 +302,60 @@ function startUI() {
res.send('File uploaded successfully.');
});
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.post('/generatebackup', async (req, res) => {
if (authUser(req)) {
res.send(await BackLog.dumpBackup());
res.end();
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.post('/deletebackup', async (req, res) => {
if (authUser(req)) {
const { body } = req;
if (body) {
const { filename } = body;
res.send(await BackLog.deleteBackupFile(filename));
res.send(await BackLog.deleteBackupFile(sanitize(filename)));
res.end();
}
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});
app.post('/executebackup', async (req, res) => {
if (authUser(req)) {
const { body } = req;
if (body) {
const { filename } = body;
// create a snapshot
await BackLog.dumpBackup();
// removing old db + resetting secuence numbers:
await Operator.rollBack(0);
await timer.setTimeout(2000);
const importer = new SqlImporter({
callback: Operator.sendWriteQuery,
serverSocket: Operator.serverSocket,
});
importer.onProgress((progress) => {
const percent = Math.floor((progress.bytes_processed / progress.total_bytes) * 10000) / 100;
log.info(`${percent}% Completed`, 'cyan');
});
importer.setEncoding('utf8');
await importer.import(`./dumps/${sanitize(filename)}.sql`).then(async () => {
const filesImported = importer.getImported();
log.info(`${filesImported.length} SQL file(s) imported.`);
res.send('OK');
}).catch((err) => {
res.status(500).send(JSON.stringify(err));

Check warning

Code scanning / CodeQL

Information exposure through a stack trace

This information exposed to the user depends on [stack trace information](1). This information exposed to the user depends on [stack trace information](2).
log.error(err);
});
res.end();
}
} else {
res.status(403).send('Bad Request');
}
});
app.post('/verifylogin/', (req, res) => {
Expand Down Expand Up @@ -376,7 +410,7 @@ function startUI() {
Operator.emitUserSession('remove', req.cookies.loginphrase, '');
res.send('OK');
} else {
res.status(403).render();
res.status(403).send('Bad Request');
}
});

Expand Down Expand Up @@ -494,6 +528,7 @@ async function initServer() {
if (!BLRecord) {
BLRecord = await BackLog.getLog(index);
log.info(`from DB : ${JSON.stringify(BLRecord)}`, 'red');
socket.emit('query', BLRecord[0].query, BLRecord[0].seq, BLRecord[0].timestamp, connId);
}
}
// if (record) {
Expand Down
24 changes: 18 additions & 6 deletions modules/mysql-import/mysql-import.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable */
/**
* mysql-import - v5.0.26
* Import .sql into a MySQL database with Node.
Expand Down Expand Up @@ -29,6 +30,8 @@ class Importer{
*/
constructor(settings){
this._connection_settings = settings;
this.callback = (settings.callback) ? settings.callback : null;
this.serverSocket = (settings.serverSocket) ? settings.serverSocket : null;
this._conn = null;
this._encoding = 'utf8';
this._imported = [];
Expand Down Expand Up @@ -128,7 +131,7 @@ class Importer{
import(...input){
return new Promise(async (resolve, reject)=>{
try{
await this._connect();
// await this._connect();
var files = await this._getSQLFilePaths(...input);
this._total_files = files.length;
this._current_file_no = 0;
Expand All @@ -140,15 +143,15 @@ class Importer{
next();
return;
}
this._importSingleFile(file).then(()=>{
this._importSingleFile(file, this.callback, this.serverSocket).then(()=>{
next();
}).catch(err=>{
error = err;
next();
});
});
if(error) throw error;
await this.disconnect();
// await this.disconnect();
resolve();
}catch(err){
reject(err);
Expand Down Expand Up @@ -194,10 +197,11 @@ class Importer{
* - size: The size of the file in bytes
* @returns {Promise}
*/
_importSingleFile(fileObj){
_importSingleFile(fileObj, callback, serverSocket){
return new Promise((resolve, reject)=>{

var parser = new queryParser({
callback,
serverSocket,
db_connection: this._conn,
encoding: this._encoding,
onProgress: (progress) => {
Expand Down Expand Up @@ -434,7 +438,10 @@ class queryParser extends stream.Writable{

// Are we currently seeking new delimiter
this.seekingDelimiter = false;

this.executeCallback = options.callback;

this.serverSocket = options.serverSocket;
}

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -465,7 +472,12 @@ class queryParser extends stream.Writable{
}

// Execute a query, return a Promise
executeQuery(query){
async executeQuery(query){
console.log (query);
console.log (this.executeCallback);
if (this.executeCallback){
return await this.executeCallback(query, false, false, this.serverSocket);
}
return new Promise((resolve, reject)=>{
this.db_connection.query(query, err=>{
if (err){
Expand Down
6 changes: 3 additions & 3 deletions modules/mysqldump/dist/cjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ function getSchemaDump(connection, options, tables) {
s.schema += ';';
// pad the sql with a header
s.schema = [
'# ------------------------------------------------------------',
`# SCHEMA DUMP FOR TABLE: ${s.name}`,
'# ------------------------------------------------------------',
//'# ------------------------------------------------------------',
//`# SCHEMA DUMP FOR TABLE: ${s.name}`,
//'# ------------------------------------------------------------',
'',
s.schema,
'',
Expand Down
6 changes: 3 additions & 3 deletions modules/mysqldump/dist/es.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ function getSchemaDump(connection, options, tables) {
s.schema += ';';
// pad the sql with a header
s.schema = [
'# ------------------------------------------------------------',
`# SCHEMA DUMP FOR TABLE: ${s.name}`,
'# ------------------------------------------------------------',
//'# ------------------------------------------------------------',
//`# SCHEMA DUMP FOR TABLE: ${s.name}`,
//'# ------------------------------------------------------------',
'',
s.schema,
'',
Expand Down
9 changes: 5 additions & 4 deletions tests/mysqlimpot.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const Importer = require('../modules/mysql-import');

function callback(query) {
console.log(query);
}

const importer = new Importer({
host: 'localhost',
user: 'root',
password: 'secret',
database: 'test_db2',
callback,
});

importer.onProgress((progress) => {
Expand Down
Loading