From 8e1d6b31f00efbe77888c6ecacdf4e6733a29463 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 11 Sep 2024 09:59:42 +0400 Subject: [PATCH] cli: add `dump-bin-put` and `dump-generate-index-file` These commands can be useful for interaction with NeoFS. Signed-off-by: Ekaterina Pavlova --- cli/server/dump_bin_put.go | 154 +++++++++++++++++++++++++++++++++++ cli/server/generate_oid.go | 161 +++++++++++++++++++++++++++++++++++++ cli/server/server.go | 69 ++++++++++++++++ 3 files changed, 384 insertions(+) create mode 100644 cli/server/dump_bin_put.go create mode 100644 cli/server/generate_oid.go diff --git a/cli/server/dump_bin_put.go b/cli/server/dump_bin_put.go new file mode 100644 index 0000000000..37ef627359 --- /dev/null +++ b/cli/server/dump_bin_put.go @@ -0,0 +1,154 @@ +package server + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "sync" + "time" + + "github.com/google/uuid" + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/slicer" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/urfave/cli/v2" +) + +const batchSize = 50 + +func dumpPut(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + dir := ctx.String("dir") + batch := ctx.Uint("batch") + if batch == 0 { + batch = batchSize + } + rpc := ctx.String("rpc-endpoint") + containerIDStr := ctx.String("container") + attribute := ctx.String("attribute") + start := ctx.Uint("start") + count := ctx.Uint("count") + acc, _, err := options.GetAccFromContext(ctx) + if err != nil { + return fmt.Errorf("failed to load wallet: %w", err) + } + + var containerID cid.ID + err = containerID.DecodeString(containerIDStr) + if err != nil { + return fmt.Errorf("failed to decode container ID: %w", err) + } + + clientSDK, err := neofs.GetSDKClient(context.Background(), rpc, 10*time.Minute) + if err != nil { + return fmt.Errorf("failed to create NeoFS client: %w", err) + } + defer clientSDK.Close() + + sessionToken, err := createSessionToken(*acc, containerID) + if err != nil { + return fmt.Errorf("failed to create session token: %w", err) + } + + ctxCancel, cancel := context.WithCancel(context.Background()) + defer cancel() + + for i := start; i < count; i += batch { + end := i + batch + if end > count { + end = count + } + + var wg sync.WaitGroup + errChan := make(chan error, batch) + + for j := i; j < end; j++ { + wg.Add(1) + go func(index uint) { + defer wg.Done() + path := fmt.Sprintf("%s/block-%d.bin", dir, index) + if err := uploadFileWithSlicer(ctxCancel, clientSDK, *acc, containerID, path, attribute, strconv.Itoa(int(index)), sessionToken); err != nil { + log.Printf("Failed to upload %s: %v", path, err) + errChan <- err + cancel() + } else { + log.Printf("Successfully uploaded: %s", path) + } + }(j) + } + + wg.Wait() + select { + case err := <-errChan: + return fmt.Errorf("batch upload failed: %w", err) + default: + } + } + + log.Println("Upload completed.") + return nil +} + +// uploadFileWithSlicer uploads a file to NeoFS using the Slicer with a session token. +func uploadFileWithSlicer(ctx context.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, filePath, attributeKey, attributeValue string, sessionToken *session.Object) error { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) + var ownerID user.ID + ownerID.SetScriptHash(account.PrivateKey().GetScriptHash()) + + slc, err := slicer.New(ctx, clientSDK, signer, containerID, ownerID, sessionToken) + if err != nil { + return fmt.Errorf("failed to create slicer: %w", err) + } + + attrs := []object.Attribute{ + *object.NewAttribute(attributeKey, attributeValue), + } + + _, err = slc.Put(ctx, file, attrs) + if err != nil { + return fmt.Errorf("failed to slice and upload file: %w", err) + } + + return nil +} + +func createSessionToken(account wallet.Account, containerID cid.ID) (*session.Object, error) { + sessionToken := session.Object{} + + sessionID := uuid.New() + sessionToken.SetID(sessionID) + + pubKey := account.PublicKey() + authKey := neofsecdsa.PublicKey(*pubKey) + sessionToken.SetAuthKey(&authKey) + + sessionToken.BindContainer(containerID) + sessionToken.ForVerb(session.VerbObjectPut) + + sessionToken.SetExp(uint64(time.Now().Add(100 * time.Minute).Unix())) + + signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) + if err := sessionToken.Sign(signer); err != nil { + return nil, fmt.Errorf("failed to sign session token: %w", err) + } + + return &sessionToken, nil +} diff --git a/cli/server/generate_oid.go b/cli/server/generate_oid.go new file mode 100644 index 0000000000..2ea6fe3a19 --- /dev/null +++ b/cli/server/generate_oid.go @@ -0,0 +1,161 @@ +package server + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/urfave/cli/v2" +) + +func generateOIDs(ctx *cli.Context) error { + rpc := ctx.String("rpc-endpoint") + containerIDStr := ctx.String("container") + attribute := ctx.String("attribute") + startIndex := uint32(ctx.Uint("start")) + count := uint32(ctx.Uint("count")) + outputDir := ctx.String("out") + fileSize := uint32(ctx.Uint("index-file-size")) + + if _, err := os.Stat(outputDir); os.IsNotExist(err) { + if err := os.MkdirAll(outputDir, os.ModePerm); err != nil { + return fmt.Errorf("failed to create directory %s: %w", outputDir, err) + } + } + + var containerID cid.ID + err := containerID.DecodeString(containerIDStr) + if err != nil { + return fmt.Errorf("failed to decode container ID: %w", err) + } + pk, err := keys.NewPrivateKey() + if err != nil { + return fmt.Errorf("failed to create private key: %w", err) + } + + clientSDK, err := neofs.GetSDKClient(context.Background(), rpc, 10*time.Minute) + if err != nil { + return fmt.Errorf("failed to create NeoFS client: %w", err) + } + defer clientSDK.Close() + + fileIndex := 1 + startIndex/fileSize + + type result struct { + index uint32 + oidsBatch []oid.ID + err error + } + + for i := startIndex; i < startIndex+count; i += fileSize { + var wg sync.WaitGroup + results := make(chan result, fileSize) + workerPool := make(chan struct{}, 100) + + oids := make([]oid.ID, 0, fileSize) + + for j := i; j < i+fileSize && j < startIndex+count; j++ { + wg.Add(1) + workerPool <- struct{}{} + + go func(index uint32) { + defer wg.Done() + defer func() { <-workerPool }() + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(attribute, fmt.Sprintf("%d", index), object.MatchStringEqual) + prm.SetFilters(filters) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + oidsBatch, err := neofs.ObjectSearch(ctx, clientSDK, pk, containerID.String(), prm) + if err != nil || len(oidsBatch) == 0 { + results <- result{index, nil, fmt.Errorf("no OIDs found or failed for index %d: %w", index, err)} + return + } + if len(oidsBatch) == 0 { + fmt.Println("Found OIDs:", oidsBatch, "for index", index, "total", len(oidsBatch)) + } + if index%2000 == 0 { + fmt.Println("Index:", index) + } + results <- result{index, oidsBatch[:1], nil} + }(j) + } + + go func() { + wg.Wait() + close(results) + }() + + pendingResults := make(map[uint32][]oid.ID) + expectedIndex := i + + for res := range results { + if res.err != nil { + return res.err + } + + pendingResults[res.index] = res.oidsBatch + + for { + if batch, ok := pendingResults[expectedIndex]; ok { + delete(pendingResults, expectedIndex) + oids = append(oids, batch...) + + if len(oids) >= int(fileSize) { + if err := writeOIDFile(oids[:fileSize], outputDir, int(fileIndex)); err != nil { + return err + } + oids = oids[fileSize:] + fileIndex++ + } + expectedIndex++ + } else { + break + } + } + } + + if len(oids) > 0 { + if err := writeOIDFile(oids, outputDir, int(fileIndex)); err != nil { + return err + } + fileIndex++ + } + } + + log.Println("OID generation completed.") + return nil +} + +// writeOIDFile writes a list of OIDs to a file in the specified directory. +func writeOIDFile(oids []oid.ID, outputDir string, fileIndex int) error { + fileName := fmt.Sprintf("%s/oid-%d.bin", outputDir, fileIndex) + file, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", fileName, err) + } + defer file.Close() + + for _, oid := range oids { + oidBytes := make([]byte, 32) + oid.Encode(oidBytes) + if _, err := file.Write(oidBytes); err != nil { + return fmt.Errorf("failed to write OID to file %s: %w", fileName, err) + } + } + + log.Printf("Successfully wrote %d OIDs to %s", len(oids), fileName) + return nil +} diff --git a/cli/server/server.go b/cli/server/server.go index b3f9ec22b9..1318e86460 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -83,6 +83,49 @@ func NewCommands() []*cli.Command { Usage: "Height of the state to reset DB to", Required: true, }) + var neofsFlags = []cli.Flag{ + &cli.StringFlag{ + Name: "rpc-endpoint", + Aliases: []string{"r"}, + Usage: "NeoFS RPC address", + Required: true, + }, + &cli.StringFlag{ + Name: "container", + Aliases: []string{"cid"}, + Usage: "NeoFS container ID", + Required: true, + }, + &cli.StringFlag{ + Name: "attribute", + Aliases: []string{"a"}, + Usage: "Attribute key of the object", + Required: true, + }, + &cli.UintFlag{ + Name: "start", + Aliases: []string{"s"}, + Usage: "Starting block index", + Required: true, + }, + &cli.UintFlag{ + Name: "count", + Aliases: []string{"n"}, + Usage: "Number of blocks to be processed", + Required: true, + }, + } + var putFlags = append(neofsFlags, + &cli.StringFlag{ + Name: "dir", + Usage: "Directory containing the binary files", + Aliases: []string{"d"}, + Required: true, + }, + &cli.UintFlag{ + Name: "batch", + Usage: "Number of blocks to be processed in parallel", + }) return []*cli.Command{ { Name: "node", @@ -109,6 +152,32 @@ func NewCommands() []*cli.Command { Action: dumpBin, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin-put", + Usage: "Upload blocks from binary files in the specified directory to the NeoFS container", + UsageText: "neo-go db dump-bin-put --dir --rpc-endpoint --container --attribute --start --count --wallet ", + Action: dumpPut, + Flags: append(putFlags, options.Wallet...), + }, + { + Name: "dump-generate-index-file", + Usage: "Generate index files of oids blocks from NeoFS container with the specified attribute", + UsageText: "neo-go db dump-generate-oid --start --count --out , --container , --index-file-size , --rpc-endpoint , --attribute ", + Action: generateOIDs, + Flags: append([]cli.Flag{ + &cli.StringFlag{ + Name: "out", + Usage: "Output directory where OID files will be saved", + Aliases: []string{"o"}, + Required: true, + }, + &cli.UintFlag{ + Name: "index-file-size", + Usage: "Number of blocks in oid file", + Required: true, + }, + }, neofsFlags...), + }, { Name: "restore", Usage: "Restore blocks from the file",