Skip to content

Commit

Permalink
fix(cluster): subscriber connection leaks
Browse files Browse the repository at this point in the history
Closes #1325
  • Loading branch information
luin committed Apr 8, 2021
1 parent 8f9a72e commit dce5f45
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
9 changes: 8 additions & 1 deletion lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ export default class ClusterSubscriber {
lastActiveSubscriber.disconnect();
}

if (this.subscriber) {
this.subscriber.disconnect();
}

const sampleNode = sample(this.connectionPool.getNodes());
if (!sampleNode) {
debug(
Expand Down Expand Up @@ -113,7 +117,10 @@ export default class ClusterSubscriber {
this.lastActiveSubscriber = this.subscriber;
}
})
.catch(noop);
.catch(() => {
// TODO: should probably disconnect the subscriber and try again.
debug("failed to %s %d channels", type, channels.length);
});
}
}
} else {
Expand Down
36 changes: 36 additions & 0 deletions test/functional/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import ConnectionPool from "../../../lib/cluster/ConnectionPool";
import ClusterSubscriber from "../../../lib/cluster/ClusterSubscriber";
import { EventEmitter } from "events";
import MockServer from "../../helpers/mock_server";
import { expect } from "chai";

describe("ClusterSubscriber", () => {
it("cleans up subscribers when selecting a new one", async () => {
const pool = new ConnectionPool({});
const subscriber = new ClusterSubscriber(pool, new EventEmitter());

let rejectSubscribes = false;
const server = new MockServer(30000, (argv) => {
if (rejectSubscribes && argv[0] === "subscribe") {
return new Error("Failed to subscribe");
}
return "OK";
});

pool.findOrCreate({ host: "127.0.0.1", port: 30000 });

subscriber.start();
await subscriber.getInstance().subscribe("foo");
rejectSubscribes = true;

subscriber.start();
await subscriber.getInstance().echo("hello");

subscriber.start();
await subscriber.getInstance().echo("hello");

expect(server.getAllClients()).to.have.lengthOf(1);
subscriber.stop();
pool.reset([]);
});
});
4 changes: 4 additions & 0 deletions test/helpers/mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,8 @@ export default class MockServer extends EventEmitter {
return getConnectionName(client) === name;
});
}

getAllClients(): Socket[] {
return this.clients.filter(Boolean);
}
}

0 comments on commit dce5f45

Please sign in to comment.