diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index fc1850cc..04b144eb 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -16,7 +16,7 @@ import threading from asyncio import BaseEventLoop from logging import LogRecord -from typing import Optional +from typing import Optional, List import grpc @@ -79,8 +79,8 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1. self._sync_tp_max_workers: int = self._get_sync_tp_max_workers() self._sync_call_tp: concurrent.futures.Executor = ( - concurrent.futures.ThreadPoolExecutor( - max_workers=self._sync_tp_max_workers)) + self._create_sync_call_tp(self._sync_tp_max_workers) + ) self._grpc_connect_timeout: float = grpc_connect_timeout # This is set to -1 by default to remove the limitation on msg size @@ -97,9 +97,7 @@ async def connect(cls, host: str, port: int, worker_id: str, disp = cls(loop, host, port, worker_id, request_id, connect_timeout) disp._grpc_thread.start() await disp._grpc_connected_fut - logger.info('Successfully opened gRPC channel to %s:%s ' - 'with sync threadpool max workers set to %s', - host, port, disp._sync_tp_max_workers) + logger.info('Successfully opened gRPC channel to %s:%s ', host, port) return disp async def dispatch_forever(self): @@ -161,9 +159,7 @@ def stop(self) -> None: self._grpc_thread.join() self._grpc_thread = None - if self._sync_call_tp is not None: - self._sync_call_tp.shutdown() - self._sync_call_tp = None + self._stop_sync_call_tp() def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None: if record.levelno >= logging.CRITICAL: @@ -318,11 +314,19 @@ async def _handle__invocation_request(self, req): fi: functions.FunctionInfo = self._functions.get_function( function_id) - logger.info(f'Received FunctionInvocationRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id}, ' - f'invocation ID: {invocation_id}, ' - f'function type: {"async" if fi.is_async else "sync"}') + function_invocation_logs: List[str] = [ + 'Received FunctionInvocationRequest', + f'request ID: {self.request_id}', + f'function ID: {function_id}', + f'invocation ID: {invocation_id}', + f'function type: {"async" if fi.is_async else "sync"}' + ] + if not fi.is_async: + function_invocation_logs.append( + f'sync threadpool max workers: {self._sync_tp_max_workers}' + ) + logger.info(', '.join(function_invocation_logs)) + args = {} for pb in invoc_request.input_data: pb_type_info = fi.input_types[pb.name] @@ -426,6 +430,13 @@ async def _handle__function_environment_reload_request(self, req): for var in env_vars: os.environ[var] = env_vars[var] + # Apply PYTHON_THREADPOOL_THREAD_COUNT + self._stop_sync_call_tp() + self._sync_tp_max_workers = self._get_sync_tp_max_workers() + self._sync_call_tp = ( + self._create_sync_call_tp(self._sync_tp_max_workers) + ) + # Reload package namespaces for customer's libraries packages_to_reload = ['azure', 'google'] for p in packages_to_reload: @@ -479,6 +490,15 @@ def _change_cwd(self, new_cwd: str): else: logger.warning('Directory %s is not found when reloading', new_cwd) + def _stop_sync_call_tp(self): + """Deallocate the current synchronous thread pool and assign + self._sync_call_tp to None. If the thread pool does not exist, + this will be a no op. + """ + if getattr(self, '_sync_call_tp', None): + self._sync_call_tp.shutdown() + self._sync_call_tp = None + def _get_sync_tp_max_workers(self) -> int: def tp_max_workers_validator(value: str) -> bool: try: @@ -501,6 +521,17 @@ def tp_max_workers_validator(value: str) -> bool: default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}', validator=tp_max_workers_validator)) + def _create_sync_call_tp( + self, max_worker: int) -> concurrent.futures.Executor: + """Create a thread pool executor with max_worker. This is a wrapper + over ThreadPoolExecutor constructor. Consider calling this method after + _stop_sync_call_tp() to ensure only 1 synchronous thread pool is + running. + """ + return concurrent.futures.ThreadPoolExecutor( + max_workers=max_worker + ) + def __run_sync_func(self, invocation_id, func, params): # This helper exists because we need to access the current # invocation_id from ThreadPoolExecutor's threads. diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index 5a903bf8..a965b5cf 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -408,6 +408,28 @@ async def invoke_function( return invocation_id, r + async def reload_environment( + self, + environment: typing.Dict[str, str], + function_project_path: str = '/home/site/wwwroot' + ) -> protos.FunctionEnvironmentReloadResponse: + + request_content = protos.FunctionEnvironmentReloadRequest( + function_app_directory=function_project_path, + environment_variables={ + k.encode(): v.encode() for k, v in environment.items() + } + ) + + r = await self.communicate( + protos.StreamingMessage( + function_environment_reload_request=request_content + ), + wait_for='function_environment_reload_response' + ) + + return r + async def send(self, message): self._in_queue.put_nowait((message, None)) @@ -453,12 +475,12 @@ def _read_available_functions(self): class _MockWebHostController: - def __init__(self, scripts_dir): - self._host = None - self._scripts_dir = scripts_dir - self._worker = None + def __init__(self, scripts_dir: pathlib.PurePath): + self._host: typing.Optional[_MockWebHost] = None + self._scripts_dir: pathlib.PurePath = scripts_dir + self._worker: typing.Optional[dispatcher.Dispatcher] = None - async def __aenter__(self): + async def __aenter__(self) -> _MockWebHost: loop = aio_compat.get_running_loop() self._host = _MockWebHost(loop, self._scripts_dir) diff --git a/tests/unittests/dispatcher_functions/show_context_async/__init__.py b/tests/unittests/dispatcher_functions/show_context_async/__init__.py new file mode 100644 index 00000000..da61ce16 --- /dev/null +++ b/tests/unittests/dispatcher_functions/show_context_async/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +import azure.functions as func + + +async def main(req: func.HttpRequest, + context: func.Context) -> func.HttpResponse: + result = { + 'function_directory': context.function_directory, + 'function_name': context.function_name + } + return func.HttpResponse(body=json.dumps(result), + mimetype='application/json') diff --git a/tests/unittests/dispatcher_functions/show_context_async/function.json b/tests/unittests/dispatcher_functions/show_context_async/function.json new file mode 100644 index 00000000..7239e0fc --- /dev/null +++ b/tests/unittests/dispatcher_functions/show_context_async/function.json @@ -0,0 +1,15 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "type": "httpTrigger", + "direction": "in", + "name": "req" + }, + { + "type": "http", + "direction": "out", + "name": "$return" + } + ] +} \ No newline at end of file diff --git a/tests/unittests/test_dispatcher.py b/tests/unittests/test_dispatcher.py index f3893d17..df259353 100644 --- a/tests/unittests/test_dispatcher.py +++ b/tests/unittests/test_dispatcher.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from typing import Tuple import os from unittest.mock import patch from azure_functions_worker import protos @@ -26,6 +27,10 @@ async def test_dispatcher_sync_threadpool_default_worker(self): async with ctrl as host: await self._check_if_function_is_ok(host) self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) async def test_dispatcher_sync_threadpool_set_worker(self): """Test if the sync threadpool maximum worker can be set @@ -37,63 +42,309 @@ async def test_dispatcher_sync_threadpool_set_worker(self): async with ctrl as host: await self._check_if_function_is_ok(host) self.assertEqual(ctrl._worker._sync_tp_max_workers, 5) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_invalid_worker_count(self, - mock_logger): + async def test_dispatcher_sync_threadpool_invalid_worker_count(self): """Test when sync threadpool maximum worker is set to an invalid value, the host should fallback to default value 1 """ - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'}) + # The @patch decorator does not work as expected and will suppress + # any assertion failures in the async test cases. + # Thus we're moving the patch() method to use the with syntax + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + + async def test_dispatcher_sync_threadpool_below_min_setting(self): + """Test if the sync threadpool will pick up default value when the + setting is below minimum + """ + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_exceed_max_setting(self): + """Test if the sync threadpool will pick up default value when the + setting is above maximum + """ + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + # Configure thread pool max worker to an invalid value + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'}) + ctrl = testutils.start_mockhost( + script_root=self.dispatcher_funcs_dir) + + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_in_placeholder(self): + """Test if the sync threadpool will pick up app setting in placeholder + mode (Linux Consumption) + """ + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) async with ctrl as host: await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '3' + }) + + # Ensure the dispatcher sync threadpool should update to 3 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 3) + self.assertIsNotNone(ctrl._worker._sync_call_tp) - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + async def test_dispatcher_sync_threadpool_in_placeholder_invalid(self): + """Test if the sync threadpool will use the default setting when the + app setting is invalid + """ - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_below_min_setting(self, - mock_logger): - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'}) ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - async with ctrl as host: - await self._check_if_function_is_ok(host) + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: 'invalid' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer') + + async def test_dispatcher_sync_threadpool_in_placeholder_above_max(self): + """Test if the sync threadpool will use the default setting when the + app setting is above maximum + """ - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between ' - '1 and 32') - - @patch('azure_functions_worker.dispatcher.logger') - async def test_dispatcher_sync_threadpool_exceed_max_setting( - self, - mock_logger - ): - # Configure thread pool max worker to an invalid value - os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'}) ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - async with ctrl as host: - await self._check_if_function_is_ok(host) + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) - # Ensure the dispatcher sync threadpool should fallback to 1 - self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '33' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_dispatcher_sync_threadpool_in_placeholder_below_min(self): + """Test if the sync threadpool will use the default setting when the + app setting is below minimum + """ + + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await self._check_if_function_is_ok(host) + + # Reload environment variable on specialization + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '0' + }) + + # Ensure the dispatcher sync threadpool should fallback to 1 + self.assertEqual(ctrl._worker._sync_tp_max_workers, 1) + self.assertIsNotNone(ctrl._worker._sync_call_tp) + + # Check if the dispatcher still function + await self._check_if_function_is_ok(host) + + # Check warning message + mock_logger.warning.assert_any_call( + f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value ' + 'between 1 and 32') + + async def test_sync_invocation_request_log(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) - mock_logger.warning.assert_any_call( - f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between ' - '1 and 32') + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) - async def _check_if_function_is_ok(self, host): + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 1' + ) + + async def test_async_invocation_request_log(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def test_sync_invocation_request_log_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'}) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 5' + ) + + async def test_async_invocation_request_log_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '4'}) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def test_sync_invocation_request_log_in_placeholder_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '5' + }) + + request_id: str = ctrl._worker._request_id + func_id, invoke_id = await self._check_if_function_is_ok(host) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: sync, ' + 'sync threadpool max workers: 5' + ) + + async def test_async_invocation_request_log_in_placeholder_threads(self): + ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir) + + with patch('azure_functions_worker.dispatcher.logger') as mock_logger: + async with ctrl as host: + await host.reload_environment(environment={ + PYTHON_THREADPOOL_THREAD_COUNT: '5' + }) + + request_id: str = ctrl._worker._request_id + func_id, invoke_id = ( + await self._check_if_async_function_is_ok(host) + ) + + mock_logger.info.assert_any_call( + 'Received FunctionInvocationRequest, ' + f'request ID: {request_id}, ' + f'function ID: {func_id}, ' + f'invocation ID: {invoke_id}, ' + 'function type: async' + ) + + async def _check_if_function_is_ok(self, host) -> Tuple[str, str]: # Ensure the function can be properly loaded func_id, load_r = await host.load_function('show_context') self.assertEqual(load_r.response.function_id, func_id) @@ -115,3 +366,30 @@ async def _check_if_function_is_ok(self, host): self.assertIsNotNone(invoke_id) self.assertEqual(call_r.response.result.status, protos.StatusResult.Success) + + return (func_id, invoke_id) + + async def _check_if_async_function_is_ok(self, host) -> Tuple[str, str]: + # Ensure the function can be properly loaded + func_id, load_r = await host.load_function('show_context_async') + self.assertEqual(load_r.response.function_id, func_id) + self.assertEqual(load_r.response.result.status, + protos.StatusResult.Success) + + # Ensure the function can be properly invoked + invoke_id, call_r = await host.invoke_function( + 'show_context_async', [ + protos.ParameterBinding( + name='req', + data=protos.TypedData( + http=protos.RpcHttp( + method='GET' + ) + ) + ) + ]) + self.assertIsNotNone(invoke_id) + self.assertEqual(call_r.response.result.status, + protos.StatusResult.Success) + + return (func_id, invoke_id)