Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix: crash when huge write (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongChaoqiang authored Apr 28, 2022
1 parent 47fd85c commit 3f0c632
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion include/dsn/tool-api/aio_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class aio_context : public ref_counter
// filled by apps
dsn_handle_t file;
void *buffer;
uint32_t buffer_size;
uint64_t buffer_size;
uint64_t file_offset;

// filled by frameworks
Expand Down
2 changes: 1 addition & 1 deletion src/aio/aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace dsn {

aio_provider::aio_provider(disk_engine *disk) : _engine(disk) {}

void aio_provider::complete_io(aio_task *aio, error_code err, uint32_t bytes)
void aio_provider::complete_io(aio_task *aio, error_code err, uint64_t bytes)
{
_engine->complete_io(aio, err, bytes);
}
Expand Down
6 changes: 3 additions & 3 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
Expand All @@ -69,7 +69,7 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

void complete_io(aio_task *aio, error_code err, uint32_t bytes);
void complete_io(aio_task *aio, error_code err, uint64_t bytes);

private:
disk_engine *_engine;
Expand Down
18 changes: 7 additions & 11 deletions src/aio/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static disk_engine_initializer disk_engine_init;
aio_task *disk_write_queue::unlink_next_workload(void *plength)
{
uint64_t next_offset = 0;
uint32_t &sz = *(uint32_t *)plength;
uint64_t &sz = *(uint64_t *)plength;
sz = 0;

aio_task *first = _hdr._first, *current = first, *last = first;
Expand Down Expand Up @@ -125,11 +125,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,

if (err == ERR_OK) {
size_t this_size = (size_t)wk->get_aio_context()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
(int)this_size);

dcheck_ge(size, this_size);
wk->enqueue(err, this_size);
size -= this_size;
} else {
Expand Down Expand Up @@ -167,7 +163,7 @@ class batch_write_io_task : public aio_task
virtual void exec() override
{
auto df = (disk_file *)_tasks->get_aio_context()->file_object;
uint32_t sz;
uint64_t sz;

auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size());
if (wk) {
Expand All @@ -193,14 +189,14 @@ void disk_engine::write(aio_task *aio)
dio->engine = this;
dio->type = AIO_Write;

uint32_t sz;
uint64_t sz;
auto wk = df->write(aio, &sz);
if (wk) {
process_write(wk, sz);
}
}

void disk_engine::process_write(aio_task *aio, uint32_t sz)
void disk_engine::process_write(aio_task *aio, uint64_t sz)
{
aio_context *dio = aio->get_aio_context();

Expand Down Expand Up @@ -243,7 +239,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)
}
}

void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes)
void disk_engine::complete_io(aio_task *aio, error_code err, uint64_t bytes)
{
if (err != ERR_OK) {
dinfo("disk operation failure with code %s, err = %s, aio_task_id = %016" PRIx64,
Expand All @@ -270,7 +266,7 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes)

// write
else {
uint32_t sz;
uint64_t sz;
auto wk = df->on_write_completed(aio, (void *)&sz, err, (size_t)bytes);
if (wk) {
process_write(wk, sz);
Expand Down
4 changes: 2 additions & 2 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class disk_engine : public utils::singleton<disk_engine>
disk_engine();
~disk_engine() = default;

void process_write(aio_task *wk, uint32_t sz);
void complete_io(aio_task *aio, error_code err, uint32_t bytes);
void process_write(aio_task *wk, uint64_t sz);
void complete_io(aio_task *aio, error_code err, uint64_t bytes);

std::unique_ptr<aio_provider> _provider;

Expand Down
18 changes: 9 additions & 9 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}

error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
/*out*/ uint64_t *processed_bytes)
{
dsn::error_code resp = ERR_OK;
uint32_t buffer_offset = 0;
uint64_t buffer_offset = 0;
do {
// ret is the written data size
uint32_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
(char *)aio_ctx.buffer + buffer_offset,
aio_ctx.buffer_size - buffer_offset,
aio_ctx.file_offset + buffer_offset);
auto ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
(char *)aio_ctx.buffer + buffer_offset,
aio_ctx.buffer_size - buffer_offset,
aio_ctx.file_offset + buffer_offset);
if (dsn_unlikely(ret < 0)) {
if (errno == EINTR) {
dwarn_f("write failed with errno={} and will retry it.", strerror(errno));
Expand Down Expand Up @@ -114,7 +114,7 @@ error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
}

error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
/*out*/ uint64_t *processed_bytes)
{
ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx.file),
aio_ctx.buffer,
Expand All @@ -126,7 +126,7 @@ error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
if (ret == 0) {
return ERR_HANDLE_EOF;
}
*processed_bytes = static_cast<uint32_t>(ret);
*processed_bytes = static_cast<uint64_t>(ret);
return ERR_OK;
}

Expand All @@ -148,7 +148,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk)
ADD_POINT(aio_tsk->_tracer);
aio_context *aio_ctx = aio_tsk->get_aio_context();
error_code err = ERR_UNKNOWN;
uint32_t processed_bytes = 0;
uint64_t processed_bytes = 0;
switch (aio_ctx->type) {
case AIO_Read:
err = read(*aio_ctx, &processed_bytes);
Expand Down
4 changes: 2 additions & 2 deletions src/aio/native_linux_aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class native_linux_aio_provider : public aio_provider
dsn_handle_t open(const char *file_name, int flag, int pmode) override;
error_code close(dsn_handle_t fh) override;
error_code flush(dsn_handle_t fh) override;
error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;
error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override;

void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; }
Expand Down

0 comments on commit 3f0c632

Please sign in to comment.