Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pending tasks not properly stopped during tests #27

Closed
imranariffin opened this issue Aug 29, 2022 · 0 comments · Fixed by #31
Closed

Fix pending tasks not properly stopped during tests #27

imranariffin opened this issue Aug 29, 2022 · 0 comments · Fixed by #31

Comments

@imranariffin
Copy link
Owner

Since #20, we see this warning when running the tests:

Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Connection.disconnect() done, defined at /home/in-gote/workspace/aiotaskq/.venv/lib/python3.10/site-packages/aioredis/connection.py:794> wait_for=<Future pending cb=[Task.task_wakeup()]>>
imranariffin added a commit that referenced this issue Sep 18, 2022
…bility

* Refactor: Define WorkerManager & GruntWorker

WorkerManager is the one who receives tasks and delegate them to its
GruntWorkers running in background. GruntWorker executes the tasks
and publish the result back to the queue. Both of these should implement
the Interface IWorker. The main logic for the worker is in the `_main_loop`.

* Refactor: pubsub client & concurrency manager

In the future we want to enable users to configure and switch to
a different pubsub client & concurrency manager if need be. We start
with Redis as default pubsub and multiprocessing as default concurrency
manager. Users should be able to configure these using envronment
variables.

* Use better interface

We use Protocols as interfaces for worker, pubsub, concurrency manager

* Refactor: Use pytest fixture for worker

* Fix warning & zombie child procs on sigterm/sigkill

Before, `./test.sh && pidof $(which python)` after `./test.sh`
will give a list of pids which means we are not properly
killing child processes.

With this this change, we do not see zombie child processes
anymore. We still see the "Task was destroyed but it is pending"
warning. This is considered progress.

We fix this making sure the worker manager handle TERM & INT signals
and propagate it to the workers.

Some helpful references regarding handling signals:
* https://stackoverflow.com/questions/42628795/indirectly-stopping-a-python-asyncio-event-loop-through-sigterm-has-no-effect
* https://stackoverflow.com/questions/67823770/how-to-propagate-sigterm-to-children-created-via-subprocess

* Split worker.py into interfaces, pubsub & concurrency_manager

worker.py is now split into:

├── interfaces.py
├── pubsub.py
└── concurrency_manager.py

all of which should be able to be imported by any worker.py or main.py.
Hopefully this will make the code more organized and well-abstracted.

* Attempt to ensure child processes are covered in unit tests

The WorkerManager processes seems to be included in coverage but
GruntWorker processes are still not (I guess because they are
grandchild processes and coverage doesn't handle that?)

* Make main.py more DRY: Re-use PubSub facade from pubsub.py

* As a positive side effect, also closes #27

Small extras:
* Re-organize test files
* Split test_cli.py to test_cli.py and test_worker.py
* Ignore __main__.py from coverage since it iss not coverable anyways
imranariffin added a commit that referenced this issue Sep 18, 2022
Closes #30

* Refactor: Define WorkerManager & GruntWorker

WorkerManager is the one who receives tasks and delegate them to its
GruntWorkers running in background. GruntWorker executes the tasks
and publish the result back to the queue. Both of these should implement
the Interface IWorker. The main logic for the worker is in the `_main_loop`.

* Refactor: pubsub client & concurrency manager

In the future we want to enable users to configure and switch to
a different pubsub client & concurrency manager if need be. We start
with Redis as default pubsub and multiprocessing as default concurrency
manager. Users should be able to configure these using envronment
variables.

* Use better interface

We use Protocols as interfaces for worker, pubsub, concurrency manager

* Refactor: Use pytest fixture for worker

* Fix warning & zombie child procs on sigterm/sigkill

Before, `./test.sh && pidof $(which python)` after `./test.sh`
will give a list of pids which means we are not properly
killing child processes.

With this this change, we do not see zombie child processes
anymore. We still see the "Task was destroyed but it is pending"
warning. This is considered progress.

We fix this making sure the worker manager handle TERM & INT signals
and propagate it to the workers.

Some helpful references regarding handling signals:
* https://stackoverflow.com/questions/42628795/indirectly-stopping-a-python-asyncio-event-loop-through-sigterm-has-no-effect
* https://stackoverflow.com/questions/67823770/how-to-propagate-sigterm-to-children-created-via-subprocess

* Split worker.py into interfaces, pubsub & concurrency_manager

worker.py is now split into:

├── interfaces.py
├── pubsub.py
└── concurrency_manager.py

all of which should be able to be imported by any worker.py or main.py.
Hopefully this will make the code more organized and well-abstracted.

* Attempt to ensure child processes are covered in unit tests

The WorkerManager processes seems to be included in coverage but
GruntWorker processes are still not (I guess because they are
grandchild processes and coverage doesn't handle that?)

* Make main.py more DRY: Re-use PubSub facade from pubsub.py

* As a positive side effect, also closes #27

Small extras:
* Re-organize test files
* Split test_cli.py to test_cli.py and test_worker.py
* Ignore __main__.py from coverage since it iss not coverable anyways
imranariffin added a commit that referenced this issue Sep 19, 2022
Closes #30

* Refactor: Define WorkerManager & GruntWorker

WorkerManager is the one who receives tasks and delegate them to its
GruntWorkers running in background. GruntWorker executes the tasks
and publish the result back to the queue. Both of these should implement
the Interface IWorker. The main logic for the worker is in the `_main_loop`.

* Refactor: pubsub client & concurrency manager

In the future we want to enable users to configure and switch to
a different pubsub client & concurrency manager if need be. We start
with Redis as default pubsub and multiprocessing as default concurrency
manager. Users should be able to configure these using envronment
variables.

* Use better interface

We use Protocols as interfaces for worker, pubsub, concurrency manager

* Refactor: Use pytest fixture for worker

* Fix warning & zombie child procs on sigterm/sigkill

Before, `./test.sh && pidof $(which python)` after `./test.sh`
will give a list of pids which means we are not properly
killing child processes.

With this this change, we do not see zombie child processes
anymore.

We fix this making sure the worker manager handle TERM & INT signals
and propagate it to the workers.

Some helpful references regarding handling signals:
* https://stackoverflow.com/questions/42628795/indirectly-stopping-a-python-asyncio-event-loop-through-sigterm-has-no-effect
* https://stackoverflow.com/questions/67823770/how-to-propagate-sigterm-to-children-created-via-subprocess

* Split worker.py into interfaces, pubsub & concurrency_manager

worker.py is now split into:

├── interfaces.py
├── pubsub.py
└── concurrency_manager.py

all of which should be able to be imported by any worker.py or main.py.
Hopefully this will make the code more organized and well-abstracted.

* Attempt to ensure child processes are covered in unit tests

The WorkerManager processes seems to be included in coverage but
GruntWorker processes are still not (I guess because they are
grandchild processes and coverage doesn't handle that?)

* Make main.py more DRY: Re-use PubSub facade from pubsub.py

* As a positive side effect, also closes #27

Small extras:
* Re-organize test files
* Split test_cli.py to test_cli.py and test_worker.py
* Ignore __main__.py from coverage since it iss not coverable anyways
imranariffin added a commit that referenced this issue Sep 21, 2022
#31)

* (#30): Refactor worker for readability, extensibility & configurability

Closes #30

* Refactor: Define WorkerManager & GruntWorker

WorkerManager is the one who receives tasks and delegate them to its
GruntWorkers running in background. GruntWorker executes the tasks
and publish the result back to the queue. Both of these should implement
the Interface IWorker. The main logic for the worker is in the `_main_loop`.

* Refactor: pubsub client & concurrency manager

In the future we want to enable users to configure and switch to
a different pubsub client & concurrency manager if need be. We start
with Redis as default pubsub and multiprocessing as default concurrency
manager. Users should be able to configure these using envronment
variables.

* Use better interface

We use Protocols as interfaces for worker, pubsub, concurrency manager

* Refactor: Use pytest fixture for worker

* Fix warning & zombie child procs on sigterm/sigkill

Before, `./test.sh && pidof $(which python)` after `./test.sh`
will give a list of pids which means we are not properly
killing child processes.

With this this change, we do not see zombie child processes
anymore.

We fix this making sure the worker manager handle TERM & INT signals
and propagate it to the workers.

Some helpful references regarding handling signals:
* https://stackoverflow.com/questions/42628795/indirectly-stopping-a-python-asyncio-event-loop-through-sigterm-has-no-effect
* https://stackoverflow.com/questions/67823770/how-to-propagate-sigterm-to-children-created-via-subprocess

* Split worker.py into interfaces, pubsub & concurrency_manager

worker.py is now split into:

├── interfaces.py
├── pubsub.py
└── concurrency_manager.py

all of which should be able to be imported by any worker.py or main.py.
Hopefully this will make the code more organized and well-abstracted.

* Attempt to ensure child processes are covered in unit tests

The WorkerManager processes seems to be included in coverage but
GruntWorker processes are still not (I guess because they are
grandchild processes and coverage doesn't handle that?)

* Make main.py more DRY: Re-use PubSub facade from pubsub.py

* As a positive side effect, also closes #27

Small extras:
* Re-organize test files
* Split test_cli.py to test_cli.py and test_worker.py
* Ignore __main__.py from coverage since it iss not coverable anyways
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant