Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O operation on closed file with lots of threads #43

Open
alexrudy opened this issue Apr 20, 2016 · 5 comments
Open

I/O operation on closed file with lots of threads #43

alexrudy opened this issue Apr 20, 2016 · 5 comments

Comments

@alexrudy
Copy link

This is similar to #41 but suggests something perhaps a bit more nefarious. The code below spawns lots of threads (on my machine, ~100 was enough to do it, but I see the problem intermittently with ~8 threads) and then logs from each thread intermittently.

This causes lots of "I/O operation on closed file" errors from the threads, which seem to log after the handler has been removed and closed.

It appears to be a race condition, as the error is intermittent with smaller number of threads.

I discovered this problem when running a bunch of pytest tests which spun up pools of worker threads for each test, then shut them all down again.

Here is the code to cause the problem:

# -*- coding: utf-8 -*-

import threading
import logging
import time
import pytest_catchlog
import pytest_catchlog.common
import pytest_catchlog.plugin

def worker(n, shutdown):
    """A worker thread."""
    logger = logging.getLogger("worker.{0:d}".format(n))

    while not shutdown.isSet():
        logger.debug("Working")
        shutdown.wait(0.01)


def log_from_many_threads(n=10):
    """docstring for log_from_many_threads"""
    threads = {}

    for i in range(n):
        event = threading.Event()
        thread = threading.Thread(target=worker, args=(i, event))
        thread.start()
        threads[i] = (thread, event)

    handler = pytest_catchlog.plugin.LogCaptureHandler()
    with pytest_catchlog.common.catching_logs(handler):
        print("Running")
        time.sleep(1.0)

    print("Shutting down...")
    for thread, event in threads.values():
        event.set()
        thread.join()


if __name__ == '__main__':
    log_from_many_threads(100)

The errors are all of the variety

Traceback (most recent call last):
  File ".../python2.7/logging/__init__.py", line 874, in emit
    stream.write(fs % msg)
  File ".../site-packages/py/_io/capture.py", line 16, in write
    StringIO.write(self, data)
ValueError: I/O operation on closed file
Logged from file break_me.py, line 15
@abusalimov
Copy link
Collaborator

@alexrudy, thank you for reporting this!

If I understand correctly, the error should gone, if you move thread joining logic inside the with catching_logs(...) block:

    handler = pytest_catchlog.plugin.LogCaptureHandler()
    with pytest_catchlog.common.catching_logs(handler):
        print("Running")
        time.sleep(1.0)

        print("Shutting down...")
        for thread, event in threads.values():
            event.set()
            thread.join()

Am I missing something?

Is this the original code that causes the error? If not, could you paste the particular test cases that shows that error, please? Pseudo-code would be OK too.

@alexrudy
Copy link
Author

@abusalimov I agree, moving the thread join inside the the context manager does make this problem go away. However, my particular case happens because I start threads during a test setup or test body, then shut them down during a test teardown. The test case itself is quite a bit more complicated, but you can find the thread pool here:
https://github.com/alexrudy/Cauldron/blob/feature/async-dispatch/Cauldron/zmq/responder.py#L79

However, the gist of what I'm going for can be reproduced by e.g.

    handler_setup = pytest_catchlog.plugin.LogCaptureHandler()
    handler_body = pytest_catchlog.plugin.LogCaptureHandler()
    handler_teardown = pytest_catchlog.plugin.LogCaptureHandler()

    with pytest_catchlog.common.catching_logs(handler_setup):
        for i in range(n):
            event = threading.Event()
            thread = threading.Thread(target=worker, args=(i, event))
            thread.start()
            threads[i] = (thread, event)

    with pytest_catchlog.common.catching_logs(handler_body):
        print("Running")
        time.sleep(1.0)

    with pytest_catchlog.common.catching_logs(handler_teardown):
        print("Shutting down...")
        for thread, event in threads.values():
            event.set()
            thread.join()

With a bit more thought, I think I could make an actual test case which might reproduce this, instead of just a script.

@abusalimov
Copy link
Collaborator

abusalimov commented Apr 20, 2016

I think we could add a check to ensure the stream is still alive. Effectively, this would make errors be swallowed silently. I don't like that kind of approach, but I guess in this case it would OK to do like that.

The thing is, since the standard logging module does not synchronize addHandler/removeHandler with actual access to a handler, a worker thread can indeed call an already detached and closed handler.

There is an inevitable race condition anyway:

  • worker thread sees the handler attached the logger (for handler in logger.handlers: ...)
  • main thread removes the handler and closes it (logger.handlers.remove(handler); handler.close())
  • worker thread calls handler.handle(...), which throws the error in question

Hence, there is no fundamental difference, whether the handler is invoked due to a race condition just after it has been closed, or it is not invoked at all because it has been removed already. Let me show that on your example:

with pytest_catchlog.common.catching_logs(handler_setup):
    ...
    thread.start()
    # <--- if a worker thread logs now, the `handler_setup` captures the record
# <------- if somewhere at catching_logs(...).__exit__, you get the "I/O operation on closed file" error

# <------- if a worker logs in between these blocks, a record is lost anyway

with pytest_catchlog.common.catching_logs(handler_body):
    # <--- a record is captured by the `handler_body`
    ...

Does this make sense for your case?

@abusalimov
Copy link
Collaborator

I think it's also worth removing the closing(...) context from the catching_logs(...) to let a caller do that explicitly.

@alexrudy
Copy link
Author

I had to take a long leave of absence from this project, so sorry about the delayed response.

Yes, I think both of these suggestions make sense.

  • Swallow errors if the stream isn't alive is fine in my case. Thats the best that can be done given the current state of the logging module's thread safety.
  • Removing closing(...) and having the caller do that explicitly would also be good.

Thanks for your incredibly clear explanation of the race condition that I'm chasing. I think you've nailed it. Perhaps there is an upstream bug here, but I'll have to check the python logging PEP to see if there is any guarantee of thread safety to begin with.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants