diff --git a/modules/io/adaptors/read_oss_dataframe.cc b/modules/io/adaptors/read_oss_dataframe.cc index a00ca55f..2d84dedc 100644 --- a/modules/io/adaptors/read_oss_dataframe.cc +++ b/modules/io/adaptors/read_oss_dataframe.cc @@ -66,6 +66,7 @@ int main(int argc, const char** argv) { std::shared_ptr table; VINEYARD_CHECK_OK(oss_io_adaptor->ReadTable(&table)); + VLOG(2) << "Read Table Done, part: " << proc; if (table) { auto st = writer->WriteTable(table); diff --git a/modules/io/adaptors/write_hdfs_bytes.py b/modules/io/adaptors/write_hdfs_bytes.py index 968a4b7b..2f470192 100755 --- a/modules/io/adaptors/write_hdfs_bytes.py +++ b/modules/io/adaptors/write_hdfs_bytes.py @@ -40,6 +40,7 @@ def write_hdfs_bytes(vineyard_socket, stream_id, path, proc_num, proc_index): hdfs = HDFileSystem(host=host, port=int(port)) path = urlparse(path).path + path += f'_{proc_index}' with hdfs.open(path, 'wb') as f: while True: try: diff --git a/modules/io/adaptors/write_hdfs_orc.py b/modules/io/adaptors/write_hdfs_orc.py index 0065e69b..a18479b2 100755 --- a/modules/io/adaptors/write_hdfs_orc.py +++ b/modules/io/adaptors/write_hdfs_orc.py @@ -73,6 +73,7 @@ def write_hdfs_orc(vineyard_socket, stream_id, path, proc_num, proc_index): path = urlparse(path).path writer = None + path += f'_{proc_index}' with hdfs.open(path, 'wb') as f: while True: try: diff --git a/modules/io/adaptors/write_local_bytes.py b/modules/io/adaptors/write_local_bytes.py index e346a9c9..eb2a613b 100755 --- a/modules/io/adaptors/write_local_bytes.py +++ b/modules/io/adaptors/write_local_bytes.py @@ -32,7 +32,9 @@ def write_local_bytes(vineyard_socket, stream_id, path, proc_num, proc_index): instream = streams[proc_index] reader = instream.open_reader(client) - with open(urlparse(path).path, 'wb') as f: + path = urlparse(path).path + f'_{proc_index}' + + with open(path, 'wb') as f: while True: try: buf = reader.next() diff --git a/modules/io/adaptors/write_local_dataframe.cc b/modules/io/adaptors/write_local_dataframe.cc deleted file mode 100644 index 92d91c9a..00000000 --- a/modules/io/adaptors/write_local_dataframe.cc +++ /dev/null @@ -1,78 +0,0 @@ -/** Copyright 2020 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include -#include - -#include "basic/stream/dataframe_stream.h" -#include "basic/stream/parallel_stream.h" -#include "client/client.h" -#include "io/io/local_io_adaptor.h" - -using namespace vineyard; // NOLINT(build/namespaces) - -int main(int argc, const char** argv) { - if (argc < 6) { - printf( - "usage ./write_local_dataframe " - " "); - return 1; - } - - std::string ipc_socket = std::string(argv[1]); - ObjectID stream_id = VYObjectIDFromString(argv[2]); - std::string ofile = std::string(argv[3]); - int proc_num = std::stoi(argv[4]); - int proc_index = std::stoi(argv[5]); - - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - LOG(INFO) << "Connected to IPCServer: " << ipc_socket; - - auto s = - std::dynamic_pointer_cast(client.GetObject(stream_id)); - LOG(INFO) << "Got parallel stream " << s->id(); - - VINEYARD_ASSERT(static_cast(proc_num) == s->GetStreamSize(), - "Different ProcNum(" + std::to_string(proc_num) + - ") from StreamSize(" + - std::to_string(s->GetStreamSize()) + ")"); - - auto ls = s->GetStream(proc_index); - LOG(INFO) << "Got dataframe stream " << ls->id() << " at " << proc_index; - - auto reader = ls->OpenReader(client); - - std::unique_ptr local_io_adaptor( - new LocalIOAdaptor(ofile.c_str())); - VINEYARD_CHECK_OK(local_io_adaptor->Open("w")); - - bool header_row = false; - std::string header_line = ""; - VINEYARD_CHECK_OK(reader->GetHeaderLine(header_row, header_line)); - if (header_row) { - VINEYARD_CHECK_OK(local_io_adaptor->WriteLine(header_line + "\n")); - } - - std::string line; - while (reader->ReadLine(line).ok()) { - VINEYARD_CHECK_OK(local_io_adaptor->WriteLine(line)); - } - - VINEYARD_CHECK_OK(local_io_adaptor->Close()); - local_io_adaptor->Finalize(); - - return 0; -} diff --git a/modules/io/adaptors/write_local_orc.py b/modules/io/adaptors/write_local_orc.py index 7fcbc18a..efd46458 100755 --- a/modules/io/adaptors/write_local_orc.py +++ b/modules/io/adaptors/write_local_orc.py @@ -65,6 +65,7 @@ def write_local_orc(vineyard_socket, stream_id, path, proc_num, proc_index): reader = instream.open_reader(client) writer = None + path += f'_{proc_index}' with open(path, 'wb') as f: while True: try: diff --git a/modules/io/adaptors/write_oss_bytes.py b/modules/io/adaptors/write_oss_bytes.py new file mode 100755 index 00000000..32058887 --- /dev/null +++ b/modules/io/adaptors/write_oss_bytes.py @@ -0,0 +1,65 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import sys +import oss2 +from urllib.parse import urlparse + +import vineyard +from vineyard.io.byte import ByteStreamBuilder + + +def write_oss_bytes(vineyard_socket, stream_id, path, proc_num, proc_index): + client = vineyard.connect(vineyard_socket) + streams = client.get(stream_id) + if len(streams) != proc_num or streams[proc_index] is None: + raise ValueError(f'Fetch stream error with proc_num={proc_num},proc_index={proc_index}') + instream = streams[proc_index] + reader = instream.open_reader(client) + + parsed = urlparse(path) + auth = oss2.Auth(parsed.username, parsed.password) + _, bucket_name, object_name = parsed.path.split('/', 2) + bucket = oss2.Bucket(auth, parsed.hostname, bucket_name) + + result = None + object_name += f'_{proc_index}' + + while True: + try: + buf = reader.next() + except vineyard.StreamDrainedException: + break + if result is None: + offset = 0 + else: + offset = result.next_position + result = bucket.append_object(object_name, offset, bytes(memoryview(buf))) + + +if __name__ == '__main__': + if len(sys.argv) < 6: + print('usage: ./write_oss_bytes ') + exit(1) + ipc_socket = sys.argv[1] + stream_id = sys.argv[2] + file_path = sys.argv[3] + proc_num = int(sys.argv[4]) + proc_index = int(sys.argv[5]) + write_oss_bytes(ipc_socket, stream_id, file_path, proc_num, proc_index) diff --git a/modules/io/python/stream.py b/modules/io/python/stream.py index f824dade..4b936a39 100644 --- a/modules/io/python/stream.py +++ b/modules/io/python/stream.py @@ -168,7 +168,7 @@ def wait_all(self, func=None, **kwargs): logger.debug('partial_id_matrix = %s', partial_id_matrix) if func is None: return self.create_global_dataframe(partial_id_matrix, **kwargs) - return func(partial_id_matrix, kwargs) + return func(partial_id_matrix, **kwargs) def create_objectset(self, partial_id_matrix): meta = vineyard.ObjectMeta() @@ -304,12 +304,6 @@ def read_oss_dataframe(path, vineyard_socket, *args, **kwargs): vineyard.io.read.register('oss', read_oss_dataframe) -def parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs): - launcher = ParallelStreamLauncher() - launcher.run(get_executable('parse_dataframe_to_bytes'), *((vineyard_socket, dataframe_stream) + args), **kwargs) - return launcher.wait() - - def parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs): launcher = ParallelStreamLauncher() launcher.run(get_executable('parse_dataframe_to_bytes'), *((vineyard_socket, dataframe_stream) + args), **kwargs) @@ -377,8 +371,21 @@ def write_vineyard_dataframe(path, dataframe_stream, vineyard_socket, *args, **k return launcher.wait_all(name=path) +def write_oss_bytes(path, byte_stream, vineyard_socket, *args, **kwargs): + path = json.dumps('oss://' + path) + launcher = ParallelStreamLauncher() + launcher.run(get_executable('write_oss_bytes'), *((vineyard_socket, byte_stream, path) + args), **kwargs) + launcher.join() + + +def write_oss_dataframe(path, dataframe_stream, vineyard_socket, *args, **kwargs): + write_oss_bytes(path, parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs), vineyard_socket, + *args, **kwargs) + + vineyard.io.write.register('file', write_local_dataframe) vineyard.io.write.register('kafka', write_kafka_bytes) vineyard.io.write.register('kafka', write_kafka_dataframe) vineyard.io.write.register('hdfs', write_hdfs_dataframe) vineyard.io.write.register('vineyard', write_vineyard_dataframe) +vineyard.io.write.register('oss', write_oss_dataframe) diff --git a/modules/io/python/tests/test_open.py b/modules/io/python/tests/test_open.py index 91e98009..2cba718b 100644 --- a/modules/io/python/tests/test_open.py +++ b/modules/io/python/tests/test_open.py @@ -56,7 +56,7 @@ def test_local_with_header(vineyard_ipc_socket, vineyard_endpoint, test_dataset, mode='w', vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) - assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp) + assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp) def test_local_without_header(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp): @@ -68,7 +68,7 @@ def test_local_without_header(vineyard_ipc_socket, vineyard_endpoint, test_datas mode='w', vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) - assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp) + assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp) def test_local_orc(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp): @@ -80,7 +80,7 @@ def test_local_orc(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_da mode='w', vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) - assert filecmp.cmp('%s/p2p-31.e.orc' % test_dataset, '%s/testout.orc' % test_dataset_tmp) + assert filecmp.cmp('%s/p2p-31.e.orc' % test_dataset, '%s/testout.orc_0' % test_dataset_tmp) @pytest.mark.skip_without_hdfs() @@ -134,7 +134,7 @@ def test_hdfs_bytes(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_d mode='w', vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) - assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp) + assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp) def test_vineyard_dataframe(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp): @@ -154,11 +154,11 @@ def test_vineyard_dataframe(vineyard_ipc_socket, vineyard_endpoint, test_dataset mode='w', vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) - assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp) + assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp) @pytest.mark.skip('oss not available at github ci') -def test_oss(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config): +def test_oss_read(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config): oss_config_file = oss_config + '/.ossutilconfig' oss_config = configparser.ConfigParser() oss_config.read(oss_config_file) @@ -175,3 +175,24 @@ def test_oss(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_ vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint) assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp) + + +@pytest.mark.skip('oss not available at github ci') +def test_oss_io(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config): + oss_config_file = oss_config + '/.ossutilconfig' + oss_config = configparser.ConfigParser() + oss_config.read(oss_config_file) + accessKeyID = oss_config['Credentials']['accessKeyID'] + accessKeySecret = oss_config['Credentials']['accessKeySecret'] + oss_endpoint = oss_config['Credentials']['endpoint'] + stream = vineyard.io.open( + f'oss://{accessKeyID}:{accessKeySecret}@{oss_endpoint}/grape-uk/p2p-31.e#header_row=false&delimiter= ', + vineyard_ipc_socket=vineyard_ipc_socket, + vineyard_endpoint=vineyard_endpoint, + num_workers=2) + vineyard.io.open(f'oss://{accessKeyID}:{accessKeySecret}@{oss_endpoint}/grape-uk/p2p-31.out', + stream, + mode='w', + vineyard_ipc_socket=vineyard_ipc_socket, + vineyard_endpoint=vineyard_endpoint, + num_workers=2)