diff --git a/.github/workflows/update_blockfetcher_db.yml b/.github/workflows/update_blockfetcher_db.yml new file mode 100644 index 0000000000..594af32de9 --- /dev/null +++ b/.github/workflows/update_blockfetcher_db.yml @@ -0,0 +1,60 @@ +name: Build neo-go Binary and Run Dump Put Job + +on: + schedule: + - cron: "0 */3 * * *" + workflow_dispatch: + +jobs: + build_and_run: + name: Build and Run neo-go db dump-bin-put + runs-on: ubuntu-22.04 + strategy: + matrix: + network: + - { name: "MainNet", cid: "EPGuD26wYgQJbmDdVBoYoNZiMKHwFMJT3A5WqPjdUHxH", rpc: "https://rpc10.n3.nspcc.ru:10331" } + - { name: "TestNet", cid: "testnet-cid", rpc: "https://rpc.t5.n3.nspcc.ru:20331" } + fail-fast: false + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.23' + + - name: Install dependencies + run: sudo apt-get install -y build-essential + + - name: Build CLI + run: | + git clone https://github.com/nspcc-dev/neo-go.git + cd neo-go + make build + cp ./bin/neo-go ../bin/neo-go # Copy the binary to a bin directory for later use + + - name: Create wallet JSON file + run: | + echo "${{ secrets.WALLET_JSON_CONTENT }}" > ./wallet.json + + - name: Create wallet-config.yml + run: | + echo "Path: \"$PWD/wallet.json\"" > ./wallet-config.yml + echo "Password: \"$WALLET_PASSWORD\"" >> ./wallet-config.yml + + - name: Run neo-go db dump-bin-put for ${{ matrix.network.name }} + run: | + echo "Running neo-go db dump-bin-put on network: ${{ matrix.network.name }} with CID: ${{ matrix.network.cid }} and RPC: ${{ matrix.network.rpc }}" + + ./bin/neo-go db dump-bin-put \ + --cid ${{ matrix.network.cid }} \ + --wallet-config ./wallet-config.yml \ + --block-attribute "block" \ + --index-attribute "index" \ + --rpc-endpoint ${{ matrix.network.rpc }} \ + -rn st1.t5.fs.neo.org:8080 + env: + WALLET_PASSWORD: ${{ secrets.WALLET_PASSWORD }} diff --git a/cli/server/dump_bin_put.go b/cli/server/dump_bin_put.go new file mode 100644 index 0000000000..78af36c119 --- /dev/null +++ b/cli/server/dump_bin_put.go @@ -0,0 +1,365 @@ +package server + +import ( + "bytes" + "context" + "fmt" + "log" + "strconv" + "sync" + "time" + + "github.com/google/uuid" + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/object/slicer" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/urfave/cli/v2" +) + +const ( + searchBatchSize = 10000 // Number of objects to search in a batch for finding max block in container. + maxParallelSearches = 40 // Control the number of concurrent searches for index files generation. + indexFileSize = 128000 // Size of each index file. +) + +func downloadPut(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + + rpcNeoFS := ctx.String("rpc-neofs") + containerIDStr := ctx.String("container") + attribute := ctx.String("block-attribute") + indexFileAttribute := ctx.String("index-attribute") + acc, _, err := options.GetAccFromContext(ctx) + if err != nil { + return fmt.Errorf("failed to load wallet: %w", err) + } + + var containerID cid.ID + if err = containerID.DecodeString(containerIDStr); err != nil { + return fmt.Errorf("failed to decode container ID: %w", err) + } + + clientSDK, err := neofs.GetSDKClient(context.Background(), rpcNeoFS, 10*time.Minute) + if err != nil { + return fmt.Errorf("failed to create NeoFS client: %w", err) + } + defer clientSDK.Close() + + //sessionToken, err := createSessionToken(*acc, containerID) + //if err != nil { + // return fmt.Errorf("failed to create session token: %w", err) + //} + + endpoint := ctx.String(options.RPCEndpointFlag) + rpcClient, err := rpcclient.New(ctx.Context, endpoint, rpcclient.Options{ + DialTimeout: 50 * time.Second, + RequestTimeout: 50 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to create RPC client: %w", err) + } + err = rpcClient.Init() + if err != nil { + return fmt.Errorf("failed to initialize RPC client: %w", err) + } + + currentBlockHeight, err := rpcClient.GetBlockCount() + if err != nil { + return fmt.Errorf("failed to get current block height from RPC: %w", err) + } + log.Println("Current block height:", currentBlockHeight) + currentBlockHeight = 423894 + //maxBlockIndex, err := fetchMaxBlockIndex(ctx.Context, clientSDK, containerID, acc.PrivateKey(), uint(currentBlockHeight), attribute) + //if err != nil { + // return fmt.Errorf("failed to fetch max block index from container: %w", err) + //} + //log.Println("Max block index in NeoFS:", maxBlockIndex) + //if maxBlockIndex == 0 { + // maxBlockIndex = -1 + //} + // + //if maxBlockIndex > int(currentBlockHeight) { + // return fmt.Errorf("no new blocks to upload. Max index in NeoFS: %d, current height: %d", maxBlockIndex, currentBlockHeight) + //} + + //for blockIndex := maxBlockIndex + 1; blockIndex <= int(currentBlockHeight); blockIndex++ { + // blockData, err := fetchBlockData(rpcClient, uint(blockIndex)) + // if err != nil { + // return fmt.Errorf("failed to fetch block %d: %w", blockIndex, err) + // } + // + // err = uploadBObjWithSlicer(ctx.Context, clientSDK, *acc, containerID, blockData, attribute, strconv.Itoa(blockIndex), sessionToken) + // if err != nil { + // return fmt.Errorf("failed to upload block %d: %w", blockIndex, err) + // } + // + // if blockIndex%1000 == 0 { + // log.Printf("Successfully uploaded block: %d", blockIndex) + // } + //} + + err = updateIndexFiles(ctx, clientSDK, containerID, *acc, uint(currentBlockHeight), indexFileAttribute, attribute) + if err != nil { + return fmt.Errorf("failed to update index files after upload: %w", err) + } + + log.Println("Upload completed successfully.") + return nil +} + +// fetchMaxBlockIndex searches for the maximum block index in the container. +func fetchMaxBlockIndex(ctx context.Context, clientSDK *client.Client, containerID cid.ID, priv *keys.PrivateKey, currentHeight uint, attributeKey string) (int, error) { + height := int(currentHeight) + var ( + finalResult int + finalErr error + ) + + searchBatch := func(start, end int) (int, bool, error) { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(attributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) + filters.AddFilter(attributeKey, fmt.Sprintf("%d", end), object.MatchNumLE) + prm.SetFilters(filters) + + objectIDs, err := neofs.ObjectSearch(ctx, clientSDK, priv, containerID.String(), prm) + if err != nil { + return 0, false, fmt.Errorf("failed to search objects from %d to %d: %w", start, end, err) + } + + numOIDs := len(objectIDs) + log.Printf("Found %d blocks between %d and %d", numOIDs, start, end) + + if numOIDs == 0 { + return 0, false, nil // Keep searching. + } + + // Return the max block index found in this batch + maxInBatch := start + numOIDs - 1 + + // Stop immediately after finding the first non-empty batch + return maxInBatch, true, nil + } + + for height >= 0 { + startIndex := height - searchBatchSize + 1 + if startIndex < 0 { + startIndex = 0 + } + + maxInBatch, foundNonEmptyBatch, err := searchBatch(startIndex, height) + if err != nil { + finalErr = err + break + } + + if maxInBatch > finalResult { + finalResult = maxInBatch + } + + // Stop the search as soon as we find any valid blocks + if foundNonEmptyBatch { + break + } + + height -= searchBatchSize + } + + if finalErr != nil { + return 0, finalErr + } + + return finalResult, nil +} + +func fetchBlockData(rpcClient *rpcclient.Client, index uint) ([]byte, error) { + if index%1000 == 0 { + log.Println("Fetching block", index) + } + block, err := rpcClient.GetBlockByIndex(uint32(index)) + if err != nil { + return nil, fmt.Errorf("failed to fetch block %d: %w", index, err) + } + + var buf bytes.Buffer + bw := io.NewBinWriterFromIO(&buf) + + block.EncodeBinary(bw) + + if bw.Err != nil { + return nil, fmt.Errorf("failed to encode block %d: %w", index, bw.Err) + } + + return buf.Bytes(), nil +} + +func uploadBObjWithSlicer(ctx context.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, objData []byte, attributeKey, attributeValue string, sessionToken *session.Object) error { + signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) + var ownerID user.ID + ownerID.SetScriptHash(account.PrivateKey().GetScriptHash()) + + slc, err := slicer.New(ctx, clientSDK, signer, containerID, ownerID, sessionToken) + if err != nil { + return fmt.Errorf("failed to create slicer: %w", err) + } + + attrs := []object.Attribute{ + *object.NewAttribute(attributeKey, attributeValue), + } + + _, err = slc.Put(ctx, bytes.NewReader(objData), attrs) + if err != nil { + return fmt.Errorf("failed to slice and upload block data: %w", err) + } + + return nil +} + +// updateIndexFiles updates the index files in the container. +func updateIndexFiles(ctx *cli.Context, clientSDK *client.Client, containerID cid.ID, account wallet.Account, currentHeight uint, attributeKey string, blockAttributeKey string) error { + log.Println("Updating index files...") + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) + prm.SetFilters(filters) + + objectIDs, err := neofs.ObjectSearch(ctx.Context, clientSDK, account.PrivateKey(), containerID.String(), prm) + if err != nil { + return fmt.Errorf("no OIDs found for index files: %w", err) + } + + existingIndexCount := uint(len(objectIDs)) + expectedIndexCount := currentHeight / uint(indexFileSize) + + if existingIndexCount == expectedIndexCount { + return nil + } + + var buf bytes.Buffer + fileIndex := int(existingIndexCount) + + ctxWithCancel, cancel := context.WithCancel(ctx.Context) + defer cancel() + + workerPool := make(chan struct{}, maxParallelSearches) + errCh := make(chan error, 1) + wg := sync.WaitGroup{} + + for i := existingIndexCount; i < expectedIndexCount; i++ { + startIndex := i * indexFileSize + endIndex := startIndex + indexFileSize + if endIndex > currentHeight { + return fmt.Errorf("end index %d exceeds current height %d", endIndex, currentHeight) + } + + blockOids := make([]oid.ID, indexFileSize) + for j := startIndex; j < endIndex; j++ { + select { + case <-ctxWithCancel.Done(): + return <-errCh + default: + } + + wg.Add(1) + workerPool <- struct{}{} + + go func(index uint) { + defer wg.Done() + defer func() { <-workerPool }() + + oidBlock, err := searchBlockOID(ctx, clientSDK, account, containerID, blockAttributeKey, index) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for block OID at index %d: %w", index, err): + cancel() + default: + } + return + } + + blockOids[index-startIndex] = oidBlock + if index%1000 == 0 { + log.Println("Found block OID", index) + } + }(j) + } + wg.Wait() + + select { + case err := <-errCh: + return err + default: + } + + for _, oidBlock := range blockOids { + oidBytes := make([]byte, 32) + oidBlock.Encode(oidBytes) + buf.Write(oidBytes) + } + err = uploadBObjWithSlicer(ctx.Context, clientSDK, account, containerID, buf.Bytes(), attributeKey, strconv.Itoa(fileIndex), nil) + if err != nil { + return fmt.Errorf("failed to upload index file %d: %w", fileIndex, err) + } + log.Printf("Uploaded index file %d", fileIndex) + + fileIndex++ + buf.Reset() + } + + return nil +} + +// searchBlockOID function to search for the OID of a specific block by index. +func searchBlockOID(ctx *cli.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, blockAttributeKey string, blockIndex uint) (oid.ID, error) { + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", blockIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + objectIDs, err := neofs.ObjectSearch(ctx.Context, clientSDK, account.PrivateKey(), containerID.String(), prm) + if err != nil || len(objectIDs) == 0 { + return oid.ID{}, fmt.Errorf("no OIDs found for block index %d", blockIndex) + } + if len(objectIDs) > 1 { + log.Println("Multiple OIDs found for block index", blockIndex) + } + return objectIDs[0], nil +} + +func createSessionToken(account wallet.Account, containerID cid.ID) (*session.Object, error) { + sessionToken := session.Object{} + + sessionID := uuid.New() + sessionToken.SetID(sessionID) + + pubKey := account.PublicKey() + authKey := neofsecdsa.PublicKey(*pubKey) + sessionToken.SetAuthKey(&authKey) + + sessionToken.BindContainer(containerID) + sessionToken.ForVerb(session.VerbObjectPut) + + sessionToken.SetExp(uint64(time.Now().Add(100 * time.Minute).Unix())) + + signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) + if err := sessionToken.Sign(signer); err != nil { + return nil, fmt.Errorf("failed to sign session token: %w", err) + } + + return &sessionToken, nil +} diff --git a/cli/server/server.go b/cli/server/server.go index b3f9ec22b9..c3251f3950 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" @@ -83,6 +84,35 @@ func NewCommands() []*cli.Command { Usage: "Height of the state to reset DB to", Required: true, }) + var neofsFlags = []cli.Flag{ + &cli.StringFlag{ + Name: "rpc-neofs", + Aliases: []string{"rn"}, + Usage: "NeoFS RPC address", + Required: true, + }, + &cli.StringFlag{ + Name: "container", + Aliases: []string{"cid"}, + Usage: "NeoFS container ID", + Required: true, + }, + &cli.StringFlag{ + Name: "block-attribute", + Usage: "Attribute key of the block object", + Required: true, + }, + &cli.StringFlag{ + Name: "index-attribute", + Usage: "Attribute key of the index file object", + Required: true, + }, + &flags.AddressFlag{ + Name: "address", + }, + } + + putFlags := append(neofsFlags, options.RPC...) return []*cli.Command{ { Name: "node", @@ -109,6 +139,13 @@ func NewCommands() []*cli.Command { Action: dumpBin, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin-put", + Usage: "Upload blocks from binary files in the specified directory to the NeoFS container", + UsageText: "neo-go db dump-bin-put --rpc-neofs address --container cid --attribute key [-a address] [-n] [-p/-m/-t] [--config-file file]", + Action: downloadPut, + Flags: append(putFlags, options.Wallet...), + }, { Name: "restore", Usage: "Restore blocks from the file", diff --git a/pkg/vm/testdata/neo-vm b/pkg/vm/testdata/neo-vm index 7e5996844a..524545f944 160000 --- a/pkg/vm/testdata/neo-vm +++ b/pkg/vm/testdata/neo-vm @@ -1 +1 @@ -Subproject commit 7e5996844a90b514739f879bc9f873f9a34c9a67 +Subproject commit 524545f944eacaf2ebbfdcf98152a6a933b09f24