-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The auto conversion feature of the Python UDF decorator has a performance problem. #5112
Comments
I have done quite a big of digging and playing around (starting from looking for obvious leaky code, to disabling auto conversion, and then simplifying the UDF, and then bypassing the UDF decorator completely), and finally now believe the 'memory leak' could have something to do with that the default liveness scope mishandles tables created in the global scope. This is ofc only my speculation without diving into the actual implementation. @niloc132 , @rcaudy are the resident experts/creators of liveness scope, and should know if I am talking nonsense here after a quick look at the simple code examples/results below. 1. Wrap @stanbrub 's bench marking script into a function and call it multiple times, no memory leak
2. Run @stanbrub 's script as is multiple times manually in the IDE, almost constant memory leak amount each time, in fact, we don't even need to involve PY UDF, for example, just replace the formula with "Y = 1" in the select op, would render exact same result.
|
Just to be clear, there are multiple issues that can be seen in testing between 0.24.0 and 0.32.0. (Though the regression happens earlier than 0.32.0 and later than 0.24.0, these are the versions that are like what HH is seeing.) Performance:
Memory:
|
@stanbrub Can you confirm that performance degradation for scalars is on the same scale as for arrays? |
The title lists a "memory leak" and a "performance problem". The example in the thread clearly shows a memory leak, but it looks like the rows/sec remains constant. Is there another reproducer of the "performance problem", or is the performance problem just a slowdown that happens as the process runs out of memory? |
Here are some results that show UDF performance regression between 0.24.0 and 0.32.0 for both scalar and array values
|
Here's some more supporting info on the performance regression. I ran some Benchmark UDF tests on >= 0.28.0.
|
No hints, no vectorization
Return hints, vectorizaiton
Numpy type hints for input, vectorization
Pyhton built-in type for input, vectorization
|
TLDR: auto-conversion of args and return value of Py UDF really kills the performanceAuto-conversion disabled in Python, vectorization with type hints; cast needed, no vectorization with no-type-hints, formula is "Y = why(X)"
Auto-conversion disabled in Python, no vectorization at all, formula is "Y = why(X + 1)"
Auto-conversion enabled in Python, vectorization with type hints; cast needed, no vectorization with no-type-hints, formula is "Y = why(X)"
Auto-conversion enabled in Python, no vectorization at all, formula is "Y = why(X + 1)"
Auto-conversion disabled in Java, vectorization with type hints; cast needed, no vectorization with no-type-hints, formula is "Y = why(X)"
Auto-conversion disabled in Java, no vectorization, better performance than it is disabled in Python only, because it doesn't go through the udf decorator
import time
import numpy as np
from deephaven import empty_table, garbage_collect
row_count = 1_000_000
source = empty_table(row_count).update(["X = (int)(ii % 250)"])
def run_test():
# with type-hints
def why(v: int) -> np.int32:
return v
begin_time = time.perf_counter_ns()
for i in range(5):
result = source.select('Y=why(X + 1)')
print("type hints,", 'Rows / Sec:', row_count * 5 / ((time.perf_counter_ns() - begin_time) / 1_000_000_000))
print(result.columns[0].data_type)
# try to restore the worker to the same state
result = None
for i in range(5):
garbage_collect()
time.sleep(0.1)
time.sleep(5)
# without type-hints
def why(v):
return v
begin_time = time.perf_counter_ns()
for i in range(5):
# result = source.select('Y=why(X)').select("Y = (int)Y")
result = source.select("Y = (int)why(X + 1)")
print("no type hints,", 'Rows / Sec:', row_count * 5 / ((time.perf_counter_ns() - begin_time) / 1_000_000_000))
print(result.columns[0].data_type)
result = None
for i in range(5):
garbage_collect()
run_test() |
I'm trying to get my head around the relevant changes that lead to the problem. Am I correct that these are the key files? |
Array input - vectorization for type-hints, no vectorization for no-type-hints
Array input - no vectorization for either
|
Experiments to speed up the auto-conversionScalar input/output, auto-conversion on, removed: null/Optional check
Scalar input/output, auto-conversion on, removed: null/Optional check, numpy scalar support in input
Array input, auto-conversion on, removed: null/Optional check
Array input, auto-conversion on, removed: null/Optional check, lookup function call replaced with the use of map
import time
import numpy as np
from deephaven import empty_table, garbage_collect
row_count = 10_000_000
source = empty_table(row_count).update(["X = (int)(ii % 1_000_000)", "Y = ii"]).group_by("X")
def run_test():
# with type-hints
def why(v: np.ndarray[np.int64]) -> np.float64:
return np.average(v)
begin_time = time.perf_counter_ns()
for i in range(5):
result = source.select("Z = why(Y)")
print("type hints,", 'Rows / Sec:', row_count * 5 /10/ ((time.perf_counter_ns() - begin_time) / 1_000_000_000))
print(result.columns[0].data_type)
# try to restore the worker to the same state
result = None
for i in range(5):
garbage_collect()
time.sleep(0.1)
time.sleep(5)
# without type-hints
def why(v):
v = np.frombuffer(v)
return np.average(v)
begin_time = time.perf_counter_ns()
for i in range(5):
result = source.select("Z = (double) why(Y)")
print("no type hints,", 'Rows / Sec:', row_count * 5 / ((time.perf_counter_ns() - begin_time) / 1_000_000_000))
print(result.columns[0].data_type)
# try to restore the worker to the same state
result = None
for i in range(5):
garbage_collect()
time.sleep(0.1)
time.sleep(5)
run_test() Some rough numbers, UDF vs. pre-UDF
|
I'm looking at the perf data above.
I have other specific questions about what is going on. These questions revolve around the performance of built-in python types vs numpy types (e.g. New benchmarksimport time
import numpy as np
from deephaven import empty_table, garbage_collect
row_count = 1_000_000
source = empty_table(row_count).update(["Y = ii"])
# both type-hints numpy
def why_n_n(v: np.int64) -> np.float64:
return v+1.2
# both type-hints native
def why_p_p(v: int) -> float:
return v+1.2
# return type-hints numpy
def why_x_n(v) -> np.float64:
return v+1.2
# return type-hints native
def why_x_p(v) -> float:
return v+1.2
# param type-hints numpy
def why_n_x(v: np.int64):
return v+1.2
# param type-hints native
def why_p_x(v: int):
return v+1.2
# no type-hints
def why_x_x(v):
return v+1.2
# no type-hints convert the input
def why_c_x(v):
v = np.int64(v)
return v+1.2
# no type-hints convert the output
def why_x_c(v):
return np.float64(v+1.2)
# no type-hints convert both
def why_c_c(v):
v = np.int64(v)
return np.float64(v+1.2)
jobs = [
["Z = why_n_n(Y)", "numpy/numpy"],
["Z = why_p_p(Y)", "python/python"],
["Z = why_x_n(Y)", "none/numpy"],
["Z = why_x_p(Y)", "none/python"],
["Z = (double) why_n_x(Y)", "numpy/none"],
["Z = (double) why_p_x(Y)", "python/none"],
["Z = (double) why_x_x(Y)", "none/none"],
["Z = (double) why_c_x(Y)", "convert/none"],
["Z = (double) why_x_c(Y)", "none/convert"],
["Z = (double) why_c_c(Y)", "convert/convert"],
]
def run_test():
for query, msg in jobs:
begin_time = time.perf_counter_ns()
for i in range(5):
result = source.select(query)
print(f"{msg} hints: {row_count * 5 / ((time.perf_counter_ns() - begin_time) / 1_000_000_000)} rows/sec ({result.columns[0].data_type})")
# try to restore the worker to the same state
result = None
for i in range(5):
garbage_collect()
time.sleep(0.1)
time.sleep(5)
run_test() Running this code on the demo system with
ALL of the rows/sec seem low. I do not know what hardware they are on... Observation 1: Numpy scalar conversions are ultra slow (not a DH problem)Numpy scalar types are much slower than python scalar types. Converting both inputs and outputs to numpy scalars is 31% slower than using the native python types.
Floating point return conversion seems comparable.
Integer inputs are 27% slower.
Converting inputs and outputs to numpy scalars by hand is slow. Here the functions have no type hints and convert by hand. Converting both values is 37% slower than the unconverted case. The float conversion is around 6%, while the bulk of the slowdown is from the integer conversion.
Observation 2: DH wrapper is way too slow (DH problem)The
Similarly, converting the input to numpy by hand is 36% slower vs 60% slower when using the annotation.
This all indicates that the annotation has too much overhead. Considerations
|
It is a low-level math mistake. The row number after group-by should be 10 times smaller. |
Summary:
Test code (Chip's, but with only two targeted UDFs): import time
import numpy as np
from deephaven import empty_table, garbage_collect
row_count = 1_000_000
source = empty_table(row_count).update(["Y = ii"])
# both type-hints numpy
def why_n_n(v: np.int64) -> np.float64:
return v+1.2
# both type-hints native
def why_p_p(v: int) -> float:
return v+1.2
# return type-hints numpy
def why_x_n(v) -> np.float64:
return v+1.2
# return type-hints native
def why_x_p(v) -> float:
return v+1.2
# param type-hints numpy
def why_n_x(v: np.int64):
return v+1.2
# param type-hints native
def why_p_x(v: int):
return v+1.2
# no type-hints
def why_x_x(v):
return v+1.2
# no type-hints convert the input
def why_c_x(v):
v = np.int64(v)
return v+1.2
# no type-hints convert the output
def why_x_c(v):
return np.float64(v+1.2)
# no type-hints convert both
def why_c_c(v):
v = np.int64(v)
return np.float64(v+1.2)
jobs = [
["Z = why_n_n(Y + 1)", "numpy/numpy"],
["Z = why_p_p(Y + 1)", "python/python"],
# ["Z = why_x_n(Y)", "none/numpy"],
# ["Z = why_x_p(Y)", "none/python"],
# ["Z = (double) why_n_x(Y)", "numpy/none"],
# ["Z = (double) why_p_x(Y)", "python/none"],
# ["Z = (double) why_x_x(Y)", "none/none"],
# ["Z = (double) why_c_x(Y)", "convert/none"],
# ["Z = (double) why_x_c(Y)", "none/convert"],
# ["Z = (double) why_c_c(Y)", "convert/convert"],
]
def run_test():
global source
for query, msg in jobs:
begin_time = time.perf_counter_ns()
for i in range(5):
result = source.select(query)
print(f"{msg} hints: {row_count * 5 / ((time.perf_counter_ns() - begin_time) / 1_000_000_000)} rows/sec ({result.columns[0].data_type})")
# try to restore the worker to the same state
result = None
for i in range(5):
garbage_collect()
time.sleep(0.1)
time.sleep(5)
source = None
run_test() 1. Py UDF decorator by-passed entirely
2. Py UDF decorator, non-optimized (no typed converter)
3. Hard-coded long/float conversion for input/output (null check, NumPy type check)
np_long_type = np.dtype("l")
def _convert_long_arg(param: _ParsedParamAnnotation, arg: int) -> Any:
""" Convert an integer argument to the type specified by the annotation """
if arg == NULL_LONG:
if param.none_allowed:
return None
else:
raise DHError(f"Argument {arg} is not compatible with annotation {param.orig_types}")
else:
for t in param.orig_types:
if issubclass(t, np.generic):
return np_long_type.type(arg)
else:
return arg
# return arg def _convert_args(p_sig: _ParsedSignature, args: Tuple[Any, ...]) -> List[Any]:
""" Convert all arguments to the types specified by the annotations.
Given that the number of arguments and the number of parameters may not match (in the presence of keyword,
var-positional, or var-keyword parameters), we have the following rules:
If the number of arguments is less than the number of parameters, the remaining parameters are left as is.
If the number of arguments is greater than the number of parameters, the extra arguments are left as is.
Python's function call mechanism will raise an exception if it can't resolve the parameters with the arguments.
"""
converted_args = [_convert_long_arg(param, arg) for param, arg in zip(p_sig.params, args)] ####### call the typed convertor
# converted_args = [_convert_arg(param, arg) for param, arg in zip(p_sig.params, args)]
converted_args.extend(args[len(converted_args):])
return converted_args def _scalar(x: Any, dtype: DType) -> Any:
"""Converts a Python value to a Java scalar value. It converts the numpy primitive types, string to
their Python equivalents so that JPY can handle them. For datetime values, it converts them to Java Instant.
Otherwise, it returns the value as is."""
# NULL_BOOL will appear in Java as a byte value which causes a cast error. We just let JPY converts it to Java null
# and the engine has casting logic to handle it.
# if (dt := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)) and _is_py_null(x) and dtype not in (bool_, char):
# return dt
return NULL_DOUBLE if _is_py_null(x) else float(x) ######### skip all the branching
# try:
# if hasattr(x, "dtype"):
# if x.dtype.char == 'H': # np.uint16 maps to Java char
# return Character(int(x))
# elif x.dtype.char in _NUMPY_INT_TYPE_CODES:
# return int(x)
# elif x.dtype.char in _NUMPY_FLOATING_TYPE_CODES:
# return float(x)
# elif x.dtype.char == '?':
# return bool(x)
# elif x.dtype.char == 'U':
# return str(x)
# elif x.dtype.char == 'O':
# return x
# elif x.dtype.char == 'M':
# from deephaven.time import to_j_instant
# return to_j_instant(x)
# elif isinstance(x, (datetime.datetime, pd.Timestamp)):
# from deephaven.time import to_j_instant
# return to_j_instant(x)
# return x
# except:
# return x 4. Hard-coded long/float conversion for input/output (null check, no Numpy type check)
def _convert_long_arg(param: _ParsedParamAnnotation, arg: int) -> Any:
""" Convert an integer argument to the type specified by the annotation """
if arg == NULL_LONG:
if param.none_allowed:
return None
else:
raise DHError(f"Argument {arg} is not compatible with annotation {param.orig_types}")
else:
return arg
# else:
# for t in param.orig_types:
# if issubclass(t, np.generic):
# return np_long_type.type(arg)
# else:
# return arg
# return arg 5. Hard-coded long/float conversion for input/output (no null check, no NumPy type check)
def _convert_long_arg(param: _ParsedParamAnnotation, arg: int) -> Any:
""" Convert an integer argument to the type specified by the annotation """
# if arg == NULL_LONG:
# if param.none_allowed:
# return None
# else:
# raise DHError(f"Argument {arg} is not compatible with annotation {param.orig_types}")
# else:
# for t in param.orig_types:
# if issubclass(t, np.generic):
# return np_long_type.type(arg)
# else:
# return arg
return arg ###### skip null check/numpy scalar def _scalar(x: Any, dtype: DType) -> Any:
"""Converts a Python value to a Java scalar value. It converts the numpy primitive types, string to
their Python equivalents so that JPY can handle them. For datetime values, it converts them to Java Instant.
Otherwise, it returns the value as is."""
# NULL_BOOL will appear in Java as a byte value which causes a cast error. We just let JPY converts it to Java null
# and the engine has casting logic to handle it.
# if (dt := _PRIMITIVE_DTYPE_NULL_MAP.get(dtype)) and _is_py_null(x) and dtype not in (bool_, char):
# return dt
return float(x)
# try:
# if hasattr(x, "dtype"):
# if x.dtype.char == 'H': # np.uint16 maps to Java char
# return Character(int(x))
# elif x.dtype.char in _NUMPY_INT_TYPE_CODES:
# return int(x)
# elif x.dtype.char in _NUMPY_FLOATING_TYPE_CODES:
# return float(x)
# elif x.dtype.char == '?':
# return bool(x)
# elif x.dtype.char == 'U':
# return str(x)
# elif x.dtype.char == 'O':
# return x
# elif x.dtype.char == 'M':
# from deephaven.time import to_j_instant
# return to_j_instant(x)
# elif isinstance(x, (datetime.datetime, pd.Timestamp)):
# from deephaven.time import to_j_instant
# return to_j_instant(x)
# return x
# except:
# return x 6. Hard-coded long/float conversion for input/output (no null check, no NumPy type check, _scalar() skipped)
def _py_udf(fn: Callable):
"""A decorator that acts as a transparent translator for Python UDFs used in Deephaven query formulas between
Python and Java. This decorator is intended for use by the Deephaven query engine and should not be used by
users.
It carries out two conversions:
1. convert Python function return values to Java values.
For properly annotated functions, including numba vectorized and guvectorized ones, this decorator inspects the
signature of the function and determines its return type, including supported primitive types and arrays of
the supported primitive types. It then converts the return value of the function to the corresponding Java value
of the same type. For unsupported types, the decorator returns the original Python value which appears as
org.jpy.PyObject in Java.
4. convert Java function arguments to Python values based on the signature of the function.
"""
if hasattr(fn, "return_type"):
return fn
p_sig = _parse_signature(fn)
# build a signature string for vectorization by removing NoneType, array char '[', and comma from the encoded types
# since vectorization only supports UDFs with a single signature and enforces an exact match, any non-compliant
# signature (e.g. Union with more than 1 non-NoneType) will be rejected by the vectorizer.
sig_str_vectorization = re.sub(r"[\[N,]", "", p_sig.encoded)
return_array = p_sig.ret_annotation.has_array
ret_dtype = dtypes.from_np_dtype(np.dtype(p_sig.ret_annotation.encoded_type[-1]))
@wraps(fn)
def wrapper(*args, **kwargs):
converted_args = _convert_args(p_sig, args)
# converted_args = args
# kwargs are not converted because they are not used in the UDFs
ret = fn(*converted_args, **kwargs)
if return_array:
return dtypes.array(ret_dtype, ret)
elif ret_dtype == dtypes.PyObject:
return ret
else:
return ret ####### skip _scalar() call
# return _scalar(ret, ret_dtype)
wrapper.j_name = ret_dtype.j_name
real_ret_dtype = _BUILDABLE_ARRAY_DTYPE_MAP.get(ret_dtype, dtypes.PyObject) if return_array else ret_dtype
if hasattr(ret_dtype.j_type, 'jclass'):
j_class = real_ret_dtype.j_type.jclass
else:
j_class = real_ret_dtype.qst_type.clazz()
wrapper.return_type = j_class
wrapper.signature = sig_str_vectorization
return wrapper 7. convert_args() skipped, _scalar() skipped)
def _py_udf(fn: Callable):
"""A decorator that acts as a transparent translator for Python UDFs used in Deephaven query formulas between
Python and Java. This decorator is intended for use by the Deephaven query engine and should not be used by
users.
It carries out two conversions:
1. convert Python function return values to Java values.
For properly annotated functions, including numba vectorized and guvectorized ones, this decorator inspects the
signature of the function and determines its return type, including supported primitive types and arrays of
the supported primitive types. It then converts the return value of the function to the corresponding Java value
of the same type. For unsupported types, the decorator returns the original Python value which appears as
org.jpy.PyObject in Java.
2. convert Java function arguments to Python values based on the signature of the function.
"""
if hasattr(fn, "return_type"):
return fn
p_sig = _parse_signature(fn)
# build a signature string for vectorization by removing NoneType, array char '[', and comma from the encoded types
# since vectorization only supports UDFs with a single signature and enforces an exact match, any non-compliant
# signature (e.g. Union with more than 1 non-NoneType) will be rejected by the vectorizer.
sig_str_vectorization = re.sub(r"[\[N,]", "", p_sig.encoded)
return_array = p_sig.ret_annotation.has_array
ret_dtype = dtypes.from_np_dtype(np.dtype(p_sig.ret_annotation.encoded_type[-1]))
@wraps(fn)
def wrapper(*args, **kwargs):
# converted_args = _convert_args(p_sig, args) ###### skip _convert_argsIO entirely
converted_args = args
# kwargs are not converted because they are not used in the UDFs
ret = fn(*converted_args, **kwargs)
if return_array:
return dtypes.array(ret_dtype, ret)
elif ret_dtype == dtypes.PyObject:
return ret
else:
return ret ###### skip _scalar() entirely
# return _scalar(ret, ret_dtype)
wrapper.j_name = ret_dtype.j_name
real_ret_dtype = _BUILDABLE_ARRAY_DTYPE_MAP.get(ret_dtype, dtypes.PyObject) if return_array else ret_dtype
if hasattr(ret_dtype.j_type, 'jclass'):
j_class = real_ret_dtype.j_type.jclass
else:
j_class = real_ret_dtype.qst_type.clazz()
wrapper.return_type = j_class
wrapper.signature = sig_str_vectorization
return wrapper |
This was found by the latest bench marking effort to measure the performance impact from the usability improvement on Python UDF.
The text was updated successfully, but these errors were encountered: