Skip to content

Commit

Permalink
refactor: cleanup rerouting and retry implementation (#1660)
Browse files Browse the repository at this point in the history
Cleanup rerouting and retry implementation.
  • Loading branch information
arthurschreiber authored Sep 11, 2024
1 parent e8c6228 commit aa0f183
Showing 1 changed file with 70 additions and 82 deletions.
152 changes: 70 additions & 82 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ interface State {
socketError?(this: Connection, err: Error): void;
connectTimeout?(this: Connection): void;
message?(this: Connection, message: Message): void;
retry?(this: Connection): void;
reconnect?(this: Connection): void;
};
}

Expand Down Expand Up @@ -873,15 +871,6 @@ export interface ConnectionOptions {
workstationId?: string | undefined;
}

/**
* @private
*/
const CLEANUP_TYPE = {
NORMAL: 0,
REDIRECT: 1,
RETRY: 2
};

interface RoutingData {
server: string;
port: number;
Expand Down Expand Up @@ -1032,20 +1021,31 @@ class Connection extends EventEmitter {
* @private
*/
declare requestTimer: undefined | NodeJS.Timeout;

/**
* @private
*/
declare retryTimer: undefined | NodeJS.Timeout;
declare _cancelAfterRequestSent: () => void;

/**
* @private
*/
_cancelAfterRequestSent: () => void;
declare databaseCollation: Collation | undefined;

/**
* @private
*/
declare databaseCollation: Collation | undefined;
declare _onSocketClose: (hadError: boolean) => void;

/**
* @private
*/
declare _onSocketError: (err: Error) => void;

/**
* @private
*/
declare _onSocketEnd: () => void;

/**
* Note: be aware of the different options field:
Expand Down Expand Up @@ -1771,6 +1771,18 @@ class Connection extends EventEmitter {
this.messageIo.sendMessage(TYPE.ATTENTION);
this.createCancelTimer();
};

this._onSocketClose = () => {
this.socketClose();
};

this._onSocketEnd = () => {
this.socketEnd();
};

this._onSocketError = (error) => {
this.socketError(error);
};
}

connect(connectListener?: (err?: Error) => void) {
Expand Down Expand Up @@ -1972,19 +1984,15 @@ class Connection extends EventEmitter {
/**
* @private
*/
cleanupConnection(cleanupType: typeof CLEANUP_TYPE[keyof typeof CLEANUP_TYPE]) {
cleanupConnection() {
if (!this.closed) {
this.clearConnectTimer();
this.clearRequestTimer();
this.clearRetryTimer();
this.closeConnection();
if (cleanupType === CLEANUP_TYPE.REDIRECT) {
this.emit('rerouting');
} else if (cleanupType !== CLEANUP_TYPE.RETRY) {
process.nextTick(() => {
this.emit('end');
});
}

process.nextTick(() => {
this.emit('end');
});

const request = this.request;
if (request) {
Expand Down Expand Up @@ -2017,9 +2025,9 @@ class Connection extends EventEmitter {
}

socketHandlingForSendPreLogin(socket: net.Socket) {
socket.on('error', (error) => { this.socketError(error); });
socket.on('close', () => { this.socketClose(); });
socket.on('end', () => { this.socketEnd(); });
socket.on('error', this._onSocketError);
socket.on('close', this._onSocketClose);
socket.on('end', this._onSocketEnd);
socket.setKeepAlive(true, KEEP_ALIVE_INITIAL_DELAY);

this.messageIo = new MessageIO(socket, this.config.options.packetSize, this.debug);
Expand Down Expand Up @@ -2172,16 +2180,6 @@ class Connection extends EventEmitter {
}
}

/**
* @private
*/
createRetryTimer() {
this.clearRetryTimer();
this.retryTimer = setTimeout(() => {
this.retryTimeout();
}, this.config.options.connectionRetryInterval);
}

/**
* @private
*/
Expand Down Expand Up @@ -2221,15 +2219,6 @@ class Connection extends EventEmitter {
request.error = new RequestError(message, 'ETIMEOUT');
}

/**
* @private
*/
retryTimeout() {
this.retryTimer = undefined;
this.emit('retry');
this.transitionTo(this.STATE.CONNECTING);
}

/**
* @private
*/
Expand Down Expand Up @@ -2260,16 +2249,6 @@ class Connection extends EventEmitter {
}
}

/**
* @private
*/
clearRetryTimer() {
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = undefined;
}
}

/**
* @private
*/
Expand Down Expand Up @@ -2357,19 +2336,7 @@ class Connection extends EventEmitter {
*/
socketClose() {
this.debug.log('connection to ' + this.config.server + ':' + this.config.options.port + ' closed');
if (this.state === this.STATE.REROUTING) {
this.debug.log('Rerouting to ' + this.routingData!.server + ':' + this.routingData!.port);

this.dispatchEvent('reconnect');
} else if (this.state === this.STATE.TRANSIENT_FAILURE_RETRY) {
const server = this.routingData ? this.routingData.server : this.config.server;
const port = this.routingData ? this.routingData.port : this.config.options.port;
this.debug.log('Retry after transient failure connecting to ' + server + ':' + port);

this.dispatchEvent('retry');
} else {
this.transitionTo(this.STATE.FINAL);
}
this.transitionTo(this.STATE.FINAL);
}

/**
Expand Down Expand Up @@ -3356,39 +3323,61 @@ Connection.prototype.STATE = {
REROUTING: {
name: 'ReRouting',
enter: function() {
this.cleanupConnection(CLEANUP_TYPE.REDIRECT);
// Clear the existing connection timeout
this.clearConnectTimer();

this.socket!.removeListener('error', this._onSocketError);
this.socket!.removeListener('close', this._onSocketClose);
this.socket!.removeListener('end', this._onSocketEnd);
this.socket!.destroy();

this.debug.log('connection to ' + this.config.server + ':' + this.config.options.port + ' closed');

this.emit('rerouting');
this.debug.log('Rerouting to ' + this.routingData!.server + ':' + this.routingData!.port);

// Attempt connecting to the rerouting target
this.transitionTo(this.STATE.CONNECTING);
},
events: {
message: function() {
},
socketError: function() {
this.transitionTo(this.STATE.FINAL);
},
connectTimeout: function() {
this.transitionTo(this.STATE.FINAL);
},
reconnect: function() {
this.transitionTo(this.STATE.CONNECTING);
}
}
},
TRANSIENT_FAILURE_RETRY: {
name: 'TRANSIENT_FAILURE_RETRY',
enter: function() {
this.curTransientRetryCount++;
this.cleanupConnection(CLEANUP_TYPE.RETRY);

this.clearConnectTimer();
this.loginError = undefined;

this.socket!.removeListener('error', this._onSocketError);
this.socket!.removeListener('close', this._onSocketClose);
this.socket!.removeListener('end', this._onSocketEnd);
this.socket!.destroy();

this.debug.log('connection to ' + this.config.server + ':' + this.config.options.port + ' closed');

const server = this.routingData ? this.routingData.server : this.config.server;
const port = this.routingData ? this.routingData.port : this.config.options.port;
this.debug.log('Retry after transient failure connecting to ' + server + ':' + port);

setTimeout(() => {
this.emit('retry');
this.transitionTo(this.STATE.CONNECTING);
}, this.config.options.connectionRetryInterval);
},
events: {
message: function() {
},
socketError: function() {
this.transitionTo(this.STATE.FINAL);
},
connectTimeout: function() {
this.transitionTo(this.STATE.FINAL);
},
retry: function() {
this.createRetryTimer();
}
}
},
Expand Down Expand Up @@ -3803,7 +3792,6 @@ Connection.prototype.STATE = {
sqlRequest.callback(new RequestError('Canceled.', 'ECANCEL'));
}
}

})().catch((err) => {
process.nextTick(() => {
throw err;
Expand All @@ -3824,7 +3812,7 @@ Connection.prototype.STATE = {
FINAL: {
name: 'Final',
enter: function() {
this.cleanupConnection(CLEANUP_TYPE.NORMAL);
this.cleanupConnection();
},
events: {
connectTimeout: function() {
Expand Down

0 comments on commit aa0f183

Please sign in to comment.