Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix PT AutoBatchSize OOM bug and merge execute_all into base #4047

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 0 additions & 100 deletions deepmd/pt/utils/auto_batch_size.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
# SPDX-License-Identifier: LGPL-3.0-or-later
from typing import (
Callable,
Tuple,
Union,
)

import numpy as np
import torch

from deepmd.utils.batch_size import AutoBatchSize as AutoBatchSizeBase
Expand Down Expand Up @@ -64,97 +58,3 @@ def is_oom_error(self, e: Exception) -> bool:
torch.cuda.empty_cache()
return True
return False

def execute_all(
self, callable: Callable, total_size: int, natoms: int, *args, **kwargs
) -> Tuple[Union[np.ndarray, torch.Tensor]]:
"""Excuate a method with all given data.

Parameters
----------
callable : Callable
The method should accept *args and **kwargs as input and return the similiar array.
total_size : int
Total size
natoms : int
The number of atoms
*args
Variable length argument list.
**kwargs
If 2D np.ndarray or torch.Tensor, assume the first axis is batch; otherwise do nothing.
"""

def execute_with_batch_size(
batch_size: int, start_index: int
) -> Tuple[int, Tuple[torch.Tensor]]:
end_index = start_index + batch_size
end_index = min(end_index, total_size)
return (end_index - start_index), callable(
*[
(
vv[start_index:end_index]
if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor))
and vv.ndim > 1
else vv
)
for vv in args
],
**{
kk: (
vv[start_index:end_index]
if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor))
and vv.ndim > 1
else vv
)
for kk, vv in kwargs.items()
},
)

index = 0
results = None
returned_dict = None
while index < total_size:
n_batch, result = self.execute(execute_with_batch_size, index, natoms)
returned_dict = (
isinstance(result, dict) if returned_dict is None else returned_dict
)
if not returned_dict:
result = (result,) if not isinstance(result, tuple) else result
index += n_batch

def append_to_list(res_list, res):
if n_batch:
res_list.append(res)
return res_list

if not returned_dict:
results = [] if results is None else results
results = append_to_list(results, result)
else:
results = (
{kk: [] for kk in result.keys()} if results is None else results
)
results = {
kk: append_to_list(results[kk], result[kk]) for kk in result.keys()
}
assert results is not None
assert returned_dict is not None

def concate_result(r):
if isinstance(r[0], np.ndarray):
ret = np.concatenate(r, axis=0)
elif isinstance(r[0], torch.Tensor):
ret = torch.cat(r, dim=0)
else:
raise RuntimeError(f"Unexpected result type {type(r[0])}")
return ret

if not returned_dict:
r_list = [concate_result(r) for r in zip(*results)]
r = tuple(r_list)
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]
else:
r = {kk: concate_result(vv) for kk, vv in results.items()}
return r
63 changes: 47 additions & 16 deletions deepmd/utils/batch_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Tuple,
)

import array_api_compat
import numpy as np

from deepmd.utils.errors import (
Expand Down Expand Up @@ -59,7 +60,7 @@
self.minimal_not_working_batch_size = self.maximum_working_batch_size + 1
else:
self.maximum_working_batch_size = initial_batch_size
if self.is_gpu_available():

Check warning

Code scanning / CodeQL

`__init__` method calls overridden method Warning

Call to self.
is_gpu_available
in __init__ method, which is overridden by
method AutoBatchSize.is_gpu_available
.
Call to self.
is_gpu_available
in __init__ method, which is overridden by
method AutoBatchSize.is_gpu_available
.
Call to self.
is_gpu_available
in __init__ method, which is overridden by
method AutoBatchSize.is_gpu_available
.
Call to self.
is_gpu_available
in __init__ method, which is overridden by
method AutoBatchSize.is_gpu_available
.
Call to self.
is_gpu_available
in __init__ method, which is overridden by
method CustomizedAutoBatchSizeCPU.is_gpu_available
.
Call to self.
is_gpu_available
in __init__ method, which is overridden by method CustomizedAutoBatchSizeGPU.is_gpu_available.
self.minimal_not_working_batch_size = 2**31
log.info(
"If you encounter the error 'an illegal memory access was encountered', this may be due to a TensorFlow issue. "
Expand Down Expand Up @@ -155,6 +156,8 @@
) -> Tuple[np.ndarray]:
"""Excuate a method with all given data.

This method is compatible with Array API.

Parameters
----------
callable : Callable
Expand All @@ -177,38 +180,66 @@
return (end_index - start_index), callable(
*[
(
vv[start_index:end_index]
if isinstance(vv, np.ndarray) and vv.ndim > 1
vv[start_index:end_index, ...]
if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1
njzjz marked this conversation as resolved.
Show resolved Hide resolved
else vv
)
for vv in args
],
**{
kk: (
vv[start_index:end_index]
if isinstance(vv, np.ndarray) and vv.ndim > 1
vv[start_index:end_index, ...]
if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1
else vv
)
for kk, vv in kwargs.items()
},
)

index = 0
results = []
results = None
returned_dict = None
while index < total_size:
n_batch, result = self.execute(execute_with_batch_size, index, natoms)
if not isinstance(result, tuple):
result = (result,)
if n_batch == 0:
continue
returned_dict = (
isinstance(result, dict) if returned_dict is None else returned_dict
)
if not returned_dict:
result = (result,) if not isinstance(result, tuple) else result
index += n_batch
if n_batch:
for rr in result:
rr.reshape((n_batch, -1))
results.append(result)

r = tuple([np.concatenate(r, axis=0) for r in zip(*results)])
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]

def append_to_list(res_list, res):
if n_batch:
res_list.append(res)
return res_list

if not returned_dict:
results = [] if results is None else results
results = append_to_list(results, result)
else:
results = {kk: [] for kk in result} if results is None else results
results = {kk: append_to_list(results[kk], result[kk]) for kk in result}
assert results is not None
assert returned_dict is not None

def concate_result(r):
if array_api_compat.is_array_api_obj(r[0]):
xp = array_api_compat.array_namespace(r[0])
ret = xp.concat(r, axis=0)
else:
raise RuntimeError(f"Unexpected result type {type(r[0])}")

Check warning on line 232 in deepmd/utils/batch_size.py

View check run for this annotation

Codecov / codecov/patch

deepmd/utils/batch_size.py#L232

Added line #L232 was not covered by tests
return ret

if not returned_dict:
r_list = [concate_result(r) for r in zip(*results)]
r = tuple(r_list)
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]
else:
r = {kk: concate_result(vv) for kk, vv in results.items()}
return r

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion doc/backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ In the documentation, TensorFlow {{ tensorflow_icon }} and PyTorch {{ pytorch_ic
- Model filename extension: `.pb`
- Checkpoint filename extension: `.meta`, `.index`, `.data-00000-of-00001`

[TensorFlow](https://tensorflow.org) 2.2 or above is required.
[TensorFlow](https://tensorflow.org) 2.7 or above is required, since NumPy 1.21 or above is required.
DeePMD-kit does not use the TensorFlow v2 API but uses the TensorFlow v1 API (`tf.compat.v1`) in the graph mode.

### PyTorch {{ pytorch_icon }}
Expand Down
2 changes: 1 addition & 1 deletion doc/install/install-from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pip install --upgrade pip

:::{tab-item} TensorFlow {{ tensorflow_icon }}

The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.2 or later is supported.
The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.7 or later is supported.

```bash
pip install --upgrade tensorflow
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ classifiers = [
"Environment :: Console",
]
dependencies = [
'numpy',
# array-api-compat requires numpy>=1.21
'numpy>=1.21',
'scipy',
'pyyaml',
'dargs >= 0.4.7',
Expand Down
141 changes: 141 additions & 0 deletions source/tests/common/test_auto_batch_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# SPDX-License-Identifier: LGPL-3.0-or-later
import os
import sys
import unittest

from deepmd.utils.batch_size import (
AutoBatchSize,
)
from deepmd.utils.errors import (
OutOfMemoryError,
)

if sys.version_info >= (3, 9):
import array_api_strict as xp
else:
raise unittest.SkipTest("array_api_strict doesn't support Python<=3.8")


class CustomizedAutoBatchSizeCPU(AutoBatchSize):
def is_gpu_available(self):
return False

def is_oom_error(self, e):
return isinstance(e, OutOfMemoryError)


class CustomizedAutoBatchSizeGPU(AutoBatchSize):
def is_gpu_available(self):
return True

def is_oom_error(self, e):
return isinstance(e, OutOfMemoryError)


class TestAutoBatchSize(unittest.TestCase):
def oom(self, batch_size, start_index):
if batch_size >= 512:
raise OutOfMemoryError
return batch_size, xp.zeros((batch_size, 2))

def test_execute_oom_gpu(self):
# initial batch size 256 = 128 * 2
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)
# no error - 128
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
# no error - 256
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))
# error - 512 return 0, None
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 0)
self.assertIsNone(result)
# 256 again
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))
# 256 again
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))

def test_execute_oom_cpu(self):
# initial batch size 256 = 128 * 2, nb is always 128
auto_batch_size = CustomizedAutoBatchSizeCPU(256, 2.0)
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))

@unittest.mock.patch.dict(os.environ, {"DP_INFER_BATCH_SIZE": "256"}, clear=True)
def test_execute_oom_environment_variables(self):
# DP_INFER_BATCH_SIZE = 256 = 128 * 2, nb is always 128
auto_batch_size = CustomizedAutoBatchSizeGPU(999, 2.0)
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))

def test_execute_all(self):
dd1 = xp.zeros((10000, 2, 1))
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)
dd2 = auto_batch_size.execute_all(xp.asarray, 10000, 2, dd1)
assert xp.all(dd1 == dd2)

def test_execute_all_dict(self):
dd0 = xp.zeros((10000, 2, 1, 3, 4))
dd1 = xp.ones((10000, 2, 1, 3, 4))
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)

def func(dd1):
return {
"foo": xp.zeros_like(dd1),
"bar": xp.ones_like(dd1),
}

dd2 = auto_batch_size.execute_all(func, 10000, 2, dd1)
assert xp.all(dd0 == dd2["foo"])
assert xp.all(dd1 == dd2["bar"])

def test_execute_all_dict_oom(self):
# to reproduce #4036 when commenting "if n_batch == 0: continue"
dd0 = xp.zeros((10, 2, 1, 3, 4))
dd1 = xp.ones((10, 2, 1, 3, 4))
auto_batch_size = CustomizedAutoBatchSizeGPU(4, 2.0)

def func(dd1):
if dd1.shape[0] >= 2:
raise OutOfMemoryError
return {
"foo": xp.zeros_like(dd1),
"bar": xp.ones_like(dd1),
}

dd2 = auto_batch_size.execute_all(func, 10, 2, dd1)
assert xp.all(dd0 == dd2["foo"])
assert xp.all(dd1 == dd2["bar"])
Loading