Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ceph-dovecot/dovecot-ceph-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jrse committed Apr 3, 2022
2 parents f3674a3 + 559b5cc commit f4af18c
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 67 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# Change Log
## [0.0.31](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.30) (2022-03-14)
- #304: force-resync: preserve mail flags
- #306: force-resync: restore all mail objects to inbox in case they have no reference to existing mailboxes
- #310: save-mail: check ceph osd_max_object size option, if mail size is bigger abort save.

## [0.0.30](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.30) (2022-03-14)

- bugfix: retry ceph operation in case of connection timeout

## [0.0.29](https://github.com/ceph-dovecot/dovecot-ceph-plugin/tree/0.0.29) (2022-03-10)

- bugfix: force-resync
Expand Down
3 changes: 2 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

AC_PREREQ([2.59])

AC_INIT([dovecot-ceph-plugin], [0.0.29], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])
AC_INIT([dovecot-ceph-plugin], [0.0.31], [https://github.com/ceph-dovecot/dovecot-ceph-plugin/issues/new], ,[https://github.com/ceph-dovecot/dovecot-ceph-plugin])


AC_CONFIG_AUX_DIR([.])
AC_CONFIG_SRCDIR([src])
Expand Down
2 changes: 1 addition & 1 deletion rpm/dovecot-ceph-plugin.spec
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Name: dovecot-ceph-plugin
Summary: Dovecot Ceph RADOS plugins

Version: 0.0.29
Version: 0.0.31

Release: 0%{?dist}
URL: https://github.com/ceph-dovecot/dovecot-ceph-plugin
Expand Down
19 changes: 16 additions & 3 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ using librmb::RadosStorageImpl;

#define DICT_USERNAME_SEPARATOR '/'
const char *RadosStorageImpl::CFG_OSD_MAX_WRITE_SIZE = "osd_max_write_size";
const char *RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE= "osd_max_object_size";

RadosStorageImpl::RadosStorageImpl(RadosCluster *_cluster) {
cluster = _cluster;
max_write_size = 10;
max_object_size = 134217728; //ceph default 128MB
io_ctx_created = false;
wait_method = WAIT_FOR_COMPLETE_AND_CB;
}
Expand All @@ -46,7 +48,7 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
if (!cluster->is_connected() || !io_ctx_created) {
return -1;
}
std::cout << "splitting buffer and exec op " << std::endl;

current_object->set_completion(librados::Rados::aio_create_completion());

/* librados::ObjectWriteOperation *op =
Expand All @@ -58,10 +60,11 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
ceph::bufferlist tmp_buffer;
assert(max_write > 0);

if (write_buffer_size <= 0 || max_write <= 0) {
if (write_buffer_size <= 0 ||
max_write <= 0) {
ret_val = -1;
return ret_val;
}

uint64_t rest = write_buffer_size % max_write;
int div = write_buffer_size / max_write + (rest > 0 ? 1 : 0);
for (int i = 0; i < div; ++i) {
Expand Down Expand Up @@ -197,9 +200,18 @@ int RadosStorageImpl::create_connection(const std::string &poolname) {
return err;
}
max_write_size = std::stoi(max_write_size_str);

string max_object_size_str;
err = cluster->get_config_option(RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE, &max_object_size_str);
if (err < 0) {
return err;
}
max_object_size = std::stoi(max_object_size_str);

if (err == 0) {
io_ctx_created = true;
}

// set the poolname
pool_name = poolname;
return 0;
Expand Down Expand Up @@ -384,6 +396,7 @@ bool RadosStorageImpl::save_mail(librados::ObjectWriteOperation *write_op_xattr,
}
time_t save_date = mail->get_rados_save_date();
write_op_xattr->mtime(&save_date);

int ret = split_buffer_and_exec_op(mail, write_op_xattr, get_max_write_size_bytes());
if (ret != 0) {
write_op_xattr->remove();
Expand Down
5 changes: 5 additions & 0 deletions src/librmb/rados-storage-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RadosStorageImpl : public RadosStorage {
void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method_) { this->wait_method = wait_method_; }
int get_max_write_size() override { return max_write_size; }
int get_max_write_size_bytes() override { return max_write_size * 1024 * 1024; }
int get_max_object_size() override {return max_object_size;}

int split_buffer_and_exec_op(RadosMail *current_object, librados::ObjectWriteOperation *write_op_xattr,
const uint64_t &max_write) override;
Expand Down Expand Up @@ -77,14 +78,18 @@ class RadosStorageImpl : public RadosStorage {
private:
RadosCluster *cluster;
int max_write_size;
int max_object_size;
std::string nspace;
librados::IoCtx io_ctx;
bool io_ctx_created;
std::string pool_name;
enum rbox_ceph_aio_wait_method wait_method;

static const char *CFG_OSD_MAX_WRITE_SIZE;
static const char *CFG_OSD_MAX_OBJECT_SIZE;
};



} // namespace librmb

Expand Down
8 changes: 6 additions & 2 deletions src/librmb/rados-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ class RadosStorage {
/* set the wait method for async operations */
virtual void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method) = 0;

/*! get the max object size in mb
/*! get the max operation size in mb
* @return the maximal number of mb to write in a single write operation*/
virtual int get_max_write_size() = 0;
/*! get the max object size in bytes
/*! get the max operation size in bytes
* @return max number of bytes to write in a single write operation*/
virtual int get_max_write_size_bytes() = 0;

/*! get the max ceph object size
*/
virtual int get_max_object_size() = 0;

/*! In case the current object size exceeds the max_write (bytes), object should be split into
* max smaller operations and executed separately.
*
Expand Down
1 change: 0 additions & 1 deletion src/librmb/tools/rmb/mailbox_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ int MailboxTools::save_mail(librmb::RadosMail* mail_obj) {
}

std::string file_path = mailbox_path + "/" + filename;
std::cout << " writing mail to " << file_path << std::endl;
std::ofstream myfile(file_path, std::ofstream::binary | std::ofstream::out);
if (!myfile.is_open()) {
return -1;
Expand Down
4 changes: 2 additions & 2 deletions src/storage-rbox/rbox-copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static int copy_mail(struct mail_save_context *ctx, librmb::RadosStorage *rados_
int ret_val = rados_storage->copy(src_oid, ns_src->c_str(), dest_oid, ns_dest->c_str(), metadata_update);
if (ret_val < 0) {
if (ret_val == -ENOENT) {
i_warning(
i_debug(
"copy mail failed from namespace: %s to namespace %s: src_oid: %s, des_oid: %s, error_code: %d, "
"storage_pool: %s , most likely concurrency issue => marking mail as expunged",
ns_src->c_str(), ns_dest->c_str(), src_oid.c_str(), dest_oid.c_str(), ret_val,
Expand Down Expand Up @@ -241,7 +241,7 @@ static int move_mail(struct mail_save_context *ctx, librmb::RadosStorage *rados_
rados_storage->move(src_oid, ns_src->c_str(), dest_oid, ns_dest->c_str(), metadata_update, delete_source);
if (ret_val < 0) {
if (ret_val == -ENOENT) {
i_warning(
i_debug(
"move mail failed: from namespace: %s to namespace %s: src_oid: %s, des_oid: %s, error_code : %d, "
"pool_name: %s. most likely due to concurency issues => marking mail as expunged",
ns_src->c_str(), ns_dest->c_str(), src_oid.c_str(), dest_oid.c_str(), ret_val,
Expand Down
91 changes: 72 additions & 19 deletions src/storage-rbox/rbox-mail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,20 @@ static int rbox_mail_metadata_get(struct rbox_mail *rmail, enum rbox_metadata_ke
// metadata_key.c_str(), key, rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
rbox_mail_set_expunged(rmail);
} else if(ret_load_metadata == -ETIMEDOUT) {
i_warning("READ TIMEOUT %d reading mail object %s ", ret_load_metadata,rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
FUNC_END();
return -1;
int max_retry = 10;
for(int i=0;i<max_retry;i++){
ret_load_metadata = r_storage->ms->get_storage()->load_metadata(rmail->rados_mail);
if(ret_load_metadata>=0){
i_error("READ TIMEOUT %d reading mail object %s ", ret_load_metadata,rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
break;
}
i_warning("READ TIMEOUT retry(%d) %d reading mail object %s ",i, ret_load_metadata,rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
}
if(ret_load_metadata<0){
FUNC_END();
return -1;
}

}
else {
i_error("Errorcode: process %d returned with %d cannot get x_attr(%s,%c) from rados_object: %s",getpid(), ret_load_metadata,
Expand Down Expand Up @@ -441,6 +452,30 @@ static int get_mail_stream(struct rbox_mail *mail, librados::bufferlist *buffer,
return ret;
}

static int read_mail_from_storage(librmb::RadosStorage *rados_storage,
struct rbox_mail *rmail,
uint64_t *psize,
time_t *save_date) {

int stat_err = 0;
int read_err = 0;

/* duplicate code: get_attribute */
librados::ObjectReadOperation *read_mail = new librados::ObjectReadOperation();
read_mail->read(0, INT_MAX, rmail->rados_mail->get_mail_buffer(), &read_err);
read_mail->stat(psize, save_date, &stat_err);

librados::AioCompletion *completion = librados::Rados::aio_create_completion();
int ret = rados_storage->get_io_ctx().aio_operate(*rmail->rados_mail->get_oid(), completion, read_mail,
rmail->rados_mail->get_mail_buffer());
completion->wait_for_complete_and_cb();
ret = completion->get_return_value();
completion->release();
delete read_mail;

return ret;
}

static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, struct message_size *hdr_size,
struct message_size *body_size, struct istream **stream_r) {
FUNC_START();
Expand Down Expand Up @@ -480,21 +515,21 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
uint64_t psize;
time_t save_date;

int stat_err = 0;
int read_err = 0;

librados::ObjectReadOperation *read_mail = new librados::ObjectReadOperation();
read_mail->read(0, INT_MAX, rmail->rados_mail->get_mail_buffer(), &read_err);
read_mail->stat(&psize, &save_date, &stat_err);

librados::AioCompletion *completion = librados::Rados::aio_create_completion();
ret = rados_storage->get_io_ctx().aio_operate(*rmail->rados_mail->get_oid(), completion, read_mail,
rmail->rados_mail->get_mail_buffer());
completion->wait_for_complete_and_cb();
ret = completion->get_return_value();
completion->release();
delete read_mail;

ret = read_mail_from_storage(rados_storage, rmail,&psize,&save_date);

/* duplicate code: get_attribute
librados::ObjectReadOperation *read_mail = new librados::ObjectReadOperation();
read_mail->read(0, INT_MAX, rmail->rados_mail->get_mail_buffer(), &read_err);
read_mail->stat(&psize, &save_date, &stat_err);
librados::AioCompletion *completion = librados::Rados::aio_create_completion();
ret = rados_storage->get_io_ctx().aio_operate(*rmail->rados_mail->get_oid(), completion, read_mail,
rmail->rados_mail->get_mail_buffer());
completion->wait_for_complete_and_cb();
ret = completion->get_return_value();
completion->release();
delete read_mail;
*/
if (ret < 0) {
if (ret == -ENOENT) {
// This can happen, if we have more then 2 processes running at the same time.
Expand All @@ -507,7 +542,25 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
FUNC_END_RET("ret == -1");
delete rmail->rados_mail->get_mail_buffer();
return -1;
} else {
}
else if(ret == -ETIMEDOUT) {
int max_retry = 10;
for(int i=0;i<max_retry;i++){
ret = read_mail_from_storage(rados_storage, rmail,&psize,&save_date);
if(ret >= 0){
i_error("READ TIMEOUT %d reading mail object %s ", ret,rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
break;
}
i_warning("READ TIMEOUT retry(%d) %d reading mail object %s ",i, ret,rmail->rados_mail != NULL ? rmail->rados_mail->to_string(" ").c_str() : " no rados_mail");
}

if(ret <0){
delete rmail->rados_mail->get_mail_buffer();
FUNC_END();
return -1;
}
}
else {
i_error("reading mail return code(%d), oid(%s),namespace(%s), alt_storage(%d)", ret,
rmail->rados_mail->get_oid()->c_str(), rados_storage->get_namespace().c_str(), alt_storage);
FUNC_END_RET("ret == -1");
Expand Down
4 changes: 4 additions & 0 deletions src/storage-rbox/rbox-mail.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ extern int rbox_mail_get_virtual_size(struct mail *_mail, uoff_t *size_r);

extern int rbox_get_guid_metadata(struct rbox_mail *mail, const char **value_r);

extern int read_mail_from_storage(librmb::RadosStorage *rados_storage,
struct rbox_mail *rmail,
uint64_t *psize,
time_t *save_date);
#endif // SRC_STORAGE_RBOX_RBOX_MAIL_H_
26 changes: 16 additions & 10 deletions src/storage-rbox/rbox-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,16 @@ static void clean_up_failed(struct rbox_save_context *r_ctx, bool wait_for_opera
}
}
// try to clean up!
int delete_ret = 0;
for (std::list<RadosMail *>::iterator it_cur_obj = r_ctx->rados_mails.begin(); it_cur_obj != r_ctx->rados_mails.end();
++it_cur_obj) {
delete_ret = r_storage->s->delete_mail(*it_cur_obj);
int delete_ret = r_storage->s->delete_mail(*it_cur_obj);
if (delete_ret < 0 && delete_ret != -ENOENT) {
i_error("Librados obj: %s, could not be removed", (*it_cur_obj)->get_oid()->c_str());
}
}
// clean up index
if (r_ctx->seq > 0) {
mail_index_expunge(r_ctx->trans, r_ctx->seq);
} else {
i_warning("clean_up_failed, index entry for seq %d, not removed r_ctx->seq <= 0", r_ctx->seq);
}

if (r_ctx->ctx.transaction != NULL) {
Expand Down Expand Up @@ -518,7 +515,6 @@ int rbox_save_finish(struct mail_save_context *_ctx) {
r_ctx->failed = true;
i_error("ERROR, mailsize is <= 0 ");
} else {
bool async_write = true;

if (!zlib_plugin_active) {
// write \0 to ceph (length()+1) if stream is not binary
Expand All @@ -536,11 +532,21 @@ int rbox_save_finish(struct mail_save_context *_ctx) {

r_storage->ms->get_storage()->save_metadata(&write_op, r_ctx->rados_mail);

if (!r_storage->config->is_write_chunks()) {
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
int max_object_size = r_storage->s->get_max_object_size();
i_debug("max_object_size %d mail_size %d",max_object_size, r_ctx->rados_mail->get_mail_size() );
if(max_object_size < r_ctx->rados_mail->get_mail_size()) {
i_error("configured CEPH Object size %d < then mail size %d ", r_storage->s->get_max_object_size(), r_ctx->rados_mail->get_mail_size() );
r_ctx->failed = true;
}else {
if (!r_storage->config->is_write_chunks()) {
bool async_write = true;
i_info("write chunks enabled %d ", r_storage->s->get_max_write_size_bytes() );
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
i_info("write failed? %d",r_ctx->failed);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
}
}
if (r_ctx->failed) {
i_error("saved mail: %s failed metadata_count %ld, mail_size (%d)", r_ctx->rados_mail->get_oid()->c_str(),
Expand Down
Loading

0 comments on commit f4af18c

Please sign in to comment.