Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory usage increases with subject and threadpool scheduler #582

Open
damianoct opened this issue Sep 1, 2021 · 3 comments
Open

Memory usage increases with subject and threadpool scheduler #582

damianoct opened this issue Sep 1, 2021 · 3 comments
Labels

Comments

@damianoct
Copy link

Hi,

I'm using rxpy for an event-driven application and after some stress tests I noticed a strange behaviour on memory usage.
I've created a simple script to reproduce the issue.


from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject
import rx.operators as ops
from threading import current_thread
import multiprocessing
import rx
import time

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

pipeline = [ops.map(lambda x: x**2)]

pipeline.append(ops.observe_on(pool_scheduler))


s = Subject()
s.pipe(*pipeline).subscribe(on_next=lambda x: print("{}, thread: {}".format(x, current_thread().name)))

for i in range(100000000000):
    s.on_next(i)

input()

I'm using memory-profile library for tracing the memory usage of this script, after 30 seconds of run the result is the following:

Screen Shot 2021-09-01 at 11 02 59 AM

The memory is growing linearly.

If I place a time.sleep(0.1) inside the for loop the memory usage is steady.


from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject
import rx.operators as ops
from threading import current_thread
import multiprocessing
import rx
import time

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

pipeline = [ops.map(lambda x: x**2)]

pipeline.append(ops.observe_on(pool_scheduler))


s = Subject()
s.pipe(*pipeline).subscribe(on_next=lambda x: print("{}, thread: {}".format(x, current_thread().name)))

for i in range(100000000000):
    s.on_next(i)
    time.sleep(0.1)

input()

Screen Shot 2021-09-01 at 11 04 59 AM

I don't know if the problem is related to python / rxpy or if I'm using the library in a wrong way.

PS: the memory usage is steady if I get rid of scheduler and everything is executed on the main thread.

@damianoct damianoct changed the title Memory increases with subject and threadpool scheduler Memory usage increases with subject and threadpool scheduler Sep 1, 2021
@MainRo
Copy link
Collaborator

MainRo commented Sep 9, 2021

If you wait until the program completes, does the memory go down?

It looks like the code generates the items faster than they are processed in the thread pool. There is an unbounded queue to schedule the items between the source and the threads in the ThreadSchedulers. So the memory grows up until the source completes and the threads can catch up.
When you add the sleep statement, you slow down the production of the source items and they can be processed faster than they are produced.

@MainRo MainRo added the question label Sep 9, 2021
@damianoct
Copy link
Author

If you wait until the program completes, does the memory go down?

Yes, I ran a script that processes 1 million source items without the time sleep and the memory is releasing over the time.

image

My question is:

I'm using the same setup (subject + thread pool) in a python application and the frequency of source creation and process could be very high.
How can I avoid this memory usage if I have the production of the source items is faster than the process time?

Maybe should I use multiple observable (multiple subjects)?

Thanks. @MainRo

@MainRo
Copy link
Collaborator

MainRo commented Nov 12, 2021

You need to handle backpressure in your application. Unfortunately, there is no built-in solution for this in RxPY.
Depending on the structure of your application, you can use different techniques.

You can try this library:
https://github.com/MichaelSchneeberger/rxbackpressure

I do not know if @MichaelSchneeberger still maintains it.
Also some time ago I wrote an article on how to handle backpressure with a feedback loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants