Skip to content

Commit

Permalink
fix(invoke): Write in UTF-8 string instead of bytes (#5427)
Browse files Browse the repository at this point in the history
* Revert "Revert "fix(invoke): Write in UTF-8 string instead of bytes. (#5232)" (#5401)"

This reverts commit 7b7c54c.

* Add typing and fix issues found in the reverted commit

* Update of comments

* handle pr feedback

---------

Co-authored-by: Jacob Fuss <jfuss@users.noreply.github.com>
  • Loading branch information
jfuss and jfuss committed Jul 10, 2023
1 parent cb5e46b commit d5ce6d5
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 89 deletions.
4 changes: 2 additions & 2 deletions samcli/commands/remote/remote_invoke_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class DefaultRemoteInvokeResponseConsumer(RemoteInvokeConsumer[RemoteInvokeRespo
_stream_writer: StreamWriter

def consume(self, remote_invoke_response: RemoteInvokeResponse) -> None:
self._stream_writer.write(cast(str, remote_invoke_response.response).encode())
self._stream_writer.write_bytes(cast(str, remote_invoke_response.response).encode())


@dataclass
Expand All @@ -254,4 +254,4 @@ class DefaultRemoteInvokeLogConsumer(RemoteInvokeConsumer[RemoteInvokeLogOutput]
_stream_writer: StreamWriter

def consume(self, remote_invoke_response: RemoteInvokeLogOutput) -> None:
self._stream_writer.write(remote_invoke_response.log_output.encode())
self._stream_writer.write_bytes(remote_invoke_response.log_output.encode())
28 changes: 13 additions & 15 deletions samcli/lib/docker/log_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,21 @@ def stream_progress(self, logs: docker.APIClient.logs):
else:
curr_log_line_id = ids[_id]
change_cursor_count = len(ids) - curr_log_line_id
self._stream.write(
self._stream.write_str(
self._cursor_up_formatter.cursor_format(change_cursor_count)
+ self._cursor_left_formatter.cursor_format(),
encode=True,
+ self._cursor_left_formatter.cursor_format()
)

self._stream_write(_id, status, stream, progress, error)

if _id:
self._stream.write(
self._stream.write_str(
self._cursor_down_formatter.cursor_format(change_cursor_count)
+ self._cursor_left_formatter.cursor_format(),
encode=True,
+ self._cursor_left_formatter.cursor_format()
)
self._stream.write(os.linesep, encode=True)
self._stream.write_str(os.linesep)

def _stream_write(self, _id: str, status: str, stream: bytes, progress: str, error: str):
def _stream_write(self, _id: str, status: str, stream: str, progress: str, error: str):
"""
Write stream information to stderr, if the stream information contains a log id,
use the carriage return character to rewrite that particular line.
Expand All @@ -80,14 +78,14 @@ def _stream_write(self, _id: str, status: str, stream: bytes, progress: str, err

# NOTE(sriram-mv): Required for the purposes of when the cursor overflows existing terminal buffer.
if not stream:
self._stream.write(os.linesep, encode=True)
self._stream.write(
self._cursor_up_formatter.cursor_format() + self._cursor_left_formatter.cursor_format(), encode=True
self._stream.write_str(os.linesep)
self._stream.write_str(
self._cursor_up_formatter.cursor_format() + self._cursor_left_formatter.cursor_format()
)
self._stream.write(self._cursor_clear_formatter.cursor_format(), encode=True)
self._stream.write_str(self._cursor_clear_formatter.cursor_format())

if not _id:
self._stream.write(stream, encode=True)
self._stream.write(status, encode=True)
self._stream.write_str(stream)
self._stream.write_str(status)
else:
self._stream.write(f"\r{_id}: {status} {progress}", encode=True)
self._stream.write_str(f"\r{_id}: {status} {progress}")
2 changes: 1 addition & 1 deletion samcli/lib/package/s3_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,4 @@ def on_progress(self, bytes_transferred, **kwargs):
)
sys.stderr.flush()
if int(percentage) == 100: # noqa: PLR2004
sys.stderr.write("\n")
sys.stderr.write(os.linesep)
13 changes: 9 additions & 4 deletions samcli/lib/utils/osutils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Common OS utilities
"""
import io
import logging
import os
import shutil
Expand Down Expand Up @@ -78,7 +79,7 @@ def rmtree_if_exists(path: Union[str, Path]):
shutil.rmtree(path_obj)


def stdout():
def stdout() -> io.TextIOWrapper:
"""
Returns the stdout as a byte stream in a Py2/PY3 compatible manner
Expand All @@ -87,10 +88,12 @@ def stdout():
io.BytesIO
Byte stream of Stdout
"""
return sys.stdout.buffer
# Note(jfuss): sys.stdout is a type typing.TextIO but are initialized to
# io.TextIOWrapper. To make mypy and typing play well, tell mypy to ignore.
return sys.stdout # type:ignore[return-value]


def stderr():
def stderr() -> io.TextIOWrapper:
"""
Returns the stderr as a byte stream in a Py2/PY3 compatible manner
Expand All @@ -99,7 +102,9 @@ def stderr():
io.BytesIO
Byte stream of stderr
"""
return sys.stderr.buffer
# Note(jfuss): sys.stderr is a type typing.TextIO but are initialized to
# io.TextIOWrapper. To make mypy and typing play well, tell mypy to ignore.
return sys.stderr # type:ignore[return-value]


def remove(path):
Expand Down
21 changes: 18 additions & 3 deletions samcli/lib/utils/stream_writer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""
This class acts like a wrapper around output streams to provide any flexibility with output we need
"""
from typing import Union


class StreamWriter:
def __init__(self, stream, auto_flush=False):
def __init__(self, stream, auto_flush: bool = False):
"""
Instatiates new StreamWriter to the specified stream
Expand All @@ -22,7 +23,7 @@ def __init__(self, stream, auto_flush=False):
def stream(self):
return self._stream

def write(self, output, encode=False):
def write_bytes(self, output: Union[bytes, bytearray]):
"""
Writes specified text to the underlying stream
Expand All @@ -31,7 +32,21 @@ def write(self, output, encode=False):
output bytes-like object
Bytes to write
"""
self._stream.write(output.encode() if encode else output)
self._stream.buffer.write(output)

if self._auto_flush:
self._stream.flush()

def write_str(self, output: str):
"""
Writes specified text to the underlying stream
Parameters
----------
output bytes-like object
Bytes to write
"""
self._stream.write(output)

if self._auto_flush:
self._stream.flush()
Expand Down
4 changes: 2 additions & 2 deletions samcli/lib/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def default_loading_pattern(stream_writer: Optional[StreamWriter] = None, loadin
How frequently to generate the pattern
"""
stream_writer = stream_writer or StreamWriter(sys.stderr)
stream_writer.write(".")
stream_writer.write_str(".")
stream_writer.flush()
sleep(loading_pattern_rate)

Expand Down Expand Up @@ -96,7 +96,7 @@ def _print_loading_pattern():
return_code = process.wait()
keep_printing = False

stream_writer.write(os.linesep)
stream_writer.write_str(os.linesep)
stream_writer.flush()
process_stderr = _check_and_convert_stream_to_string(process.stderr)

Expand Down
47 changes: 38 additions & 9 deletions samcli/local/docker/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Representation of a generic Docker container
"""
import io
import json
import logging
import os
import pathlib
Expand All @@ -9,14 +11,15 @@
import tempfile
import threading
import time
from typing import Optional
from typing import Iterator, Optional, Tuple, Union

import docker
import requests
from docker.errors import NotFound as DockerNetworkNotFound

from samcli.lib.constants import DOCKER_MIN_API_VERSION
from samcli.lib.utils.retry import retry
from samcli.lib.utils.stream_writer import StreamWriter
from samcli.lib.utils.tar import extract_tarfile
from samcli.local.docker.effective_user import ROOT_USER_ID, EffectiveUser

Expand Down Expand Up @@ -314,7 +317,7 @@ def start(self, input_data=None):
real_container.start()

@retry(exc=requests.exceptions.RequestException, exc_raise=ContainerResponseException)
def wait_for_http_response(self, name, event, stdout):
def wait_for_http_response(self, name, event, stdout) -> str:
# TODO(sriram-mv): `aws-lambda-rie` is in a mode where the function_name is always "function"
# NOTE(sriram-mv): There is a connection timeout set on the http call to `aws-lambda-rie`, however there is not
# a read time out for the response received from the server.
Expand All @@ -324,7 +327,7 @@ def wait_for_http_response(self, name, event, stdout):
data=event.encode("utf-8"),
timeout=(self.RAPID_CONNECTION_TIMEOUT, None),
)
stdout.write(resp.content)
return json.dumps(json.loads(resp.content), ensure_ascii=False)

def wait_for_result(self, full_path, event, stdout, stderr, start_timer=None):
# NOTE(sriram-mv): Let logging happen in its own thread, so that a http request can be sent.
Expand All @@ -344,11 +347,21 @@ def wait_for_result(self, full_path, event, stdout, stderr, start_timer=None):
# start the timer for function timeout right before executing the function, as waiting for the socket
# can take some time
timer = start_timer() if start_timer else None
self.wait_for_http_response(full_path, event, stdout)
response = self.wait_for_http_response(full_path, event, stdout)
if timer:
timer.cancel()

def wait_for_logs(self, stdout=None, stderr=None):
# NOTE(jfuss): Adding a sleep after we get a response from the contianer but before we
# we write the response to ensure the last thing written to stdout is the container response
time.sleep(1)
stdout.write_str(response)
stdout.flush()

def wait_for_logs(
self,
stdout: Optional[Union[StreamWriter, io.BytesIO, io.TextIOWrapper]] = None,
stderr: Optional[Union[StreamWriter, io.BytesIO, io.TextIOWrapper]] = None,
):
# Return instantly if we don't have to fetch any logs
if not stdout and not stderr:
return
Expand All @@ -360,7 +373,6 @@ def wait_for_logs(self, stdout=None, stderr=None):

# Fetch both stdout and stderr streams from Docker as a single iterator.
logs_itr = real_container.attach(stream=True, logs=True, demux=True)

self._write_container_output(logs_itr, stdout=stdout, stderr=stderr)

def _wait_for_socket_connection(self) -> None:
Expand Down Expand Up @@ -411,7 +423,11 @@ def copy(self, from_container_path, to_host_path) -> None:
extract_tarfile(file_obj=fp, unpack_dir=to_host_path)

@staticmethod
def _write_container_output(output_itr, stdout=None, stderr=None):
def _write_container_output(
output_itr: Iterator[Tuple[bytes, bytes]],
stdout: Optional[Union[StreamWriter, io.BytesIO, io.TextIOWrapper]] = None,
stderr: Optional[Union[StreamWriter, io.BytesIO, io.TextIOWrapper]] = None,
):
"""
Based on the data returned from the Container output, via the iterator, write it to the appropriate streams
Expand All @@ -430,13 +446,26 @@ def _write_container_output(output_itr, stdout=None, stderr=None):
# Iterator returns a tuple of (stdout, stderr)
for stdout_data, stderr_data in output_itr:
if stdout_data and stdout:
stdout.write(stdout_data)
Container._handle_data_writing(stdout, stdout_data)

if stderr_data and stderr:
stderr.write(stderr_data)
Container._handle_data_writing(stderr, stderr_data)

except Exception as ex:
LOG.debug("Failed to get the logs from the container", exc_info=ex)

@staticmethod
def _handle_data_writing(output_stream: Union[StreamWriter, io.BytesIO, io.TextIOWrapper], output_data: bytes):
if isinstance(output_stream, StreamWriter):
output_stream.write_bytes(output_data)
output_stream.flush()

if isinstance(output_stream, io.BytesIO):
output_stream.write(output_data)

if isinstance(output_stream, io.TextIOWrapper):
output_stream.buffer.write(output_data)

@property
def network_id(self):
"""
Expand Down
11 changes: 6 additions & 5 deletions samcli/local/docker/lambda_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import hashlib
import logging
import os
import platform
import re
import sys
Expand Down Expand Up @@ -226,7 +227,7 @@ def build(self, runtime, packagetype, image, layers, architecture, stream=None,
or not runtime
):
stream_writer = stream or StreamWriter(sys.stderr)
stream_writer.write("Building image...")
stream_writer.write_str("Building image...")
stream_writer.flush()
self._build_image(
image if image else base_image, rapid_image, downloaded_layers, architecture, stream=stream_writer
Expand Down Expand Up @@ -337,15 +338,15 @@ def set_item_permission(tar_info):
platform=get_docker_platform(architecture),
)
for log in resp_stream:
stream_writer.write(".")
stream_writer.write_str(".")
stream_writer.flush()
if "error" in log:
stream_writer.write("\n")
stream_writer.write_str(os.linesep)
LOG.exception("Failed to build Docker Image")
raise ImageBuildException("Error building docker image: {}".format(log["error"]))
stream_writer.write("\n")
stream_writer.write_str(os.linesep)
except (docker.errors.BuildError, docker.errors.APIError) as ex:
stream_writer.write("\n")
stream_writer.write_str(os.linesep)
LOG.exception("Failed to build Docker Image")
raise ImageBuildException("Building Image failed.") from ex
finally:
Expand Down
6 changes: 3 additions & 3 deletions samcli/local/docker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,16 @@ def pull_image(self, image_name, tag=None, stream=None):
raise DockerImagePullFailedException(str(ex)) from ex

# io streams, especially StringIO, work only with unicode strings
stream_writer.write("\nFetching {}:{} Docker container image...".format(image_name, tag))
stream_writer.write_str("\nFetching {}:{} Docker container image...".format(image_name, tag))

# Each line contains information on progress of the pull. Each line is a JSON string
for _ in result_itr:
# For every line, print a dot to show progress
stream_writer.write(".")
stream_writer.write_str(".")
stream_writer.flush()

# We are done. Go to the next line
stream_writer.write("\n")
stream_writer.write_str("\n")

def has_image(self, image_name):
"""
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/local/invoke/test_integrations_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,27 @@ def test_invoke_returns_expected_result_when_no_event_given(self):
self.assertEqual(process.returncode, 0)
self.assertEqual("{}", process_stdout.decode("utf-8"))

@pytest.mark.flaky(reruns=3)
def test_invoke_returns_utf8(self):
command_list = InvokeIntegBase.get_command_list(
"EchoEventFunction", template_path=self.template_path, event_path=self.event_utf8_path
)

process = Popen(command_list, stdout=PIPE)
try:
stdout, _ = process.communicate(timeout=TIMEOUT)
except TimeoutExpired:
process.kill()
raise

process_stdout = stdout.strip()

with open(self.event_utf8_path) as f:
expected_output = json.dumps(json.load(f), ensure_ascii=False)

self.assertEqual(process.returncode, 0)
self.assertEqual(expected_output, process_stdout.decode("utf-8"))

@pytest.mark.flaky(reruns=3)
def test_invoke_with_env_using_parameters(self):
command_list = InvokeIntegBase.get_command_list(
Expand Down
Loading

0 comments on commit d5ce6d5

Please sign in to comment.