Skip to content

Commit

Permalink
Refactored lzc_send_resume_redacted with a thread.
Browse files Browse the repository at this point in the history
After the changes documented in #11445, writing to certain files
errored out with an opaque error message. Unfortunately, the path
of least resistance to fixing this seemed to be sticking a pipe
into the path of zfs send, along with a thread to copy data to
stdout.

Signed-off-by: Rich Ercolani <rincebrain@gmail.com>
  • Loading branch information
rincebrain committed Jun 5, 2021
1 parent e5e76bd commit 033f8fd
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 68 deletions.
1 change: 0 additions & 1 deletion lib/libzfs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ if BUILD_LINUX
USER_C += \
os/linux/libzfs_mount_os.c \
os/linux/libzfs_pool_os.c \
os/linux/libzfs_sendrecv_os.c \
os/linux/libzfs_util_os.c
endif

Expand Down
1 change: 0 additions & 1 deletion lib/libzfs/libzfs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ extern int libzfs_load_module(void);
extern int zpool_relabel_disk(libzfs_handle_t *hdl, const char *path,
const char *msg);
extern int find_shares_object(differ_info_t *di);
extern void libzfs_set_pipe_max(int infd);
extern void zfs_commit_proto(zfs_share_proto_t *);

#ifdef __cplusplus
Expand Down
7 changes: 0 additions & 7 deletions lib/libzfs/libzfs_sendrecv.c
Original file line number Diff line number Diff line change
Expand Up @@ -5142,13 +5142,6 @@ zfs_receive(libzfs_handle_t *hdl, const char *tosnap, nvlist_t *props,
return (-2);
}

/*
* It is not uncommon for gigabytes to be processed in zfs receive.
* Speculatively increase the buffer size if supported by the platform.
*/
if (S_ISFIFO(sb.st_mode))
libzfs_set_pipe_max(infd);

if (props) {
err = nvlist_lookup_string(props, "origin", &originsnap);
if (err && err != ENOENT)
Expand Down
6 changes: 0 additions & 6 deletions lib/libzfs/os/freebsd/libzfs_compat.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
#define ZFS_KMOD "openzfs"
#endif

void
libzfs_set_pipe_max(int infd)
{
/* FreeBSD automatically resizes */
}

static int
execvPe(const char *name, const char *path, char * const *argv,
char * const *envp)
Expand Down
52 changes: 0 additions & 52 deletions lib/libzfs/os/linux/libzfs_sendrecv_os.c

This file was deleted.

151 changes: 150 additions & 1 deletion lib/libzfs_core/libzfs_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Copyright 2017 RackTop Systems.
* Copyright (c) 2017 Open-E, Inc. All Rights Reserved.
* Copyright (c) 2019, 2020 by Christian Schwarz. All rights reserved.
* Copyright (c) 2021 Rich Ercolani.
*/

/*
Expand Down Expand Up @@ -96,6 +97,19 @@ static int g_fd = -1;
static pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER;
static int g_refcount;

#ifdef __linux__
#ifndef F_SETPIPE_SZ
#define F_SETPIPE_SZ (F_SETLEASE + 7)
#endif /* F_SETPIPE_SZ */

#ifndef F_GETPIPE_SZ
#define F_GETPIPE_SZ (F_GETLEASE + 7)
#endif /* F_GETPIPE_SZ */
#endif

static unsigned long lzc_get_pipe_max(void);
static void lzc_set_pipe_max(int infd);

#ifdef ZFS_DEBUG
static zfs_ioc_t fail_ioc_cmd = ZFS_IOC_LAST;
static zfs_errno_t fail_ioc_err;
Expand Down Expand Up @@ -645,6 +659,92 @@ lzc_send_resume(const char *snapname, const char *from, int fd,
resumeoff, NULL));
}

static unsigned long
lzc_get_pipe_max()
{
/* FreeBSD automatically grows to 64k */
unsigned long max_psize = 65536;
#ifdef __linux__
FILE *procf = fopen("/proc/sys/fs/pipe-max-size", "re");

if (procf != NULL) {
if (fscanf(procf, "%lu", &max_psize) <= 0) {
max_psize = max_psize;
}
fclose(procf);
}
#endif
return (max_psize);
}

static void
lzc_set_pipe_max(int infd)
{
#ifdef __linux__
unsigned long max_psize = lzc_get_pipe_max();
long cur_psize;
cur_psize = fcntl(infd, F_GETPIPE_SZ);
if (cur_psize > 0 &&
max_psize > (unsigned long) cur_psize)
fcntl(infd, F_SETPIPE_SZ,
max_psize);
#endif
}


struct sendargs {
int ioctlfd;
int inputfd;
int outputfd;
};
typedef struct sendargs sendargs_t;

static void *
do_send_output(void *voidargs)
{
sendargs_t *args = (sendargs_t *)voidargs;
sigset_t sigs;
int buflen = lzc_get_pipe_max();

/*
* See the comment above the close() call for why
* we can't just die from SIGPIPE.
*/
sigemptyset(&sigs);
sigaddset(&sigs, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigs, NULL);


int err = 1;
#ifdef __linux__
while (err > 0) {
err = splice(args->inputfd, NULL, args->outputfd, NULL, buflen,
SPLICE_F_MORE);
}
#else
void* buf = calloc(1, buflen);
while (err > 0) {
err = read(args->inputfd, buf, buflen);
if (err <= 0) {
break;
}
err = write(args->outputfd, buf, err);
}
free(buf);
#endif
if (err < 0) {
err = errno;
}
/*
* If we just return here, the other thread often blocks
* indefinitely on the ioctl completing, which won't happen
* because we stopped consuming the data. So we close the pipe
* here, and the other thread exits in a timely fashion.
*/
close(args->inputfd);
return ((void *)(uintptr_t)err);
}

/*
* snapname: The name of the "tosnap", or the snapshot whose contents we are
* sending.
Expand All @@ -664,9 +764,18 @@ lzc_send_resume_redacted(const char *snapname, const char *from, int fd,
{
nvlist_t *args;
int err;
int pipefd[2];
pthread_t mythread;
sendargs_t sendargs;
int threadstatus;


err = pipe2(pipefd, O_CLOEXEC);

lzc_set_pipe_max(pipefd[0]);

args = fnvlist_alloc();
fnvlist_add_int32(args, "fd", fd);
fnvlist_add_int32(args, "fd", pipefd[1]);
if (from != NULL)
fnvlist_add_string(args, "fromsnap", from);
if (flags & LZC_SEND_FLAG_LARGE_BLOCK)
Expand All @@ -686,8 +795,32 @@ lzc_send_resume_redacted(const char *snapname, const char *from, int fd,
if (redactbook != NULL)
fnvlist_add_string(args, "redactbook", redactbook);

sendargs.inputfd = pipefd[0];
sendargs.outputfd = fd;
sendargs.ioctlfd = pipefd[1];

pthread_create(&mythread, NULL, do_send_output, (void *)&sendargs);

err = lzc_ioctl(ZFS_IOC_SEND_NEW, snapname, args, NULL);

close(pipefd[1]);

pthread_join(mythread, (void *)&threadstatus);

nvlist_free(args);


if (threadstatus != 0) {
err = threadstatus;
/*
* if we don't set errno here, there are some edge cases
* where we wind up dying unexpectedly with
* "internal error: [normal warning msg]: Success"
*/
errno = threadstatus;
}


return (err);
}

Expand Down Expand Up @@ -792,6 +925,7 @@ recv_impl(const char *snapname, nvlist_t *recvdprops, nvlist_t *localprops,
char *atp;
int error;
boolean_t payload = B_FALSE;
struct stat sb;

ASSERT3S(g_refcount, >, 0);
VERIFY3S(g_fd, !=, -1);
Expand All @@ -811,6 +945,21 @@ recv_impl(const char *snapname, nvlist_t *recvdprops, nvlist_t *localprops,
*slashp = '\0';
}

/*
* The only way fstat can fail is if we do not have a valid file
* descriptor.
*/
if (fstat(input_fd, &sb) == -1) {
return (-errno);
}

/*
* It is not uncommon for gigabytes to be processed in zfs receive.
* Speculatively increase the buffer size if supported by the platform.
*/
if (S_ISFIFO(sb.st_mode))
lzc_set_pipe_max(input_fd);

/*
* The begin_record is normally a non-byteswapped BEGIN record.
* For resumable streams it may be set to any non-byteswapped
Expand Down

0 comments on commit 033f8fd

Please sign in to comment.