Skip to content

Commit

Permalink
Merge pull request #3643 from nspcc-dev/uploader-workers
Browse files Browse the repository at this point in the history
cli: add flag workers to `upload-bin` command
  • Loading branch information
AnnaShaleva authored Oct 24, 2024
2 parents 4b10f23 + 7b3eeb9 commit b8a65d3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
12 changes: 11 additions & 1 deletion cli/util/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func NewCommands() []*cli.Command {
Usage: "Size of index file",
Value: 128000,
},
&cli.UintFlag{
Name: "workers",
Usage: "Number of workers to fetch and upload blocks concurrently",
Value: 50,
},
&cli.UintFlag{
Name: "searchers",
Usage: "Number of concurrent searches for blocks",
Value: 20,
},
}, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
return []*cli.Command{
Expand Down Expand Up @@ -158,7 +168,7 @@ func NewCommands() []*cli.Command {
{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload them to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]",
Action: uploadBin,
Flags: uploadBinFlags,
},
Expand Down
15 changes: 6 additions & 9 deletions cli/util/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@ import (
const (
// Number of objects to search in a batch for finding max block in container.
searchBatchSize = 10000
// Control the number of concurrent searches.
maxParallelSearches = 40
// Size of object ID.
oidSize = sha256.Size

// Number of workers to fetch and upload blocks concurrently.
numWorkers = 100
)

// Constants related to retry mechanism.
Expand Down Expand Up @@ -69,6 +64,8 @@ func uploadBin(ctx *cli.Context) error {
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
containerIDStr := ctx.String("container")
attr := ctx.String("block-attribute")
numWorkers := ctx.Int("workers")
maxParallelSearches := ctx.Int("searchers")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
Expand Down Expand Up @@ -137,7 +134,7 @@ func uploadBin(ctx *cli.Context) error {
}
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)

lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight))
lastMissingBlockIndex, err := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
if err != nil {
return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1)
}
Expand Down Expand Up @@ -222,7 +219,7 @@ func uploadBin(ctx *cli.Context) error {
fmt.Fprintf(ctx.App.Writer, "Successfully uploaded batch of blocks: from %d to %d\n", batchStart, batchEnd-1)
}

err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled)
err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
if err != nil {
return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1)
}
Expand Down Expand Up @@ -255,7 +252,7 @@ type searchResult struct {

// fetchLatestMissingBlockIndex searches the container for the last full block batch,
// starting from the currentHeight and going backwards.
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int) (int, error) {
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
var (
wg sync.WaitGroup
numBatches = currentHeight/searchBatchSize + 1
Expand Down Expand Up @@ -314,7 +311,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
}

// updateIndexFiles updates the index files in the container.
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool) error {
func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Updating index files...")
Expand Down
4 changes: 3 additions & 1 deletion docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ NAME:
neo-go util upload-bin - Fetch blocks from RPC node and upload them to the NeoFS container
USAGE:
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>]
neo-go util upload-bin --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --block-attribute block --index-attribute index --rpc-endpoint <node> [--timeout <time>] --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>] [--index-file-size <size>]
OPTIONS:
--fs-rpc-endpoint value, --fsr value [ --fs-rpc-endpoint value, --fsr value ] List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)
Expand All @@ -96,6 +96,8 @@ OPTIONS:
--index-attribute value Attribute key of the index file object
--address value Address to use for signing the uploading and searching transactions in NeoFS
--index-file-size value Size of index file (default: 128000)
--workers value Number of workers to fetch, upload and search blocks concurrently (default: 50)
--searchers value Number of concurrent searches for blocks (default: 20)
--rpc-endpoint value, -r value RPC node address
--timeout value, -s value Timeout for the operation (default: 10s)
--wallet value, -w value Wallet to use to get the key for transaction signing; conflicts with --wallet-config flag
Expand Down

0 comments on commit b8a65d3

Please sign in to comment.