Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak AsyncRabbitTemplate #2696

Closed
tetrade opened this issue May 6, 2024 · 8 comments
Closed

Memory leak AsyncRabbitTemplate #2696

tetrade opened this issue May 6, 2024 · 8 comments

Comments

@tetrade
Copy link

tetrade commented May 6, 2024

Version 3.1.4.

I recently wrote about a bug with a memory leak that occurred due to the fact that the internal pending map was not cleaned up after receive message.

Now everything works better, but you can see that for some reason some Messages still remain indefinitely in memory.

image

@artembilan
Copy link
Member

@tetrade ,

thank you for the report!

I'd be happy to fix the problem, but I don't understand how to read that object tree.
Yes, I do see RabbitMessageFuture and its TimeoutTask reference. But it is not clear for me what is requestMessage and why that RabbitMessageFuture is a part of it?

Any chances to determine what you are doing in your code with this AsyncRabbitTemplate?

@tetrade
Copy link
Author

tetrade commented May 6, 2024

@artembilan
requestMessage - name of field in RabbitFuture. I guess that TimeoutTask in some reason didn't deleted from memory and still have reference from Scheldued that created here org.springframework.amqp.rabbit.AsyncRabbitTemplate#timeoutTask

@tetrade
Copy link
Author

tetrade commented May 6, 2024

@artembilan

In short, I just use Rabbit to send a photo to a third-party service for processing and wait for a response via Direct Reply-to. When all the Future (many photos have been analyzed) are completed, I consider the project processed and upload everything to the user.

I initially thought that I was mistaken, because I have an internal ConcurrentMap in my code that takes into account the Id of the photo and stores it as the value of RabbitFuture so that when re-requesting it, I understand where to wait for a response.

However, given my implementation, there is hardly a leak there. And the profiler shows that there is a leak in scheduled tasks in Java

image

@artembilan
Copy link
Member

As you see the logic there is like this:

	@Nullable
	private ScheduledFuture<?> timeoutTask(RabbitFuture<?> future) {
		if (this.receiveTimeout > 0) {
			this.lock.lock();
			try {
				if (!this.running) {
					this.pending.remove(future.getCorrelationId());
					throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
				}
				return this.taskScheduler.schedule(
						new TimeoutTask(future, this.pending, this.directReplyToContainer),
						Instant.now().plusMillis(this.receiveTimeout));
			}
			finally {
				this.lock.unlock();
			}
		}
		return null;
	}

So, such a timeout task is scheduled to be executed after that this.receiveTimeout (30 seconds by default).
Let's see if the problem with that ThreadPoolTaskScheduler with the default as private volatile int poolSize = 1;!
This way, it feels like "too many messages" would cause such a pool to be exhausted and therefore we see some kind of memory leak for some period of time until the queue of those TimeoutTask is cleared.

You can set an external ThreadPoolTaskScheduler with a bigger thread pool to mitigate workload.

Again: this is just an assumption since I don't see how else those TimeoutTask instances could be held in the memory if scheduling is done properly.

@mailingfeng
Copy link

I also had memory leak in same scene[AsyncRabbitTemplate.sendAndReceive()], but version is 2.4.17

Maybe the bug also exists in 3.1.x @tetrade

Causes of memory leaks

  1. When AsyncRabbitTemplate.sendAndReceive() and receiveTimeout > 0, will startTimer() and create timeoutTask with TaskScheduler

  2. When sendAndReceive() completed, but timeoutTask is not clear immediately, and remains in the TaskScheduler until receiveTimeout

  3. IF High throughput in short time scenarios,a large number of timeoutTask can cause memory leaks !!!

        // Class: AsyncRabbitTemplate
	@Override
	public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
		String correlationId = getOrSetCorrelationIdAndSetReplyTo(message, null);
		RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message);
		CorrelationData correlationData = null;
		if (this.enableConfirms) {
			correlationData = new CorrelationData(correlationId);
			future.setConfirm(new SettableListenableFuture<>());
		}
		this.pending.put(correlationId, future);
		if (this.container != null) {
			this.template.send(exchange, routingKey, message, correlationData);
		}
		else {
			ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
			future.setChannelHolder(channelHolder);
			sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
		}
                 
                 // !!! Problem spot
		future.startTimer();
		return future;
	}
        // Class: RabbitFuture<T>

        void startTimer() {
	    if (AsyncRabbitTemplate.this.receiveTimeout > 0) {
		    synchronized (AsyncRabbitTemplate.this) {
			    if (!AsyncRabbitTemplate.this.running) {
				    AsyncRabbitTemplate.this.pending.remove(this.correlationId);
				    throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
			    }

                             // !!! Problem spot
			    this.timeoutTask = AsyncRabbitTemplate.this.taskScheduler.schedule(new TimeoutTask(),
					    new Date(System.currentTimeMillis() + AsyncRabbitTemplate.this.receiveTimeout));
		    }
	    }
	    else {
		    this.timeoutTask = null;
	    }
      }

Expect it to be fixed in version 2.4.x,Looking forward to your reply @artembilan

@artembilan
Copy link
Member

@mailingfeng ,

Thank you for investigation!

I will into that code today.

However we are not going to back-port the fix into 2.4.x branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support

@artembilan
Copy link
Member

I think this was fixed via: #2724 and released as a part of the 3.1.6.

So, treating this as a Duplication and close respectively.
As I said: this is not going to be back-ported to the 2.4.x because no support for that branch any more.

@mailingfeng
Copy link

@mailingfeng ,

Thank you for investigation!

I will into that code today.

However we are not going to back-port the fix into 2.4.x branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support

OK, Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants