Skip to content

Commit

Permalink
convert reference to pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain committed Dec 12, 2020
1 parent f0958b4 commit 7c82af8
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Status BrokerReader::open() {
}

//not support
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) {
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BrokerReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Status BufferedReader::open() {
}

//not support
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) {
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");

}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BufferedReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileReader {
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
* other return readed bytes.
*/
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) = 0;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) = 0;
virtual int64_t size() = 0;
virtual Status seek(int64_t position) = 0;
virtual Status tell(int64_t* position) = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void JsonReader::_close() {
Status JsonReader::_parse_json_doc(bool* eof) {
std::unique_ptr<uint8_t[]> json_str;
size_t length = 0;
RETURN_IF_ERROR(_file_reader->read_one_message(json_str, &length));
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
if (length == 0) {
*eof = true;
return Status::OK();
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t
bool eof;
int64_t file_size = size() - _current_offset;
if (file_size <= 0) {
buf.reset();
buf->reset();
*length = 0;
return Status::OK();
}
*length = file_size;
buf.reset(new uint8_t[file_size]);
read(buf.get(), length, &eof);
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/local_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalFileReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
}

// _total_length > 0, read the entire data
data.reset(new uint8_t[_total_length]);
data->reset(new uint8_t[_total_length]);
*length = _total_length;
bool eof = false;
Status st = read(data.get(), length, &eof);
Status st = read(data->get(), length, &eof);
if (eof) {
*length = 0;
}
Expand Down Expand Up @@ -188,7 +188,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

private:
// read the next buffer from _buf_queue
Status _read_next_buffer(std::unique_ptr<uint8_t[]>& data, size_t* length) {
Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
Expand All @@ -200,14 +200,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
data.reset();
data->reset();
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
data.reset(new uint8_t[*length]);
buf->get_bytes((char*)(data.get()), *length);
data->reset(new uint8_t[*length]);
buf->get_bytes((char*)(data->get()), *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
Expand Down

0 comments on commit 7c82af8

Please sign in to comment.