Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! [SQUASH] src: fixup lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Dec 19, 2022
1 parent c36be6e commit 9b037f1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 20 deletions.
46 changes: 37 additions & 9 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class DataQueueImpl final : public DataQueue,
tracker->TrackField("entries", entries_);
}

std::unique_ptr<Reader> getReader() override;
std::shared_ptr<Reader> getReader() override;
SET_MEMORY_INFO_NAME(DataQueue);
SET_SELF_SIZE(DataQueueImpl);

Expand All @@ -197,7 +197,9 @@ class DataQueueImpl final : public DataQueue,
// DataQueue with which it is associated, and always from the beginning.
// Reads are non-destructive, meaning that the state of the DataQueue
// will not and cannot be changed.
class IdempotentDataQueueReader final : public DataQueue::Reader {
class IdempotentDataQueueReader final
: public DataQueue::Reader,
public std::enable_shared_from_this<DataQueue::Reader> {
public:
IdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue)
: data_queue_(std::move(data_queue)) {
Expand All @@ -216,6 +218,8 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
DataQueue::Vec* data,
size_t count,
size_t max_count_hint = bob::kMaxCountHint) override {
std::shared_ptr<DataQueue::Reader> self = shared_from_this();

// If ended is true, this reader has already reached the end and cannot
// provide any more data.
if (ended_) {
Expand Down Expand Up @@ -360,7 +364,7 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
private:
std::shared_ptr<DataQueueImpl> data_queue_;
Maybe<uint32_t> current_index_ = Nothing<uint32_t>();
std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr;
std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr;
bool ended_ = false;
bool pull_pending_ = false;
int last_status_ = 0;
Expand All @@ -370,7 +374,9 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
// and removes those entries from the queue as they are fully consumed.
// This means that reads are destructive and the state of the DataQueue
// is mutated as the read proceeds.
class NonIdempotentDataQueueReader final : public DataQueue::Reader {
class NonIdempotentDataQueueReader final
: public DataQueue::Reader,
public std::enable_shared_from_this<NonIdempotentDataQueueReader> {
public:
NonIdempotentDataQueueReader(std::shared_ptr<DataQueueImpl> data_queue)
: data_queue_(std::move(data_queue)) {
Expand All @@ -390,6 +396,8 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
DataQueue::Vec* data,
size_t count,
size_t max_count_hint = bob::kMaxCountHint) override {
std::shared_ptr<DataQueue::Reader> self = shared_from_this();

// If ended is true, this reader has already reached the end and cannot
// provide any more data.
if (ended_) {
Expand Down Expand Up @@ -543,21 +551,21 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {

private:
std::shared_ptr<DataQueueImpl> data_queue_;
std::unique_ptr<DataQueue::Reader> current_reader_ = nullptr;
std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr;
bool ended_ = false;
bool pull_pending_ = false;
int last_status_ = 0;
};

std::unique_ptr<DataQueue::Reader> DataQueueImpl::getReader() {
std::shared_ptr<DataQueue::Reader> DataQueueImpl::getReader() {
if (isIdempotent()) {
return std::make_unique<IdempotentDataQueueReader>(shared_from_this());
return std::make_shared<IdempotentDataQueueReader>(shared_from_this());
}

if (lockedToReader_) return nullptr;
lockedToReader_ = true;

return std::make_unique<NonIdempotentDataQueueReader>(shared_from_this());
return std::make_shared<NonIdempotentDataQueueReader>(shared_from_this());
}

// ============================================================================
Expand Down Expand Up @@ -755,7 +763,7 @@ class DataQueueEntry : public EntryBase {
DataQueueEntry& operator=(DataQueueEntry&&) = delete;

std::unique_ptr<DataQueue::Reader> getReader() override {
return data_queue_->getReader();
return std::make_unique<ReaderImpl>(data_queue_->getReader());
}

std::unique_ptr<Entry> slice(
Expand Down Expand Up @@ -794,6 +802,26 @@ class DataQueueEntry : public EntryBase {

private:
std::shared_ptr<DataQueue> data_queue_;

class ReaderImpl : public DataQueue::Reader {
public:
explicit ReaderImpl(std::shared_ptr<DataQueue::Reader> inner) : inner_(std::move(inner)) {}

int Pull(DataQueue::Reader::Next next,
int options,
DataQueue::Vec* data,
size_t count,
size_t max_count_hint) override {
return inner_->Pull(std::move(next), options, data, count, max_count_hint);
}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(ReaderImpl)
SET_SELF_SIZE(ReaderImpl)

private:
std::shared_ptr<DataQueue::Reader> inner_;
};
};

// ============================================================================
Expand Down
2 changes: 1 addition & 1 deletion src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class DataQueue : public MemoryRetainer {
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
// permitted.
virtual std::unique_ptr<Reader> getReader() = 0;
virtual std::shared_ptr<Reader> getReader() = 0;

// Append a single new entry to the queue. Appending is only allowed
// when isIdempotent() is false. v8::Nothing<bool>() will be returned
Expand Down
2 changes: 1 addition & 1 deletion src/node_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class Blob : public BaseObject {
SET_SELF_SIZE(Reader)

private:
std::unique_ptr<DataQueue::Reader> inner_;
std::shared_ptr<DataQueue::Reader> inner_;
BaseObjectPtr<Blob> strong_ptr_;
bool eos_ = false;
};
Expand Down
18 changes: 9 additions & 9 deletions test/cctest/test_dataqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ TEST(DataQueue, IdempotentDataQueue) {
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);

// We can acquire multiple readers from the data_queue.
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->getReader();

CHECK_NOT_NULL(reader1);
CHECK_NOT_NULL(reader2);
Expand Down Expand Up @@ -289,7 +289,7 @@ TEST(DataQueue, IdempotentDataQueue) {
};

// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader3 = slice1->getReader();
std::shared_ptr<DataQueue::Reader> reader3 = slice1->getReader();
testSlice(reader3);

// We can slice correctly across boundaries.
Expand Down Expand Up @@ -364,7 +364,7 @@ TEST(DataQueue, IdempotentDataQueue) {
};

// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader4 = slice2->getReader();
std::shared_ptr<DataQueue::Reader> reader4 = slice2->getReader();
testSlice2(reader4);
}

Expand Down Expand Up @@ -426,8 +426,8 @@ TEST(DataQueue, NonIdempotentDataQueue) {
CHECK_NULL(slice1);

// We can acquire only a single reader for a non-idempotent data queue
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->getReader();

CHECK_NOT_NULL(reader1);
CHECK_NULL(reader2);
Expand Down Expand Up @@ -499,7 +499,7 @@ TEST(DataQueue, NonIdempotentDataQueue) {
testRead(reader1);

// We still cannot acquire another reader.
std::unique_ptr<DataQueue::Reader> reader3 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader3 = data_queue->getReader();
CHECK_NULL(reader3);

CHECK_NOT_NULL(data_queue);
Expand Down Expand Up @@ -555,7 +555,7 @@ TEST(DataQueue, DataQueueEntry) {
// Our original data queue should have a use count of 2.
CHECK_EQ(data_queue.use_count(), 2);

std::unique_ptr<DataQueue::Reader> reader = data_queue2->getReader();
std::shared_ptr<DataQueue::Reader> reader = data_queue2->getReader();

bool pullIsPending = true;

Expand Down Expand Up @@ -584,7 +584,7 @@ TEST(DataQueue, DataQueueEntry) {
// even though we have already consumed the non-idempotent data queue that
// contained it.

std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
CHECK_NOT_NULL(reader2);

pullIsPending = true;
Expand Down

0 comments on commit 9b037f1

Please sign in to comment.