Skip to content

Commit

Permalink
Enhance the implementation of stream, and those readers. (#93)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Dec 7, 2020
1 parent f7b22b4 commit bef75d5
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion modules/io/adaptors/parse_bytes_to_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ int main(int argc, const char** argv) {
std::unique_ptr<arrow::Buffer> buffer;
auto status = reader->GetNext(buffer);
if (status.ok()) {
LOG(INFO) << "consumer: buffer size = " << buffer->size();
VLOG(10) << "consumer: buffer size = " << buffer->size();
std::shared_ptr<arrow::Table> table;
Status st =
ParseTable(&table, buffer, delimiter[0], header_row, columns,
Expand Down
2 changes: 1 addition & 1 deletion modules/io/adaptors/parse_dataframe_to_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def parse_dataframe(vineyard_socket, stream_id, proc_num, proc_index):
client.persist(stream)
ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

stream_writer = stream.open_writer(client)
first_write = header_row
Expand Down
6 changes: 3 additions & 3 deletions modules/io/adaptors/read_hdfs_bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def read_hdfs_bytes(vineyard_socket, path, proc_num, proc_index):
builder[k] = v

offset = 0
length = 1024 * 1024
chunk_size = 1024 * 1024 * 4

header_line = hdfs.read_block(path, 0, 1, b'\n')
builder['header_line'] = header_line.decode('unicode_escape')
Expand All @@ -74,7 +74,7 @@ def read_hdfs_bytes(vineyard_socket, path, proc_num, proc_index):

ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

writer = stream.open_writer(client)

Expand All @@ -90,7 +90,7 @@ def read_hdfs_bytes(vineyard_socket, path, proc_num, proc_index):

offset = begin
while offset < end:
buf = hdfs.read_block(path, offset, min(length, end - offset), b'\n')
buf = hdfs.read_block(path, offset, min(chunk_size, end - offset), b'\n')
size = len(buf)
if not size:
break
Expand Down
6 changes: 4 additions & 2 deletions modules/io/adaptors/read_hdfs_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def read_hdfs_orc(vineyard_socket, path, proc_num, proc_index):
client.persist(stream)
ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

chunk_rows = 1024 * 256

writer = stream.open_writer(client)

Expand All @@ -87,7 +89,7 @@ def read_hdfs_orc(vineyard_socket, path, proc_num, proc_index):
schema.append((c, arrow_type(fields[c])))
pa_struct = pa.struct(schema)
while True:
rows = reader.read(num=1024)
rows = reader.read(num=chunk_rows)
if not rows:
break
rb = pa.RecordBatch.from_struct_array(pa.array(rows, type=pa_struct))
Expand Down
6 changes: 4 additions & 2 deletions modules/io/adaptors/read_hive_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def arrow_type(field):


def read_hdfs_orc(path, hdfs, writer):
chunk_rows = 1024 * 256

with hdfs.open(path, 'rb') as f:
reader = pyorc.Reader(f)
fields = reader.schema.fields
Expand All @@ -71,7 +73,7 @@ def read_hdfs_orc(path, hdfs, writer):
schema.append((c, arrow_type(fields[c])))
pa_struct = pa.struct(schema)
while True:
rows = reader.read(num=1024)
rows = reader.read(num=chunk_rows)
if not rows:
break
rb = pa.RecordBatch.from_struct_array(pa.array(rows, type=pa_struct))
Expand Down Expand Up @@ -117,7 +119,7 @@ def read_hive_orc(vineyard_socket, path, proc_num, proc_index):
client.persist(stream)
ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

writer = stream.open_writer(client)
host, port = urlparse(path).netloc.split(':')
Expand Down
6 changes: 4 additions & 2 deletions modules/io/adaptors/read_local_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def read_local_orc(vineyard_socket, path, proc_num, proc_index):
client.persist(stream)
ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

chunk_rows = 1024 * 256

writer = stream.open_writer(client)

Expand All @@ -82,7 +84,7 @@ def read_local_orc(vineyard_socket, path, proc_num, proc_index):
schema.append((c, arrow_type(fields[c])))
pa_struct = pa.struct(schema)
while True:
rows = reader.read(num=1024 * 1024)
rows = reader.read(num=chunk_rows)
if not rows:
break
rb = pa.RecordBatch.from_struct_array(pa.array(rows, type=pa_struct))
Expand Down
2 changes: 1 addition & 1 deletion modules/io/adaptors/read_vineyard_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def read_vineyard_dataframe(vineyard_socket, path, proc_num, proc_index):
client.persist(stream)
ret = {'type': 'return'}
ret['content'] = repr(stream.id)
print(json.dumps(ret))
print(json.dumps(ret), flush=True)

writer = stream.open_writer(client)

Expand Down
4 changes: 2 additions & 2 deletions python/vineyard/launcher/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ def read_output(self, stream):
while self._proc.poll() is None:
line = stream.readline()
self.parse(line)
logger.info(line)
logger.debug(line)

# consume all extra lines if the proc exits.
for line in stream.readlines():
self.parse(line)
logger.info(line)
logger.debug(line)

def join(self):
if self._proc.wait():
Expand Down
1 change: 0 additions & 1 deletion src/server/memory/stream_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ Status StreamStore::Get(ObjectID const stream_id, size_t const size,
}
}

// if (true /* FIXME: allocatable */) {
if (allocatable(stream, size)) {
// do allocation
ObjectID chunk;
Expand Down

0 comments on commit bef75d5

Please sign in to comment.