Skip to content

Commit

Permalink
convert reference to pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain committed Dec 12, 2020
1 parent 751bc92 commit 6e81403
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Status BrokerReader::open() {
}

//not support
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) {
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BrokerReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Status BufferedReader::open() {
}

//not support
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) {
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");

}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BufferedReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileReader {
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
* other return readed bytes.
*/
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) = 0;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) = 0;
virtual int64_t size() = 0;
virtual Status seek(int64_t position) = 0;
virtual Status tell(int64_t* position) = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void JsonReader::_close() {
Status JsonReader::_parse_json_doc(bool* eof) {
std::unique_ptr<uint8_t[]> json_str;
size_t length = 0;
RETURN_IF_ERROR(_file_reader->read_one_message(json_str, &length));
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
if (length == 0) {
*eof = true;
return Status::OK();
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ bool LocalFileReader::closed() {
}

// Read all bytes
Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) {
Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
bool eof;
int64_t file_size = size() - _current_offset;
if (file_size <= 0) {
buf.reset();
buf->reset();
*length = 0;
return Status::OK();
}
*length = file_size;
buf.reset(new uint8_t[file_size]);
read(buf.get(), length, &eof);
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/local_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalFileReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>& buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
// If _total_length == -1, this should be a Kafka routine load task,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status read_one_message(std::unique_ptr<uint8_t[]>& data, size_t* length) override {
Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) override {
if (_total_length < -1) {
std::stringstream ss;
ss << "invalid, _total_length is: " << _total_length;
Expand All @@ -102,10 +102,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
}

// _total_length > 0, read the entire data
data.reset(new uint8_t[_total_length]);
data->reset(new uint8_t[_total_length]);
*length = _total_length;
bool eof = false;
Status st = read(data.get(), length, &eof);
Status st = read(data->get(), length, &eof);
if (eof) {
*length = 0;
}
Expand Down Expand Up @@ -188,7 +188,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

private:
// read the next buffer from _buf_queue
Status _read_next_buffer(std::unique_ptr<uint8_t[]>& data, size_t* length) {
Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
Expand All @@ -200,14 +200,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
data.reset();
data->reset();
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
data.reset(new uint8_t[*length]);
buf->get_bytes((char*)(data.get()), *length);
data->reset(new uint8_t[*length]);
buf->get_bytes((char*)(data->get()), *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
Expand Down

0 comments on commit 6e81403

Please sign in to comment.