Skip to content

Commit

Permalink
Add TensorFlow dataset (mars-project#2383)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Yuan authored Aug 31, 2021
1 parent 4a53c48 commit 4b83198
Show file tree
Hide file tree
Showing 11 changed files with 544 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ msgid ""
msgstr ""
"Project-Id-Version: mars 0.5.0a2\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2020-12-18 14:49+0800\n"
"POT-Creation-Date: 2021-08-29 20:21+0800\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.9.0\n"
"Generated-By: Babel 2.9.1\n"

#: ../../source/user_guide/learn/tensorflow.rst:5
msgid "Integrate with TensorFlow"
Expand Down Expand Up @@ -137,3 +137,34 @@ msgstr ""
"一旦一个集群存在,你可以要么设置默认 session,训练和预测就会自动提交到"
"集群,要么你可以通过 ``session=***`` 显示指定运行的 session。"

#: ../../source/user_guide/learn/tensorflow.rst:172
msgid "Use ``gen_tensorflow_dataset``"
msgstr "使用 ``gen_tensorflow_dataset``"

#: ../../source/user_guide/learn/tensorflow.rst:174
msgid ""
"You can convert Mars data(:class:`mars.tensor.Tensor`, "
":class:`mars.dataframe.DataFrame`, :class:`mars.dataframe.Series`) to "
"`tf.data.Dataset <https://tensorflow.google. "
"cn/api_docs/python/tf/data/Dataset>`_ by :meth:`gen_tensorflow_dataset`. "
"It also support :class:`numpy.ndarray`, :class:`pandas.DataFrame`, "
":class:`pandas.Series`."
msgstr ""
"你可以利用 :meth:`gen_tensorflow_dataset` 将Mars类型数据"
"(:class:`mars.tensor.Tensor`,:class:`mars.dataframe.DataFrame`,"
":class:`mars.dataframe.Series`)转换为 `tf.data.Dataset <https://tensorflow."
"google.cn/api_docs/python/tf/data/Dataset>`_。该接口也支持 "
":class:`numpy.ndarray`,:class:`pandas.DataFrame`,:class:`pandas.Series`。"

#: ../../source/user_guide/learn/tensorflow.rst:191
msgid "Now, you can preprocess the data via mars, and pass data to script."
msgstr "现在,你可以用Mars预处理数据,然后将数据传到脚本中。"

#: ../../source/user_guide/learn/tensorflow.rst:214
msgid "``tf_demo.py``"
msgstr ""

#: ../../source/user_guide/learn/tensorflow.rst:269
msgid "result:"
msgstr "结果:"

1 change: 1 addition & 0 deletions docs/source/reference/learn/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ TensorFlow Integration
:toctree: generated/

contrib.tensorflow.run_tensorflow_script
contrib.tensorflow.gen_tensorflow_dataset

.. _xgboost_ref:

Expand Down
129 changes: 129 additions & 0 deletions docs/source/user_guide/learn/tensorflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,133 @@ and prediction shown above will be submitted to the cluster, or you can specify
# Or, session could be specified as well
run_tensorflow_script('tf_demo.py', n_workers=1, session=sess)
Use ``gen_tensorflow_dataset``
---------------------------------

You can convert Mars data(:class:`mars.tensor.Tensor`, :class:`mars.dataframe.DataFrame`,
:class:`mars.dataframe.Series`) to `tf.data.Dataset <https://tensorflow.google.
cn/api_docs/python/tf/data/Dataset>`_ by :meth:`gen_tensorflow_dataset`. It also
support :class:`numpy.ndarray`, :class:`pandas.DataFrame`, :class:`pandas.Series`.

.. code-block:: python
In [1]: data = mt.tensor([[1, 2], [3, 4]])
In [2]: dataset = gen_tensorflow_dataset(data)
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [array([1, 2]), array([3, 4])]
In [1]: data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6])
In [2]: dataset = gen_tensorflow_dataset((data1, data2, data3))
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [(1, 3, 5), (2, 4, 6)]
Now, you can preprocess the data via mars, and pass data to script.

.. code-block:: python
import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder
from mars.learn.contrib.tensorflow import run_tensorflow_script
df = md.read_csv('ionosphere.data', header=None)
X = df.iloc[:, :-1].astype('float32')
y = df.iloc[:, -1]
y = LabelEncoder().fit_transform(y.execute().fetch())
X_train, X_test, y_train, y_test = train_test_split(X.execute(), y, test_size=0.33)
run_tensorflow_script(
"tf_demo.py", n_workers=2, data={'X_train': X_train, 'y_train': y_train,
'X_test':X_test, 'y_test': y_test}, session=sess)
``tf_demo.py``

.. code-block:: python
import os
from mars.learn.contrib.tensorflow import gen_tensorflow_dataset
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
def get_model(n_features):
model = Sequential()
model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
input_shape=(n_features,)))
model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
model.add(Dense(1, activation='sigmoid'))
# compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
return model
def train(X_train, X_test, y_train, y_test):
model = get_model(X_train.shape[1])
db_train = gen_tensorflow_dataset((X_train, y_train))
db_train = db_train.batch(32)
db_test = gen_tensorflow_dataset((X_test, y_test))
db_test = db_test.batch(32)
# fit model
model.fit(db_train, epochs=150)
# evaluate
loss, acc = model.evaluate(db_test)
print('Test accuracy: %.3f' % acc)
if __name__ == '__main__':
X_train = globals()['X_train']
y_train = globals()['y_train']
X_test = globals()['X_test']
y_test = globals()['y_test']
if 'TF_CONFIG' in os.environ:
# distributed TensorFlow
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with multiworker_strategy.scope():
train(X_train, X_test, y_train, y_test)
else:
train(X_train, X_test, y_train, y_test)
result:

.. code-block:: ipython
Epoch 1/150
Epoch 1/150
1/Unknown - 1s 996ms/step - loss: 0.7825 - accuracy: 0.2500 1/Unknown - 1s 996ms/step - loss: 0.7825 - accura
6/Unknown - 3s 362ms/step - loss: 0.7388 - accuracy: 0.3438 6/Unknown - 3s 363ms/step - loss: 0.7388 - accura
7/Unknown - 3s 358ms/step - loss: 0.7404 - accuracy: 0.3259 7/Unknown - 3s 358ms/step - loss: 0.7404 - accura
8/Unknown - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277 8/Unknown - 3s 324ms/step - loss: 0.7368 - accura
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
Epoch 2/150
Epoch 2/150
8/8 [==============================] - ETA: 0s - loss: 0.6775 - accuracy: 0.49798/8 [==============================] - E
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
Epoch 3/150
Epoch 3/150
...
Epoch 150/150
Epoch 150/150
2/8 [======>.......................] - ETA: 2s - loss: 0.0210 - accuracy: 1.00002/8 [======>.......................] - E
3/8 [==========>...................] - ETA: 1s - loss: 0.0220 - accuracy: 1.00003/8 [==========>...................] - E
8/8 [==============================] - ETA: 0s - loss: 0.0319 - accuracy: 0.99578/8 [==============================] - E
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do
this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = Aut
oShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
4/Unknown - 3s 380ms/step - loss: 0.2354 - accuracy: 0.9138 4/Unknown - 3s 380ms/step - loss: 0.2354 - accura
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
Test accuracy: 0.914
Test accuracy: 0.914
1 change: 1 addition & 0 deletions mars/learn/contrib/tensorflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from .run_script import run_tensorflow_script
from .dataset import gen_tensorflow_dataset # noqa: F401 # pylint: disable=unused-import


def register_op():
Expand Down
166 changes: 166 additions & 0 deletions mars/learn/contrib/tensorflow/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import copy
from typing import List, Tuple

import pandas as pd
import numpy as np

from .... import execute
from ....core.context import get_context
from ....tensor.core import TENSOR_TYPE
from ....dataframe.core import DATAFRAME_TYPE, SERIES_TYPE
from ....utils import require_not_none, lazy_import

tf = lazy_import("tensorflow")


ACCEPT_TYPE = (TENSOR_TYPE, DATAFRAME_TYPE, SERIES_TYPE,
np.ndarray, pd.DataFrame, pd.Series, List)


@require_not_none(tf)
class MarsDataset:
def __init__(self, tensors,
output_shapes = None,
output_types = None,
fetch_kwargs=None):

self._context = get_context()
self._tensors = tensors
self._output_shapes = output_shapes
self._output_types = output_types
self._fetch_kwargs = fetch_kwargs or dict()
self._executed = False
self._check_and_convert()

def _check_and_convert(self):
if not isinstance(self._tensors, Tuple):
self._tensors = (self._tensors,)
for t in self._tensors:
if not isinstance(t, ACCEPT_TYPE):
raise TypeError(f"Unexpected dataset type: {type(t)}")

if not self._executed:
self._execute()
self._executed = True

if not self._output_shapes:
get_shape = lambda t: tuple(()) if isinstance(t, (List, SERIES_TYPE, pd.Series)) \
else t.shape[1:]
self._output_shapes = get_shape(self._tensors[0]) if len(self._tensors) == 1 else \
tuple(get_shape(t) for t in self._tensors)

if not self._output_types:
get_type = lambda t: type(t[0]) if isinstance(t, List) else \
t[0].dtype if isinstance(t, (DATAFRAME_TYPE, pd.DataFrame)) \
else t.dtype
self._output_types = get_type(self._tensors[0]) if len(self._tensors) == 1 else \
tuple(tf.as_dtype(get_type(t)) for t in self._tensors)

def _execute(self): # pragma: no cover
execute_data = [t for t in self._tensors if isinstance(t, ACCEPT_TYPE[:3])]

if len(execute_data) > 0:
execute(execute_data)

def get_data(self, t, index): # pragma: no cover
# coverage not included as now there is no solution to cover tensorflow methods
# see https://github.com/tensorflow/tensorflow/issues/33759 for more details.
fetch_kwargs = dict()
if self._fetch_kwargs:
fetch_kwargs = copy.deepcopy(self._fetch_kwargs)

if isinstance(t, TENSOR_TYPE):
return t[index].fetch(**fetch_kwargs)
elif isinstance(t, np.ndarray):
return t[index]
elif isinstance(t, DATAFRAME_TYPE):
return t.iloc[index].fetch(**fetch_kwargs).values
elif isinstance(t, SERIES_TYPE):
return t.iloc[index].fetch(**fetch_kwargs)
elif isinstance(t, pd.DataFrame):
return t.iloc[index].values
elif isinstance(t, pd.Series):
return t.iloc[index]
else:
return t[index]

def to_tf(self) -> "tf.data.Dataset":
"""Get TF Dataset.
convert into a tensorflow.data.Dataset
"""
def make_generator(): # pragma: no cover
if not self._executed:
self._execute()
self._executed = True

for i in range(len(self._tensors[0])):
if len(self._tensors) == 1:
yield self.get_data(self._tensors[0], i)
else:
yield tuple(self.get_data(t, i) for t in self._tensors)

return tf.data.Dataset.from_generator(
make_generator,
output_types=self._output_types,
output_shapes=self._output_shapes
)


def gen_tensorflow_dataset(tensors,
output_shapes = None,
output_types = None,
fetch_kwargs=None):
"""
convert mars data type to tf.data.Dataset. Note this is based tensorflow 2.0
For example
-----------
>>> # convert a tensor to tf.data.Dataset.
>>> data = mt.tensor([[1, 2], [3, 4]])
>>> dataset = gen_tensorflow_dataset(data)
>>> list(dataset.as_numpy_iterator())
[array([1, 2]), array([3, 4])]
>>> dataset.element_spec
TensorSpec(shape=(2,), dtype=tf.int64, name=None)
>>> # convert a tuple of tensors to tf.data.Dataset.
>>> data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6])
>>> dataset = gen_tensorflow_dataset((data1, data2, data3))
>>> list(dataset.as_numpy_iterator())
[(1, 3, 5), (2, 4, 6)]
Parameters
----------
tensors: Mars data type or a tuple consisting of Mars data type
the data that convert to tf.data.dataset
output_shapes:
A (nested) structure of `tf.TensorShape` objects corresponding to
each component of an element yielded from mars object.
output_types:
A (nested) structure of `tf.DType` objects corresponding to each
component of an element yielded from mars object.
fetch_kwargs:
the parameters of mars object executes fetch() operation.
Returns
-------
tf.data.Dataset
"""
mars_dataset = MarsDataset(tensors, output_shapes=output_shapes,
output_types=output_types, fetch_kwargs=fetch_kwargs)

return mars_dataset.to_tf()
7 changes: 2 additions & 5 deletions mars/learn/contrib/tensorflow/run_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO],
command_argv: List[str] = None,
retry_when_fail: bool = False,
session: SessionType = None,
run_kwargs: Dict[str, Any] = None,
port: int = None):
run_kwargs: Dict[str, Any] = None):
"""
Run TensorFlow script in Mars cluster.
Expand All @@ -170,8 +169,6 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO],
Mars session, if not provided, will use default one.
run_kwargs : dict
Extra kwargs for `session.run`.
port : int
Port of TensorFlow worker or ps, will automatically increase for the same worker
Returns
-------
Expand All @@ -192,5 +189,5 @@ def run_tensorflow_script(script: Union[bytes, str, BinaryIO, TextIO],
op = RunTensorFlow(data=data, code=to_binary(code),
n_workers=int(n_workers), n_ps=int(n_ps),
retry_when_fail=retry_when_fail, gpu=gpu,
port=port, command_args=command_argv)
command_args=command_argv)
return op(inputs).execute(session=session, **(run_kwargs or {}))
Loading

0 comments on commit 4b83198

Please sign in to comment.