Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unstopped samples in TimedScheduler #3844

Closed
rethab opened this issue Jul 10, 2024 · 5 comments
Closed

Unstopped samples in TimedScheduler #3844

rethab opened this issue Jul 10, 2024 · 5 comments
Assignees
Labels
Milestone

Comments

@rethab
Copy link

rethab commented Jul 10, 2024

Decorating the reactor schedulers with TimedScheduler leads to very high CPU in some of our production services.
I'm not entirely sure what the problem is, but I noticed that sometimes, samples in the TimedScheduler are not cleaned up.

Expected Behavior

When a sample is started for the metrics, I expect it to be stopped again.

Actual Behavior

Sometimes, this does not seem to happen.

In particular, I'm looking at the TimedScheduler's pendingTasks timer.

When a TimedRunnable is instantiated, it starts the sample: this.pendingSample = parent.pendingTasks.start();
In both the run method and the dispose method, it stops it again.

Internally, when calling stop on SampleImpl it removes the sample from the list of active tasks.

I noticed in my test run, the pending tasks is sometimes not empty after having run a bunch of Monos.

In other words, this fails:

//
// run some Mono...bla..subscribe  (see below)
//
Field field = scheduler.getClass().getDeclaredField("pendingTasks");
field.setAccessible(true);
DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
assertEquals(0, pendingTasks.activeTasks());

Additionally, I have also used my own implementation of TimedScheduler and overridden the finalize method of TimedRunnable. In there, I printed something if the pendingSample was not stopped, but the runnable was garbage collected:

@Override
protected void finalize() throws Throwable {
		super.finalize();
		if (this.pendingSample != null) {
			var duration = this.pendingSample.duration(TimeUnit.MICROSECONDS);
			if (duration != -1) {
				System.out.println("Pending task was not stopped");
			}
		}
}

(I'm not sure if this is a reliable way to detect this though)

Steps to Reproduce

package org.example;

import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class Main {
    public static void main(String[] args) throws Exception {
        var scheduler = Micrometer.timedScheduler(
                Schedulers.newBoundedElastic(10, 10000, "boundedElastic"),
                new SimpleMeterRegistry(),
                "test");

        var totalCalls = new AtomicLong();
        var iterations = 50_000;
        new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                Mono.delay(Duration.ofMillis(5))
                        .subscribeOn(scheduler)
                        .timeout(Duration.ofMillis(20), scheduler)
                        .subscribe(__ -> totalCalls.incrementAndGet(), __ -> totalCalls.incrementAndGet());
            }
        }).start();

        while (totalCalls.get() < iterations) {
            Thread.sleep(1000);
            System.out.printf("Progress: %.1f\n", 100d / iterations * totalCalls.get());
        }

        scheduler.dispose();

        Field field = scheduler.getClass().getDeclaredField("pendingTasks");
        field.setAccessible(true);
        DefaultLongTaskTimer pendingTasks = (DefaultLongTaskTimer) field.get(scheduler);
        System.out.println("Pending Tasks: " + pendingTasks.activeTasks());
        assertEquals(0, pendingTasks.activeTasks());
    }

}

Possible Solution

???

Your Environment

  • io.projectreactor:reactor-core:3.6.8
  • io.projectreactor:reactor-core-micrometer:1.1.8
  • io.micrometer:micrometer-core:1.13.2
  • M1 Macbook Pro
  • Java 17
@chemicL
Copy link
Member

chemicL commented Jul 17, 2024

Hi @rethab ! Thanks for describing the issue with a potential reproducer.

I was able to reproduce the issue in a more direct way that takes away the cancellation performed on behalf of the timeout operator:

@Test
void disposeClearsPendingTasks() throws Exception {
	TimedScheduler scheduler = (TimedScheduler) Micrometer.timedScheduler(
			Schedulers.newSingle("ttt"),
			new SimpleMeterRegistry(),
			"test");

	scheduler.schedule(() -> {
		try {
			System.out.println("First task run");
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			System.out.println("First task interrupted");
		}
	});

	scheduler.schedule(() -> {
		try {
			System.out.println("Second task run");
			Thread.sleep(500);
		} catch (InterruptedException e) {
			System.out.println("Second task interrupted");
		}
	});

	// There is an issue here that this is async.
	// disposeGracefully also doesn't help as it times out or allows both tasks to finish.
	scheduler.dispose();

	assertThat(scheduler.pendingTasks.activeTasks()).isEqualTo(0);
}

This is indeed a bug. I can't figure out yet if we have means to intercept such scenarios to properly clean the pending timer. I'll try to look into it more.

@chemicL chemicL added the type/bug A general bug label Jul 17, 2024
chemicL added a commit that referenced this issue Jul 24, 2024
The `TimedRunnable` that is created using a `TimedWorker` was not
disposed upon `Worker` shutdown. That led the pending tasks timers to
run forever, causing leaks.

This change keeps track of created `TimedRunnable` instances by the
`TimedWorker`, allowing to dispose the resources responsibly.

Resolves #3844
@chemicL chemicL self-assigned this Jul 24, 2024
@chemicL chemicL added this to the 3.5.20 milestone Jul 24, 2024
@chemicL
Copy link
Member

chemicL commented Jul 25, 2024

@rethab as a matter of fact, the above example has no means to actually work. However, when the Scheduler is swapped with Scheduler.Worker then it was expected to yield the proper cancellation (as Workers are tracking submitted tasks). This has now been fixed. You can use 3.6.9-SNAPSHOT to validate the behaviour. Please get in touch if you spot any issues. Thanks.

@rethab
Copy link
Author

rethab commented Jul 25, 2024

Hi @chemicL , thanks a lot for the fix!! ❤️

Unfortunately, we only saw the behavior in production after a few hours and I can't use snapshots in production, but I'll surely take it for a spin once the real version is out :)

Thanks again!

@rethab
Copy link
Author

rethab commented Aug 2, 2024

Hi @chemicL, I have since had the chance to run this in production and the results look very good.

We've been running with a monkey-patched version of the TimedScheduler for a while. Previously, I have removed the pending tasks metric and CPU was stable. A little more than 24 hours ago, we have deployed your fix including the pending tasks metric to production and CPU has been stable since.

So I'm pretty confident your fix did the trick. Thanks again for fixing this so quickly 💪

@chemicL
Copy link
Member

chemicL commented Aug 2, 2024

Thanks for the feedback, it's awesome to hear the good news @rethab ❤️
This fix will ship August 13th in 3.5.20, 3.6.9 and the latest milestone, 3.7.0-M5.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants