Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NeoFS block storage: add uploading commands #3582

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

AliceInHunterland
Copy link
Contributor

@AliceInHunterland AliceInHunterland commented Sep 11, 2024

example usage:

./bin/neo-go util upload-bin --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --block-attribute "test_final2345678901234" --index-attribute "oid-index" --rpc-endpoint https://rpc.t5.n3.nspcc.ru:20331 -fsr st1.t5.fs.neo.org:8080

Copy link

codecov bot commented Sep 11, 2024

Codecov Report

Attention: Patch coverage is 4.56140% with 272 lines in your changes missing coverage. Please review.

Project coverage is 84.62%. Comparing base (9a38360) to head (0a4e1d1).
Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
cli/util/uploader.go 0.00% 265 Missing ⚠️
cli/util/convert.go 72.22% 4 Missing and 1 partial ⚠️
pkg/services/oracle/neofs/neofs.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3582      +/-   ##
==========================================
- Coverage   85.26%   84.62%   -0.65%     
==========================================
  Files         333      334       +1     
  Lines       39005    39292     +287     
==========================================
- Hits        33256    33249       -7     
- Misses       4177     4475     +298     
+ Partials     1572     1568       -4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@AliceInHunterland
Copy link
Contributor Author

@AnnaShaleva what do you think about adding more commands to neo-go cli?
The current pipeline of updating blocks in the container is as follows:

dump-bin
dump-bin-put
dump-generate-index-file
uploading index files (not a command)

cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/server.go Outdated Show resolved Hide resolved
cli/server/server.go Outdated Show resolved Hide resolved
cli/server/server.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/server/dump_bin_put.go Outdated Show resolved Hide resolved
cli/util/convert.go Outdated Show resolved Hide resolved
cli/util/convert.go Outdated Show resolved Hide resolved
cli/util/convert.go Outdated Show resolved Hide resolved
indexFileAttribute := ctx.String("index-attribute")
acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to load account: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a proper CLI exit error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need unit-tests at least for error cases of this command. As mach cases should be covered as possible without access to NeoFS nodes (NeoGo RPC is included into Executor and hence may be used for tests).

cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
expectedIndexCount := currentHeight / uint(indexFileSize)

if existingIndexCount == expectedIndexCount {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write that index files are up-to-date.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...Existing: %d, expected:...

cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
defer wg.Done()
defer func() { <-workerPool }()

oidBlock, err := searchBlockOID(ctx, clientSDK, account, containerID, blockAttributeKey, index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still there; we can't use Search request for every block, it's too time-consuming, imagine we have to perform 128K Search requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time taken for index file generation with searches: 17m3.919291917s
Time taken for index file generation with getHeader: 14m57.008241s

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expected a bigger difference, btw. Still, that's it.

cli/util/uploader.go Outdated Show resolved Hide resolved
@AnnaShaleva
Copy link
Member

BTW, linter is failing.

@AliceInHunterland
Copy link
Contributor Author

AliceInHunterland commented Oct 5, 2024

706 seconds / 1000 blocks = 0.706 seconds/block. 11m46s for 1k blocks with slicer version

394 seconds / 1000 blocks = 0.394 seconds/block. 6m34s for 1k blocks with pool (it still a month)

with something like:

numWorkers := 5
	blockChan := make(chan int, numWorkers)
	errChan := make(chan error, numWorkers)
	var wg sync.WaitGroup

	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for blockIndex := range blockChan {
                              block, err := rpc.GetBlockByIndex(uint32(blockIndex))
                              ....}()
         }
	go func() {
		for blockIndex := maxBlockIndex + 1; blockIndex <= int(currentBlockHeight); blockIndex++ {
			blockChan <- blockIndex
		}
		close(blockChan)
	}()
	
	go func() {
		wg.Wait()
		close(errChan)
	}()

74 seconds / 1000 blocks = 0.074 seconds/block. 1m14s for 1k blocks (~5 days)
producer(4)-consumer(6) 52 seconds
producer(10)-consumer(20) 15 seconds,but:

[2024-10-06T01:47:40+03:00] Successfully uploaded block: 442000
panic: runtime error: index out of range [-1]

goroutine 247 [running]:
math/rand.(*rngSource).Uint64(...)
        math/rand/rng.go:249
math/rand.(*rngSource).Int63(0x1400048d6a8?)
        math/rand/rng.go:234 +0x98
math/rand.(*Rand).Int63(...)
        math/rand/rand.go:96
math/rand.(*Rand).Int31(...)
        math/rand/rand.go:110
math/rand.(*Rand).Int31n(0x14000040750?, 0x48d7a0?)
        math/rand/rand.go:142 +0x8c
math/rand.(*Rand).Intn(0x10?, 0x10?)
        math/rand/rand.go:183 +0x30
github.com/nspcc-dev/neofs-sdk-go/pool.(*sampler).Next(0x140001cf3c0)
        github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/pool/sampler.go:63 +0x2c
github.com/nspcc-dev/neofs-sdk-go/pool.(*innerPool).connection(0x140001cf400)
        github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/pool/pool.go:988 +0x1a0
github.com/nspcc-dev/neofs-sdk-go/pool.(*Pool).connection(0x60?)
        github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/pool/pool.go:967 +0x58
github.com/nspcc-dev/neofs-sdk-go/pool.(*Pool).sdkClient(0xad8f60c33a130b3?)
        github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/pool/pool.go:1031 +0x1c
github.com/nspcc-dev/neofs-sdk-go/pool.(*Pool).ObjectPutInit(0x1400048da88, {0x101e0dec0, 0x1027af240}, {0x0, 0x0, 0x1400422a2a0, {0x0, 0x0, 0x0}}, {0x101e0e400, ...}, ...)
        github.com/nspcc-dev/neofs-sdk-go@v1.0.0-rc.12/pool/object.go:35 +0xa4
github.com/nspcc-dev/neo-go/cli/util.uploadBlock({0x101e0dec0, 0x1027af240}, {{0x14000192180, 0x1, 0x1}, {0x101e0e400, 0x140003f53b0}, 0x140002fb150, 0x14000436a80, 0x1400004b910, ...}, ...)
        github.com/nspcc-dev/neo-go/cli/util/uploader.go:401 +0x11c
github.com/nspcc-dev/neo-go/cli/util.uploadBin.func3()
        github.com/nspcc-dev/neo-go/cli/util/uploader.go:162 +0x6bc
created by github.com/nspcc-dev/neo-go/cli/util.uploadBin in goroutine 1
        github.com/nspcc-dev/neo-go/cli/util/uploader.go:145 +0xb48
(base) ekaterinapavlova@MacBook-Air-4 neo-go % 

./bin/neo-go util upload-bin --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --block-attribute "ekt_block_pool_pr_cons" --index-attribute "oid-index" --rpc-endpoint https://rpc.t5.n3.nspcc.ru:20331 -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080 -fsr st4.t5.fs.neo.org:8080

I haven't managed to repeat panic. successfully uploaded 1447000 blocks to testnet.

}

attrs := []object.Attribute{
*object.NewAttribute("block", strconv.Itoa(blockIndex)), // Block index
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"block" to attr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we have to use user-provided value.

@@ -109,6 +147,13 @@ func NewCommands() []*cli.Command {
},
},
},
{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload it to the NeoFS container",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/upload it/upload them?

{
Name: "upload-bin",
Usage: "Fetch blocks from RPC node and upload it to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint address --container cid --block-attribute string --index-attribute string --rpc-endpoint address --wallet wallet [--wallet-config config] [--address address]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--fs-rpc-endpoint <address1>[,<address2>[...]]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--block-attribute block
--index-attribute index
--rpc-endpoint <node> [--timeout <time>]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And while we're not yet have mainnet uploads, let's rename index file object attribute from oid to index (documentation and node config should be fixed).

Usage: "Fetch blocks from RPC node and upload it to the NeoFS container",
UsageText: "neo-go util upload-bin --fs-rpc-endpoint address --container cid --block-attribute string --index-attribute string --rpc-endpoint address --wallet wallet [--wallet-config config] [--address address]",
Action: uploadBin,
Flags: putFlags,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename putFlags to uploadBinFlags.

cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
cli/util/uploader.go Outdated Show resolved Hide resolved
With ObjectSearchInitter ObjectSearch can be done both NeoFS SDK
client or pool.

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This command is used for keeping container with blocks for
blockfetcher updated.

Close #3578

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
README: add archival notice (#520)

People are likely to find this repo and we better have some directions for them.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
producerWg.Add(numProducers)

// Retry function with exponential backoff
retry := func(action func() error) error {
Copy link
Contributor Author

@AliceInHunterland AliceInHunterland Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we save it? Maybe yes? I like it. With it, timeouts are rare occasions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it. Could you please point to the part of code that has a lot of timeout failures?

*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("size", strconv.Itoa(int(indexFileSize))),
*object.NewAttribute("timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
*object.NewAttribute("block-attribute", blockAttributeKey),
Copy link
Contributor Author

@AliceInHunterland AliceInHunterland Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AnnaShaleva what do you think about adding two more attributes to index files? timestamp seems to me now useless -NeoFS will put it by itself. But block-attribute could be helpful for debagging and manual search.

Copy link
Member

@AnnaShaleva AnnaShaleva Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For timestamp - OK, remove it from this list. For block-attribute: I'd say it's a bit excessive for the real usage because we always have block-attribute in the node settings.

@@ -6,6 +6,7 @@ import (
"fmt"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neo-vm will be removed from this commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and commit message should be adjusted

@@ -16,12 +16,12 @@ attributes:
- primary node index (`primary:0`)
- block hash in the LE form (`hash:5412a781caf278c0736556c0e544c7cfdbb6e3c62ae221ef53646be89364566b`)
- previous block hash in the LE form (`prevHash:3654a054d82a8178c7dfacecc2c57282e23468a42ee407f14506368afe22d929`)
- millisecond-precision block timestamp (`time:1627894840919`)
- millisecond-precision block timestamp (`timestamp:1627894840919`)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message

Copy link
Member

@AnnaShaleva AnnaShaleva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good indeed! A couple of logic fixes are still needed, but overall looks much better.

// ObjectSearch returns a list of object IDs from the provided container.
func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code itself is perfect, but regarding commit message: neofs: add pool for searching it seems not to be matching the commit content, thus please rephrase.

fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)

signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless blank line.

Comment on lines +218 to +221
// fetchMaxBlockIndex searches the container for the maximum block index.
func fetchMaxBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string) (int, error) {
var wg sync.WaitGroup
height := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, but: why not to search starting from the current height? Uploader can (and should) guarantee that only single latest batch may be incomplete; older batches are guaranteed to be completed. If currently uploader can't guarantee that, then uploader algorithm should be fixed.

Comment on lines +249 to +254
if res.numOIDs < searchBatchSize {
if res.startIndex == 0 && res.numOIDs == 0 {
return -1, nil
}
// Return the start index of the first incomplete batch
return res.startIndex, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check it and fix the code if it's true: we'll try to re-upload incomplete batch starting from block with index res.startIndex; then we should not return -1 because there's no block with index -1. Genesis block is not special anymore, we should remove the following part:

				if res.startIndex == 0 && res.numOIDs == 0 {
					return -1, nil
				}

}

// fetchMaxBlockIndex searches the container for the maximum block index.
func fetchMaxBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And since the algorithm was changed, let's rename this function to fetchLatestMissingBlockIndex. And adjust documentation, specify that it's the index of the first block in the latest incomplete batch.


var (
processedCounter atomic.Int32
errCh = make(chan error, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it non-buffered.

}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
}
close(oidCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this close to a deferred statement.

hdr.SetVersion(v)

checksum.Calculate(&ch, checksum.TZ, objData)
hdr.SetPayloadHomomorphicHash(ch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to calculate/set it only if it's enabled in the network.

Comment on lines +425 to +428
func getBlockIndex(header object.Object, attribute string) (int, error) {
for _, attr := range header.UserAttributes() {
if attr.Key() == attribute {
return strconv.Atoi(attr.Value())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, much better way.

github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something has happened with go modules and VM git submodule, please, remove these changes from the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants