Skip to content

Commit

Permalink
binding: add batch thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
braydonf committed Oct 2, 2019
1 parent bad4a96 commit 5abd296
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 14 deletions.
68 changes: 54 additions & 14 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1677,7 +1677,8 @@ struct Batch {
Batch (Database* database)
: database_(database),
batch_(new leveldb::WriteBatch()),
hasData_(false) {}
hasData_(false),
isShared_(false) {}

~Batch () {
delete batch_;
Expand All @@ -1704,9 +1705,22 @@ struct Batch {
return database_->WriteBatch(options, batch_);
}

bool IsShared () {
return isShared_;
}

void Share () {
isShared_ = true;
}

void Unshare () {
isShared_ = false;
}

Database* database_;
leveldb::WriteBatch* batch_;
bool hasData_;
bool isShared_;
};

/**
Expand Down Expand Up @@ -1741,11 +1755,15 @@ NAPI_METHOD(batch_put) {
NAPI_ARGV(3);
NAPI_BATCH_CONTEXT();

leveldb::Slice key = ToSlice(env, argv[1]);
leveldb::Slice value = ToSlice(env, argv[2]);
batch->Put(key, value);
DisposeSliceBuffer(key);
DisposeSliceBuffer(value);
if (!batch->IsShared()) {
leveldb::Slice key = ToSlice(env, argv[1]);
leveldb::Slice value = ToSlice(env, argv[2]);
batch->Put(key, value);
DisposeSliceBuffer(key);
DisposeSliceBuffer(value);
} else {
napi_throw_error(env, 0, "Unsafe batch put.");
}

NAPI_RETURN_UNDEFINED();
}
Expand All @@ -1757,9 +1775,13 @@ NAPI_METHOD(batch_del) {
NAPI_ARGV(2);
NAPI_BATCH_CONTEXT();

leveldb::Slice key = ToSlice(env, argv[1]);
batch->Del(key);
DisposeSliceBuffer(key);
if (!batch->IsShared()) {
leveldb::Slice key = ToSlice(env, argv[1]);
batch->Del(key);
DisposeSliceBuffer(key);
} else {
napi_throw_error(env, 0, "Unsafe batch del.");
}

NAPI_RETURN_UNDEFINED();
}
Expand All @@ -1771,7 +1793,11 @@ NAPI_METHOD(batch_clear) {
NAPI_ARGV(1);
NAPI_BATCH_CONTEXT();

batch->Clear();
if (!batch->IsShared()) {
batch->Clear();
} else {
napi_throw_error(env, 0, "Unsafe batch clear.");
}

NAPI_RETURN_UNDEFINED();
}
Expand All @@ -1788,6 +1814,9 @@ struct BatchWriteWorker final : public PriorityWorker {
: PriorityWorker(env, batch->database_, callback, "leveldown.batch.write"),
batch_(batch),
sync_(sync) {
// For thread saftey, consider BatchWrite as shared.
batch->Share();

// Prevent GC of batch object before we execute
NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, context, 1, &contextRef_));
}
Expand All @@ -1802,6 +1831,11 @@ struct BatchWriteWorker final : public PriorityWorker {
}
}

void DoFinally () override {
database_->DecrementPriorityWork();
batch_->Unshare();
}

Batch* batch_;
bool sync_;

Expand All @@ -1816,12 +1850,18 @@ NAPI_METHOD(batch_write) {
NAPI_ARGV(3);
NAPI_BATCH_CONTEXT();

napi_value options = argv[1];
bool sync = BooleanProperty(env, options, "sync", false);
napi_value callback = argv[2];

BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync);
worker->Queue();
if (!batch->IsShared()) {
napi_value options = argv[1];
bool sync = BooleanProperty(env, options, "sync", false);

BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync);
worker->Queue();
} else {
napi_value argv = CreateError(env, "Unsafe batch write.");
CallFunction(env, callback, 1, &argv);
}

NAPI_RETURN_UNDEFINED();
}
Expand Down
52 changes: 52 additions & 0 deletions test/bdb-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,56 @@ describe('BDB', function() {
}
});
});

describe('thread safety', function() {
async function checkError(method, message) {
const batch = db.batch();
const hash = Buffer.alloc(20, 0x11);

const value = Buffer.alloc(1024 * 1024);
const key = tkey.encode(hash, 12);

batch.put(key, value);
batch.write();

let err = null;

try {
switch (method) {
case 'clear':
batch.clear();
break;
case 'put':
batch.put(key, value);
break;
case 'del':
batch.del(key);
break;
case 'write':
await batch.write();
break;
}

} catch (e) {
err = e;
}

assert(err);
assert.equal(err.message, message);
await new Promise((r) => setTimeout(r, 200));
}

const methods = {
'clear': 'Unsafe batch clear.',
'put': 'Unsafe batch put.',
'del': 'Unsafe batch del.',
'write': 'Unsafe batch write.',
};

for (const [method, message] of Object.entries(methods)) {
it(`will check safety of ${method}`, async () => {
await checkError(method, message);
});
}
});
});

0 comments on commit 5abd296

Please sign in to comment.