Skip to content

Commit

Permalink
Free olap scanner out of lock (#1733)
Browse files Browse the repository at this point in the history
Close scanner out of OlapScanner's batch lock,
which will lead all scanners wait for one scanner to finish.
  • Loading branch information
imay authored Sep 2, 2019
1 parent ba170aa commit 81ca3e3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
70 changes: 38 additions & 32 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
if (state->is_cancelled()) {
boost::unique_lock<boost::mutex> l(_row_batches_lock);
_transfer_done = true;
boost::lock_guard<boost::mutex> guard(_status_mutex);
boost::lock_guard<SpinLock> guard(_status_mutex);
if (LIKELY(_status.ok())) {
_status = Status::Cancelled("Cancelled");
}
Expand Down Expand Up @@ -303,7 +303,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo

// all scanner done, change *eos to true
*eos = true;
boost::lock_guard<boost::mutex> guard(_status_mutex);
boost::lock_guard<SpinLock> guard(_status_mutex);
return _status;
}

Expand Down Expand Up @@ -348,7 +348,7 @@ Status OlapScanNode::close(RuntimeState* state) {

// OlapScanNode terminate by exception
// so that initiative close the Scanner
for (auto scanner : _all_olap_scanners) {
for (auto scanner : _olap_scanners) {
scanner->close(state);
}

Expand Down Expand Up @@ -662,7 +662,6 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {

_scanner_pool->add(scanner);
_olap_scanners.push_back(scanner);
_all_olap_scanners = _olap_scanners;
}

// init progress
Expand Down Expand Up @@ -1036,7 +1035,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs());
if (!status.ok()) {
boost::lock_guard<boost::mutex> guard(_status_mutex);
boost::lock_guard<SpinLock> guard(_status_mutex);
_status = status;
break;
}
Expand Down Expand Up @@ -1179,7 +1178,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
if (!scanner->is_open()) {
status = scanner->open();
if (!status.ok()) {
boost::lock_guard<boost::mutex> guard(_status_mutex);
boost::lock_guard<SpinLock> guard(_status_mutex);
_status = status;
eos = true;
}
Expand Down Expand Up @@ -1230,43 +1229,50 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
raw_rows_read = scanner->raw_rows_read();
}

boost::unique_lock<boost::mutex> l(_scan_batches_lock);
// if we failed, check status.
if (UNLIKELY(!status.ok())) {
_transfer_done = true;
boost::lock_guard<boost::mutex> guard(_status_mutex);
if (LIKELY(_status.ok())) {
_status = status;
{
boost::unique_lock<boost::mutex> l(_scan_batches_lock);
// if we failed, check status.
if (UNLIKELY(!status.ok())) {
_transfer_done = true;
boost::lock_guard<SpinLock> guard(_status_mutex);
if (LIKELY(_status.ok())) {
_status = status;
}
}
}

bool global_status_ok = false;
{
boost::lock_guard<boost::mutex> guard(_status_mutex);
global_status_ok = _status.ok();
}
if (UNLIKELY(!global_status_ok)) {
eos = true;
for (auto rb : row_batchs) {
delete rb;
bool global_status_ok = false;
{
boost::lock_guard<SpinLock> guard(_status_mutex);
global_status_ok = _status.ok();
}
} else {
for (auto rb : row_batchs) {
_scan_row_batches.push_back(rb);
if (UNLIKELY(!global_status_ok)) {
eos = true;
for (auto rb : row_batchs) {
delete rb;
}
} else {
for (auto rb : row_batchs) {
_scan_row_batches.push_back(rb);
}
}
// If eos is true, we will process out of this lock block.
if (!eos) {
_olap_scanners.push_front(scanner);
}
_running_thread--;
}
// Scanner thread completed. Take a look and update the status
if (UNLIKELY(eos)) {
if (eos) {
// close out of batches lock. we do this before _progress update
// that can assure this object can keep live before we finish.
scanner->close(_runtime_state);

boost::unique_lock<boost::mutex> l(_scan_batches_lock);
_progress.update(1);
if (_progress.done()) {
// this is the right out
_scanner_done = true;
}
scanner->close(_runtime_state);
} else {
_olap_scanners.push_front(scanner);
}
_running_thread--;
_scan_batch_added_cv.notify_one();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "runtime/row_batch_interface.hpp"
#include "runtime/vectorized_row_batch.h"
#include "util/progress_updater.h"
#include "util/spinlock.h"

namespace doris {

Expand Down Expand Up @@ -226,7 +227,6 @@ class OlapScanNode : public ScanNode {

std::list<RowBatchInterface*> _scan_row_batches;

std::list<OlapScanner*> _all_olap_scanners;
std::list<OlapScanner*> _olap_scanners;

int _max_materialized_row_batches;
Expand All @@ -240,7 +240,7 @@ class OlapScanNode : public ScanNode {
int _nice;

// protect _status, for many thread may change _status
boost::mutex _status_mutex;
SpinLock _status_mutex;
Status _status;
RuntimeState* _runtime_state;
RuntimeProfile::Counter* _scan_timer;
Expand Down

0 comments on commit 81ca3e3

Please sign in to comment.