-
Notifications
You must be signed in to change notification settings - Fork 78
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cli: add
dump-bin-put
and dump-generate-index-file
These commands can be useful for interaction with NeoFS. Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
- Loading branch information
1 parent
0b31a29
commit 8e1d6b3
Showing
3 changed files
with
384 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package server | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"os" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/nspcc-dev/neo-go/cli/cmdargs" | ||
"github.com/nspcc-dev/neo-go/cli/options" | ||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" | ||
"github.com/nspcc-dev/neo-go/pkg/wallet" | ||
"github.com/nspcc-dev/neofs-sdk-go/client" | ||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" | ||
"github.com/nspcc-dev/neofs-sdk-go/object" | ||
"github.com/nspcc-dev/neofs-sdk-go/object/slicer" | ||
"github.com/nspcc-dev/neofs-sdk-go/session" | ||
"github.com/nspcc-dev/neofs-sdk-go/user" | ||
"github.com/urfave/cli/v2" | ||
) | ||
|
||
const batchSize = 50 | ||
|
||
func dumpPut(ctx *cli.Context) error { | ||
if err := cmdargs.EnsureNone(ctx); err != nil { | ||
return err | ||
} | ||
dir := ctx.String("dir") | ||
batch := ctx.Uint("batch") | ||
if batch == 0 { | ||
batch = batchSize | ||
} | ||
rpc := ctx.String("rpc-endpoint") | ||
containerIDStr := ctx.String("container") | ||
attribute := ctx.String("attribute") | ||
start := ctx.Uint("start") | ||
count := ctx.Uint("count") | ||
acc, _, err := options.GetAccFromContext(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to load wallet: %w", err) | ||
} | ||
|
||
var containerID cid.ID | ||
err = containerID.DecodeString(containerIDStr) | ||
if err != nil { | ||
return fmt.Errorf("failed to decode container ID: %w", err) | ||
} | ||
|
||
clientSDK, err := neofs.GetSDKClient(context.Background(), rpc, 10*time.Minute) | ||
if err != nil { | ||
return fmt.Errorf("failed to create NeoFS client: %w", err) | ||
} | ||
defer clientSDK.Close() | ||
|
||
sessionToken, err := createSessionToken(*acc, containerID) | ||
if err != nil { | ||
return fmt.Errorf("failed to create session token: %w", err) | ||
} | ||
|
||
ctxCancel, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
for i := start; i < count; i += batch { | ||
end := i + batch | ||
if end > count { | ||
end = count | ||
} | ||
|
||
var wg sync.WaitGroup | ||
errChan := make(chan error, batch) | ||
|
||
for j := i; j < end; j++ { | ||
wg.Add(1) | ||
go func(index uint) { | ||
defer wg.Done() | ||
path := fmt.Sprintf("%s/block-%d.bin", dir, index) | ||
if err := uploadFileWithSlicer(ctxCancel, clientSDK, *acc, containerID, path, attribute, strconv.Itoa(int(index)), sessionToken); err != nil { | ||
log.Printf("Failed to upload %s: %v", path, err) | ||
errChan <- err | ||
cancel() | ||
} else { | ||
log.Printf("Successfully uploaded: %s", path) | ||
} | ||
}(j) | ||
} | ||
|
||
wg.Wait() | ||
select { | ||
case err := <-errChan: | ||
return fmt.Errorf("batch upload failed: %w", err) | ||
default: | ||
} | ||
} | ||
|
||
log.Println("Upload completed.") | ||
return nil | ||
} | ||
|
||
// uploadFileWithSlicer uploads a file to NeoFS using the Slicer with a session token. | ||
func uploadFileWithSlicer(ctx context.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, filePath, attributeKey, attributeValue string, sessionToken *session.Object) error { | ||
file, err := os.Open(filePath) | ||
if err != nil { | ||
return fmt.Errorf("failed to open file: %w", err) | ||
} | ||
defer file.Close() | ||
|
||
signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) | ||
var ownerID user.ID | ||
ownerID.SetScriptHash(account.PrivateKey().GetScriptHash()) | ||
|
||
slc, err := slicer.New(ctx, clientSDK, signer, containerID, ownerID, sessionToken) | ||
if err != nil { | ||
return fmt.Errorf("failed to create slicer: %w", err) | ||
} | ||
|
||
attrs := []object.Attribute{ | ||
*object.NewAttribute(attributeKey, attributeValue), | ||
} | ||
|
||
_, err = slc.Put(ctx, file, attrs) | ||
if err != nil { | ||
return fmt.Errorf("failed to slice and upload file: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func createSessionToken(account wallet.Account, containerID cid.ID) (*session.Object, error) { | ||
sessionToken := session.Object{} | ||
|
||
sessionID := uuid.New() | ||
sessionToken.SetID(sessionID) | ||
|
||
pubKey := account.PublicKey() | ||
authKey := neofsecdsa.PublicKey(*pubKey) | ||
sessionToken.SetAuthKey(&authKey) | ||
|
||
sessionToken.BindContainer(containerID) | ||
sessionToken.ForVerb(session.VerbObjectPut) | ||
|
||
sessionToken.SetExp(uint64(time.Now().Add(100 * time.Minute).Unix())) | ||
|
||
signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) | ||
if err := sessionToken.Sign(signer); err != nil { | ||
return nil, fmt.Errorf("failed to sign session token: %w", err) | ||
} | ||
|
||
return &sessionToken, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
package server | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" | ||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" | ||
"github.com/nspcc-dev/neofs-sdk-go/client" | ||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
"github.com/nspcc-dev/neofs-sdk-go/object" | ||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
"github.com/urfave/cli/v2" | ||
) | ||
|
||
func generateOIDs(ctx *cli.Context) error { | ||
rpc := ctx.String("rpc-endpoint") | ||
containerIDStr := ctx.String("container") | ||
attribute := ctx.String("attribute") | ||
startIndex := uint32(ctx.Uint("start")) | ||
count := uint32(ctx.Uint("count")) | ||
outputDir := ctx.String("out") | ||
fileSize := uint32(ctx.Uint("index-file-size")) | ||
|
||
if _, err := os.Stat(outputDir); os.IsNotExist(err) { | ||
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil { | ||
return fmt.Errorf("failed to create directory %s: %w", outputDir, err) | ||
} | ||
} | ||
|
||
var containerID cid.ID | ||
err := containerID.DecodeString(containerIDStr) | ||
if err != nil { | ||
return fmt.Errorf("failed to decode container ID: %w", err) | ||
} | ||
pk, err := keys.NewPrivateKey() | ||
if err != nil { | ||
return fmt.Errorf("failed to create private key: %w", err) | ||
} | ||
|
||
clientSDK, err := neofs.GetSDKClient(context.Background(), rpc, 10*time.Minute) | ||
if err != nil { | ||
return fmt.Errorf("failed to create NeoFS client: %w", err) | ||
} | ||
defer clientSDK.Close() | ||
|
||
fileIndex := 1 + startIndex/fileSize | ||
|
||
type result struct { | ||
index uint32 | ||
oidsBatch []oid.ID | ||
err error | ||
} | ||
|
||
for i := startIndex; i < startIndex+count; i += fileSize { | ||
var wg sync.WaitGroup | ||
results := make(chan result, fileSize) | ||
workerPool := make(chan struct{}, 100) | ||
|
||
oids := make([]oid.ID, 0, fileSize) | ||
|
||
for j := i; j < i+fileSize && j < startIndex+count; j++ { | ||
wg.Add(1) | ||
workerPool <- struct{}{} | ||
|
||
go func(index uint32) { | ||
defer wg.Done() | ||
defer func() { <-workerPool }() | ||
|
||
prm := client.PrmObjectSearch{} | ||
filters := object.NewSearchFilters() | ||
filters.AddFilter(attribute, fmt.Sprintf("%d", index), object.MatchStringEqual) | ||
prm.SetFilters(filters) | ||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) | ||
defer cancel() | ||
|
||
oidsBatch, err := neofs.ObjectSearch(ctx, clientSDK, pk, containerID.String(), prm) | ||
if err != nil || len(oidsBatch) == 0 { | ||
results <- result{index, nil, fmt.Errorf("no OIDs found or failed for index %d: %w", index, err)} | ||
return | ||
} | ||
if len(oidsBatch) == 0 { | ||
fmt.Println("Found OIDs:", oidsBatch, "for index", index, "total", len(oidsBatch)) | ||
} | ||
if index%2000 == 0 { | ||
fmt.Println("Index:", index) | ||
} | ||
results <- result{index, oidsBatch[:1], nil} | ||
}(j) | ||
} | ||
|
||
go func() { | ||
wg.Wait() | ||
close(results) | ||
}() | ||
|
||
pendingResults := make(map[uint32][]oid.ID) | ||
expectedIndex := i | ||
|
||
for res := range results { | ||
if res.err != nil { | ||
return res.err | ||
} | ||
|
||
pendingResults[res.index] = res.oidsBatch | ||
|
||
for { | ||
if batch, ok := pendingResults[expectedIndex]; ok { | ||
delete(pendingResults, expectedIndex) | ||
oids = append(oids, batch...) | ||
|
||
if len(oids) >= int(fileSize) { | ||
if err := writeOIDFile(oids[:fileSize], outputDir, int(fileIndex)); err != nil { | ||
return err | ||
} | ||
oids = oids[fileSize:] | ||
fileIndex++ | ||
} | ||
expectedIndex++ | ||
} else { | ||
break | ||
} | ||
} | ||
} | ||
|
||
if len(oids) > 0 { | ||
if err := writeOIDFile(oids, outputDir, int(fileIndex)); err != nil { | ||
return err | ||
} | ||
fileIndex++ | ||
} | ||
} | ||
|
||
log.Println("OID generation completed.") | ||
return nil | ||
} | ||
|
||
// writeOIDFile writes a list of OIDs to a file in the specified directory. | ||
func writeOIDFile(oids []oid.ID, outputDir string, fileIndex int) error { | ||
fileName := fmt.Sprintf("%s/oid-%d.bin", outputDir, fileIndex) | ||
file, err := os.Create(fileName) | ||
if err != nil { | ||
return fmt.Errorf("failed to create file %s: %w", fileName, err) | ||
} | ||
defer file.Close() | ||
|
||
for _, oid := range oids { | ||
oidBytes := make([]byte, 32) | ||
oid.Encode(oidBytes) | ||
if _, err := file.Write(oidBytes); err != nil { | ||
return fmt.Errorf("failed to write OID to file %s: %w", fileName, err) | ||
} | ||
} | ||
|
||
log.Printf("Successfully wrote %d OIDs to %s", len(oids), fileName) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters