Skip to content

Commit

Permalink
fix(ds-sql): close the proxy when sequelize instance closes (#694)
Browse files Browse the repository at this point in the history
  • Loading branch information
Scra3 authored May 16, 2023
1 parent 65dc023 commit d31276c
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 27 deletions.
14 changes: 9 additions & 5 deletions packages/datasource-sql/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export default async function connect(
logger?: Logger,
): Promise<Sequelize> {
let proxy: ReverseProxy | undefined;
let sequelize: Sequelize | undefined;

try {
let options = await preprocessOptions(uriOrOptions);
Expand All @@ -77,22 +78,25 @@ export default async function connect(
...getSslConfiguration(opts.dialect, sslMode, logger),
};

const sequelize = uri
sequelize = uri
? new Sequelize(uri, { ...opts, schema, logging })
: new Sequelize({ ...opts, schema, logging });

// we want to stop the proxy when the sequelize connection is closed
sequelize.close = async function close() {
await Sequelize.prototype.close.call(this);
await proxy?.stop();
try {
await Sequelize.prototype.close.call(this);
} finally {
await proxy?.stop();
}
};

await sequelize.authenticate(); // Test connection

return sequelize;
} catch (e) {
await proxy?.stop();
await sequelize?.close();
// if proxy encountered an error, we want to throw it instead of the sequelize error
handleSequelizeError(proxy?.getError() || (e as Error));
handleSequelizeError(proxy?.error || (e as Error));
}
}
44 changes: 38 additions & 6 deletions packages/datasource-sql/src/connection/reverse-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import net from 'net';
import { SocksClient } from 'socks';
import { SocksClientEstablishedEvent } from 'socks/typings/common/constants';

import { ConnectionOptionsObj } from '../types';

export default class ReverseProxy {
private readonly errors: Error[] = [];
private readonly server: net.Server;
private readonly destination: ConnectionOptionsObj;
private readonly connectedClients: Set<net.Socket> = new Set();

constructor(destination: ConnectionOptionsObj) {
this.destination = destination;
Expand All @@ -24,12 +26,17 @@ export default class ReverseProxy {
}

stop(): Promise<void> {
return new Promise(resolve => {
this.server.close(() => resolve());
return new Promise((resolve, reject) => {
this.server.close(e => {
if (e) reject(e);
else resolve();
});

this.connectedClients.forEach(client => client.destroy());
});
}

public get connectionOptions(): ConnectionOptionsObj {
get connectionOptions(): ConnectionOptionsObj {
const { address, port } = this.server.address() as net.AddressInfo;
const connection = { ...this.destination };

Expand All @@ -46,7 +53,7 @@ export default class ReverseProxy {
return connection;
}

public getError(): Error | null {
get error(): Error | null {
return this.errors.length > 0 ? this.errors[0] : null;
}

Expand All @@ -61,13 +68,38 @@ export default class ReverseProxy {
}

private async onConnection(socket: net.Socket): Promise<void> {
socket.on('error', error => this.errors.push(error));
let socks5Proxy: SocksClientEstablishedEvent;
this.connectedClients.add(socket);

socket.on('error', error => {
this.errors.push(error);

if (!socket.closed) {
socket.destroy(error);
}
});
socket.on('close', () => {
this.connectedClients.delete(socket);

if (!socks5Proxy?.socket.closed) {
socks5Proxy?.socket.destroy();
}
});

try {
const socks5Proxy = await SocksClient.createConnection({
socks5Proxy = await SocksClient.createConnection({
proxy: { ...this.destination.proxySocks, type: 5 },
command: 'connect',
destination: { host: this.destinationHost, port: this.destinationPort },
timeout: 4000,
});

socks5Proxy.socket.on('close', () => {
this.connectedClients.delete(socks5Proxy.socket);

if (!socket.closed) {
socket.destroy();
}
});

socks5Proxy.socket.on('error', socket.destroy);
Expand Down
28 changes: 19 additions & 9 deletions packages/datasource-sql/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,33 @@ export async function introspect(
uriOrOptions: ConnectionOptions,
logger?: Logger,
): Promise<Table[]> {
const sequelize = await connect(uriOrOptions, logger);
const tables = await Introspector.introspect(sequelize, logger);
await sequelize.close();
let sequelize: Sequelize;

return tables;
try {
sequelize = await connect(uriOrOptions, logger);

return await Introspector.introspect(sequelize, logger);
} finally {
await sequelize?.close();
}
}

export async function buildSequelizeInstance(
uriOrOptions: ConnectionOptions,
logger: Logger,
introspection?: Table[],
): Promise<Sequelize> {
const sequelize = await connect(uriOrOptions, logger);
const tables = introspection ?? (await Introspector.introspect(sequelize, logger));

ModelBuilder.defineModels(sequelize, logger, tables);
RelationBuilder.defineRelations(sequelize, logger, tables);
let sequelize: Sequelize;

try {
sequelize = await connect(uriOrOptions, logger);
const tables = introspection ?? (await Introspector.introspect(sequelize, logger));
ModelBuilder.defineModels(sequelize, logger, tables);
RelationBuilder.defineRelations(sequelize, logger, tables);
} catch (error) {
await sequelize?.close();
throw error;
}

return sequelize;
}
Expand Down
22 changes: 22 additions & 0 deletions packages/datasource-sql/test/connection/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ describe('Connect', () => {
});
});

describe('when proxy socks configuration is provided', () => {
describe('when the password the uri is wrong', () => {
it('should not blocked the promise', async () => {
const baseUri = 'postgres://test:password@localhost:5443';
await setupDatabaseWithTypes(baseUri, 'postgres', 'test_connection');

const badUri = `postgres://BADUSER:password@postgres:5432/test_connection`;
await expect(() =>
connect({
uri: badUri,
proxySocks: {
host: 'localhost',
port: 1080,
password: 'password',
userId: 'username',
},
}),
).rejects.toThrow('password authentication failed for user "BADUSER"');
});
});
});

describe.each([
['postgres' as Dialect, 'test', 'password', 'localhost', 5443, 5432, 'postgres'],
['mysql' as Dialect, 'root', 'password', 'localhost', 3307, 3306, 'mysql'],
Expand Down
58 changes: 54 additions & 4 deletions packages/datasource-sql/test/connection/reverse-proxy.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import ReverseProxy from '../../src/connection/reverse-proxy';

beforeEach(() => jest.clearAllMocks());

describe('proxy utils', () => {
describe('ReverseProxy', () => {
const makeProxyOptions = () => ({
host: 'localhost',
port: 10,
Expand All @@ -16,7 +16,13 @@ describe('proxy utils', () => {
},
});
let proxy: ReverseProxy;
afterEach(() => proxy?.stop());
afterEach(async () => {
try {
await proxy?.stop();
} catch (e) {
/* empty */
}
});

describe('when port is not provided', () => {
it('should throw an error', async () => {
Expand Down Expand Up @@ -52,14 +58,14 @@ describe('proxy utils', () => {
client.connect({ port: Number(port), host });
});

expect(proxy.getError()).toEqual(new Error('a proxy error'));
expect(proxy.error).toEqual(new Error('a proxy error'));
});
});
});

describe('when client open a connection', () => {
it('should branch the socks5 to the socket', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn() } };
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest.spyOn(SocksClient, 'createConnection').mockResolvedValue(socks5ProxyMock as any);
proxy = new ReverseProxy(makeProxyOptions());

Expand Down Expand Up @@ -125,4 +131,48 @@ describe('proxy utils', () => {
});
});
});

describe('stop', () => {
describe('when the server is not started', () => {
it('should throw an error', async () => {
proxy = new ReverseProxy(makeProxyOptions());

await expect(proxy.stop()).rejects.toThrow();
});
});

describe('when the server is started', () => {
it('should stop the proxy without error', async () => {
proxy = new ReverseProxy(makeProxyOptions());

await proxy.start();

await expect(proxy.stop()).resolves.not.toThrow();
});
});

describe('when a connection is opened', () => {
it('should stop the proxy without throwing error', async () => {
const socks5ProxyMock = { socket: { on: jest.fn(), pipe: jest.fn(), destroy: jest.fn() } };
jest.spyOn(SocksClient, 'createConnection').mockResolvedValue(socks5ProxyMock as any);

proxy = new ReverseProxy(makeProxyOptions());

await proxy.start();

const client = new net.Socket();
await new Promise<void>(resolve => {
client.on('close', () => {
client.destroy();
resolve();
});

const { port, host } = proxy.connectionOptions;
client.connect({ port: Number(port), host });
});

await expect(proxy.stop()).resolves.not.toThrow();
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe('SqlDataSourceFactory > Integration', () => {

const sequelize = await buildSequelizeInstance(`${baseUri}/${database}`, logger);

sequelize.close();
await sequelize.close();
expect(sequelize).toBeInstanceOf(Sequelize);
});

Expand All @@ -52,7 +52,7 @@ describe('SqlDataSourceFactory > Integration', () => {

const sequelize = await buildSequelizeInstance({ uri: `${baseUri}/${database}` }, logger);

sequelize.close();
await sequelize.close();
expect(sequelize).toBeInstanceOf(Sequelize);
});

Expand All @@ -65,7 +65,7 @@ describe('SqlDataSourceFactory > Integration', () => {
logger,
);

sequelize.close();
await sequelize.close();
expect(sequelize).toBeInstanceOf(Sequelize);
});
});
Expand Down

0 comments on commit d31276c

Please sign in to comment.