diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/learn/tensorflow.po b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/learn/tensorflow.po index 2c9623771c..dc7ee2fca4 100644 --- a/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/learn/tensorflow.po +++ b/docs/source/locale/zh_CN/LC_MESSAGES/user_guide/learn/tensorflow.po @@ -8,14 +8,14 @@ msgid "" msgstr "" "Project-Id-Version: mars 0.5.0a2\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2020-12-18 14:49+0800\n" +"POT-Creation-Date: 2021-08-29 20:21+0800\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=utf-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: Babel 2.9.0\n" +"Generated-By: Babel 2.9.1\n" #: ../../source/user_guide/learn/tensorflow.rst:5 msgid "Integrate with TensorFlow" @@ -137,3 +137,34 @@ msgstr "" "一旦一个集群存在,你可以要么设置默认 session,训练和预测就会自动提交到" "集群,要么你可以通过 ``session=***`` 显示指定运行的 session。" +#: ../../source/user_guide/learn/tensorflow.rst:172 +msgid "Use ``gen_tensorflow_dataset``" +msgstr "使用 ``gen_tensorflow_dataset``" + +#: ../../source/user_guide/learn/tensorflow.rst:174 +msgid "" +"You can convert Mars data(:class:`mars.tensor.Tensor`, " +":class:`mars.dataframe.DataFrame`, :class:`mars.dataframe.Series`) to " +"`tf.data.Dataset `_ by :meth:`gen_tensorflow_dataset`. " +"It also support :class:`numpy.ndarray`, :class:`pandas.DataFrame`, " +":class:`pandas.Series`." +msgstr "" +"你可以利用 :meth:`gen_tensorflow_dataset` 将Mars类型数据" +"(:class:`mars.tensor.Tensor`,:class:`mars.dataframe.DataFrame`," +":class:`mars.dataframe.Series`)转换为 `tf.data.Dataset `_。该接口也支持 " +":class:`numpy.ndarray`,:class:`pandas.DataFrame`,:class:`pandas.Series`。" + +#: ../../source/user_guide/learn/tensorflow.rst:191 +msgid "Now, you can preprocess the data via mars, and pass data to script." +msgstr "现在,你可以用Mars预处理数据,然后将数据传到脚本中。" + +#: ../../source/user_guide/learn/tensorflow.rst:214 +msgid "``tf_demo.py``" +msgstr "" + +#: ../../source/user_guide/learn/tensorflow.rst:269 +msgid "result:" +msgstr "结果:" + diff --git a/docs/source/reference/learn/reference.rst b/docs/source/reference/learn/reference.rst index b0ef90f8c6..cda324cf71 100644 --- a/docs/source/reference/learn/reference.rst +++ b/docs/source/reference/learn/reference.rst @@ -313,6 +313,7 @@ TensorFlow Integration :toctree: generated/ contrib.tensorflow.run_tensorflow_script + contrib.tensorflow.gen_tensorflow_dataset .. _xgboost_ref: diff --git a/docs/source/user_guide/learn/tensorflow.rst b/docs/source/user_guide/learn/tensorflow.rst index 678b17f1ea..02724a2d6f 100644 --- a/docs/source/user_guide/learn/tensorflow.rst +++ b/docs/source/user_guide/learn/tensorflow.rst @@ -168,4 +168,133 @@ and prediction shown above will be submitted to the cluster, or you can specify # Or, session could be specified as well run_tensorflow_script('tf_demo.py', n_workers=1, session=sess) +Use ``gen_tensorflow_dataset`` +--------------------------------- + +You can convert Mars data(:class:`mars.tensor.Tensor`, :class:`mars.dataframe.DataFrame`, +:class:`mars.dataframe.Series`) to `tf.data.Dataset `_ by :meth:`gen_tensorflow_dataset`. It also +support :class:`numpy.ndarray`, :class:`pandas.DataFrame`, :class:`pandas.Series`. + +.. code-block:: python + + In [1]: data = mt.tensor([[1, 2], [3, 4]]) + In [2]: dataset = gen_tensorflow_dataset(data) + In [3]: list(dataset.as_numpy_iterator()) + Out[3]: [array([1, 2]), array([3, 4])] + + In [1]: data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6]) + In [2]: dataset = gen_tensorflow_dataset((data1, data2, data3)) + In [3]: list(dataset.as_numpy_iterator()) + Out[3]: [(1, 3, 5), (2, 4, 6)] + +Now, you can preprocess the data via mars, and pass data to script. + +.. code-block:: python + + import mars.dataframe as md + from sklearn.preprocessing import LabelEncoder + from mars.learn.contrib.tensorflow import run_tensorflow_script + + + df = md.read_csv('ionosphere.data', header=None) + X = df.iloc[:, :-1].astype('float32') + y = df.iloc[:, -1] + y = LabelEncoder().fit_transform(y.execute().fetch()) + X_train, X_test, y_train, y_test = train_test_split(X.execute(), y, test_size=0.33) + + run_tensorflow_script( + "tf_demo.py", n_workers=2, data={'X_train': X_train, 'y_train': y_train, + 'X_test':X_test, 'y_test': y_test}, session=sess) + +``tf_demo.py`` + +.. code-block:: python + + import os + + from mars.learn.contrib.tensorflow import gen_tensorflow_dataset + import tensorflow as tf + from tensorflow.keras import Sequential + from tensorflow.keras.layers import Dense + + + def get_model(n_features): + model = Sequential() + model.add(Dense(10, activation='relu', kernel_initializer='he_normal', + input_shape=(n_features,))) + model.add(Dense(8, activation='relu', kernel_initializer='he_normal')) + model.add(Dense(1, activation='sigmoid')) + + # compile the model + model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) + + return model + + + def train(X_train, X_test, y_train, y_test): + model = get_model(X_train.shape[1]) + + db_train = gen_tensorflow_dataset((X_train, y_train)) + db_train = db_train.batch(32) + db_test = gen_tensorflow_dataset((X_test, y_test)) + db_test = db_test.batch(32) + + # fit model + model.fit(db_train, epochs=150) + # evaluate + loss, acc = model.evaluate(db_test) + print('Test accuracy: %.3f' % acc) + + + if __name__ == '__main__': + X_train = globals()['X_train'] + y_train = globals()['y_train'] + X_test = globals()['X_test'] + y_test = globals()['y_test'] + + if 'TF_CONFIG' in os.environ: + # distributed TensorFlow + multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() + + with multiworker_strategy.scope(): + train(X_train, X_test, y_train, y_test) + else: + train(X_train, X_test, y_train, y_test) + +result: + +.. code-block:: ipython + Epoch 1/150 + Epoch 1/150 + 1/Unknown - 1s 996ms/step - loss: 0.7825 - accuracy: 0.2500 1/Unknown - 1s 996ms/step - loss: 0.7825 - accura + 6/Unknown - 3s 362ms/step - loss: 0.7388 - accuracy: 0.3438 6/Unknown - 3s 363ms/step - loss: 0.7388 - accura + 7/Unknown - 3s 358ms/step - loss: 0.7404 - accuracy: 0.3259 7/Unknown - 3s 358ms/step - loss: 0.7404 - accura + 8/Unknown - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277 8/Unknown - 3s 324ms/step - loss: 0.7368 - accura + 8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277 + 8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277 + Epoch 2/150 + Epoch 2/150 + 8/8 [==============================] - ETA: 0s - loss: 0.6775 - accuracy: 0.49798/8 [==============================] - E + 8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979 + 8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979 + Epoch 3/150 + Epoch 3/150 + ... + Epoch 150/150 + Epoch 150/150 + 2/8 [======>.......................] - ETA: 2s - loss: 0.0210 - accuracy: 1.00002/8 [======>.......................] - E + 3/8 [==========>...................] - ETA: 1s - loss: 0.0220 - accuracy: 1.00003/8 [==========>...................] - E + 8/8 [==============================] - ETA: 0s - loss: 0.0319 - accuracy: 0.99578/8 [==============================] - E + 8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957 + 8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957 + + . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do + this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = Aut + oShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`. + 4/Unknown - 3s 380ms/step - loss: 0.2354 - accuracy: 0.9138 4/Unknown - 3s 380ms/step - loss: 0.2354 - accura + 4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138 + 4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138 + Test accuracy: 0.914 + Test accuracy: 0.914 diff --git a/mars/learn/contrib/tensorflow/__init__.py b/mars/learn/contrib/tensorflow/__init__.py index ef79c47feb..d1510bfbe6 100644 --- a/mars/learn/contrib/tensorflow/__init__.py +++ b/mars/learn/contrib/tensorflow/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from .run_script import run_tensorflow_script +from .dataset import gen_tensorflow_dataset # noqa: F401 # pylint: disable=unused-import def register_op(): diff --git a/mars/learn/contrib/tensorflow/dataset.py b/mars/learn/contrib/tensorflow/dataset.py new file mode 100644 index 0000000000..5d681cc4cc --- /dev/null +++ b/mars/learn/contrib/tensorflow/dataset.py @@ -0,0 +1,166 @@ +# Copyright 1999-2020 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import copy +from typing import List, Tuple + +import pandas as pd +import numpy as np + +from .... import execute +from ....core.context import get_context +from ....tensor.core import TENSOR_TYPE +from ....dataframe.core import DATAFRAME_TYPE, SERIES_TYPE +from ....utils import require_not_none, lazy_import + +tf = lazy_import("tensorflow") + + +ACCEPT_TYPE = (TENSOR_TYPE, DATAFRAME_TYPE, SERIES_TYPE, + np.ndarray, pd.DataFrame, pd.Series, List) + + +@require_not_none(tf) +class MarsDataset: + def __init__(self, tensors, + output_shapes = None, + output_types = None, + fetch_kwargs=None): + + self._context = get_context() + self._tensors = tensors + self._output_shapes = output_shapes + self._output_types = output_types + self._fetch_kwargs = fetch_kwargs or dict() + self._executed = False + self._check_and_convert() + + def _check_and_convert(self): + if not isinstance(self._tensors, Tuple): + self._tensors = (self._tensors,) + for t in self._tensors: + if not isinstance(t, ACCEPT_TYPE): + raise TypeError(f"Unexpected dataset type: {type(t)}") + + if not self._executed: + self._execute() + self._executed = True + + if not self._output_shapes: + get_shape = lambda t: tuple(()) if isinstance(t, (List, SERIES_TYPE, pd.Series)) \ + else t.shape[1:] + self._output_shapes = get_shape(self._tensors[0]) if len(self._tensors) == 1 else \ + tuple(get_shape(t) for t in self._tensors) + + if not self._output_types: + get_type = lambda t: type(t[0]) if isinstance(t, List) else \ + t[0].dtype if isinstance(t, (DATAFRAME_TYPE, pd.DataFrame)) \ + else t.dtype + self._output_types = get_type(self._tensors[0]) if len(self._tensors) == 1 else \ + tuple(tf.as_dtype(get_type(t)) for t in self._tensors) + + def _execute(self): # pragma: no cover + execute_data = [t for t in self._tensors if isinstance(t, ACCEPT_TYPE[:3])] + + if len(execute_data) > 0: + execute(execute_data) + + def get_data(self, t, index): # pragma: no cover + # coverage not included as now there is no solution to cover tensorflow methods + # see https://github.com/tensorflow/tensorflow/issues/33759 for more details. + fetch_kwargs = dict() + if self._fetch_kwargs: + fetch_kwargs = copy.deepcopy(self._fetch_kwargs) + + if isinstance(t, TENSOR_TYPE): + return t[index].fetch(**fetch_kwargs) + elif isinstance(t, np.ndarray): + return t[index] + elif isinstance(t, DATAFRAME_TYPE): + return t.iloc[index].fetch(**fetch_kwargs).values + elif isinstance(t, SERIES_TYPE): + return t.iloc[index].fetch(**fetch_kwargs) + elif isinstance(t, pd.DataFrame): + return t.iloc[index].values + elif isinstance(t, pd.Series): + return t.iloc[index] + else: + return t[index] + + def to_tf(self) -> "tf.data.Dataset": + """Get TF Dataset. + + convert into a tensorflow.data.Dataset + """ + def make_generator(): # pragma: no cover + if not self._executed: + self._execute() + self._executed = True + + for i in range(len(self._tensors[0])): + if len(self._tensors) == 1: + yield self.get_data(self._tensors[0], i) + else: + yield tuple(self.get_data(t, i) for t in self._tensors) + + return tf.data.Dataset.from_generator( + make_generator, + output_types=self._output_types, + output_shapes=self._output_shapes + ) + + +def gen_tensorflow_dataset(tensors, + output_shapes = None, + output_types = None, + fetch_kwargs=None): + """ + convert mars data type to tf.data.Dataset. Note this is based tensorflow 2.0 + For example + ----------- + >>> # convert a tensor to tf.data.Dataset. + >>> data = mt.tensor([[1, 2], [3, 4]]) + >>> dataset = gen_tensorflow_dataset(data) + >>> list(dataset.as_numpy_iterator()) + [array([1, 2]), array([3, 4])] + >>> dataset.element_spec + TensorSpec(shape=(2,), dtype=tf.int64, name=None) + + >>> # convert a tuple of tensors to tf.data.Dataset. + >>> data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6]) + >>> dataset = gen_tensorflow_dataset((data1, data2, data3)) + >>> list(dataset.as_numpy_iterator()) + [(1, 3, 5), (2, 4, 6)] + + Parameters + ---------- + tensors: Mars data type or a tuple consisting of Mars data type + the data that convert to tf.data.dataset + output_shapes: + A (nested) structure of `tf.TensorShape` objects corresponding to + each component of an element yielded from mars object. + output_types: + A (nested) structure of `tf.DType` objects corresponding to each + component of an element yielded from mars object. + fetch_kwargs: + the parameters of mars object executes fetch() operation. + Returns + ------- + tf.data.Dataset + """ + mars_dataset = MarsDataset(tensors, output_shapes=output_shapes, + output_types=output_types, fetch_kwargs=fetch_kwargs) + + return mars_dataset.to_tf() diff --git a/mars/learn/contrib/tensorflow/run_script.py b/mars/learn/contrib/tensorflow/run_script.py index c5ed285888..27aed31609 100644 --- a/mars/learn/contrib/tensorflow/run_script.py +++ b/mars/learn/contrib/tensorflow/run_script.py @@ -145,8 +145,7 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO], command_argv: List[str] = None, retry_when_fail: bool = False, session: SessionType = None, - run_kwargs: Dict[str, Any] = None, - port: int = None): + run_kwargs: Dict[str, Any] = None): """ Run TensorFlow script in Mars cluster. @@ -170,8 +169,6 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO], Mars session, if not provided, will use default one. run_kwargs : dict Extra kwargs for `session.run`. - port : int - Port of TensorFlow worker or ps, will automatically increase for the same worker Returns ------- @@ -192,5 +189,5 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO], op = RunTensorFlow(data=data, code=to_binary(code), n_workers=int(n_workers), n_ps=int(n_ps), retry_when_fail=retry_when_fail, gpu=gpu, - port=port, command_args=command_argv) + command_args=command_argv) return op(inputs).execute(session=session, **(run_kwargs or {})) diff --git a/mars/learn/contrib/tensorflow/tests/test_dataset.py b/mars/learn/contrib/tensorflow/tests/test_dataset.py new file mode 100644 index 0000000000..08e6523afb --- /dev/null +++ b/mars/learn/contrib/tensorflow/tests/test_dataset.py @@ -0,0 +1,149 @@ +# Copyright 1999-2020 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import os + + +import mars.tensor as mt +import mars.dataframe as md +from mars.utils import lazy_import +from mars.learn.contrib.tensorflow import gen_tensorflow_dataset, run_tensorflow_script + +tf_installed = lazy_import('tensorflow', globals=globals()) is not None + + +@pytest.mark.skipif(not tf_installed, reason='tensorflow not installed') +def test_mars_dataset(setup_cluster): + import numpy as np + import pandas as pd + tf_dataset_ops = lazy_import("tensorflow.python.data.ops.dataset_ops") + + # Mars tensor + data = mt.random.rand(1000, 32, dtype='f4') + data_verify = data[:10].execute().fetch() + + dataset = gen_tensorflow_dataset(data) + print(dataset) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, data_1batch in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + + # Mars tensors + data = mt.random.rand(1000, 32, dtype='f4') + labels = mt.random.randint(0, 2, (1000, 10), dtype='f4') + + data_verify = data[:10].execute().fetch() + labels_verify = labels[:10].execute().fetch() + + dataset = gen_tensorflow_dataset((data, labels)) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, labels_verify) + + # np ndarray + data = np.random.rand(1000, 32) + labels = np.random.randint(0, 2, (1000, 10)) + + data_verify = data[:10] + labels_verify = labels[:10] + + dataset = gen_tensorflow_dataset((data, labels)) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, labels_verify) + + # Mars dataframe + data = md.DataFrame(data) + labels = md.DataFrame(labels) + + data_verify = data.iloc[:10].execute().fetch().values + labels_verify = labels.iloc[:10].execute().fetch().values + + dataset = gen_tensorflow_dataset((data, labels), fetch_kwargs={ + 'extra_config': {'check_series_name': False}}) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, labels_verify) + + # Mars series + label = labels[1] + + label_verify = label[:10].execute().fetch() + + dataset = gen_tensorflow_dataset((data, label), fetch_kwargs={ + 'extra_config': {'check_series_name': False}}) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, label_verify) + + # pandas dataframe + data = pd.DataFrame(np.random.rand(1000, 32)) + labels = pd.DataFrame(np.random.randint(0, 2, (1000, 10)), dtype="float32") + + data_verify = data.iloc[:10].values + labels_verify = labels.iloc[:10].values + dataset = gen_tensorflow_dataset((data, labels)) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, labels_verify) + + # pandas series + label = labels[1] + + label_verify = label[:10] + + dataset = gen_tensorflow_dataset((data, label)) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, label_verify) + + # list + label = label.tolist() + + label_verify = label[:10] + + dataset = gen_tensorflow_dataset((data, label)) + assert isinstance(dataset, tf_dataset_ops.DatasetV2) + for _, (data_1batch, label_1batch) in enumerate(dataset.repeat().batch(10).take(1)): + np.testing.assert_array_equal(data_1batch, data_verify) + np.testing.assert_array_equal(label_1batch, label_verify) + + # test TypeError + label = tuple(range(1000)) + + with pytest.raises(TypeError) as e: + dataset = gen_tensorflow_dataset((data, label)) + exec_msg = e.value.args[0] + assert exec_msg == "Unexpected dataset type: " + + +@pytest.mark.skipif(not tf_installed, reason='tensorflow not installed') +def test_mars_dataset_script(setup_cluster): + sess = setup_cluster + path = os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'tf_dataset.py') + + data = mt.random.rand(1000, 32, dtype='f4') + labels = mt.random.randint(0, 2, (1000, 10), dtype='f4') + + assert run_tensorflow_script( + path, n_workers=2, data={'feature_data': data, 'labels': labels}, + command_argv=['multiple'], session=sess).fetch()['status'] == 'ok' diff --git a/mars/learn/contrib/tensorflow/tests/test_run_script.py b/mars/learn/contrib/tensorflow/tests/test_run_script.py index 770c2279b3..6cd645f7de 100644 --- a/mars/learn/contrib/tensorflow/tests/test_run_script.py +++ b/mars/learn/contrib/tensorflow/tests/test_run_script.py @@ -28,8 +28,8 @@ def test_local_run_tensor_flow_script(setup_cluster): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tf_distributed_sample.py') assert run_tensorflow_script( - path, n_workers=2, command_argv=['multiple'], - port=2222).fetch()['status'] == 'ok' + path, n_workers=2, command_argv=['multiple'] + ).fetch()['status'] == 'ok' with pytest.raises(ValueError): run_tensorflow_script(path, n_workers=0) diff --git a/mars/learn/contrib/tensorflow/tests/tf_dataset.py b/mars/learn/contrib/tensorflow/tests/tf_dataset.py new file mode 100644 index 0000000000..31c744198c --- /dev/null +++ b/mars/learn/contrib/tensorflow/tests/tf_dataset.py @@ -0,0 +1,59 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +import sys + +import tensorflow as tf +from tensorflow.keras import layers +from mars.learn.contrib.tensorflow import gen_tensorflow_dataset +from tensorflow.python.data.ops.dataset_ops import DatasetV2 + + +def get_model(): + model = tf.keras.Sequential() + model.add(layers.Dense(64, activation='relu')) + model.add(layers.Dense(64, activation='relu')) + model.add(layers.Dense(10, activation='softmax')) + model.compile(optimizer=tf.keras.optimizers.Adam(0.01), + loss='categorical_crossentropy', + metrics=['accuracy']) + return model + + +def train(feature_data, labels): + data = feature_data + labels = labels + + db_train = gen_tensorflow_dataset((data, labels)) + assert isinstance(db_train, DatasetV2) + db_train = db_train.batch(32) + + model = get_model() + model.fit(db_train, epochs=2) + + +if __name__ == "__main__": + + assert json.loads(os.environ['TF_CONFIG'])['task']['index'] in {0, 1} + assert len(sys.argv) == 2 + assert sys.argv[1] == 'multiple' + + feature_data = globals()['feature_data'] + labels = globals()['labels'] + multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() + + with multiworker_strategy.scope(): + train(feature_data, labels) diff --git a/mars/services/task/analyzer/assigner.py b/mars/services/task/analyzer/assigner.py index a38519dbe4..a138abc12c 100644 --- a/mars/services/task/analyzer/assigner.py +++ b/mars/services/task/analyzer/assigner.py @@ -173,7 +173,7 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType # calculate the number of chunks to be assigned to each band # given number of bands and existing assignments band_quotas = self._calc_band_assign_limits( - len(chunk_to_assign), assigned_counts) + len(chunk_to_assign) + sum(assigned_counts.values()), assigned_counts) # calculate expected descendant count (spread range) of # every band and subtract assigned number from it diff --git a/setup.cfg b/setup.cfg index 06a2d1d42f..c06e5ece09 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,6 +63,7 @@ dev = pytest-forked>=1.0 pytest-asyncio>=0.14.0 mock>=4.0.0; python_version<"3.8" + flake8>=3.8.0 extra = pyarrow>=0.11.0,!=0.16.* lz4>=1.0.0