Skip to content

Commit

Permalink
CLI: --num-workers option ('ais put', 'ais archive', and more)
Browse files Browse the repository at this point in the history
* add; amend
* an option to execute serially (consistent with aistore)
* limit not to exceed 2 * num-CPUs
* remove `--conc` flag (obsolete)
* fix helps

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Oct 1, 2024
1 parent 16edff7 commit d5e6fbc
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 53 deletions.
6 changes: 3 additions & 3 deletions cmd/cli/cli/arch_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (
archAppendOrPutFlag,
archAppendOnlyFlag,
archpathFlag,
concurrencyFlag,
numPutWorkersFlag,
dryRunFlag,
recursFlag,
verboseFlag,
Expand All @@ -104,7 +104,7 @@ var (
),
cmdGenShards: {
cleanupFlag,
concurrencyFlag,
numGenShardWorkersFlag,
fsizeFlag,
fcountFlag,
fextsFlag,
Expand Down Expand Up @@ -519,7 +519,7 @@ func genShardsHandler(c *cli.Context) error {
var (
shardNum int
progress = mpb.New(mpb.WithWidth(barWidth))
concLimit = parseIntFlag(c, concurrencyFlag)
concLimit = parseIntFlag(c, numGenShardWorkersFlag)
concSemaphore = make(chan struct{}, concLimit)
group, ctx = errgroup.WithContext(context.Background())
text = "Shards created: "
Expand Down
24 changes: 17 additions & 7 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,6 @@ var (
Name: "cleanup",
Usage: "remove old bucket and create it again (warning: removes the entire content of the old bucket)",
}
concurrencyFlag = cli.IntFlag{
Name: "conc",
Value: 10,
Usage: "limits number of concurrent put requests and number of concurrent shards created",
}

// waiting
waitPodReadyTimeoutFlag = DurationFlag{
Expand Down Expand Up @@ -845,12 +840,27 @@ var (
Name: "num-workers",
Usage: "number of concurrent blob-downloading workers (readers); system default when omitted or zero",
}

noWorkers = indent4 + "\tuse (-1) to indicate single-threaded serial execution (ie., no workers);\n"

numListRangeWorkersFlag = cli.IntFlag{
Name: "num-workers",
Name: numBlobWorkersFlag.Name,
Usage: "number of concurrent workers (readers); defaults to a number of target mountpaths if omitted or zero;\n" +
indent4 + "\t(-1) is a special value indicating no workers at all (ie., single-threaded execution);\n" +
noWorkers +
indent4 + "\tany positive value will be adjusted _not_ to exceed the number of target CPUs",
}
numGenShardWorkersFlag = cli.IntFlag{
Name: numBlobWorkersFlag.Name,
Value: 10,
Usage: "limits the number of shards created concurrently",
}
numPutWorkersFlag = cli.IntFlag{
Name: numBlobWorkersFlag.Name,
Value: 10,
Usage: "number of concurrent client-side workers (to execute PUT or append requests);\n" +
noWorkers +
indent4 + "\tany positive value will be adjusted _not_ to exceed twice the number of client CPUs",
}

// validate
cksumFlag = cli.BoolFlag{Name: "checksum", Usage: "validate checksum"}
Expand Down
15 changes: 15 additions & 0 deletions cmd/cli/cli/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"time"

"github.com/NVIDIA/aistore/cmd/cli/teb"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -172,6 +174,9 @@ func parseRetriesFlag(c *cli.Context, flag cli.IntFlag, warn bool) (retries int)
maxr = 5
efmt = "invalid option '%s=%d' (expecting 1..5 range)"
)
if !flagIsSet(c, flag) {
return 0
}
retries = parseIntFlag(c, flag)
if retries < 0 {
if warn {
Expand All @@ -188,6 +193,16 @@ func parseRetriesFlag(c *cli.Context, flag cli.IntFlag, warn bool) (retries int)
return retries
}

//nolint:gocritic // ignoring hugeParam - following the orig. github.com/urfave style
func parseNumWorkersFlag(c *cli.Context, flag cli.IntFlag) (n int) {
n = parseIntFlag(c, flag)
if n > 0 {
return min(2*cmn.MaxParallelism(), n)
}
debug.Assert(n < 0, "flag '", flag.Name, "' must have positive default value")
return 1
}

func rmFlags(flags []cli.Flag, fs ...cli.Flag) (out []cli.Flag) {
out = make([]cli.Flag, 0, len(flags))
loop:
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/object_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var (
commandPut: append(
listRangeProgressWaitFlags,
chunkSizeFlag,
concurrencyFlag,
numPutWorkersFlag,
dryRunFlag,
recursFlag,
putSrcDirNameFlag,
Expand Down
56 changes: 26 additions & 30 deletions cmd/cli/cli/verbfobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (

type (
uparams struct {
wop wop
bck cmn.Bck
fobjs []fobj
workerCnt int
refresh time.Duration
cksum *cos.Cksum
cptn string
totalSize int64
dryRun bool
wop wop
bck cmn.Bck
fobjs []fobj
numWorkers int
refresh time.Duration
cksum *cos.Cksum
cptn string
totalSize int64
dryRun bool
}
uctx struct {
wg cos.WG
Expand Down Expand Up @@ -102,18 +102,18 @@ func verbFobjs(c *cli.Context, wop wop, fobjs []fobj, bck cmn.Bck, ndir int, rec
}
}
refresh := calcPutRefresh(c)
numWorkers := parseIntFlag(c, concurrencyFlag)
debug.Assert(numWorkers > 0)
numWorkers := parseNumWorkersFlag(c, numPutWorkersFlag)

uparams := &uparams{
wop: wop,
bck: bck,
fobjs: fobjs,
workerCnt: numWorkers,
refresh: refresh,
cksum: cksum,
cptn: cptn,
totalSize: totalSize,
dryRun: flagIsSet(c, dryRunFlag),
wop: wop,
bck: bck,
fobjs: fobjs,
numWorkers: numWorkers,
refresh: refresh,
cksum: cksum,
cptn: cptn,
totalSize: totalSize,
dryRun: flagIsSet(c, dryRunFlag),
}
return uparams.do(c)
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *uparams) do(c *cli.Context) error {
u := &uctx{
verbose: flagIsSet(c, verboseFlag),
showProgress: flagIsSet(c, progressFlag),
wg: cos.NewLimitedWaitGroup(p.workerCnt, 0),
wg: cos.NewLimitedWaitGroup(p.numWorkers, 0),
lastReport: time.Now(),
reportEvery: p.refresh,
}
Expand All @@ -167,9 +167,7 @@ func (p *uparams) do(c *cli.Context) error {
u.barSize = totalBars[1]
}

if flagIsSet(c, putRetriesFlag) {
_ = parseRetriesFlag(c, putRetriesFlag, true)
}
_ = parseRetriesFlag(c, putRetriesFlag, true) // to warn once, if need be

u.errCh = make(chan string, len(p.fobjs))
for _, fobj := range p.fobjs {
Expand Down Expand Up @@ -329,9 +327,8 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
iters = 1
isTout bool
)
if flagIsSet(c, putRetriesFlag) {
iters += parseRetriesFlag(c, putRetriesFlag, false /*warn*/)
}
iters += parseRetriesFlag(c, putRetriesFlag, false /*warn*/)

switch p.wop.verb() {
case "PUT":
for i := range iters {
Expand Down Expand Up @@ -442,9 +439,8 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File
SkipVC: flagIsSet(c, skipVerCksumFlag),
}
iters := 1
if flagIsSet(c, putRetriesFlag) {
iters += parseRetriesFlag(c, putRetriesFlag, true /*warn*/)
}
iters += parseRetriesFlag(c, putRetriesFlag, true /*warn*/)

for i := range iters {
_, err = api.PutObject(&putArgs)
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/test/bucket_summary.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ ais bucket create ais://$BUCKET_2 | awk '{print $1 " " $2}'
ais bucket summary ais://$BUCKET_1
ais bucket summary aiss://$BUCKET_1 // FAIL "invalid backend provider "aiss""

ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..149}.tar" --fcount 1 --fsize 1KB --conc 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_2/tmp/shard-{0..19}.tar" --fcount 10 --fsize 1KB --conc 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..149}.tar" --fcount 1 --fsize 1KB --num-workers 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_2/tmp/shard-{0..19}.tar" --fcount 10 --fsize 1KB --num-workers 1 --cleanup // IGNORE
ais bucket summary ais://$BUCKET_1
sleep 1
ais bucket summary ais://$BUCKET_2
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/test/list_bucket.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..4}.tar" --fcount 1 --fsize 1KB --conc 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..4}.tar" --fcount 1 --fsize 1KB --num-workers 1 --cleanup // IGNORE
ais ls ais://$BUCKET_1
ais ls ais://$BUCKET_1/
ais ls ais://$BUCKET_1
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/test/mv_bucket.in
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ais wait rebalance // IGNORE
ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..9}.tar" --fcount 1 --fsize 1KB --conc 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_1/tmp/shard-{0..9}.tar" --fcount 1 --fsize 1KB --num-workers 1 --cleanup // IGNORE
ais bucket mv ais://$BUCKET_1/ ais://$BUCKET_2/ --wait --timeout 3m
ais bucket mv ais://$BUCKET_2/ ais://$BUCKET_1/
ais archive gen-shards "ais://$BUCKET_3/tmp/shard-{0..9}.tar" --fcount 1 --fsize 1KB --conc 1 --cleanup // IGNORE
ais archive gen-shards "ais://$BUCKET_3/tmp/shard-{0..9}.tar" --fcount 1 --fsize 1KB --num-workers 1 --cleanup // IGNORE
ais bucket mv ais://$BUCKET_1/ ais://$BUCKET_3/ // FAIL "bucket "ais://$BUCKET_3" already exists"
ais bucket mv ais://$BUCKET_1/ ais://$BUCKET_1/ // FAIL "cannot mv bucket "ais://$BUCKET_1" onto itself"

Expand Down
54 changes: 48 additions & 6 deletions docs/cli/archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,60 @@ NAME:
(.tar, .tgz or .tar.gz, .zip, .tar.lz4)-formatted object - aka "shard".
Both APPEND (to an existing shard) and PUT (a new version of the shard) are supported.
Examples:
- 'local-filename bucket/shard-00123.tar.lz4 --append --archpath name-in-archive' - append file to a given shard,
- 'local-file s3://q/shard-00123.tar.lz4 --append --archpath name-in-archive' - append file to a given shard,
optionally, rename it (inside archive) as specified;
- 'local-filename bucket/shard-00123.tar.lz4 --append-or-put --archpath name-in-archive' - append file to a given shard if exists,
- 'local-file s3://q/shard-00123.tar.lz4 --append-or-put --archpath name-in-archive' - append file to a given shard if exists,
otherwise, create a new shard (and name it shard-00123.tar.lz4, as specified);
- 'src-dir bucket/shard-99999.zip -put' - one directory; iff the destination .zip doesn't exist create a new one;
- 'src-dir gs://w/shard-999.zip --append' - archive entire 'src-dir' directory; iff the destination .zip doesn't exist create a new one;
- '"sys, docs" ais://dst/CCC.tar --dry-run -y -r --archpath ggg/' - dry-run to recursively archive two directories.
Tips:
- use '--dry-run' option if in doubt;
- to archive objects from a ais:// or remote bucket, run 'ais archive bucket', see --help for details.
- use '--dry-run' if in doubt;
- to archive objects from a ais:// or remote bucket, run 'ais archive bucket' (see --help for details).

USAGE:
ais archive put [command options] [-|FILE|DIRECTORY[/PATTERN]] BUCKET/SHARD_NAME

OPTIONS:
--list value comma-separated list of object or file names, e.g.:
--list 'o1,o2,o3'
--list "abc/1.tar, abc/1.cls, abc/1.jpeg"
or, when listing files and/or directories:
--list "/home/docs, /home/abc/1.tar, /home/abc/1.jpeg"
--template value template to match object or file names; may contain prefix (that could be empty) with zero or more ranges
(with optional steps and gaps), e.g.:
--template "" # (an empty or '*' template matches eveything)
--template 'dir/subdir/'
--template 'shard-{1000..9999}.tar'
--template "prefix-{0010..0013..2}-gap-{1..2}-suffix"
and similarly, when specifying files and directories:
--template '/home/dir/subdir/'
--template "/abc/prefix-{0010..9999..2}-suffix"
--wait wait for an asynchronous operation to finish (optionally, use '--timeout' to limit the waiting time)
--timeout value maximum time to wait for a job to finish; if omitted: wait forever or until Ctrl-C;
valid time units: ns, us (or µs), ms, s (default), m, h
--progress show progress bar(s) and progress of execution in real time
--refresh value time interval for continuous monitoring; can be also used to update progress bar (at a given interval);
valid time units: ns, us (or µs), ms, s (default), m, h
--append-or-put append to an existing destination object ("archive", "shard") iff exists; otherwise PUT a new archive (shard);
note that PUT (with subsequent overwrite if the destination exists) is the default behavior when the flag is omitted
--append add newly archived content to the destination object ("archive", "shard") that must exist
--archpath value filename in an object ("shard") formatted as: .tar, .tgz or .tar.gz, .zip, .tar.lz4
--num-workers value number of concurrent client-side workers (to execute PUT or append requests);
use (-1) to indicate single-threaded serial execution (ie., no workers);
any positive value will be adjusted _not_ to exceed twice the number of client CPUs (default: 10)

--dry-run preview the results without really running the action
--recursive, -r recursive operation
--verbose, -v verbose output
--yes, -y assume 'yes' to all questions
--units value show statistics and/or parse command-line specified sizes using one of the following _units of measurement_:
iec - IEC format, e.g.: KiB, MiB, GiB (default)
si - SI (metric) format, e.g.: KB, MB, GB
raw - do not convert to (or from) human-readable format
--include-src-dir prefix the names of archived files with the (root) source directory
--skip-vc skip loading object metadata (and the associated checksum & version related processing)
--cont-on-err keep running archiving xaction (job) in presence of errors in a any given multi-object transaction
--help, -h show help
```

The operation accepts either an explicitly defined *list* or template-defined *range* of file names (to archive).
Expand Down Expand Up @@ -679,7 +721,7 @@ The `TEMPLATE` must be bash-like brace expansion (see examples) and `.EXT` must
| `--fcount` | `int` | Number of files inside single shard | `5` |
| `--fext` | `string` | Comma-separated list of file extensions (default ".test"), e.g.: --fext '.mp3,.json,.cls' | `.test` |
| `--cleanup` | `bool` | When set, the old bucket will be deleted and created again | `false` |
| `--conc` | `int` | Limits number of concurrent `PUT` requests and number of concurrent shards created | `10` |
| `--num-workers` | `int` | Limits the number of shards created concurrently | `10` |

### Examples

Expand Down
5 changes: 4 additions & 1 deletion docs/cli/object.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,10 @@ OPTIONS:
--refresh value interval for continuous monitoring;
valid time units: ns, us (or µs), ms, s (default), m, h
--chunk-size value chunk size in IEC or SI units, or "raw" bytes (e.g.: 1MiB or 1048576; see '--units')
--conc value limits number of concurrent put requests and number of concurrent shards created (default: 10)
--num-workers value number of concurrent client-side workers (to execute PUT or append requests);
use (-1) to indicate single-threaded serial execution (ie., no workers);
any positive value will be adjusted _not_ to exceed twice the number of client CPUs (default: 10)

--dry-run preview the results without really running the action
--recursive, -r recursive operation
--include-src-dir prefix destination object names with the source directory
Expand Down

0 comments on commit d5e6fbc

Please sign in to comment.