Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: rename disk_aio to aio_context #311

Merged
merged 8 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions include/dsn/tool-api/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
// to wait until it completes.
// TODO(wutao1): Call it aio_submit().
virtual void aio(aio_task *aio) = 0;
virtual disk_aio *prepare_aio_context(aio_task *) = 0;

virtual aio_context *prepare_aio_context(aio_task *) = 0;

virtual void start() = 0;

Expand All @@ -94,4 +100,4 @@ class aio_provider
};

/*@}*/
} // end namespace
} // namespace dsn
15 changes: 8 additions & 7 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ enum aio_type
};

class disk_engine;
class disk_aio
class aio_context : public ref_counter
{
public:
// filled by apps
Expand All @@ -557,9 +557,9 @@ class disk_aio
// filled by frameworks
aio_type type;
disk_engine *engine;
void *file_object;
void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it from `file`

disk_aio()
aio_context()
: file(nullptr),
buffer(nullptr),
support_write_vec(false),
Expand All @@ -578,15 +578,16 @@ class aio_task : public task
public:
aio_task(task_code code, const aio_handler &cb, int hash = 0, service_node *node = nullptr);
aio_task(task_code code, aio_handler &&cb, int hash = 0, service_node *node = nullptr);
~aio_task();

// tell the compiler that we want both the enqueue from base task and ours
// to prevent the compiler complaining -Werror,-Woverloaded-virtual.
using task::enqueue;
void enqueue(error_code err, size_t transferred_size);

size_t get_transferred_size() const { return _transferred_size; }
disk_aio *aio() { return _aio; }

// The ownership of `aio_context` is held by `aio_task`.
aio_context *get_aio_context() { return _aio_ctx.get(); }

// merge buffers in _unmerged_write_buffers to a single merged buffer.
// and store it in _merged_write_buffer_holder.
Expand All @@ -606,8 +607,8 @@ class aio_task : public task
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

protected:
disk_aio *_aio;
private:
dsn::ref_ptr<aio_context> _aio_ctx;
size_t _transferred_size;
aio_handler _cb;
};
Expand Down
26 changes: 13 additions & 13 deletions src/core/core/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength)

aio_task *first = _hdr._first, *current = first, *last = first;
while (nullptr != current) {
auto io = current->aio();
auto io = current->get_aio_context();
if (sz == 0) {
sz = io->buffer_size;
next_offset = io->file_offset + sz;
Expand Down Expand Up @@ -78,7 +78,7 @@ disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {}

aio_task *disk_file::read(aio_task *tsk)
{
tsk->add_ref(); // release on completion
tsk->add_ref(); // release on completion, see `on_read_completed`.
return _read_queue.add_work(tsk, nullptr);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
wk->next = nullptr;

if (err == ERR_OK) {
size_t this_size = (size_t)wk->aio()->buffer_size;
size_t this_size = (size_t)wk->get_aio_context()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
Expand Down Expand Up @@ -195,7 +195,7 @@ void disk_engine::read(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -218,12 +218,12 @@ class batch_write_io_task : public aio_task

virtual void exec() override
{
auto df = (disk_file *)_tasks->aio()->file_object;
auto df = (disk_file *)_tasks->get_aio_context()->file_object;
uint32_t sz;

auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), _transferred_size);
auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size());
if (wk) {
wk->aio()->engine->process_write(wk, sz);
wk->get_aio_context()->engine->process_write(wk, sz);
}
}

Expand All @@ -243,7 +243,7 @@ void disk_engine::write(aio_task *aio)
return;
}

auto dio = aio->aio();
auto dio = aio->get_aio_context();
auto df = (disk_file *)dio->file;
dio->file = df->native_handle();
dio->file_object = df;
Expand All @@ -259,7 +259,7 @@ void disk_engine::write(aio_task *aio)

void disk_engine::process_write(aio_task *aio, uint32_t sz)
{
disk_aio *dio = aio->aio();
aio_context *dio = aio->get_aio_context();

// no batching
if (dio->buffer_size == sz) {
Expand All @@ -278,7 +278,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)
else {
// setup io task
auto new_task = new batch_write_io_task(aio);
auto new_dio = new_task->aio();
auto new_dio = new_task->get_aio_context();
new_dio->buffer_size = sz;
new_dio->file_offset = dio->file_offset;
new_dio->file = dio->file;
Expand All @@ -288,7 +288,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)

auto cur_task = aio;
do {
auto cur_dio = cur_task->aio();
auto cur_dio = cur_task->get_aio_context();
if (cur_dio->buffer) {
dsn_file_buffer_t buf;
buf.buffer = cur_dio->buffer;
Expand Down Expand Up @@ -324,8 +324,8 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int

// no batching
else {
auto df = (disk_file *)(aio->aio()->file_object);
if (aio->aio()->type == AIO_Read) {
auto df = (disk_file *)(aio->get_aio_context()->file_object);
if (aio->get_aio_context()->type == AIO_Read) {
auto wk = df->on_read_completed(aio, err, (size_t)bytes);
if (wk) {
_provider->aio(wk);
Expand Down
3 changes: 2 additions & 1 deletion src/core/core/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class disk_engine
void read(aio_task *aio);
void write(aio_task *aio);

disk_aio *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }
aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }

service_node *node() const { return _node; }

private:
Expand Down
28 changes: 14 additions & 14 deletions src/core/core/file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Read;
cb->get_aio_context()->buffer = buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Read;

task::get_current_disk()->read(cb);
return cb;
Expand All @@ -70,11 +70,11 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->buffer = (char *)buffer;
cb->aio()->buffer_size = count;
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->buffer = (char *)buffer;
cb->get_aio_context()->buffer_size = count;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;

task::get_current_disk()->write(cb);
return cb;
Expand All @@ -90,13 +90,13 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash);
cb->aio()->file = file;
cb->aio()->file_offset = offset;
cb->aio()->type = AIO_Write;
cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
for (int i = 0; i < buffer_count; i++) {
if (buffers[i].size > 0) {
cb->_unmerged_write_buffers.push_back(buffers[i]);
cb->aio()->buffer_size += buffers[i].size;
cb->get_aio_context()->buffer_size += buffers[i].size;
}
}

Expand Down
22 changes: 8 additions & 14 deletions src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,34 +601,28 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node
spec().name.c_str());
set_error_code(ERR_IO_PENDING);

auto disk = get_current_disk();
_aio = disk->prepare_aio_context(this);
disk_engine *disk = task::get_current_disk();
_aio_ctx = disk->prepare_aio_context(this);
}

void aio_task::collapse()
{
if (!_unmerged_write_buffers.empty()) {
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio->buffer_size));
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio_ctx->buffer_size));
char *dest = buffer.get();
for (const dsn_file_buffer_t &b : _unmerged_write_buffers) {
::memcpy(dest, b.buffer, b.size);
dest += b.size;
}
dassert(dest - buffer.get() == _aio->buffer_size,
dassert(dest - buffer.get() == _aio_ctx->buffer_size,
"%u VS %u",
dest - buffer.get(),
_aio->buffer_size);
_aio->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio->buffer_size);
_aio_ctx->buffer_size);
_aio_ctx->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size);
}
}

aio_task::~aio_task()
{
delete _aio;
_aio = nullptr;
}

void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
Expand All @@ -639,4 +633,4 @@ void aio_task::enqueue(error_code err, size_t transferred_size)
task::enqueue(node()->computation()->get_pool(spec().pool_code));
}

} // end namespace
} // namespace dsn
7 changes: 3 additions & 4 deletions src/core/tests/task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,18 @@ class task_test : public ::testing::Test
{
disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0);

// this aio task is enqueued into read_queue of disk_engine
// this aio task is enqueued into read-queue of disk_engine
char buffer[128];
// in simulator environment this task will be executed immediately,
// so we excluded config-test-sim.ini for this test.
auto t = file::read(fp, buffer, 128, 0, LPC_TASK_TEST, nullptr, nullptr);

t->wait(10000);
ASSERT_TRUE(t->_wait_event.load() != nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为什么去掉?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会导致单测失败

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个不是本次修改引入的单测失败吧?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的 @acelyc111 但是晚修不如早修

ASSERT_EQ(t->_state, task_state::TASK_STATE_FINISHED);

// signal a finished task won't cause failure
ASSERT_TRUE(t->signal_waiters());
ASSERT_TRUE(t->signal_waiters());
t->signal_waiters(); // signal_waiters may return false
t->signal_waiters();
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/fault_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static void fault_on_task_cancel_post(task *caller, task *callee, bool succ) {}
// return true means continue, otherwise early terminate with task::set_error_code
static bool fault_on_aio_call(task *caller, aio_task *callee)
{
switch (callee->aio()->type) {
switch (callee->get_aio_context()->type) {
case AIO_Read:
if (rand::next_double01() < s_fj_opts[callee->spec().code].disk_read_fail_ratio) {
ddebug("fault inject %s at %s", callee->spec().name.c_str(), __FUNCTION__);
Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/native_aio_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}
}

disk_aio *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
{
return new linux_disk_aio_context(tsk);
}
Expand Down Expand Up @@ -159,7 +159,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk,
linux_disk_aio_context *aio;
int ret;

aio = (linux_disk_aio_context *)aio_tsk->aio();
aio = (linux_disk_aio_context *)aio_tsk->get_aio_context();

memset(&aio->cb, 0, sizeof(aio->cb));

Expand Down
6 changes: 3 additions & 3 deletions src/core/tools/common/native_aio_provider.linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class native_linux_aio_provider : public aio_provider
virtual error_code close(dsn_handle_t fh) override;
virtual error_code flush(dsn_handle_t fh) override;
virtual void aio(aio_task *aio) override;
virtual disk_aio *prepare_aio_context(aio_task *tsk) override;
virtual aio_context *prepare_aio_context(aio_task *tsk) override;

virtual void start() override;

class linux_disk_aio_context : public disk_aio
class linux_disk_aio_context : public aio_context
{
public:
struct iocb cb;
Expand All @@ -64,7 +64,7 @@ class native_linux_aio_provider : public aio_provider
uint32_t bytes;

explicit linux_disk_aio_context(aio_task *tsk_)
: disk_aio(), tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
: tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
{
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ static void tracer_on_aio_call(task *caller, aio_task *callee)
ddebug("%s AIO.CALL, task_id = %016" PRIx64 ", offset = %" PRIu64 ", size = %d",
callee->spec().name.c_str(),
callee->id(),
callee->aio()->file_offset,
callee->aio()->buffer_size);
callee->get_aio_context()->file_offset,
callee->get_aio_context()->buffer_size);
}

static void tracer_on_aio_enqueue(aio_task *this_)
Expand Down
7 changes: 5 additions & 2 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1816,11 +1816,12 @@ class log_file::file_streamer
// possible error_code:
// ERR_OK result would always size as expected
// ERR_HANDLE_EOF if there are not enough data in file. result would still be
// filled with possible data
// filled with possible data
// ERR_FILE_OPERATION_FAILED filesystem failure
error_code read_next(size_t size, /*out*/ blob &result)
{
binary_writer writer(size);

#define TRY(x) \
do { \
auto _x = (x); \
Expand All @@ -1829,6 +1830,7 @@ class log_file::file_streamer
return _x; \
} \
} while (0)

TRY(_current_buffer->wait_ongoing_task());
if (size < _current_buffer->length()) {
result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size);
Expand Down Expand Up @@ -1884,7 +1886,8 @@ class log_file::file_streamer
}

// buffer size, in bytes
static const size_t block_size_bytes = 1024 * 1024;
// TODO(wutao1): call it BLOCK_BYTES_SIZE
static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB
struct buffer_t
{
std::unique_ptr<char[]> _buffer; // with block_size
Expand Down
Loading