Skip to content

Commit

Permalink
fix(rebuild_issue): scanner thread exit issue (#223) (#224)
Browse files Browse the repository at this point in the history
Signed-off-by: Vitta <vitta@mayadata.io>
  • Loading branch information
vishnuitta authored May 3, 2019
1 parent 955f8bb commit ff7e226
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 45 deletions.
26 changes: 18 additions & 8 deletions include/zrepl_mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef struct inject_delay_s {
int io_receiver_exit;
int helping_replica_rebuild_complete;
int rebuild_complete;
int helping_replica_ack_sender;
} inject_delay_t;

typedef struct inject_rebuild_error_s {
Expand Down Expand Up @@ -144,8 +145,6 @@ typedef struct zvol_info_s {
uint64_t degraded_checkpointed_ionum;

time_t checkpointed_time; /* time of the last chkpoint */
uint64_t rebuild_cmd_queued_cnt;
uint64_t rebuild_cmd_acked_cnt;
/*
* time of the last stored checkedpointed io sequence number
* when ZVOL was in degraded state
Expand All @@ -169,6 +168,9 @@ typedef struct zvol_info_s {
/* fds related to this zinfo on which threads are waiting */
STAILQ_HEAD(, zinfo_fd_s) fd_list;

/* rebuild scanner info related to this zinfo */
STAILQ_HEAD(, zvol_rebuild_scanner_info_s) rebuild_scanner_list;

uint8_t io_ack_waiting;

/* Will be used to singal ack-sender to exit */
Expand Down Expand Up @@ -202,14 +204,27 @@ typedef struct zvol_info_s {
uint64_t inflight_io_cnt; // ongoing IOs count
uint64_t dispatched_io_cnt; // total received but incomplete IOs


// histogram of IOs
zfs_histogram_t uzfs_rio_histogram[ZFS_HISTOGRAM_IO_SIZE /
ZFS_HISTOGRAM_IO_BLOCK + 1];
zfs_histogram_t uzfs_wio_histogram[ZFS_HISTOGRAM_IO_SIZE /
ZFS_HISTOGRAM_IO_BLOCK + 1];
} zvol_info_t;

typedef struct zvol_rebuild_scanner_info_s {
STAILQ_ENTRY(zvol_rebuild_scanner_info_s) link;
zvol_info_t *zinfo;
uint64_t rebuild_cmd_queued_cnt;
uint64_t rebuild_cmd_acked_cnt;
union {
struct {
int is_fd_errored: 1;
};
uint32_t flags;
};
int fd;
} zvol_rebuild_scanner_info_t;

typedef struct thread_args_s {
char zvol_name[MAXNAMELEN];
zvol_info_t *zinfo;
Expand All @@ -235,11 +250,6 @@ typedef struct zvol_io_cmd_s {
int conn;
} zvol_io_cmd_t;

typedef struct zvol_rebuild_s {
zvol_info_t *zinfo;
int fd;
} zvol_rebuild_t;

extern int uzfs_zinfo_init(zvol_state_t *zv, const char *ds_name,
nvlist_t *create_props);
extern zvol_info_t *uzfs_zinfo_lookup(const char *name);
Expand Down
1 change: 1 addition & 0 deletions lib/libzpool/zrepl_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ uzfs_zinfo_init(zvol_state_t *zv, const char *ds_name, nvlist_t *create_props)

STAILQ_INIT(&zinfo->complete_queue);
STAILQ_INIT(&zinfo->fd_list);
STAILQ_INIT(&zinfo->rebuild_scanner_list);
uzfs_zinfo_init_mutex(zinfo);

strlcpy(zinfo->name, ds_name, MAXNAMELEN);
Expand Down
183 changes: 160 additions & 23 deletions lib/libzrepl/data_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ uint64_t zvol_rebuild_step_size = ZVOL_REBUILD_STEP_SIZE;
#define REBUILD_CMD_QUEUE_MAX_LIMIT (100)
uint64_t zvol_rebuild_cmd_queue_limit = REBUILD_CMD_QUEUE_MAX_LIMIT;

#define IS_REBUILD_HIT_MAX_CMD_LIMIT(zinfo) \
((zinfo->rebuild_cmd_queued_cnt - \
zinfo->rebuild_cmd_acked_cnt) > \
#define IS_REBUILD_HIT_MAX_CMD_LIMIT(scanner_info) \
((scanner_info->rebuild_cmd_queued_cnt - \
scanner_info->rebuild_cmd_acked_cnt) > \
zvol_rebuild_cmd_queue_limit)

uint16_t io_server_port = IO_SERVER_PORT;
Expand Down Expand Up @@ -444,6 +444,95 @@ uzfs_zvol_worker(void *arg)
uzfs_zinfo_drop_refcnt(zinfo);
}

/*
* This fn increments ack cnt on scanner info related to given fd
* This fn needs zinfo_mutex lock to access scanner list
*/
static void zvol_rebuild_scanner_inc_ack_cnt(zvol_info_t *zinfo, int fd)
{
zvol_rebuild_scanner_info_t *sinfo;

sinfo = STAILQ_FIRST(&zinfo->rebuild_scanner_list);
while (sinfo != NULL) {
if (sinfo->fd == fd) {
sinfo->rebuild_cmd_acked_cnt++;
break;
}
sinfo = STAILQ_NEXT(sinfo, link);
}
}

/*
* This fn sets error on scanner info related to given fd
*/
static void zvol_rebuild_scanner_set_errored_fd(zvol_info_t *zinfo, int fd)
{
zvol_rebuild_scanner_info_t *sinfo = NULL;

(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
sinfo = STAILQ_FIRST(&zinfo->rebuild_scanner_list);
while (sinfo != NULL) {
if (sinfo->fd == fd) {
sinfo->is_fd_errored = 1;
break;
}
sinfo = STAILQ_NEXT(sinfo, link);
}
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
}

/*
* This fn appends scanner details to zinfo scanner list
*/
static void
uzfs_zvol_append_to_rebuild_scanner(zvol_info_t *zinfo,
zvol_rebuild_scanner_info_t *scanner_info)
{
(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
#ifdef DEBUG
zvol_rebuild_scanner_info_t *sinfo = NULL;
STAILQ_FOREACH(sinfo, &zinfo->rebuild_scanner_list, link) {
if (sinfo->fd == scanner_info->fd) {
ASSERT(1 == 0);
}
}
#endif
STAILQ_INSERT_TAIL(&zinfo->rebuild_scanner_list, scanner_info, link);
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
}

/*
* This fn removes scanner details with given fd from zinfo scanner list
*/
static void
uzfs_zvol_remove_from_rebuild_scanner(zvol_info_t *zinfo, int fd)
{
zvol_rebuild_scanner_info_t *sinfo;

(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
#ifdef DEBUG
int count = 0;
STAILQ_FOREACH(sinfo, &zinfo->rebuild_scanner_list, link) {
if (sinfo->fd == fd)
count++;
}
ASSERT(count == 1);
#endif
sinfo = STAILQ_FIRST(&zinfo->rebuild_scanner_list);
while (sinfo != NULL) {
if (sinfo->fd == fd) {
STAILQ_REMOVE(&zinfo->rebuild_scanner_list, sinfo,
zvol_rebuild_scanner_info_s, link);
break;
}
sinfo = STAILQ_NEXT(sinfo, link);
}
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
}

/*
* This fn appends zinfo related fd to zinfo fd list
*/
static void
uzfs_zvol_append_to_fd_list(zvol_info_t *zinfo, int fd)
{
Expand All @@ -464,6 +553,9 @@ uzfs_zvol_append_to_fd_list(zvol_info_t *zinfo, int fd)
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
}

/*
* This fn removes given fd from zinfo fd list
*/
static void
uzfs_zvol_remove_from_fd_list(zvol_info_t *zinfo, int fd)
{
Expand Down Expand Up @@ -1293,10 +1385,11 @@ uzfs_zvol_rebuild_scanner_callback(off_t offset, size_t len,
{
zvol_io_hdr_t hdr;
zvol_io_cmd_t *zio_cmd;
zvol_rebuild_t *warg;
zvol_info_t *zinfo;
zvol_rebuild_scanner_info_t *warg;
int count_to_print = 0;

warg = (zvol_rebuild_t *)args;
warg = (zvol_rebuild_scanner_info_t *)args;
zinfo = warg->zinfo;

hdr.version = REPLICA_VERSION;
Expand All @@ -1307,18 +1400,33 @@ uzfs_zvol_rebuild_scanner_callback(off_t offset, size_t len,
hdr.flags = ZVOL_OP_FLAG_REBUILD;
hdr.status = ZVOL_OP_STATUS_OK;

LOG_DEBUG("IO number for rebuild %ld %ld %s", metadata->io_num, offset,
zinfo->name);
while (1) {
if ((zinfo->state == ZVOL_INFO_STATE_OFFLINE) ||
(!zinfo->is_io_ack_sender_created))
(!zinfo->is_io_ack_sender_created)) {
LOG_ERR("[%s:%d] in err state", zinfo->name, warg->fd);
return (-1);
}
if (warg->is_fd_errored) {
LOG_ERR("[%s:%d] errored at ack_sender", zinfo->name,
warg->fd);
return (-1);
if (IS_REBUILD_HIT_MAX_CMD_LIMIT(zinfo))
usleep(100);
}
if (IS_REBUILD_HIT_MAX_CMD_LIMIT(warg)) {
count_to_print++;
if (count_to_print >= 1000) {
LOG_ERR("[%s:%d] waiting for ack to send",
zinfo->name, warg->fd);
count_to_print = 0;
}
sleep(1);
}
else
break;
}

zinfo->rebuild_cmd_queued_cnt++;
LOG_DEBUG("IO number for rebuild %ld", metadata->io_num);
warg->rebuild_cmd_queued_cnt++;
zio_cmd = zio_cmd_alloc(&hdr, warg->fd);
/* Take refcount for uzfs_zvol_worker to work on it */
uzfs_zinfo_take_refcnt(zinfo);
Expand Down Expand Up @@ -1401,7 +1509,6 @@ uzfs_zvol_rebuild_scanner(void *arg)
zvol_state_t *snap_zv = NULL;
zvol_io_hdr_t hdr;
int rc = 0;
zvol_rebuild_t warg;
char *name;
blk_metadata_t metadata;
uint64_t rebuild_req_offset;
Expand All @@ -1412,6 +1519,7 @@ uzfs_zvol_rebuild_scanner(void *arg)
uint64_t checkpointed_io_seq = 0;
uint64_t payload_size = 0;
char *snap_name;
zvol_rebuild_scanner_info_t *warg = NULL;

if ((rc = setsockopt(fd, SOL_SOCKET, SO_LINGER, &lo, sizeof (lo)))
!= 0) {
Expand Down Expand Up @@ -1477,12 +1585,14 @@ uzfs_zvol_rebuild_scanner(void *arg)
"sock(%d)", name, fd);

uzfs_zvol_append_to_fd_list(zinfo, fd);
zinfo->rebuild_cmd_queued_cnt =
zinfo->rebuild_cmd_acked_cnt = 0;

warg = kmem_zalloc(sizeof (zvol_rebuild_scanner_info_t),
KM_SLEEP);
warg->zinfo = zinfo;
warg->fd = fd;
uzfs_zvol_append_to_rebuild_scanner(zinfo, warg);

kmem_free(name, hdr.len);
warg.zinfo = zinfo;
warg.fd = fd;
goto read_socket;

case ZVOL_OPCODE_REBUILD_STEP:
Expand Down Expand Up @@ -1521,9 +1631,16 @@ uzfs_zvol_rebuild_scanner(void *arg)
/*
* which means there is no user snapshot of given
* io_num, but, TODO: we need to make sure that there
* are no ongoing snapshots.
* are no ongoing snapshots. This is made sure by tgt?
*/
if (all_snap_done == B_FALSE) {
if (rebuild_req_offset != 0) {
LOG_ERR("[%s:%d]invalid offset"
" %d to send ALL_SNAP_DONE",
rebuild_req_offset,
zinfo->name);
goto exit;
}
uzfs_zvol_send_zio_cmd(zinfo, &hdr,
ZVOL_OPCODE_REBUILD_ALL_SNAP_DONE,
fd, NULL, 0, 0);
Expand All @@ -1542,10 +1659,10 @@ uzfs_zvol_rebuild_scanner(void *arg)

rc = uzfs_get_io_diff(zv, &metadata,
snap_zv, uzfs_zvol_rebuild_scanner_callback,
rebuild_req_offset, rebuild_req_len, &warg);
rebuild_req_offset, rebuild_req_len, warg);
if (rc != 0) {
LOG_ERR("Rebuild scanning failed on zvol %s ",
"err(%d)", zinfo->name, rc);
LOG_ERR("[%s:%d]Rebuild scanning failed err:%d",
zinfo->name, fd, rc);
goto exit;
}

Expand Down Expand Up @@ -1642,6 +1759,10 @@ uzfs_zvol_rebuild_scanner(void *arg)
LOG_INFO("Closing rebuild connection for zvol %s from sock(%d)",
zinfo->name, fd);
remove_pending_cmds_to_ack(fd, zinfo);

uzfs_zvol_remove_from_rebuild_scanner(zinfo, fd);
kmem_free(warg, sizeof (zvol_rebuild_scanner_info_t));

uzfs_zvol_remove_from_fd_list(zinfo, fd);

uzfs_zinfo_drop_refcnt(zinfo);
Expand Down Expand Up @@ -1746,6 +1867,7 @@ uzfs_zvol_io_ack_sender(void *arg)
thread_args_t *thrd_arg;
zvol_io_cmd_t *zio_cmd = NULL;
uint64_t len, latency;
int s = 0;

thrd_arg = (thread_args_t *)arg;
fd = thrd_arg->fd;
Expand Down Expand Up @@ -1780,11 +1902,19 @@ uzfs_zvol_io_ack_sender(void *arg)
zio_cmd = STAILQ_FIRST(&zinfo->complete_queue);
STAILQ_REMOVE_HEAD(&zinfo->complete_queue, cmd_link);
zinfo->zio_cmd_in_ack = zio_cmd;
if (zio_cmd->hdr.flags & ZVOL_OP_FLAG_REBUILD)
zinfo->rebuild_cmd_acked_cnt++;

if (zio_cmd->hdr.flags & ZVOL_OP_FLAG_REBUILD) {
s = 1;
zvol_rebuild_scanner_inc_ack_cnt(zinfo, zio_cmd->conn);
}

(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);

#if DEBUG
if (s == 1)
if (inject_error.delay.helping_replica_ack_sender == 1)
sleep(2);
#endif
LOG_DEBUG("ACK for op: %d, seq-id: %ld",
zio_cmd->hdr.opcode, zio_cmd->hdr.io_seq);

Expand Down Expand Up @@ -1814,7 +1944,8 @@ uzfs_zvol_io_ack_sender(void *arg)
zio_cmd->buf, zio_cmd->hdr.len);
error_check:
if (rc == -1) {
LOG_ERRNO("socket write err");
LOG_ERRNO("[fd:%d]socket write err",
zio_cmd->conn);
zinfo->zio_cmd_in_ack = NULL;
/*
* exit due to network errors on fd related
Expand All @@ -1823,6 +1954,9 @@ uzfs_zvol_io_ack_sender(void *arg)
if (zio_cmd->conn == fd) {
zio_cmd_free(&zio_cmd);
goto exit;
} else {
zvol_rebuild_scanner_set_errored_fd(
zinfo, zio_cmd->conn);
}
zio_cmd_free(&zio_cmd);
continue;
Expand All @@ -1836,11 +1970,14 @@ uzfs_zvol_io_ack_sender(void *arg)
rc = uzfs_send_reads(zio_cmd->conn, zio_cmd);
if (rc == -1) {
zinfo->zio_cmd_in_ack = NULL;
LOG_ERRNO("socket write err");
LOG_ERRNO("[fd:%d]socket write err",
zio_cmd->conn);
if (zio_cmd->conn == fd) {
zio_cmd_free(&zio_cmd);
goto exit;
}
zvol_rebuild_scanner_set_errored_fd(
zinfo, zio_cmd->conn);
}
}
latency = gethrtime() - zio_cmd->io_start_time;
Expand Down
Loading

0 comments on commit ff7e226

Please sign in to comment.