Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PYTHON_THREADPOOL_THREAD_COUNT not apply to Linux Conusmption #774

Merged
merged 9 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from asyncio import BaseEventLoop
from logging import LogRecord
from typing import Optional
from typing import Optional, List

import grpc

Expand Down Expand Up @@ -79,8 +79,8 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
self._sync_call_tp: concurrent.futures.Executor = (
concurrent.futures.ThreadPoolExecutor(
max_workers=self._sync_tp_max_workers))
self._create_sync_call_tp(self._sync_tp_max_workers)
)

self._grpc_connect_timeout: float = grpc_connect_timeout
# This is set to -1 by default to remove the limitation on msg size
Expand All @@ -97,9 +97,7 @@ async def connect(cls, host: str, port: int, worker_id: str,
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
disp._grpc_thread.start()
await disp._grpc_connected_fut
logger.info('Successfully opened gRPC channel to %s:%s '
'with sync threadpool max workers set to %s',
host, port, disp._sync_tp_max_workers)
logger.info('Successfully opened gRPC channel to %s:%s ', host, port)
return disp

async def dispatch_forever(self):
Expand Down Expand Up @@ -161,9 +159,7 @@ def stop(self) -> None:
self._grpc_thread.join()
self._grpc_thread = None

if self._sync_call_tp is not None:
self._sync_call_tp.shutdown()
self._sync_call_tp = None
self._stop_sync_call_tp()

def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None:
if record.levelno >= logging.CRITICAL:
Expand Down Expand Up @@ -318,11 +314,19 @@ async def _handle__invocation_request(self, req):
fi: functions.FunctionInfo = self._functions.get_function(
function_id)

logger.info(f'Received FunctionInvocationRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id}, '
f'invocation ID: {invocation_id}, '
f'function type: {"async" if fi.is_async else "sync"}')
function_invocation_logs: List[str] = [
'Received FunctionInvocationRequest',
f'request ID: {self.request_id}',
f'function ID: {function_id}',
f'invocation ID: {invocation_id}',
f'function type: {"async" if fi.is_async else "sync"}'
]
if not fi.is_async:
function_invocation_logs.append(
f'sync threadpool max workers: {self._sync_tp_max_workers}'
)
logger.info(', '.join(function_invocation_logs))

args = {}
for pb in invoc_request.input_data:
pb_type_info = fi.input_types[pb.name]
Expand Down Expand Up @@ -426,6 +430,12 @@ async def _handle__function_environment_reload_request(self, req):
for var in env_vars:
os.environ[var] = env_vars[var]

# Apply PYTHON_THREADPOOL_THREAD_COUNT
self._sync_tp_max_workers = self._get_sync_tp_max_workers()
self._sync_call_tp = (
self._create_sync_call_tp(self._sync_tp_max_workers)
)

# Reload package namespaces for customer's libraries
packages_to_reload = ['azure', 'google']
for p in packages_to_reload:
Expand Down Expand Up @@ -479,6 +489,14 @@ def _change_cwd(self, new_cwd: str):
else:
logger.warning('Directory %s is not found when reloading', new_cwd)

def _stop_sync_call_tp(self):
"""Deallocate the current synchronous thread pool. If the thread pool
does not exist, this will be a no op.
"""
if getattr(self, '_sync_call_tp', None):
self._sync_call_tp.shutdown()
self._sync_call_tp = None

def _get_sync_tp_max_workers(self) -> int:
def tp_max_workers_validator(value: str) -> bool:
try:
Expand All @@ -501,6 +519,15 @@ def tp_max_workers_validator(value: str) -> bool:
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
validator=tp_max_workers_validator))

def _create_sync_call_tp(
self, max_worker: int) -> concurrent.futures.Executor:
"""Update the self._sync_call_tp executor with the proper max_worker.
Hazhzeng marked this conversation as resolved.
Show resolved Hide resolved
"""
self._stop_sync_call_tp()
Hazhzeng marked this conversation as resolved.
Show resolved Hide resolved
return concurrent.futures.ThreadPoolExecutor(
max_workers=max_worker
)

def __run_sync_func(self, invocation_id, func, params):
# This helper exists because we need to access the current
# invocation_id from ThreadPoolExecutor's threads.
Expand Down
32 changes: 27 additions & 5 deletions azure_functions_worker/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,28 @@ async def invoke_function(

return invocation_id, r

async def reload_environment(
self,
environment: typing.Dict[str, str],
function_project_path: str = '/home/site/wwwroot'
) -> protos.FunctionEnvironmentReloadResponse:

request_content = protos.FunctionEnvironmentReloadRequest(
function_app_directory=function_project_path,
environment_variables={
k.encode(): v.encode() for k, v in environment.items()
}
)

r = await self.communicate(
protos.StreamingMessage(
function_environment_reload_request=request_content
),
wait_for='function_environment_reload_response'
)

return r

async def send(self, message):
self._in_queue.put_nowait((message, None))

Expand Down Expand Up @@ -453,12 +475,12 @@ def _read_available_functions(self):

class _MockWebHostController:

def __init__(self, scripts_dir):
self._host = None
self._scripts_dir = scripts_dir
self._worker = None
def __init__(self, scripts_dir: pathlib.PurePath):
self._host: typing.Optional[_MockWebHost] = None
self._scripts_dir: pathlib.PurePath = scripts_dir
self._worker: typing.Optional[dispatcher.Dispatcher] = None

async def __aenter__(self):
async def __aenter__(self) -> _MockWebHost:
loop = aio_compat.get_running_loop()
self._host = _MockWebHost(loop, self._scripts_dir)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import azure.functions as func


async def main(req: func.HttpRequest,
context: func.Context) -> func.HttpResponse:
result = {
'function_directory': context.function_directory,
'function_name': context.function_name
}
return func.HttpResponse(body=json.dumps(result),
mimetype='application/json')
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Loading