Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(datasource-sql): refactor the proxy and sequelize management #728

Merged
merged 6 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions packages/datasource-sql/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,31 @@ import { Sequelize } from 'sequelize';

import ConnectionOptions from './connection-options';
import handleErrors from './handle-errors';
import ReverseProxy from './reverse-proxy';
import ReverseProxy from './services/reverse-proxy';
import SequelizeFactory from './services/sequelize-factory';
import TcpServer from './services/tcp-server';

/** Attempt to connect to the database */
export default async function connect(options: ConnectionOptions): Promise<Sequelize> {
let proxy: ReverseProxy;
let tcpServer: TcpServer;
let sequelize: Sequelize;

try {
if (options.proxyOptions) {
// start the proxy
tcpServer = new TcpServer();
await tcpServer.start();
proxy = new ReverseProxy(options.proxyOptions, options.host, options.port);
await proxy.start();

tcpServer.onConnect(proxy.connectListener.bind(proxy));
tcpServer.onClose(proxy.closeListener.bind(proxy));
// swap database host and port with the ones from the proxy.
options.changeHostAndPort(proxy.host, proxy.port);
options.changeHostAndPort(tcpServer.host, tcpServer.port);
}

const sequelizeCtorOptions = await options.buildSequelizeCtorOptions();
sequelize =
sequelizeCtorOptions.length === 1
? new Sequelize(sequelizeCtorOptions[0])
: new Sequelize(sequelizeCtorOptions[0], sequelizeCtorOptions[1]);

// we want to stop the proxy when the sequelize connection is closed
sequelize.close = async function close() {
try {
await Sequelize.prototype.close.call(this);
} finally {
await proxy?.stop();
}
};
const sequelizeFactory = new SequelizeFactory();
if (tcpServer) sequelizeFactory.onClose(tcpServer.closeListener.bind(tcpServer));

sequelize = sequelizeFactory.build(await options.buildSequelizeCtorOptions());
await sequelize.authenticate(); // Test connection

return sequelize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,35 @@ import net from 'net';
import { SocksClient } from 'socks';
import { SocksClientEstablishedEvent } from 'socks/typings/common/constants';

import { ProxyOptions } from '../types';
import Service from './service';
import { ProxyOptions } from '../../types';

export default class ReverseProxy {
export default class ReverseProxy extends Service {
private readonly errors: Error[] = [];
private readonly server: net.Server;
private readonly connectedClients: Set<net.Socket> = new Set();
private readonly options: ProxyOptions;
private readonly targetHost: string;
private readonly targetPort: number;

get host(): string {
const { address } = this.server.address() as net.AddressInfo;

return address;
}

get port(): number {
const { port } = this.server.address() as net.AddressInfo;

return port;
get error(): Error | null {
return this.errors.length > 0 ? this.errors[0] : null;
}

constructor(proxyOptions: ProxyOptions, targetHost: string, targetPort: number) {
super();
this.options = proxyOptions;
this.targetHost = targetHost;
this.targetPort = targetPort;
if (!this.targetHost) throw new Error('Host is required');
if (!this.targetPort) throw new Error('Port is required');

this.server = net.createServer(this.onConnection.bind(this));
}

start(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.server.on('error', reject);
this.server.listen(0, '127.0.0.1', resolve);
});
override async closeListener(): Promise<void> {
await super.closeListener();
this.connectedClients.forEach(client => client.destroy());
}

stop(): Promise<void> {
return new Promise((resolve, reject) => {
this.server.close(e => {
if (e) reject(e);
else resolve();
});

this.connectedClients.forEach(client => client.destroy());
});
}

get error(): Error | null {
return this.errors.length > 0 ? this.errors[0] : null;
}

private async onConnection(socket: net.Socket): Promise<void> {
override async connectListener(socket: net.Socket): Promise<void> {
let socks5Proxy: SocksClientEstablishedEvent;
this.connectedClients.add(socket);

Expand All @@ -82,8 +56,9 @@ export default class ReverseProxy {
if (!socket.closed) socket.destroy();
});
socks5Proxy.socket.on('error', socket.destroy);

socks5Proxy.socket.pipe(socket).pipe(socks5Proxy.socket);

await super.connectListener(socks5Proxy.socket);
} catch (err) {
socket.destroy(err as Error);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Sequelize } from 'sequelize';
import { Options as SequelizeOptions } from 'sequelize/types/sequelize';

import Service from './service';

export default class SequelizeFactory extends Service {
build(sequelizeCtorOptions: [SequelizeOptions] | [string, SequelizeOptions]) {
const sequelize =
sequelizeCtorOptions.length === 1
? new Sequelize(sequelizeCtorOptions[0])
: new Sequelize(sequelizeCtorOptions[0], sequelizeCtorOptions[1]);

this.overrideCloseMethod(sequelize);

return sequelize;
}

private overrideCloseMethod(sequelize: Sequelize): void {
const closeListener = this.closeListener.bind(this);

// override close method to ensure to execute the closeListener
sequelize.close = async function close() {
try {
await Sequelize.prototype.close.call(this);
} finally {
await closeListener();
}
};
}
}
29 changes: 29 additions & 0 deletions packages/datasource-sql/src/connection/services/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import net from 'net';

export type ConnectionCallback = (socket: net.Socket) => Promise<void>;
export type CloseCallback = () => Promise<void>;

export default abstract class Service {
private connectionCallback: ConnectionCallback;
private closeCallback: CloseCallback;

/** attach a callback when there is a new connection on the service. */
onConnect(callback: ConnectionCallback): void {
this.connectionCallback = callback;
}

/** callback to execute when there is a new connection. */
async connectListener(socket: net.Socket): Promise<void> {
await this.connectionCallback?.(socket);
}

/** attach a callback when a service is closing. */
onClose(callback: CloseCallback): void {
this.closeCallback = callback;
}

/** callback to execute when the service is closing. */
async closeListener(): Promise<void> {
await this.closeCallback?.();
}
}
47 changes: 47 additions & 0 deletions packages/datasource-sql/src/connection/services/tcp-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import net from 'net';

import Service from './service';

/** TcpServer is used as proxy to redirect all the database requests */
export default class TcpServer extends Service {
private readonly server: net.Server;

get host(): string {
return (this.server.address() as net.AddressInfo).address;
}

get port(): number {
return (this.server.address() as net.AddressInfo).port;
}

constructor() {
super();
this.server = net.createServer(this.connectListener.bind(this));
}

start(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.server.on('error', reject);
// By using port 0, the operating system
// will assign an available port for the server to listen on.
this.server.listen(0, '127.0.0.1', resolve);
});
}

async stop(): Promise<void> {
try {
await super.closeListener();
} finally {
await new Promise<void>((resolve, reject) => {
this.server.close(e => {
if (e) reject(e);
else resolve();
});
});
}
}

override async closeListener(): Promise<void> {
await this.stop();
}
}
111 changes: 1 addition & 110 deletions packages/datasource-sql/test/connection/reverse-proxy.unit.test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
import * as net from 'net';
import { SocksClient } from 'socks';
import { SocksClientEstablishedEvent } from 'socks/typings/common/constants';

import ReverseProxy from '../../src/connection/reverse-proxy';
import ReverseProxy from '../../src/connection/services/reverse-proxy';

beforeEach(() => jest.clearAllMocks());

const proxyOptions = { host: 'localhost', port: 1088 };

describe('ReverseProxy', () => {
let proxy: ReverseProxy;

afterEach(async () => {
try {
await proxy?.stop();
} catch (e) {
/* empty */
}
});

describe('when port is not provided', () => {
it('should throw an error', async () => {
const fn = () => new ReverseProxy(proxyOptions, 'localhost', null);
Expand All @@ -34,99 +20,4 @@ describe('ReverseProxy', () => {
expect(fn).toThrow();
});
});

describe('getError', () => {
describe('when a connection fails because of the proxy', () => {
it('should retrieve the error', async () => {
jest.spyOn(SocksClient, 'createConnection').mockImplementation(() => {
throw new Error('a proxy error');
});

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);
await proxy.start();

const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

expect(proxy.error).toEqual(new Error('a proxy error'));
});
});
});

describe('when client open a connection', () => {
it('should branch the socks5 to the socket', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest
.spyOn(SocksClient, 'createConnection')
.mockResolvedValue(socks5ProxyMock as unknown as SocksClientEstablishedEvent);

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();
const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

expect(socks5ProxyMock.socket.on).toHaveBeenCalledWith('error', expect.any(Function));
expect(socks5ProxyMock.socket.pipe).toHaveBeenCalledWith(expect.any(net.Socket));
});
});

describe('stop', () => {
describe('when the server is not started', () => {
it('should throw an error', async () => {
proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await expect(proxy.stop()).rejects.toThrow();
});
});

describe('when the server is started', () => {
it('should stop the proxy without error', async () => {
proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();

await expect(proxy.stop()).resolves.not.toThrow();
});
});

describe('when a connection is opened', () => {
it('should stop the proxy without throwing error', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest
.spyOn(SocksClient, 'createConnection')
.mockResolvedValue(socks5ProxyMock as unknown as SocksClientEstablishedEvent);

proxy = new ReverseProxy(proxyOptions, 'localhost', 10);

await proxy.start();

const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

client.connect({ port: proxy.port, host: proxy.host });
});

await expect(proxy.stop()).resolves.not.toThrow();
});
});
});
});