-
Notifications
You must be signed in to change notification settings - Fork 0
/
bbq.sh
executable file
·202 lines (182 loc) · 6.14 KB
/
bbq.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env bash
#
# bbq - Simple Bash message queues using Linux named pipes.
#
# bbq.sh provides a few convenient Bash functions for creating and working with
# Linux named pipes (FIFO). Its primary use case is to set up multiple worker
# processes getting work from a single job queue in your shell script.
#
# Please see the examples/basic script for an example usage of bbq.sh.
#
# Requirements:
# - Linux, Bash, GNU coreutils, and flock from util-linux.
#
if [[ ${bbq_SOURCED:-} ]]; then
return 0
fi
bbq_error () { echo "$@" >&2; }
bbq_debug () { if [[ ${bbq_DEBUG:-} ]]; then echo "$@" >&2; fi; }
if [[ $BASH_SOURCE == "$0" ]]; then
bbq_error "Please source bbq.sh instead of running it!"
exit 1
fi
# Set it to enable some debugging outputs.
bbq_DEBUG=
# The default number of worker processes to run by bbq-start
bbq_WORKER_COUNT=4
# The named pipe to be used by default to serve as a message queue.
bbq_FIFO=
# Each "job" in bbq is a ${bbq_CHUNK_SIZE}-byte chunk of bash commands
# written to $bbq_FIFO that is used as a message queue.
#
bbq_CHUNK_SIZE=1024 # bytes; on Linux the limit is 4k.
# Create a named pipe (FIFO) and set it as the default FIFO ($bbq_FIFO).
#
# If an argument is provided it's taken to be the path of the FIFO to be
# created; otherwise, a random file name will be chosen for the FIFO in the
# current directory.
#
bbq-new () { # [queue]
local queue=${1:-}
[[ $queue ]] || queue=bbq-$$-$RANDOM
[[ $queue == /* ]] || queue=$PWD/$queue
mkfifo -m 0600 "$queue" || return $?
bbq_FIFO=$queue
}
# Start worker processes in the background to process the queue and wait for
# them to end.
#
# Arguments:
#
# queue - Optional. Path to the named pipe for the workers to read
# messages/commands from. If ommitted then the queue is
# taken to be $bbq_FIFO, and in which case, it will also be
# deleted at the end.
#
# Options:
#
# -w COUNT - Number of worker processes to create that work on the queue.
# Default is 4 workers.
#
bbq-start () { # [-w COUNT] [queue]
(declare -A workers # pid -> exit code
bbq_owns_the_pipe=
trap '
kill ${!workers[*]} >/dev/null 2>&1 || true
if [[ $bbq_owns_the_pipe ]]; then rm -f "$bbq_FIFO"; fi
' EXIT
_bbq_start_workers "$@"
) &
}
# Enqueue arbitrary Bash commands as a fixed length string into the $bbq_FIFO.
#
# Arguments:
#
# command - Arbitrary Bash command to add to the queue.
#
# queue - Optional. If ommitted, $bbq_FIFO is assumed; otherwise, it
# should be a path to a message queue created by bbq-new.
#
# This command MUST ONLY be run after bbq-start.
#
# NOTE: This command may block on writing to the pipe if the pipe buffer is full.
# Therefore, it's recommended that you run it as the last part of your
# script. Alternatively, you can also run it in the background (i.e.,
# with '&') directly or indirectly to avoid blocking the flow of your
# script.
#
bbq () { # <command> [queue]
local code=$1 queue=${2:-$bbq_FIFO}
if (( ${#code} > bbq_CHUNK_SIZE )); then
bbq_error "Encoded message exceeds allowed chunk size ($bbq_CHUNK_SIZE): $code"
return 1
fi
[[ -p $queue ]] || { bbq_error "'$queue' must be a named pipe!"; return 1; }
# Accoridng to docs and google, on linux, read/write less than PIPE_BUF (4k
# bytes) on a FIFO is atomic. So, we don't need to lock before writes.
#
# See also 'man fifo' and 'man 7 pipe'.
#
printf "%-${bbq_CHUNK_SIZE}s" "${code:0:$bbq_CHUNK_SIZE}" >"$queue"
}
# Internal implementation for bbq-start()
#
_bbq_start_workers () { # [-w COUNT] [queue]
local option; OPTIND=1
while getopts ':w:' option "$@"; do
case $option in
w) bbq_WORKER_COUNT=$OPTARG ;;
:) bbq_error "$FUNCNAME: Missing option argument for -$OPTARG"; return 1 ;;
\?) bbq_error "$FUNCNAME: Unknown option: -$OPTARG"; return 1 ;;
esac
done
shift $((OPTIND - 1))
bbq_WORKER_COUNT=${bbq_WORKER_COUNT:-4}
printf "%d" "$bbq_WORKER_COUNT" >/dev/null 2>&1 \
&& (( bbq_WORKER_COUNT > 0 )) || {
bbq_error "Worker count should be > 0"
return 1
}
local queue=${1:-}
if [[ $queue ]]; then
[[ $queue == /* ]] || queue=$PWD/$queue
bbq_FIFO=$queue
else
bbq_owns_the_pipe=1
fi
[[ -p ${bbq_FIFO:?required} ]] || {
bbq_error "$bbq_FIFO must be a named pipe!"
return 1
}
# Fork the workers to do work.
# This needs to be done before we can enqueue anything without blocking.
#
local i
for ((i=0; i < $bbq_WORKER_COUNT; i++)); do
_bbq_worker & workers[$!]=
done
# Keep a write FD open to the FIFO to prevent the read ends from
# getting EOFs, which happens when all write FDs are closed.
#
local write_fd; exec {write_fd}>"$bbq_FIFO" || return $?
# Wait for the worker processes to exit
local pid failed=
for pid in ${!workers[*]}; do
wait $pid && workers[$pid]=0 || {
workers[$pid]=$?
bbq_error "Worker $pid exited! (rc=${workers[$pid]})"
failed=1
}
done
[[ ! $failed ]]
}
# Represents a background worker sub process that takes commands from FIFO.
#
_bbq_worker () {
local read_fd
exec {read_fd}<"${bbq_FIFO:?required}" || return $?
local code
_bbq_pop () {
local rc
flock $read_fd || return $?
read -u $read_fd -rN "$bbq_CHUNK_SIZE" code || rc=$?
flock -u $read_fd || return $?
return $rc
}
while true; do
code=; _bbq_pop || {
bbq_debug "Worker $BASHPID: Failed dequeuing! code=$code"
continue
}
eval "$code"
# NOTE:
# - If you 'set -e' then the worker could die due to a non-zero
# exit from evaluating $code.
# - If all your workers exited (i.e., no more FDs reading on the
# pipe) then the enqueue operation will either block on opening the write end,
# or you get a SIGPIPE when writing to the pipe.
done
bbq_debug "Worker $BASHPID quit"
eval "exec $read_fd>&-"
}
bbq_SOURCED=1