Skip to content

Commit

Permalink
Re-enable the from/to vineyard test cases, and set meta for tensor/da…
Browse files Browse the repository at this point in the history
…taframe properly. (#1967)

Co-authored-by: qiaozi.zwb <qiaozi.zwb@alibaba-inc.com>
  • Loading branch information
sighingnow and acezen authored Feb 5, 2021
1 parent 311d81e commit 84e4614
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 62 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ jobs:
conda install -n test --quiet --yes -c conda-forge python=$PYTHON skein conda-pack
fi
if [ -n "$WITH_VINEYARD" ]; then
pip install vineyard==0.1.9
sudo docker pull libvineyard/vineyardd:v0.1.9
pip install vineyard==0.1.10
sudo docker pull libvineyard/vineyardd:v0.1.10
mkdir -p /tmp/etcd-download-test
export ETCD_VER=v3.4.13
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
source ./.github/workflows/reload-env.sh
# launch vineyardd
sudo docker run --rm -d --name vineyard --shm-size=3072m -v /tmp/vineyard:/var/run libvineyard/vineyardd:v0.1.9
sudo docker run --rm -d --name vineyard --shm-size=3072m -v /tmp/vineyard:/var/run libvineyard/vineyardd:v0.1.10
until [ -S /tmp/vineyard/vineyard.sock ]
do
Expand Down Expand Up @@ -138,7 +138,7 @@ jobs:
export VINEYARD_IPC_SOCKET=/tmp/vineyard/vineyard.sock
mkdir -p build
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded mars/storage/tests/test_libs.py
pytest $PYTEST_CONFIG --cov-config .coveragerc mars/storage/tests/test_libs.py
mv .coverage build/.coverage.test_lib.file
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded mars/dataframe/datastore/tests/test_datastore_execute.py \
Expand Down
27 changes: 20 additions & 7 deletions mars/dataframe/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

try:
import vineyard
from vineyard.data.utils import normalize_dtype
except ImportError:
vineyard = None

Expand Down Expand Up @@ -86,14 +87,23 @@ def tile(cls, op):
workers = {client.instance_id: '127.0.0.1'}

df_meta = client.get_meta(vineyard.ObjectID(op.object_id))
chunks_meta = df_meta['objects_']

chunk_map = {}
for idx in range(int(chunks_meta['num_of_objects'])):
chunk_meta = chunks_meta['object_%d' % idx]
df_columns, df_dtypes = [], []
for idx in range(int(df_meta['partitions_-size'])):
chunk_meta = df_meta['partitions_-%d' % idx]
chunk_location = int(chunk_meta['instance_id'])
columns = json.loads(chunk_meta['columns_'])
shape = (np.nan, len(columns))
if not columns:
# note that in vineyard dataframe are splitted along the index axis.
df_columns = columns
if not df_dtypes:
for column_idx in range(len(columns)):
column_meta = chunk_meta['__values_-value-%d' % column_idx]
dtype = normalize_dtype(column_meta['value_type_'],
column_meta.get('value_type_meta_', None))
df_dtypes.append(dtype)
chunk_index = (int(chunk_meta['partition_index_row_']), int(chunk_meta['partition_index_column_']))
chunk_map[chunk_index] = (chunk_location, chunk_meta['id'], shape, columns)

Expand All @@ -106,14 +116,17 @@ def tile(cls, op):
chunk_op._object_id = chunk_id
chunk_op._expect_worker = workers[instance_id]
out_chunks.append(chunk_op.new_chunk([], shape=shape, index=chunk_index,
index_value=parse_index(pd.Index([])),
# use the same value as `read_csv`
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(columns))))

new_op = op.copy()
return new_op.new_dataframes(op.inputs, shape=(np.nan, np.nan), dtypes=pd.Series([]),
# n.b.: the `shape` will be filled by `_update_tileable_and_chunk_shape`.
return new_op.new_dataframes(op.inputs, shape=(np.nan, np.nan), dtypes=df_dtypes,
chunks=out_chunks, nsplits=nsplits,
index_value=parse_index(pd.Index([])),
columns_value=parse_index(pd.Index([])))
# use the same value as `read_csv`
index_value=parse_index(pd.RangeIndex(0, -1)),
columns_value=parse_index(pd.Index(df_columns)))

@classmethod
def execute(cls, ctx, op):
Expand Down
13 changes: 5 additions & 8 deletions mars/dataframe/datastore/tests/test_datastore_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from mars.dataframe import DataFrame
from mars.deploy.local.core import new_cluster
from mars.session import new_session
from mars.tests.core import mock, TestBase
from mars.tests.core import TestBase

try:
import vineyard
Expand Down Expand Up @@ -159,11 +159,11 @@ def testToSQL(self):
.sort_index(ascending=False)
pd.testing.assert_frame_equal(raw.col1.to_frame(), written)

@unittest.skip('the test is broken.')
@mock.patch('webbrowser.open_new_tab', new=lambda *_, **__: True)
@unittest.skipIf(vineyard is None, 'vineyard not installed')
def testToVineyard(self):
def testWithGivenSession(session):
with option_context({'vineyard.socket': '/tmp/vineyard/vineyard.sock'}):
ipc_socket = os.environ.get('VINEYARD_IPC_SOCKET', '/tmp/vineyard/vineyard.sock')
with option_context({'vineyard.socket': ipc_socket}):
df1 = DataFrame(pd.DataFrame(np.arange(12).reshape(3, 4), columns=['a', 'b', 'c', 'd']),
chunk_size=2)
object_id = df1.to_vineyard().execute(session=session).fetch()
Expand All @@ -177,13 +177,10 @@ def testWithGivenSession(session):
testWithGivenSession(session)

with new_cluster(scheduler_n_process=2, worker_n_process=2,
shared_memory='20M', web=True) as cluster:
shared_memory='20M', web=False) as cluster:
with new_session(cluster.endpoint).as_default() as session:
testWithGivenSession(session)

with new_session('http://' + cluster._web_endpoint).as_default() as web_session:
testWithGivenSession(web_session)

@unittest.skipIf(pa is None, 'pyarrow not installed')
def testToParquetArrowExecution(self):
raw = pd.DataFrame({
Expand Down
21 changes: 7 additions & 14 deletions mars/dataframe/datastore/to_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,23 +177,16 @@ def execute(cls, ctx, op):
client = vineyard.connect(op.vineyard_socket)

meta = vineyard.ObjectMeta()
instances = set()
chunks = set()
for idx, in_chunk in enumerate(op.inputs):
instance_id, chunk_id = ctx[in_chunk.key]
instances.add(instance_id)
chunks.add(chunk_id)
meta.add_member('object_%d' % idx, vineyard.ObjectID(chunk_id))
meta['typename'] = 'vineyard::ObjectSet'
meta['num_of_instances'] = len(instances)
meta['num_of_objects'] = len(chunks)
object_set_id = client.create_metadata(meta)

meta = vineyard.ObjectMeta()
meta.set_global(True)
meta['typename'] = 'vineyard::GlobalDataFrame'
meta['partition_shape_row_'] = op.shape[0]
meta['partition_shape_column_'] = op.shape[1]
meta.add_member('objects_', object_set_id)

for idx, in_chunk in enumerate(op.inputs):
_, chunk_id = ctx[in_chunk.key]
meta.add_member('partitions_-%d' % idx, vineyard.ObjectID(chunk_id))
meta['partitions_-size'] = len(op.inputs)

global_dataframe_id = client.create_metadata(meta)
client.persist(global_dataframe_id)

Expand Down
6 changes: 4 additions & 2 deletions mars/deploy/local/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,11 +1301,13 @@ def tearDown(self):

@property
def _extra_worker_options(self):
return ['-Dvineyard.socket=/tmp/vineyard/vineyard.sock']
ipc_socket = os.environ.get('VINEYARD_IPC_SOCKET', '/tmp/vineyard/vineyard.sock')
return ['-Dvineyard.socket=%s' % ipc_socket]

@property
def _extra_scheduler_options(self):
return ['-Dvineyard.socket=/tmp/vineyard/vineyard.sock']
ipc_socket = os.environ.get('VINEYARD_IPC_SOCKET', '/tmp/vineyard/vineyard.sock')
return ['-Dvineyard.socket=%s' % ipc_socket]

def new_cluster(self, *args, **kwargs):
return IntegrationTestCluster('127.0.0.1:%s' % self.scheduler_port,
Expand Down
12 changes: 8 additions & 4 deletions mars/tensor/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

try:
import vineyard
from vineyard.data.utils import normalize_dtype
except ImportError:
vineyard = None

Expand Down Expand Up @@ -69,11 +70,14 @@ def tile(cls, op):
workers = {client.instance_id: '127.0.0.1'}

tensor_meta = client.get_meta(vineyard.ObjectID(op.object_id))
chunks_meta = tensor_meta['chunks_']

chunk_map = {}
for idx in range(int(chunks_meta['num_of_objects'])):
chunk_meta = chunks_meta['object_%d' % idx]
dtype = None
for idx in range(int(tensor_meta['partitions_-size'])):
chunk_meta = tensor_meta['partitions_-%d' % idx]
if dtype is None:
dtype = normalize_dtype(chunk_meta['value_type_'],
chunk_meta.get('value_type_meta_', None))
chunk_location = int(chunk_meta['instance_id'])
shape = tuple(json.loads(chunk_meta['shape_']))
chunk_index = tuple(json.loads(chunk_meta['partition_index_']))
Expand All @@ -90,7 +94,7 @@ def tile(cls, op):
out_chunks.append(chunk_op.new_chunk([], shape=shape, index=chunk_index))

new_op = op.copy()
return new_op.new_tileables(op.inputs, chunks=out_chunks, nsplits=nsplits)
return new_op.new_tileables(op.inputs, dtype=dtype, chunks=out_chunks, nsplits=nsplits)

@classmethod
def execute(cls, ctx, op):
Expand Down
13 changes: 5 additions & 8 deletions mars/tensor/datastore/tests/test_datastore_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from mars.session import new_session
from mars.tensor import tensor, arange, totiledb, tohdf5, tozarr, \
from_vineyard, tovineyard
from mars.tests.core import mock, TestBase, ExecutorForTest
from mars.tests.core import TestBase, ExecutorForTest
from mars.tiles import get_tiled

try:
Expand Down Expand Up @@ -222,11 +222,11 @@ def testStoreZarrExecution(self):
result = zarr.open_array(path)
np.testing.assert_array_equal(result, raw + 1)

@unittest.skip('the test is broken, need to fix.')
@mock.patch('webbrowser.open_new_tab', new=lambda *_, **__: True)
@unittest.skipIf(vineyard is None, 'vineyard not installed')
def testToVineyard(self):
def testWithGivenSession(session):
with option_context({'vineyard.socket': '/tmp/vineyard/vineyard.sock'}):
ipc_socket = os.environ.get('VINEYARD_IPC_SOCKET', '/tmp/vineyard/vineyard.sock')
with option_context({'vineyard.socket': ipc_socket}):
tensor1 = tensor(np.arange(12).reshape(3, 4), chunk_size=2)
object_id = tovineyard(tensor1).execute(session=session).fetch()
tensor2 = from_vineyard(object_id)
Expand All @@ -239,9 +239,6 @@ def testWithGivenSession(session):
testWithGivenSession(session)

with new_cluster(scheduler_n_process=2, worker_n_process=2,
shared_memory='20M', web=True) as cluster:
shared_memory='20M', web=False) as cluster:
with new_session(cluster.endpoint).as_default() as session:
testWithGivenSession(session)

with new_session('http://' + cluster._web_endpoint).as_default() as web_session:
testWithGivenSession(web_session)
23 changes: 8 additions & 15 deletions mars/tensor/datastore/to_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,16 @@ def execute(cls, ctx, op):
client = vineyard.connect(op.vineyard_socket)

meta = vineyard.ObjectMeta()
instances = set()
chunks = set()
meta.set_global(True)
meta['typename'] = 'vineyard::GlobalTensor'
meta['shape_'] = json.dumps(op.shape)
meta['partition_shape_'] = json.dumps(op.chunk_shape)

for idx, in_chunk in enumerate(op.inputs):
instance_id, chunk_id = ctx[in_chunk.key]
instances.add(instance_id)
chunks.add(chunk_id)
meta.add_member('object_%d' % idx, vineyard.ObjectID(chunk_id))
meta['typename'] = 'vineyard::ObjectSet'
meta['num_of_instances'] = len(instances)
meta['num_of_objects'] = len(chunks)
object_set_id = client.create_metadata(meta)
_, chunk_id = ctx[in_chunk.key]
meta.add_member('partitions_-%d' % idx, vineyard.ObjectID(chunk_id))
meta['partitions_-size'] = len(op.inputs)

meta = vineyard.ObjectMeta()
meta['typename'] = 'vineyard::GlobalTensor<%s>' % op.dtype.name
meta['shape_'] = json.dumps(op.shape)
meta['chunk_shape_'] = json.dumps(op.chunk_shape)
meta.add_member('chunks_', object_set_id)
global_tensor_id = client.create_metadata(meta)
client.persist(global_tensor_id)

Expand Down
1 change: 1 addition & 0 deletions requirements-vineyard.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vineyard==0.1.10; sys.platform != 'win32'
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def execfile(fname, globs, locs=None):
with open(os.path.join(repo_root, 'requirements-dev.txt'), 'r') as f:
dev_requirements.extend(f.read().splitlines())

vineyard_requirements = []
with open(os.path.join(repo_root, 'requirements-vineyard.txt'), 'r') as f:
vineyard_requirements.extend(f.read().splitlines())

long_description = None
if os.path.exists(os.path.join(repo_root, 'README.rst')):
with open(os.path.join(repo_root, 'README.rst'), encoding='utf-8') as f:
Expand Down Expand Up @@ -206,6 +210,7 @@ def run(self):
extras_require={
'distributed': extra_requirements,
'dev': extra_requirements + dev_requirements,
'vineyard': vineyard_requirements,
}
)
setup(**setup_options)

0 comments on commit 84e4614

Please sign in to comment.