From 5a0093b5c5c1c1d6897918b46a6959e32179aaba Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Mon, 2 Nov 2020 17:45:39 -0800 Subject: [PATCH 1/8] Move thread count log to function invocation --- azure_functions_worker/dispatcher.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index fc1850cc..2f53a039 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -16,7 +16,7 @@ import threading from asyncio import BaseEventLoop from logging import LogRecord -from typing import Optional +from typing import Optional, List import grpc @@ -97,9 +97,7 @@ async def connect(cls, host: str, port: int, worker_id: str, disp = cls(loop, host, port, worker_id, request_id, connect_timeout) disp._grpc_thread.start() await disp._grpc_connected_fut - logger.info('Successfully opened gRPC channel to %s:%s ' - 'with sync threadpool max workers set to %s', - host, port, disp._sync_tp_max_workers) + logger.info('Successfully opened gRPC channel to %s:%s ', host, port) return disp async def dispatch_forever(self): @@ -318,11 +316,19 @@ async def _handle__invocation_request(self, req): fi: functions.FunctionInfo = self._functions.get_function( function_id) - logger.info(f'Received FunctionInvocationRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id}, ' - f'invocation ID: {invocation_id}, ' - f'function type: {"async" if fi.is_async else "sync"}') + function_invocation_logs: List[str] = [ + f'Received FunctionInvocationRequest', + f'request ID: {self.request_id}', + f'function ID: {function_id}', + f'invocation ID: {invocation_id}', + f'function type: {"async" if fi.is_async else "sync"}' + ] + if not fi.is_async: + function_invocation_logs.append( + f'sync threadpool max workers: {self._sync_tp_max_workers}' + ) + logger.info(', '.join(function_invocation_logs)) + args = {} for pb in invoc_request.input_data: pb_type_info = fi.input_types[pb.name] From 8d91bdc79f587b50803273b6fe84fe662d5e1057 Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Tue, 3 Nov 2020 09:19:11 -0800 Subject: [PATCH 2/8] Also recreate sync_call_tp in Linux Consumption specialization --- azure_functions_worker/dispatcher.py | 29 ++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 2f53a039..c737d6e1 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -78,9 +78,8 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, # We allow the customer to change synchronous thread pool count by # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1. self._sync_tp_max_workers: int = self._get_sync_tp_max_workers() - self._sync_call_tp: concurrent.futures.Executor = ( - concurrent.futures.ThreadPoolExecutor( - max_workers=self._sync_tp_max_workers)) + self._sync_call_tp: Optional[concurrent.futures.Executor] = None + self._create_or_update_sync_call_tp(self._sync_tp_max_workers) self._grpc_connect_timeout: float = grpc_connect_timeout # This is set to -1 by default to remove the limitation on msg size @@ -159,9 +158,7 @@ def stop(self) -> None: self._grpc_thread.join() self._grpc_thread = None - if self._sync_call_tp is not None: - self._sync_call_tp.shutdown() - self._sync_call_tp = None + self._stop_sync_call_tp() def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None: if record.levelno >= logging.CRITICAL: @@ -432,6 +429,10 @@ async def _handle__function_environment_reload_request(self, req): for var in env_vars: os.environ[var] = env_vars[var] + # Apply PYTHON_THREADPOOL_THREAD_COUNT + sync_tp_max_worker: int = self._get_sync_tp_max_workers() + self._create_or_update_sync_call_tp(sync_tp_max_worker) + # Reload package namespaces for customer's libraries packages_to_reload = ['azure', 'google'] for p in packages_to_reload: @@ -485,6 +486,14 @@ def _change_cwd(self, new_cwd: str): else: logger.warning('Directory %s is not found when reloading', new_cwd) + def _stop_sync_call_tp(self): + """Deallocate the current synchronous thread pool. If the thread pool + does not exist, this will be a no op. + """ + if self._sync_call_tp is not None: + self._sync_call_tp.shutdown() + self._sync_call_tp = None + def _get_sync_tp_max_workers(self) -> int: def tp_max_workers_validator(value: str) -> bool: try: @@ -507,6 +516,14 @@ def tp_max_workers_validator(value: str) -> bool: default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}', validator=tp_max_workers_validator)) + def _create_or_update_sync_call_tp(self, max_worker: int): + """Update the self._sync_call_tp executor with the proper max_worker. + """ + self._stop_sync_call_tp() + self._sync_call_tp = concurrent.futures.ThreadPoolExecutor( + max_workers=max_worker + ) + def __run_sync_func(self, invocation_id, func, params): # This helper exists because we need to access the current # invocation_id from ThreadPoolExecutor's threads. From cc76962aef341a7c63c1cd10cec3bec489efafad Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Tue, 3 Nov 2020 10:45:46 -0800 Subject: [PATCH 3/8] Add unit tests for Linux Consumption scenario --- azure_functions_worker/dispatcher.py | 10 +- azure_functions_worker/testutils.py | 32 ++++- tests/unittests/test_dispatcher.py | 207 ++++++++++++++++++++++----- 3 files changed, 206 insertions(+), 43 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index c737d6e1..2029298c 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -77,9 +77,11 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, # We allow the customer to change synchronous thread pool count by # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1. - self._sync_tp_max_workers: int = self._get_sync_tp_max_workers() + self._sync_tp_max_workers: int = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT self._sync_call_tp: Optional[concurrent.futures.Executor] = None - self._create_or_update_sync_call_tp(self._sync_tp_max_workers) + + sync_tp_max_worker: int = self._get_sync_tp_max_workers() + self._create_or_update_sync_call_tp(sync_tp_max_worker) self._grpc_connect_timeout: float = grpc_connect_timeout # This is set to -1 by default to remove the limitation on msg size @@ -518,8 +520,12 @@ def tp_max_workers_validator(value: str) -> bool: def _create_or_update_sync_call_tp(self, max_worker: int): """Update the self._sync_call_tp executor with the proper max_worker. + Side effects: + - Update self._sync_tp_max_workers to max_worker + - Update self._sync_call_tp to have max_worker number of threads """ self._stop_sync_call_tp() + self._sync_tp_max_workers = max_worker self._sync_call_tp = concurrent.futures.ThreadPoolExecutor( max_workers=max_worker ) diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index 5a903bf8..a965b5cf 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -408,6 +408,28 @@ async def invoke_function( return invocation_id, r + async def reload_environment( + self, + environment: typing.Dict[str, str], + function_project_path: str = '/home/site/wwwroot' + ) -> protos.FunctionEnvironmentReloadResponse: + + request_content = protos.FunctionEnvironmentReloadRequest( + function_app_directory=function_project_path, + environment_variables={ + k.encode(): v.encode() for k, v in environment.items() + } + ) + + r = await self.communicate( + protos.StreamingMessage( + function_environment_reload_request=request_content + ), + wait_for='function_environment_reload_response' + ) + + return r + async def send(self, message): self._in_queue.put_nowait((message, None)) @@ -453,12 +475,12 @@ def _read_available_functions(self): class _MockWebHostController: - def __init__(self, scripts_dir): - self._host = None - self._scripts_dir = scripts_dir - self._worker = None + def __init__(self, scripts_dir: pathlib.PurePath): + self._host: typing.Optional[_MockWebHost] = None + self._scripts_dir: pathlib.PurePath = scripts_dir + self._worker: typing.Optional[dispatcher.Dispatcher] = None - async def __aenter__(self): + async def __aenter__(self) -> _MockWebHost: loop = aio_compat.get_running_loop() self._host = _MockWebHost(loop, self._scripts_dir) diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index f3893d17..16769536 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -26,6 +26,10 @@ async def test_dispatcher_sync_threadpool_default_worker(self): async with ctrl as host: await self._check_if_function_is_ok(host) self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) async def test_dispatcher_sync_threadpool_set_worker(self): """Test if the sync threadpool maximum worker can be set @@ -37,61 +41,192 @@ async def test_dispatcher_sync_threadpool_set_worker(self): async with ctrl as host: await self._check_if_function_is_ok(host) self.assertEqual(ctrl._worker._sync_tp_max_workers, 5) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_invalid_worker_count(self, - mock_logger): + async def test_dispatcher_sync_threadpool_invalid_worker_count(self): """Test when sync threadpool maximum worker is set to an invalid value, the host should fallback to default value 1 """ - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'}) + # The @patch decorator does not work as expected and will suppress + # any assertion failures in the async test cases. + # Thus we're moving the patch() method to use the with syntax + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + + async def test_dispatcher_sync_threadpool_below_min_setting(self): + """Test if the sync threadpool will pick up default value when the + setting is below minimum + """ + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_exceed_max_setting(self): + """Test if the sync threadpool will pick up default value when the + setting is above maximum + """ + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_in_placeholder(self): + """Test if the sync threadpool will pick up app setting in placeholder + mode (Linux Consumption) + """ + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) async with ctrl as host: await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '3' + }) - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + # Ensure the dispatcher sync threadpool should update to 3 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 3) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + async def test_dispatcher_sync_threadpool_in_placeholder_invalid(self): + """Test if the sync threadpool will use the default setting when the + app setting is invalid + """ - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_below_min_setting(self, - mock_logger): - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'}) ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - async with ctrl as host: - await self._check_if_function_is_ok(host) + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: 'invalid' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + + async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): + """Test if the sync threadpool will use the default setting when the + app setting is above maximum + """ - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between ' - '1 and 32') - - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_exceed_max_setting( - self, - mock_logger - ): - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'}) ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - async with ctrl as host: - await self._check_if_function_is_ok(host) + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '33' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_in_placeholder_below_min(self): + """Test if the sync threadpool will use the default setting when the + app setting is below minimum + """ + + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '0' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between ' - '1 and 32') + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') async def _check_if_function_is_ok(self, host): # Ensure the function can be properly loaded From 746b0893a45e0b3e0dedd5908a0f9e022c38e29c Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Tue, 3 Nov 2020 10:49:58 -0800 Subject: [PATCH 4/8] Fix flake8 --- azure_functions_worker/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 2029298c..448bf031 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -316,7 +316,7 @@ async def _handle__invocation_request(self, req): function_id) function_invocation_logs: List[str] = [ - f'Received FunctionInvocationRequest', + 'Received FunctionInvocationRequest', f'request ID: {self.request_id}', f'function ID: {function_id}', f'invocation ID: {invocation_id}', From 8d2aca87ce8eb690e2940d8535c2bd5d040171f7 Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Thu, 5 Nov 2020 13:45:24 -0800 Subject: [PATCH 5/8] Add unittests for invocation logs --- .../show_context_async/__init__.py | 14 ++ .../show_context_async/function.json | 15 ++ tests/unittests/test_dispatcher.py | 145 +++++++++++++++++- 3 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 tests/unittests/dispatcher_functions/show_context_async/__init__.py create mode 100644 tests/unittests/dispatcher_functions/show_context_async/function.json diff --git a/tests/unittests/dispatcher_functions/show_context_async/__init__.py b/tests/unittests/dispatcher_functions/show_context_async/__init__.py new file mode 100644 index 00000000..da61ce16 --- /dev/null +++ b/tests/unittests/dispatcher_functions/show_context_async/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +import azure.functions as func + + +async def main(req: func.HttpRequest, + context: func.Context) -> func.HttpResponse: + result = { + 'function_directory': context.function_directory, + 'function_name': context.function_name + } + return func.HttpResponse(body=json.dumps(result), + mimetype='application/json') diff --git a/tests/unittests/dispatcher_functions/show_context_async/function.json b/tests/unittests/dispatcher_functions/show_context_async/function.json new file mode 100644 index 00000000..7239e0fc --- /dev/null +++ b/tests/unittests/dispatcher_functions/show_context_async/function.json @@ -0,0 +1,15 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "type": "httpTrigger", + "direction": "in", + "name": "req" + }, + { + "type": "http", + "direction": "out", + "name": "$return" + } + ] +} \ No newline at end of file diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index 16769536..df259353 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from typing import Tuple import os from unittest.mock import patch from azure_functions_worker import protos @@ -228,7 +229,122 @@ async def test_dispatcher_sync_threadpool_in_placeholder_below_min(self): f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' 'between 1 and 32') - async def _check_if_function_is_ok(self, host): + async def test_sync_invocation_request_log(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 1' + ) + + async def test_async_invocation_request_log(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def test_sync_invocation_request_log_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'}) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 5' + ) + + async def test_async_invocation_request_log_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '4'}) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def test_sync_invocation_request_log_in_placeholder_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '5' + }) + + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 5' + ) + + async def test_async_invocation_request_log_in_placeholder_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '5' + }) + + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def _check_if_function_is_ok(self, host) -> Tuple[str, str]: # Ensure the function can be properly loaded func_id, load_r = await host.load_function('show_context') self.assertEqual(load_r.response.function_id, func_id) @@ -250,3 +366,30 @@ async def _check_if_function_is_ok(self, host): self.assertIsNotNone(invoke_id) self.assertEqual(call_r.response.result.status, protos.StatusResult.Success) + + return (func_id, invoke_id) + + async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: + # Ensure the function can be properly loaded + func_id, load_r = await host.load_function('show_context_async') + self.assertEqual(load_r.response.function_id, func_id) + self.assertEqual(load_r.response.result.status, + protos.StatusResult.Success) + + # Ensure the function can be properly invoked + invoke_id, call_r = await host.invoke_function( + 'show_context_async', [ + protos.ParameterBinding( + name='req', + data=protos.TypedData( + http=protos.RpcHttp( + method='GET' + ) + ) + ) + ]) + self.assertIsNotNone(invoke_id) + self.assertEqual(call_r.response.result.status, + protos.StatusResult.Success) + + return (func_id, invoke_id) From 9c87d38ab769bb943db412cafbad045e9aabc6f3 Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Thu, 5 Nov 2020 16:52:03 -0800 Subject: [PATCH 6/8] Make sync call tp allocation explicit --- azure_functions_worker/dispatcher.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 448bf031..14b9e707 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -77,11 +77,10 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, # We allow the customer to change synchronous thread pool count by # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1. - self._sync_tp_max_workers: int = PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT - self._sync_call_tp: Optional[concurrent.futures.Executor] = None - - sync_tp_max_worker: int = self._get_sync_tp_max_workers() - self._create_or_update_sync_call_tp(sync_tp_max_worker) + self._sync_tp_max_workers: int = self._get_sync_tp_max_workers() + self._sync_call_tp: concurrent.futures.Executor = ( + self._create_sync_call_tp(self._sync_tp_max_workers) + ) self._grpc_connect_timeout: float = grpc_connect_timeout # This is set to -1 by default to remove the limitation on msg size @@ -432,8 +431,10 @@ async def _handle__function_environment_reload_request(self, req): os.environ[var] = env_vars[var] # Apply PYTHON_THREADPOOL_THREAD_COUNT - sync_tp_max_worker: int = self._get_sync_tp_max_workers() - self._create_or_update_sync_call_tp(sync_tp_max_worker) + self._sync_tp_max_workers = self._get_sync_tp_max_workers() + self._sync_call_tp = ( + self._create_sync_call_tp(self._sync_tp_max_workers) + ) # Reload package namespaces for customer's libraries packages_to_reload = ['azure', 'google'] @@ -518,15 +519,12 @@ def tp_max_workers_validator(value: str) -> bool: default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}', validator=tp_max_workers_validator)) - def _create_or_update_sync_call_tp(self, max_worker: int): + def _create_sync_call_tp( + self, max_worker: int) -> concurrent.futures.Executor: """Update the self._sync_call_tp executor with the proper max_worker. - Side effects: - - Update self._sync_tp_max_workers to max_worker - - Update self._sync_call_tp to have max_worker number of threads """ self._stop_sync_call_tp() - self._sync_tp_max_workers = max_worker - self._sync_call_tp = concurrent.futures.ThreadPoolExecutor( + return concurrent.futures.ThreadPoolExecutor( max_workers=max_worker ) From 736ed4f4fc1f734da7a3d09dd3fb9f994d43e16e Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Thu, 5 Nov 2020 17:11:56 -0800 Subject: [PATCH 7/8] Fix deallocation issue --- azure_functions_worker/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 14b9e707..8d6f2921 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -493,7 +493,7 @@ def _stop_sync_call_tp(self): """Deallocate the current synchronous thread pool. If the thread pool does not exist, this will be a no op. """ - if self._sync_call_tp is not None: + if getattr(self, '_sync_call_tp', None): self._sync_call_tp.shutdown() self._sync_call_tp = None From a358a406c8a692bd6917be6c8276be45892cf48e Mon Sep 17 00:00:00 2001 From: "Hanzhang Zeng (Roger)" Date: Fri, 6 Nov 2020 11:48:17 -0800 Subject: [PATCH 8/8] Fix PR issues --- azure_functions_worker/dispatcher.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 8d6f2921..04b144eb 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -431,6 +431,7 @@ async def _handle__function_environment_reload_request(self, req): os.environ[var] = env_vars[var] # Apply PYTHON_THREADPOOL_THREAD_COUNT + self._stop_sync_call_tp() self._sync_tp_max_workers = self._get_sync_tp_max_workers() self._sync_call_tp = ( self._create_sync_call_tp(self._sync_tp_max_workers) @@ -490,8 +491,9 @@ def _change_cwd(self, new_cwd: str): logger.warning('Directory %s is not found when reloading', new_cwd) def _stop_sync_call_tp(self): - """Deallocate the current synchronous thread pool. If the thread pool - does not exist, this will be a no op. + """Deallocate the current synchronous thread pool and assign + self._sync_call_tp to None. If the thread pool does not exist, + this will be a no op. """ if getattr(self, '_sync_call_tp', None): self._sync_call_tp.shutdown() @@ -521,9 +523,11 @@ def tp_max_workers_validator(value: str) -> bool: def _create_sync_call_tp( self, max_worker: int) -> concurrent.futures.Executor: - """Update the self._sync_call_tp executor with the proper max_worker. + """Create a thread pool executor with max_worker. This is a wrapper + over ThreadPoolExecutor constructor. Consider calling this method after + _stop_sync_call_tp() to ensure only 1 synchronous thread pool is + running. """ - self._stop_sync_call_tp() return concurrent.futures.ThreadPoolExecutor( max_workers=max_worker )