Skip to content

Commit

Permalink
refactor(datasource-sql): refactor the proxy and sequelize management (
Browse files Browse the repository at this point in the history
  • Loading branch information
Scra3 authored Jun 15, 2023
1 parent 618d7ac commit 7f48cd1
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 166 deletions.
31 changes: 12 additions & 19 deletions packages/datasource-sql/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,31 @@ import { Sequelize } from 'sequelize';

import ConnectionOptions from './connection-options';
import handleErrors from './handle-errors';
import ReverseProxy from './reverse-proxy';
import ReverseProxy from './services/reverse-proxy';
import SequelizeFactory from './services/sequelize-factory';
import TcpServer from './services/tcp-server';

/** Attempt to connect to the database */
export default async function connect(options: ConnectionOptions): Promise<Sequelize> {
let proxy: ReverseProxy;
let tcpServer: TcpServer;
let sequelize: Sequelize;

try {
if (options.proxyOptions) {
// start the proxy
tcpServer = new TcpServer();
await tcpServer.start();
proxy = new ReverseProxy(options.proxyOptions, options.host, options.port);
await proxy.start();

tcpServer.onConnect(proxy.connectListener.bind(proxy));
tcpServer.onClose(proxy.closeListener.bind(proxy));
// swap database host and port with the ones from the proxy.
options.changeHostAndPort(proxy.host, proxy.port);
options.changeHostAndPort(tcpServer.host, tcpServer.port);
}

const sequelizeCtorOptions = await options.buildSequelizeCtorOptions();
sequelize =
sequelizeCtorOptions.length === 1
? new Sequelize(sequelizeCtorOptions[0])
: new Sequelize(sequelizeCtorOptions[0], sequelizeCtorOptions[1]);

// we want to stop the proxy when the sequelize connection is closed
sequelize.close = async function close() {
try {
await Sequelize.prototype.close.call(this);
} finally {
await proxy?.stop();
}
};
const sequelizeFactory = new SequelizeFactory();
if (tcpServer) sequelizeFactory.onClose(tcpServer.closeListener.bind(tcpServer));

sequelize = sequelizeFactory.build(await options.buildSequelizeCtorOptions());
await sequelize.authenticate(); // Test connection

return sequelize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,35 @@ import net from 'net';
import { SocksClient } from 'socks';
import { SocksClientEstablishedEvent } from 'socks/typings/common/constants';

import { ProxyOptions } from '../types';
import Service from './service';
import { ProxyOptions } from '../../types';

export default class ReverseProxy {
export default class ReverseProxy extends Service {
private readonly errors: Error[] = [];
private readonly server: net.Server;
private readonly connectedClients: Set<net.Socket> = new Set();
private readonly options: ProxyOptions;
private readonly targetHost: string;
private readonly targetPort: number;

get host(): string {
const { address } = this.server.address() as net.AddressInfo;

return address;
}

get port(): number {
const { port } = this.server.address() as net.AddressInfo;

return port;
get error(): Error | null {
return this.errors.length > 0 ? this.errors[0] : null;
}

constructor(proxyOptions: ProxyOptions, targetHost: string, targetPort: number) {
super();
this.options = proxyOptions;
this.targetHost = targetHost;
this.targetPort = targetPort;
if (!this.targetHost) throw new Error('Host is required');
if (!this.targetPort) throw new Error('Port is required');

this.server = net.createServer(this.onConnection.bind(this));
}

start(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.server.on('error', reject);
this.server.listen(0, '127.0.0.1', resolve);
});
override async closeListener(): Promise<void> {
await super.closeListener();
this.connectedClients.forEach(client => client.destroy());
}

stop(): Promise<void> {
return new Promise((resolve, reject) => {
this.server.close(e => {
if (e) reject(e);
else resolve();
});

this.connectedClients.forEach(client => client.destroy());
});
}

get error(): Error | null {
return this.errors.length > 0 ? this.errors[0] : null;
}

private async onConnection(socket: net.Socket): Promise<void> {
override async connectListener(socket: net.Socket): Promise<void> {
let socks5Proxy: SocksClientEstablishedEvent;
this.connectedClients.add(socket);

Expand All @@ -82,8 +56,9 @@ export default class ReverseProxy {
if (!socket.closed) socket.destroy();
});
socks5Proxy.socket.on('error', socket.destroy);

socks5Proxy.socket.pipe(socket).pipe(socks5Proxy.socket);

await super.connectListener(socks5Proxy.socket);
} catch (err) {
socket.destroy(err as Error);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Sequelize } from 'sequelize';
import { Options as SequelizeOptions } from 'sequelize/types/sequelize';

import Service from './service';

export default class SequelizeFactory extends Service {
build(sequelizeCtorOptions: [SequelizeOptions] | [string, SequelizeOptions]) {
const sequelize =
sequelizeCtorOptions.length === 1
? new Sequelize(sequelizeCtorOptions[0])
: new Sequelize(sequelizeCtorOptions[0], sequelizeCtorOptions[1]);

this.overrideCloseMethod(sequelize);

return sequelize;
}

private overrideCloseMethod(sequelize: Sequelize): void {
const closeListener = this.closeListener.bind(this);

// override close method to ensure to execute the closeListener
sequelize.close = async function close() {
try {
await Sequelize.prototype.close.call(this);
} finally {
await closeListener();
}
};
}
}
29 changes: 29 additions & 0 deletions packages/datasource-sql/src/connection/services/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import net from 'net';

export type ConnectionCallback = (socket: net.Socket) => Promise<void>;
export type CloseCallback = () => Promise<void>;

export default abstract class Service {
private connectionCallback: ConnectionCallback;
private closeCallback: CloseCallback;

/** attach a callback when there is a new connection on the service. */
onConnect(callback: ConnectionCallback): void {
this.connectionCallback = callback;
}

/** callback to execute when there is a new connection. */
async connectListener(socket: net.Socket): Promise<void> {
await this.connectionCallback?.(socket);
}

/** attach a callback when a service is closing. */
onClose(callback: CloseCallback): void {
this.closeCallback = callback;
}

/** callback to execute when the service is closing. */
async closeListener(): Promise<void> {
await this.closeCallback?.();
}
}
47 changes: 47 additions & 0 deletions packages/datasource-sql/src/connection/services/tcp-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import net from 'net';

import Service from './service';

/** TcpServer is used as proxy to redirect all the database requests */
export default class TcpServer extends Service {
private readonly server: net.Server;

get host(): string {
return (this.server.address() as net.AddressInfo).address;
}

get port(): number {
return (this.server.address() as net.AddressInfo).port;
}

constructor() {
super();
this.server = net.createServer(this.connectListener.bind(this));
}

start(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.server.on('error', reject);
// By using port 0, the operating system
// will assign an available port for the server to listen on.
this.server.listen(0, '127.0.0.1', resolve);
});
}

async stop(): Promise<void> {
try {
await super.closeListener();
} finally {
await new Promise<void>((resolve, reject) => {
this.server.close(e => {
if (e) reject(e);
else resolve();
});
});
}
}

override async closeListener(): Promise<void> {
await this.stop();
}
}
111 changes: 1 addition & 110 deletions packages/datasource-sql/test/connection/reverse-proxy.unit.test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
import * as net from 'net';
import { SocksClient } from 'socks';
import { SocksClientEstablishedEvent } from 'socks/typings/common/constants';

import ReverseProxy from '../../src/connection/reverse-proxy';
import ReverseProxy from '../../src/connection/services/reverse-proxy';

beforeEach(() => jest.clearAllMocks());

const proxyOptions = { host: 'localhost', port: 1088 };

describe('ReverseProxy', () => {
let proxy: ReverseProxy;

afterEach(async () => {
try {
await proxy?.stop();
} catch (e) {
/* empty */
}
});

describe('when port is not provided', () => {
it('should throw an error', async () => {
const fn = () => new ReverseProxy(proxyOptions, 'localhost', null);
Expand All @@ -34,99 +20,4 @@ describe('ReverseProxy', () => {
expect(fn).toThrow();
});
});

describe('getError', () => {
describe('when a connection fails because of the proxy', () => {
it('should retrieve the error', async () => {
jest.spyOn(SocksClient, 'createConnection').mockImplementation(() => {
throw new Error('a proxy error');
});

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);
await proxy.start();

const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

expect(proxy.error).toEqual(new Error('a proxy error'));
});
});
});

describe('when client open a connection', () => {
it('should branch the socks5 to the socket', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest
.spyOn(SocksClient, 'createConnection')
.mockResolvedValue(socks5ProxyMock as unknown as SocksClientEstablishedEvent);

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();
const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

expect(socks5ProxyMock.socket.on).toHaveBeenCalledWith('error', expect.any(Function));
expect(socks5ProxyMock.socket.pipe).toHaveBeenCalledWith(expect.any(net.Socket));
});
});

describe('stop', () => {
describe('when the server is not started', () => {
it('should throw an error', async () => {
proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await expect(proxy.stop()).rejects.toThrow();
});
});

describe('when the server is started', () => {
it('should stop the proxy without error', async () => {
proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();

await expect(proxy.stop()).resolves.not.toThrow();
});
});

describe('when a connection is opened', () => {
it('should stop the proxy without throwing error', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest
.spyOn(SocksClient, 'createConnection')
.mockResolvedValue(socks5ProxyMock as unknown as SocksClientEstablishedEvent);

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();

const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

await expect(proxy.stop()).resolves.not.toThrow();
});
});
});
});

0 comments on commit 7f48cd1

Please sign in to comment.