Skip to content

Commit

Permalink
Fix ANSI control sequences in Jupyter notebooks
Browse files Browse the repository at this point in the history
Fixes #176

- It checks if stdout is a TTY before printing private ANSI control sequences (DECTCEM 1&2)
- Corrected the EL sequence, which was missing the 0 for its canonical form
- Added a CR before the EL (which was already partly the case, but cluttered around the code)
- Some clean-up in the code calling the foregoing functions
  • Loading branch information
nschrader authored May 11, 2022
1 parent 6514f80 commit de08a47
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 25 deletions.
14 changes: 7 additions & 7 deletions tests/test_in_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_write(capsys, text):

out, _ = capsys.readouterr()
# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
Expand Down Expand Up @@ -110,15 +110,15 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared with the hide method
assert out[-4:] == "\r\033[K"
assert out[-5:] == "\r\033[0K"

# ``\n`` is required to flush stdout during
# the hidden state of the spinner
sys.stdout.write("{0}\n".format(text))
out, _ = capsys.readouterr()

# cleans stdout from _clear_line and \r
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")

assert isinstance(out, (str, bytes))
assert out[-1] == "\n"
Expand All @@ -132,7 +132,7 @@ def teardown():
out, _ = capsys.readouterr()

# ensure that text was cleared before resuming the spinner
assert out[:4] == "\r\033[K"
assert out[:5] == "\r\033[0K"


def test_spinner_write_race_condition(capsys):
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_spinner_hiding_with_context_manager(capsys):

# make sure no spinner text was printed while the spinner was hidden
out, _ = capsys.readouterr()
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")
assert "{}\n{}".format(HIDDEN_START, HIDDEN_END) in out


Expand Down Expand Up @@ -202,7 +202,7 @@ def test_spinner_nested_hiding_with_context_manager(capsys):

# make sure no spinner text was printed while the spinner was hidden
out, _ = capsys.readouterr()
out = out.replace("\r\033[K", "")
out = out.replace("\r\033[0K", "")
assert "{}\n{}".format(HIDDEN_START, HIDDEN_END) in out


Expand Down Expand Up @@ -239,4 +239,4 @@ def test_write_non_str_objects(capsys, obj, obj_str):
capsys.readouterr()
sp.write(obj)
out, _ = capsys.readouterr()
assert out == "\r\033[K{}\n".format(obj_str)
assert out == "\r\033[0K{}\n".format(obj_str)
29 changes: 11 additions & 18 deletions yaspin/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ def start(self):
if self._sigmap:
self._register_signal_handlers()

if sys.stdout.isatty():
self._hide_cursor()

self._hide_cursor()
self._start_time = time.time()
self._stop_time = None # Reset value to properly calculate subsequent spinner starts (if any) # pylint: disable=line-too-long
self._stop_spin = threading.Event()
Expand All @@ -251,11 +249,8 @@ def stop(self):
self._stop_spin.set()
self._spin_thread.join()

sys.stdout.write("\r")
self._clear_line()

if sys.stdout.isatty():
self._show_cursor()
self._show_cursor()

def hide(self):
"""Hide the spinner to allow for custom writing to the terminal."""
Expand All @@ -265,9 +260,6 @@ def hide(self):
with self._stdout_lock:
# set the hidden spinner flag
self._hide_spin.set()

# clear the current line
sys.stdout.write("\r")
self._clear_line()

# flush the stdout buffer so the current line
Expand Down Expand Up @@ -298,15 +290,13 @@ def show(self):
self._hide_spin.clear()

# clear the current line so the spinner is not appended to it
sys.stdout.write("\r")
self._clear_line()

def write(self, text):
"""Write text in the terminal without breaking the spinner."""
# similar to tqdm.write()
# https://pypi.python.org/pypi/tqdm#writing-messages
with self._stdout_lock:
sys.stdout.write("\r")
self._clear_line()

if isinstance(text, (str, bytes)):
Expand Down Expand Up @@ -357,8 +347,8 @@ def _spin(self):

# Write
with self._stdout_lock:
sys.stdout.write(out)
self._clear_line()
sys.stdout.write(out)
sys.stdout.flush()

# Wait
Expand Down Expand Up @@ -533,14 +523,17 @@ def _set_cycle(frames):

@staticmethod
def _hide_cursor():
sys.stdout.write("\033[?25l")
sys.stdout.flush()
if sys.stdout.isatty():
sys.stdout.write("\033[?25l")
sys.stdout.flush()

@staticmethod
def _show_cursor():
sys.stdout.write("\033[?25h")
sys.stdout.flush()
if sys.stdout.isatty():
sys.stdout.write("\033[?25h")
sys.stdout.flush()

@staticmethod
def _clear_line():
sys.stdout.write("\033[K")
sys.stdout.write("\r")
sys.stdout.write("\033[0K")

0 comments on commit de08a47

Please sign in to comment.