-
-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Possible memory leak in a very simple example #356
Comments
Thanks for reporting. Do you have a standalone code sample? I'm not sure what |
Thanks for the reply, sorry, same snippet without external requirements The bahavior is the same for v5 and v6 no matter if zeromq or zeromq-ng. 'use strict';
process.title = 'event_forwarder';
const zmq = require('zeromq');
const run = async (pull_host, pull_port, pub_host, pub_port) => {
const pull_sock = new zmq.Pull;
const pub_sock = new zmq.Publisher;
const pull_sock_address = `tcp://${pull_host}:${pull_port}`;
const pub_sock_address = `tcp://${pub_host}:${pub_port}`;
await pull_sock.bind(pull_sock_address);
await pub_sock.bind(pub_sock_address);
while (!pull_sock.closed) {
let [topic, event] = await pull_sock.receive();
await pub_sock.send([topic, event]);
}
};
run('127.0.0.1', 3000, '127.0.0.1', 3001)
.catch(console.error); |
I feel like I'm missing one half of the example (the code waits for incoming messages), but I think I have a reproduction of what you're describing and will get back to you after I have investigated further. |
Hi again, this is a sample we are using to send events and monitor event reception at the same time 'use strict';
process.title = 'loopback_test';
const zmq = require('zeromq');
const push = (push_host, push_port, connection_callback) => {
const push_sock = new zmq.Push;
push_sock.connect(`tcp://${push_host}:${push_port}`);
push_sock.events.on('handshake', ({address}) => {
console.info('Push connected to', address);
connection_callback(push_sock);
});
};
const subscriber = (sub_host, sub_port, topic, connection_callback) => {
const sub_sock = new zmq.Subscriber;
sub_sock.connect(`tcp://${sub_host}:${sub_port}`);
sub_sock.subscribe(topic);
sub_sock.events.on('handshake', ({address}) => {
console.info('Subscriber connected to', address);
connection_callback(sub_sock);
});
};
const SECONDS_TO_MILLIS = 1000;
const TOPIC = 'loopback';
const STATE_NONE = 'none';
const STATE_CONNECTING_SUBSCRIBER = 'connecting_subscriber';
const STATE_CONNECTING_PUSH = 'connecting_push';
const STATE_SENDING_EVENTS = 'sending_events';
const STATE_GRACE_PERIOD = 'grace_period';
const STATE_AFTER_GRACE_PERIOD = 'after_grace_period';
const run_time_seconds = 500;
const grace_period_seconds = 2;
const events_frequency = 2000; // events per second
const events_period = SECONDS_TO_MILLIS / events_frequency;
let state = STATE_NONE;
let push_sock;
let subscriber_sock;
let sent = 0;
let received = 0;
let send_rejected = 0;
let reject_receive;
let timer;
const create_event = () => ({
key: Math.floor(Math.random() * 100000),
timestamp: Date.now(),
});
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const log_info = () => console.info(`State: ${state} sent: ${sent} received: ${received} send_rejected: ${send_rejected}`);
const run = async function (push_host, push_port, sub_host, sub_port) {
state = STATE_CONNECTING_SUBSCRIBER;
const log_interval = setInterval(log_info, 1000);
const on_push_connected = (_push_sock) => {
push_sock = _push_sock;
const send_event = () => {
push_sock.send([TOPIC, JSON.stringify(create_event())])
.then(() => sent++)
.catch(() => send_rejected++)
};
// Schedule send events
timer = setInterval(send_event, events_period);
state = STATE_SENDING_EVENTS;
// Schedule stop sending events and start grace period
setTimeout(async () => {
clearInterval(timer);
state = STATE_GRACE_PERIOD;
setTimeout(() => {
state = STATE_AFTER_GRACE_PERIOD;
console.log('reject to exit loop');
reject_receive();
}, grace_period_seconds * SECONDS_TO_MILLIS)
}, run_time_seconds * SECONDS_TO_MILLIS);
};
const on_subscriber_connected = (_subscriber_sock) => {
subscriber_sock = _subscriber_sock;
state = STATE_CONNECTING_PUSH;
push(push_host, push_port, on_push_connected)
};
subscriber(sub_host, sub_port, TOPIC, on_subscriber_connected);
while ([
STATE_NONE,
STATE_CONNECTING_SUBSCRIBER,
STATE_CONNECTING_PUSH
].includes(state)) {
await delay(500);
}
const receive = () => new Promise((resolve, reject) => {
reject_receive = reject;
subscriber_sock.receive().then(resolve);
received++;
});
while (state == STATE_SENDING_EVENTS) {
await receive().catch(() => {});
}
while (state == STATE_GRACE_PERIOD) {
await receive().catch(() => {});
}
clearInterval(log_interval);
log_info();
};
run('localhost', 3000, 'localhost', 3001); |
I am running both programs side by side and I receive output like the following in the
Is that the correct behaviour? The memory usage of both programs stabilizes around 75-125MB, but sometimes spikes to 500MB or beyond before dropping back. Is that what you're observing, too? Your spikes might be higher or lower, that probably depends on the system it is running this on. I might understand what's going on and will post so in a new comment. |
I will attempt to explain why this might be happening with the following two example programs that are intended to run side by side.
const zmq = require("zeromq")
const run = async () => {
const push_sock = new zmq.Push
push_sock.connect("tcp://127.0.0.1:3000")
while (true) {
await push_sock.send(["topic", Buffer.alloc(2000, "x")])
}
}
run()
const zmq = require("zeromq")
const run = async () => {
const pull_sock = new zmq.Pull
const pub_sock = new zmq.Publisher
await pull_sock.bind("tcp://127.0.0.1:3000")
await pub_sock.bind("tcp://127.0.0.1:3001")
while (!pull_sock.closed) {
let [topic, event] = await pull_sock.receive()
await pub_sock.send([topic, event])
}
}
run() The The When incoming messages are bigger than a certain amount of bytes, the buffer will not be copied, but instead be shared between the ZeroMQ I/O thread and the Node.js buffer. This is to avoid needlessly copying large amounts of memory. To release memory from the underlying socket, Node.js needs to notify ZeroMQ.js that the buffer can be released. To do this, ZeroMQ.js attaches a A long running process should not be in a busy loop, because it will not be able to handle any Node.js events at all. So it's an undesirable state to be in. To observe what happens if we forcefully break out of the busy loop every 10K messages, change let i = 0
while (!pull_sock.closed) {
let [topic, event] = await pull_sock.receive()
await pub_sock.send([topic, event])
if (i++ % 10000 === 0) await new Promise(setImmediate)
} This probably happens a lot in example programs, benchmarks and demo code, but might not happen as much in reality. If you do see this in production I would love to hear more about your particular use case, because it may help me consider how we can solve this in a better way. I will give it some more thought to see if it is something that should be and can be dealt with in this library. TL;DR: Memory "leaks" during a Node.js busy loop are a current limitation of Node.js/N-API but may not be problem in production. |
Hi and thank you again, didn't expect to be replied so fast. We also attempted to add cpu load to the process and some delay in each loop so we can give the garbage collector a chance. It frees memory periodically but seems to left some remaining data left. After a few days working on this, we suspect it to be the N-API issue, now you're confirming it. |
If you're seeing memory increasing in production then it might be a different issue. Please share more details!
It's not an error in N-API, just unfortunate behaviour when using ZeroMQ.js in a busy loop. A busy loop in production is both unlikely and a bigger problem, because it will starve all I/O. N-API is the stable way to build binaries that work across Node.js versions. It sounds like your observations in production are different than what I described above. Could you share more details? |
I have thought about this some more and I have come to the conclusion that it is the responsibility of ZeroMQ.js to avoid event loop starvation. The reason I feel this way is that Node.js does I/O on the same thread, whereas ZMQ has background I/O threads. So it could happen that ZMQ will (temporarily) starve the Node.js event loop if ZMQ can process messages faster than Node.js can. Even if memory weren't an issue it can cause all sorts of other problems such as reduced I/O responsiveness apart from ZMQ. I'll work on a fix for this particular issue. @melvin-gutierrez-sbz This does not necessarily mean that the issue you experience in production is identical to the one I described above, so please add more details about your observations in your production environments if you can! |
Hi @rolftimmermans, and thank you for your reply. if (i++ % 10000 === 0) await new Promise(setImmediate) And the result is the same. The first snippet we sent you (event_forwarder) is production code. We just need to relay messages between two types of sockets (Receive from PULL and send with PUB to broadcast to multiple agents) Can we help you anyway? Please tel us. Thank you in advance. |
@melvin-gutierrez-sbz Thanks for the response. It might be that what you are seeing in production is a different thing. It would help to have a program like the loopback test that mimics your production usage so we can attempt to reproduce. A performance test typically has very different behaviour than actual production load. Additionally it would help to see the memory usage you observe over time. For example with |
Hi Again, @rolftimmermans. We have seen the same behavior in production but with slower memory increase. That is why we've created the loopback test, not to wait so long to see the app crashing. Just to clarify, what do you exactly mean with differences in production? We are doing the same in dev and prod environments. Our production code is just a as simple as the snippet we sent. It is just an event forwarder. We also tried with subscribers with IO-bound operations and the memory issue is still there. Will prepare a sample for you to test. |
I realise the production code is an event forwarder, but I'm looking for code to generate production-like load. Not as quickly as possible, but as realistic as possible. |
Sorry, i misunderstood you. No matter how events are generated, the memory leak is always there. We have tried generating as low as 50 event/second and with enough time it crashes. Same happens with event size. Of course, our production events are way bigger than the ones in the test we sent you, but even with very little ones and sooner or later, we cannot allocate more memory. We have spent several days doing all kind of tests to generate the event flow (while loop, nanotimer, current loopback test, etc), and we have used different rates of events (from 50 event/second to 10000 event/second). |
Right. So can you please send the code to reproduce what you describe? And also your observation of the memory usage (including which platform you observed it on)? At the moment I have no way of reproducing your problem, so I can't investigate further. |
Hi again, Plotting the output of the following snippet, 'use strict';
process.title = 'sample_subscriber';
const zmq = require('zeromq-ng');
const topic = 'my_topic';
const data = {
timestamp: Date.now(),
foo: 'bar'
};
setInterval(() => {
const { heapUsed } = process.memoryUsage();
console.info(heapUsed);
}, 500);
const subscriber = async () => {
const sock = new zmq.Subscriber;
await sock.connect(`tcp://127.0.0.1:3001`);
await sock.subscribe('');
sock.events.on('handshake', console.info);
while (!sock.closed) {
let [topic, event] = await sock.receive();
await new Promise(resolve => setTimeout(resolve, 10));
}
};
const publisher = async () => {
const sock = new zmq.Publisher();
await sock.bind(`tcp://127.0.0.1:3001`);
setInterval(() => sock.send([topic, JSON.stringify(data)]));
};
publisher().then(subscriber); We get the following chart: |
Thanks for your response, makes it very clear to me what you're observing! I'll investigate further and report back when I find something. |
Alright, here's what I've found. All of this was tested with I'm not entirely sure how to interpret it, but it seems a problem with
Note the The memory seems to be stable and not show any increasing trends. This is around 15 minutes of data. Since this code sends about 40K messages per second even a single byte leak on a message would have to lead to quite a visible trend after a few minutes. Tested on macOS with Node.js 12.12. I also plotted the My conclusion for now is that there is no memory leak manifesting in this usage pattern of ZeroMQ, as long as the event loop can run normally (which it does automatically with average usage patterns or forcibly in the code above with the What the problem is exactly with |
Isn't @melvin-gutierrez-sbz's example leaking buffer space because they are sending messages at 1ms intervals ( setInterval default delay ) and consuming with a 10ms (setTimeout) throttle? |
Experimenting some more, it seems memory pressure is not high enough for the GC to kick in. That's why the If I add an explicit
It shouldn't matter, because at some point ZMQ will just drop the messages. You can set high water marks from the application, but it does have internal default settings. It should not be possible to leak memory with a fixed number of subscribers with ZMQ. |
I have pushed beta 4, which includes two fixes that might improve memory usage, see the changelog here. The issues that were fixed would manifest as:
Please test the new release if you can. I dug into all areas of the code and did a number of tests, but haven't been able to find any other leaks. Doesn't mean that they can't be there, of course, and I'll seriously investigate any reports. Of course it helps to have example code and/or the results of memory measurement that demonstrate the issue. |
Thank you again, |
Hi again, this last beta release has solved the problem. Thank you very much. |
Perfect, thanks for the update! I'll close this now. Feel free to open a new issue if you encounter any problems. |
Here is a little code snippet just receiving from a pull socket and forwarding to publisher socket.
After some time garbage collector seems not to be able to free enough memory, and finally process crashes.
Same behavior implemented in C++ keeps memory stable.
The text was updated successfully, but these errors were encountered: