Skip to content

Commit

Permalink
Write bytestream to OSS (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
andydiwenzhu authored Dec 2, 2020
1 parent 2f48fd0 commit adfbfbe
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 92 deletions.
1 change: 1 addition & 0 deletions modules/io/adaptors/read_oss_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ int main(int argc, const char** argv) {

std::shared_ptr<arrow::Table> table;
VINEYARD_CHECK_OK(oss_io_adaptor->ReadTable(&table));
VLOG(2) << "Read Table Done, part: " << proc;

if (table) {
auto st = writer->WriteTable(table);
Expand Down
1 change: 1 addition & 0 deletions modules/io/adaptors/write_hdfs_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def write_hdfs_bytes(vineyard_socket, stream_id, path, proc_num, proc_index):
hdfs = HDFileSystem(host=host, port=int(port))
path = urlparse(path).path

path += f'_{proc_index}'
with hdfs.open(path, 'wb') as f:
while True:
try:
Expand Down
1 change: 1 addition & 0 deletions modules/io/adaptors/write_hdfs_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def write_hdfs_orc(vineyard_socket, stream_id, path, proc_num, proc_index):
path = urlparse(path).path

writer = None
path += f'_{proc_index}'
with hdfs.open(path, 'wb') as f:
while True:
try:
Expand Down
4 changes: 3 additions & 1 deletion modules/io/adaptors/write_local_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def write_local_bytes(vineyard_socket, stream_id, path, proc_num, proc_index):
instream = streams[proc_index]
reader = instream.open_reader(client)

with open(urlparse(path).path, 'wb') as f:
path = urlparse(path).path + f'_{proc_index}'

with open(path, 'wb') as f:
while True:
try:
buf = reader.next()
Expand Down
78 changes: 0 additions & 78 deletions modules/io/adaptors/write_local_dataframe.cc

This file was deleted.

1 change: 1 addition & 0 deletions modules/io/adaptors/write_local_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def write_local_orc(vineyard_socket, stream_id, path, proc_num, proc_index):
reader = instream.open_reader(client)

writer = None
path += f'_{proc_index}'
with open(path, 'wb') as f:
while True:
try:
Expand Down
65 changes: 65 additions & 0 deletions modules/io/adaptors/write_oss_bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import sys
import oss2
from urllib.parse import urlparse

import vineyard
from vineyard.io.byte import ByteStreamBuilder


def write_oss_bytes(vineyard_socket, stream_id, path, proc_num, proc_index):
client = vineyard.connect(vineyard_socket)
streams = client.get(stream_id)
if len(streams) != proc_num or streams[proc_index] is None:
raise ValueError(f'Fetch stream error with proc_num={proc_num},proc_index={proc_index}')
instream = streams[proc_index]
reader = instream.open_reader(client)

parsed = urlparse(path)
auth = oss2.Auth(parsed.username, parsed.password)
_, bucket_name, object_name = parsed.path.split('/', 2)
bucket = oss2.Bucket(auth, parsed.hostname, bucket_name)

result = None
object_name += f'_{proc_index}'

while True:
try:
buf = reader.next()
except vineyard.StreamDrainedException:
break
if result is None:
offset = 0
else:
offset = result.next_position
result = bucket.append_object(object_name, offset, bytes(memoryview(buf)))


if __name__ == '__main__':
if len(sys.argv) < 6:
print('usage: ./write_oss_bytes <ipc_socket> <stream_id> <file_path> <proc_num> <proc_index>')
exit(1)
ipc_socket = sys.argv[1]
stream_id = sys.argv[2]
file_path = sys.argv[3]
proc_num = int(sys.argv[4])
proc_index = int(sys.argv[5])
write_oss_bytes(ipc_socket, stream_id, file_path, proc_num, proc_index)
21 changes: 14 additions & 7 deletions modules/io/python/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def wait_all(self, func=None, **kwargs):
logger.debug('partial_id_matrix = %s', partial_id_matrix)
if func is None:
return self.create_global_dataframe(partial_id_matrix, **kwargs)
return func(partial_id_matrix, kwargs)
return func(partial_id_matrix, **kwargs)

def create_objectset(self, partial_id_matrix):
meta = vineyard.ObjectMeta()
Expand Down Expand Up @@ -304,12 +304,6 @@ def read_oss_dataframe(path, vineyard_socket, *args, **kwargs):
vineyard.io.read.register('oss', read_oss_dataframe)


def parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs):
launcher = ParallelStreamLauncher()
launcher.run(get_executable('parse_dataframe_to_bytes'), *((vineyard_socket, dataframe_stream) + args), **kwargs)
return launcher.wait()


def parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs):
launcher = ParallelStreamLauncher()
launcher.run(get_executable('parse_dataframe_to_bytes'), *((vineyard_socket, dataframe_stream) + args), **kwargs)
Expand Down Expand Up @@ -377,8 +371,21 @@ def write_vineyard_dataframe(path, dataframe_stream, vineyard_socket, *args, **k
return launcher.wait_all(name=path)


def write_oss_bytes(path, byte_stream, vineyard_socket, *args, **kwargs):
path = json.dumps('oss://' + path)
launcher = ParallelStreamLauncher()
launcher.run(get_executable('write_oss_bytes'), *((vineyard_socket, byte_stream, path) + args), **kwargs)
launcher.join()


def write_oss_dataframe(path, dataframe_stream, vineyard_socket, *args, **kwargs):
write_oss_bytes(path, parse_dataframe_to_bytes(vineyard_socket, dataframe_stream, *args, **kwargs), vineyard_socket,
*args, **kwargs)


vineyard.io.write.register('file', write_local_dataframe)
vineyard.io.write.register('kafka', write_kafka_bytes)
vineyard.io.write.register('kafka', write_kafka_dataframe)
vineyard.io.write.register('hdfs', write_hdfs_dataframe)
vineyard.io.write.register('vineyard', write_vineyard_dataframe)
vineyard.io.write.register('oss', write_oss_dataframe)
33 changes: 27 additions & 6 deletions modules/io/python/tests/test_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_local_with_header(vineyard_ipc_socket, vineyard_endpoint, test_dataset,
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp)


def test_local_without_header(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp):
Expand All @@ -68,7 +68,7 @@ def test_local_without_header(vineyard_ipc_socket, vineyard_endpoint, test_datas
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp)


def test_local_orc(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp):
Expand All @@ -80,7 +80,7 @@ def test_local_orc(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_da
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e.orc' % test_dataset, '%s/testout.orc' % test_dataset_tmp)
assert filecmp.cmp('%s/p2p-31.e.orc' % test_dataset, '%s/testout.orc_0' % test_dataset_tmp)


@pytest.mark.skip_without_hdfs()
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_hdfs_bytes(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_d
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp)


def test_vineyard_dataframe(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp):
Expand All @@ -154,11 +154,11 @@ def test_vineyard_dataframe(vineyard_ipc_socket, vineyard_endpoint, test_dataset
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out_0' % test_dataset_tmp)


@pytest.mark.skip('oss not available at github ci')
def test_oss(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config):
def test_oss_read(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config):
oss_config_file = oss_config + '/.ossutilconfig'
oss_config = configparser.ConfigParser()
oss_config.read(oss_config_file)
Expand All @@ -175,3 +175,24 @@ def test_oss(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint)
assert filecmp.cmp('%s/p2p-31.e' % test_dataset, '%s/p2p-31.out' % test_dataset_tmp)


@pytest.mark.skip('oss not available at github ci')
def test_oss_io(vineyard_ipc_socket, vineyard_endpoint, test_dataset, test_dataset_tmp, oss_config):
oss_config_file = oss_config + '/.ossutilconfig'
oss_config = configparser.ConfigParser()
oss_config.read(oss_config_file)
accessKeyID = oss_config['Credentials']['accessKeyID']
accessKeySecret = oss_config['Credentials']['accessKeySecret']
oss_endpoint = oss_config['Credentials']['endpoint']
stream = vineyard.io.open(
f'oss://{accessKeyID}:{accessKeySecret}@{oss_endpoint}/grape-uk/p2p-31.e#header_row=false&delimiter= ',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
num_workers=2)
vineyard.io.open(f'oss://{accessKeyID}:{accessKeySecret}@{oss_endpoint}/grape-uk/p2p-31.out',
stream,
mode='w',
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
num_workers=2)

0 comments on commit adfbfbe

Please sign in to comment.