diff --git a/build.sh b/build.sh index d1139ce8b..7ddc93d0a 100644 --- a/build.sh +++ b/build.sh @@ -21,17 +21,7 @@ go build -ldflags "\ -o ./build/gnfd-sp cmd/storage_provider/*.go if [ $? -ne 0 ]; then - echo "build failed Ooh!!!" + echo "build failed Ooooooh!!!" else echo "build succeed!" -fi - -#go build -o ./build/test-gnfd-sp test/e2e/services/case_driver.go -#if [ $? -ne 0 ]; then -# echo "build test-storage-provider failed Ooh!!!" -#fi -# -#go build -o ./build/setup-test-env test/e2e/onebox/setup_onebox.go -#if [ $? -ne 0 ]; then -# echo "build setup-test-env failed Ooh!!!" -#fi +fi \ No newline at end of file diff --git a/cmd/storage_provider/main.go b/cmd/storage_provider/main.go index b8d22cd01..46a8a131e 100644 --- a/cmd/storage_provider/main.go +++ b/cmd/storage_provider/main.go @@ -10,6 +10,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/config" "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/service/blocksyncer" "github.com/bnb-chain/greenfield-storage-provider/service/challenge" "github.com/bnb-chain/greenfield-storage-provider/service/downloader" @@ -19,12 +20,11 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/service/stonenode" "github.com/bnb-chain/greenfield-storage-provider/service/syncer" "github.com/bnb-chain/greenfield-storage-provider/service/uploader" - "github.com/bnb-chain/greenfield-storage-provider/pkg/log" ) var ( version = flag.Bool("version", false, "print version") - configFile = flag.String("config", "../../config/config.toml", "config file path") + configFile = flag.String("config", "./config.toml", "config file path") ) // initService init service instance by name and config. @@ -119,10 +119,10 @@ func main() { // 1. init service instance. service, err := initService(serviceName, cfg) if err != nil { - log.Errorw("init service failed", "service_name", serviceName, "error", err) + log.Errorw("failed to init service", "service", serviceName, "error", err) os.Exit(1) } - log.Debugw("init service success", "service_name", serviceName) + log.Debugw("success to init service ", "service", serviceName) // 2. register service to lifecycle. slc.RegisterServices(service) } diff --git a/cmd/storage_provider/version.go b/cmd/storage_provider/version.go index 283d85177..1fef8f2ad 100644 --- a/cmd/storage_provider/version.go +++ b/cmd/storage_provider/version.go @@ -7,15 +7,16 @@ import ( const ( StorageProviderLogo = `Greenfield Storage Provider - __ _ __ - _____/ /_____ _________ _____ ____ ____ _________ _ __(_)___/ /__ _____ - / ___/ __/ __ \/ ___/ __ / __ / _ \ / __ \/ ___/ __ \ | / / / __ / _ \/ ___/ - (__ ) /_/ /_/ / / / /_/ / /_/ / __/ / /_/ / / / /_/ / |/ / / /_/ / __/ / -/____/\__/\____/_/ \__,_/\__, /\___/ / .___/_/ \____/|___/_/\__,_/\___/_/ - /____/ /_/ -` + __ _ __ + _____/ /_____ _________ _____ ____ ____ _________ _ __(_)___/ /__ _____ + / ___/ __/ __ \/ ___/ __ / __ / _ \ / __ \/ ___/ __ \ | / / / __ / _ \/ ___/ + (__ ) /_/ /_/ / / / /_/ / /_/ / __/ / /_/ / / / /_/ / |/ / / /_/ / __/ / + /____/\__/\____/_/ \__,_/\__, /\___/ / .___/_/ \____/|___/_/\__,_/\___/_/ + /____/ /_/ + ` ) +// DumpLogo output greenfield storage provider logo func DumpLogo() string { return StorageProviderLogo } @@ -27,6 +28,7 @@ var ( BuildTime string ) +// DumpVersion output the storage provider version information func DumpVersion() string { return fmt.Sprintf("Version : %s\n"+ "Branch : %s\n"+ diff --git a/config/config.go b/config/config.go index 509b696f4..fbd76e536 100644 --- a/config/config.go +++ b/config/config.go @@ -4,7 +4,10 @@ import ( "bufio" "os" - "github.com/bnb-chain/greenfield-storage-provider/service/blocksyncer" + "github.com/bnb-chain/greenfield-storage-provider/model" + tomlconfig "github.com/forbole/juno/v4/cmd/migrate/toml" + "github.com/naoina/toml" + "github.com/bnb-chain/greenfield-storage-provider/service/challenge" "github.com/bnb-chain/greenfield-storage-provider/service/downloader" "github.com/bnb-chain/greenfield-storage-provider/service/gateway" @@ -14,10 +17,9 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/service/syncer" "github.com/bnb-chain/greenfield-storage-provider/service/uploader" "github.com/bnb-chain/greenfield-storage-provider/util" - tomlconfig "github.com/forbole/juno/v4/cmd/migrate/toml" - "github.com/naoina/toml" ) +// StorageProviderConfig defines the configuration of storage provider type StorageProviderConfig struct { Service []string GatewayCfg *gateway.GatewayConfig @@ -31,21 +33,29 @@ type StorageProviderConfig struct { BlockSyncerCfg *tomlconfig.TomlConfig } +// DefaultStorageProviderConfig defines the default configuration of storage provider services var DefaultStorageProviderConfig = &StorageProviderConfig{ - GatewayCfg: gateway.DefaultGatewayConfig, - UploaderCfg: uploader.DefaultUploaderConfig, - DownloaderCfg: downloader.DefaultDownloaderConfig, - ChallengeCfg: challenge.DefaultChallengeConfig, - StoneNodeCfg: stonenode.DefaultStoneNodeConfig, - SyncerCfg: syncer.DefaultSyncerConfig, - SignerCfg: signer.DefaultSignerChainConfig, - MetadataCfg: metadata.DefaultMetadataConfig, - BlockSyncerCfg: blocksyncer.DefaultBlockSyncerConfig, + GatewayCfg: gateway.DefaultGatewayConfig, + UploaderCfg: uploader.DefaultUploaderConfig, + DownloaderCfg: downloader.DefaultDownloaderConfig, + ChallengeCfg: challenge.DefaultChallengeConfig, + StoneNodeCfg: stonenode.DefaultStoneNodeConfig, + SyncerCfg: syncer.DefaultSyncerConfig, + SignerCfg: signer.DefaultSignerChainConfig, + Service: []string{ + model.GatewayService, + model.UploaderService, + model.DownloaderGrpcAddress, + model.ChallengeService, + model.StoneNodeService, + model.SyncerService, + model.SignerService, + }, } -// LoadConfig loads the config file -func LoadConfig(file string) *StorageProviderConfig { - f, err := os.Open(file) +// LoadConfig loads the config file from path +func LoadConfig(path string) *StorageProviderConfig { + f, err := os.Open(path) if err != nil { panic(err) } diff --git a/model/const.go b/model/const.go index 1337f17ae..03b7610ce 100644 --- a/model/const.go +++ b/model/const.go @@ -2,61 +2,73 @@ package model // define storage provider include service const ( - GatewayService = "Gateway" - UploaderService = "Uploader" - DownloaderService = "Downloader" - ChallengeService = "Challenge" - StoneNodeService = "StoneNode" - SyncerService = "Syncer" - SignerService = "Signer" - MetadataService = "Metadata" + // GatewayService defines the name of gateway service + GatewayService = "Gateway" + // UploaderService defines the name of uploader service + UploaderService = "Uploader" + // DownloaderService defines the name of downloader service + DownloaderService = "Downloader" + // ChallengeService defines the name of challenge service + ChallengeService = "Challenge" + // StoneNodeService defines the name of stone node service + StoneNodeService = "StoneNode" + // SyncerService defines the name of syncer service + SyncerService = "Syncer" + // SignerService defines the name of signer service + SignerService = "Signer" + // MetadataService defines the name of metadata service + MetadataService = "Metadata" + // BlockSyncerService defines the name of block sync service BlockSyncerService = "BlockSyncer" ) // define storage provider service gGRPC default address const ( - UploaderGrpcAddress = "localhost:9133" + // UploaderGrpcAddress default gGRPC address of uploader + UploaderGrpcAddress = "localhost:9133" + // DownloaderGrpcAddress default gGRPC address of downloader DownloaderGrpcAddress = "localhost:9233" - ChallengeGrpcAddress = "localhost:9333" - StoneNodeGrpcAddress = "localhost:9433" - SyncerGrpcAddress = "localhost:9533" - SignerGrpcAddress = "localhost:9633" + // ChallengeGrpcAddress default gGRPC address of challenge + ChallengeGrpcAddress = "localhost:9333" + // StoneNodeGrpcAddress default gGRPC address of stone node + StoneNodeGrpcAddress = "localhost:9433" + // SyncerGrpcAddress default gGRPC address of syncer + SyncerGrpcAddress = "localhost:9533" + // SignerGrpcAddress default gGRPC address of signer + SignerGrpcAddress = "localhost:9633" ) // environment constants const ( - // Piece Store constants + // BucketURL defines env variable name for bucket url BucketURL = "BUCKET_URL" - - // AWS environment constants - AWSAccessKey = "AWS_ACCESS_KEY" - AWSSecretKey = "AWS_SECRET_KEY" + // AWSAccessKey defines env variable name for aws assess key + AWSAccessKey = "AWS_ACCESS_KEY" + // AWSSecretKey defines env variable name for aws secret key + AWSSecretKey = "AWS_SECRET_KEY" + // AWSSessionToken defines env variable name for aws session token AWSSessionToken = "AWS_SESSION_TOKEN" - - // MetaDB environment constants - MetaDBUser = "META_DB_USER" - MetaDBPassword = "META_DB_PASSWORD" - - // JobDB environment constants - JobDBUser = "JOB_DB_USER" - JobDBPassword = "JOB_DB_PASSWORD" + // SpOperatorAddress defines env variable name for sp operator address + SpOperatorAddress = "SP_OPERATOR_ADDRESS" ) // define cache size const ( - // the maximum number of cached items in service trace queue + // LruCacheLimit define maximum number of cached items in service trace queue LruCacheLimit = 8192 ) // define piece store constants. const ( - BufPoolSize = 32 << 10 + // BufPoolSize define buffer pool size + BufPoolSize = 32 << 10 + // ChecksumAlgo define validation Algorithm Name ChecksumAlgo = "Crc32c" ) // RPC config const ( - // server and client max send or recv msg size + // MaxCallMsgSize defines gPRCt max send or recv msg size MaxCallMsgSize = 25 * 1024 * 1024 ) diff --git a/model/errors/error_code.go b/model/errors/error_code.go index 0c10234a1..9519d6898 100644 --- a/model/errors/error_code.go +++ b/model/errors/error_code.go @@ -1,10 +1,12 @@ package errors const ( + // RPCErrCode defines storage provider rpc error code RPCErrCode = 10000 - + // ErrorCodeBadRequest defines bad request error code ErrorCodeBadRequest = 40001 - ErrorCodeNotFound = 40004 - + // ErrorCodeNotFound defines not found error code + ErrorCodeNotFound = 40004 + // ErrorCodeInternalError defines internal error code ErrorCodeInternalError = 50001 ) diff --git a/model/errors/rpc_error.go b/model/errors/rpc_error.go index 1d2fa67ac..bca6f98c9 100644 --- a/model/errors/rpc_error.go +++ b/model/errors/rpc_error.go @@ -6,47 +6,61 @@ import ( // common error var ( - ErrCacheMiss = errors.New("cache missing") + // ErrCacheMiss defines cache missing error + ErrCacheMiss = errors.New("cache missing") + // ErrSealTimeout defines seal object timeout error ErrSealTimeout = errors.New("seal object timeout") ) // piece store errors var ( - NotSupportedMethod = errors.New("unsupported method") - NotSupportedDelimiter = errors.New("unsupported delimiter") - EmptyObjectKey = errors.New("invalid object key") - EmptyMemoryObject = errors.New("memory object not exist") - BucketNotExisted = errors.New("bucket not exist") + // ErrUnsupportMethod defines unsupported method error + ErrUnsupportMethod = errors.New("unsupported method") + // ErrUnsupportDelimiter defines invalid key with delimiter error + ErrUnsupportDelimiter = errors.New("unsupported delimiter") + // ErrInvalidObjectKey defines invalid object key error + ErrInvalidObjectKey = errors.New("invalid object key") + // ErrNotExitObject defines not exist object in memory error + ErrNotExitObject = errors.New("object not exist") + //ErrNotExistBucket defines not exist bucket error + ErrNotExistBucket = errors.New("bucket not exist") + // ErrNoPermissionAccessBucket defines deny access bucket error ErrNoPermissionAccessBucket = errors.New("deny access bucket") ) // gateway errors var ( - ErrInternalError = errors.New("internal error") - ErrDuplicateBucket = errors.New("duplicate bucket") - ErrDuplicateObject = errors.New("duplicate object") - ErrObjectTxNotExist = errors.New("object tx not exist") - ErrObjectNotExist = errors.New("object not exist") - ErrObjectIsEmpty = errors.New("object payload is empty") + // ErrInternalError defines storage provider internal error + ErrInternalError = errors.New("internal error") + // ErrDuplicateBucket defines duplicate bucket error + ErrDuplicateBucket = errors.New("duplicate bucket") + // ErrDuplicateObject defines duplicate object error + ErrDuplicateObject = errors.New("duplicate object") + // ErrPayloadZero defines payload size is zero error + ErrPayloadZero = errors.New("object payload is zero") - // signature error + // ErrAuthorizationFormat defines the invalid authorization format error ErrAuthorizationFormat = errors.New("authorization format error") - ErrRequestConsistent = errors.New("failed to check request consistent") + // ErrRequestConsistent defines the invalid request checksum error + ErrRequestConsistent = errors.New("failed to check request consistent") + // ErrSignatureConsistent defines the invalid signature error ErrSignatureConsistent = errors.New("failed to check signature consistent") - ErrUnsupportedSignType = errors.New("unsupported signature type") - ErrEmptyReqHeader = errors.New("request header is empty") - ErrReqHeader = errors.New("invalid request header") + // ErrUnsupportSignType defines the unsupported signature type error + ErrUnsupportSignType = errors.New("unsupported signature type") + // ErrEmptyReqHeader defines the empty header error + ErrEmptyReqHeader = errors.New("request header is empty") + // ErrInvalidHeader defines the invalid header error + ErrInvalidHeader = errors.New("invalid request header") ) // signer service error var ( - ErrIPBlocked = errors.New("ip blocked") - ErrAPIKey = errors.New("invalid api key") - ErrSignMsg = errors.New("sign message with private key failed") + // ErrIPBlocked defines deny request by ip error + ErrIPBlocked = errors.New("ip blocked") + // ErrAPIKey defines invalid signer api key + ErrAPIKey = errors.New("invalid api key") + // ErrSignMsg defines sign msg error by private key + ErrSignMsg = errors.New("sign message with private key failed") + // ErrSealObjectOnChain defines send seal object tx to chain error ErrSealObjectOnChain = errors.New("send sealObject msg failed") ) - -// block syncer service errors -var ( - ErrSyncerStopped = errors.New("syncer service has already stopped") -) diff --git a/pkg/greenfield/auth_service.go b/pkg/greenfield/auth_service.go index f059c1a6f..2daeb3096 100644 --- a/pkg/greenfield/auth_service.go +++ b/pkg/greenfield/auth_service.go @@ -13,14 +13,16 @@ func (greenfield *Greenfield) AuthUploadObjectWithAccount(ctx context.Context, b isSpBucket bool, ownerObject bool, err error) { accountExist, err = greenfield.HasAccount(ctx, account) if err != nil || !accountExist { - log.Errorw("failed to query account", "error", err, "is_account_exist", accountExist) + log.Errorw("failed to query account", "bucket", bucket, + "object", object, "account_exist", accountExist, "error", err) return } var bucketInfo *storagetypes.BucketInfo bucketInfo, err = greenfield.QueryBucketInfo(ctx, bucket) if err != nil || bucketInfo == nil { bucketExist = false - log.Errorw("failed to query bucket info", "error", err, "bucket_info", bucketInfo) + log.Errorw("failed to query bucket info", "bucket", bucket, + "object", object, "error", err) return } bucketExist = true @@ -29,18 +31,21 @@ func (greenfield *Greenfield) AuthUploadObjectWithAccount(ctx context.Context, b objectInfo, err = greenfield.QueryObjectInfo(ctx, bucket, object) if err != nil || objectInfo == nil { isInitStatus = false - log.Errorw("failed to query object info", "error", err, "object_info", objectInfo) + log.Errorw("failed to query object info", "bucket", bucket, + "object", object, "error", err) return } if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_INIT { isInitStatus = false - log.Errorw("object status is not equal to status_init", "status", objectInfo.GetObjectStatus()) + log.Errorw("object status is not equal to status_init", + "status", objectInfo.GetObjectStatus()) return } isInitStatus = true if objectInfo.GetOwner() != account { ownerObject = false - log.Errorw("object owner is not equal to account", "owner", objectInfo.GetOwner(), "account", account) + log.Errorw("object owner is not equal to account", + "owner", objectInfo.GetOwner(), "account", account) return } ownerObject = true @@ -67,14 +72,16 @@ func (greenfield *Greenfield) AuthDownloadObjectWithAccount(ctx context.Context, accountExist, err = greenfield.HasAccount(ctx, account) if err != nil || !accountExist { - log.Errorw("failed to query account", "error", err, "is_account_exist", accountExist) + log.Errorw("failed to query account", "bucket", bucket, + "object", object, "account_exist", accountExist, "error", err) return } var bucketInfo *storagetypes.BucketInfo bucketInfo, err = greenfield.QueryBucketInfo(ctx, bucket) if err != nil || bucketInfo == nil { bucketExist = false - log.Errorw("failed to query bucket info", "error", err, "bucket_info", bucketInfo) + log.Errorw("failed to query bucket info", + "bucket", bucket, "object", object, "error", err) return } bucketExist = true @@ -83,18 +90,20 @@ func (greenfield *Greenfield) AuthDownloadObjectWithAccount(ctx context.Context, objectInfo, err = greenfield.QueryObjectInfo(ctx, bucket, object) if err != nil || objectInfo == nil { isServiceStatus = false - log.Errorw("failed to query object info", "error", err, "object_info", objectInfo) + log.Errorw("failed to query object info", + "bucket", bucket, "object", object, "error", err) return } if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_IN_SERVICE { isServiceStatus = false - log.Errorw("object status is not equal to status_in_service", "status", objectInfo.GetObjectStatus()) + log.Errorw("object not in status_in_service", + "status", objectInfo.GetObjectStatus()) return } isServiceStatus = true if objectInfo.GetOwner() != account { ownerObject = false - log.Errorw("object owner is not equal to account", "owner", objectInfo.GetOwner(), "account", account) + log.Errorw("object owner mismatch", "owner", objectInfo.GetOwner(), "account", account) return } ownerObject = true diff --git a/pkg/greenfield/greenfield.go b/pkg/greenfield/greenfield.go index 16a52d528..fa1e9664d 100644 --- a/pkg/greenfield/greenfield.go +++ b/pkg/greenfield/greenfield.go @@ -27,11 +27,12 @@ type GreenfieldClient struct { Provider []string } +// GnfdCompositeClient return the greenfield chain client func (client *GreenfieldClient) GnfdCompositeClient() *chain.GnfdCompositeClient { return client.gnfdCompositeClient } -// Greenfield the greenfield chain service. +// Greenfield encapsulation of greenfiled chain go sdk, support for more query request type Greenfield struct { config *GreenfieldChainConfig client *GreenfieldClient @@ -79,14 +80,14 @@ func (greenfield *Greenfield) getCurrentClient() *GreenfieldClient { return greenfield.client } -// setCurrentClient set current use client. +// setCurrentClient set client to current client for using. func (greenfield *Greenfield) setCurrentClient(client *GreenfieldClient) { greenfield.mutex.Lock() defer greenfield.mutex.Unlock() greenfield.client = client } -// updateClient select block height is the largest from all clients and update to current client. +// updateClient select the client that block height is the largest and set to current client. func (greenfield *Greenfield) updateClient() { ticker := time.NewTicker(UpdateClientInternal * time.Second) for { diff --git a/pkg/greenfield/query_service.go b/pkg/greenfield/query_service.go index c000d201e..812018cdb 100644 --- a/pkg/greenfield/query_service.go +++ b/pkg/greenfield/query_service.go @@ -32,7 +32,7 @@ func (greenfield *Greenfield) HasAccount(ctx context.Context, address string) (b client := greenfield.getCurrentClient().GnfdCompositeClient() resp, err := client.Account(ctx, &authtypes.QueryAccountRequest{Address: address}) if err != nil { - log.Errorw("failed to query account", "error", err, "address", address) + log.Errorw("failed to query account", "address", address, "error", err) return false, err } return resp.GetAccount() != nil, nil @@ -49,7 +49,7 @@ func (greenfield *Greenfield) QuerySPInfo(ctx context.Context) ([]*sptypes.Stora }, }) if err != nil { - log.Errorw("failed to query sp list", "error", err) + log.Errorw("failed to query storage provider list", "error", err) return spInfos, err } for i := 0; i < len(resp.GetSps()); i++ { @@ -63,7 +63,7 @@ func (greenfield *Greenfield) QueryBucketInfo(ctx context.Context, bucket string client := greenfield.getCurrentClient().GnfdCompositeClient() resp, err := client.HeadBucket(ctx, &storagetypes.QueryHeadBucketRequest{BucketName: bucket}) if err != nil { - log.Errorw("failed to query bucket", "error", err, "bucket_name", bucket) + log.Errorw("failed to query bucket", "bucket_name", bucket, "error", err) return nil, err } return resp.GetBucketInfo(), nil @@ -77,7 +77,7 @@ func (greenfield *Greenfield) QueryObjectInfo(ctx context.Context, bucket, objec ObjectName: object, }) if err != nil { - log.Errorw("failed to query object", "error", err, "bucket_name", bucket, "object_name", object) + log.Errorw("failed to query object", "bucket_name", bucket, "object_name", object, "error", err) return nil, err } return resp.GetObjectInfo(), nil @@ -99,7 +99,7 @@ func (greenfield *Greenfield) ListenObjectSeal(ctx context.Context, bucket, obje return } } - log.Errorw("listen seal object timeout", "error", err, "bucket_name", bucket, "object_name", object) + log.Errorw("seal object timeout", "bucket_name", bucket, "object_name", object) err = merror.ErrSealTimeout return } diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index fa98e5b5b..2beed3275 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -103,7 +103,7 @@ func (s *ServiceLifecycle) StopServices(ctx context.Context) { <-gCtx.Done() if errors.Is(gCtx.Err(), context.Canceled) { - log.Infow("services stop working", "service config timeout", s.timeout) + log.Infow("services stop working", "stop service timeout", s.timeout) } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { log.Error("timeout while stopping service, killing instance manually") } diff --git a/pkg/stream/payload_stream.go b/pkg/stream/payload_stream.go index 294cf4c45..bf3795755 100644 --- a/pkg/stream/payload_stream.go +++ b/pkg/stream/payload_stream.go @@ -148,14 +148,14 @@ func (stream *PayloadStream) readStream() { if err == io.EOF && n == 0 { entry.err = err stream.entryCh <- entry - log.Infow("payload stream on closed", "object_id", stream.objectId) + log.Debugw("payload stream on closed", "object_id", stream.objectId) return } entry.segmentData = data stream.entryCh <- entry count++ readSize = readSize + uint32(n) - log.Infow("payload stream has read", "read_total_size", readSize, "object_id", stream.objectId, "segment_count:", count-1) + log.Debugw("payload stream has read", "read_total_size", readSize, "object_id", stream.objectId, "segment_count:", count-1) } } diff --git a/proto/service/challenge/types/challenge.proto b/proto/service/challenge/types/challenge.proto index c96838871..170de89ba 100644 --- a/proto/service/challenge/types/challenge.proto +++ b/proto/service/challenge/types/challenge.proto @@ -3,18 +3,28 @@ package service.challenge.types; option go_package = "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types"; +// ChallengePieceRequest is request type for the ChallengePiece RPC method. message ChallengePieceRequest { + // object_id defines the challenge object id uint64 object_id = 1; + // replicate_idx defines the challenge replicate idx int32 replicate_idx = 2; + // segment_idx defines the challenge segment idx uint32 segment_idx = 5; } +// ChallengePieceResponse is response type for the ChallengePiece RPC method. message ChallengePieceResponse { + // piece_data defines the challenge segment data bytes piece_data = 1; + // integrity_hash defines the integrity hash of the challenge replicate payload bytes integrity_hash = 2; + // piece_hash defines the checksum of the challenge segment repeated bytes piece_hash = 3; } +// ChallengeService defines the gRPC service of challenge segment. service ChallengeService { + // ChallengePiece challenge the segment of the object. rpc ChallengePiece(ChallengePieceRequest) returns (ChallengePieceResponse) {}; } diff --git a/proto/service/downloader/types/downloader.proto b/proto/service/downloader/types/downloader.proto index 97d43e41c..2d8aa2ed5 100644 --- a/proto/service/downloader/types/downloader.proto +++ b/proto/service/downloader/types/downloader.proto @@ -3,25 +3,37 @@ package service.downloader.types; option go_package = "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types"; +// DownloaderObjectRequest is request type for the DownloaderObject RPC method. message DownloaderObjectRequest { + // bucket_name defines the download bucket name string bucket_name = 1; + // object_name defines the download object name string object_name = 2; + // offset defines the download payload offset uint64 offset = 3; + // length defines the download payload length uint64 length = 4; // if length == 0, download all object data - + // is_range indicate whether it is rang download, compatible aws s3 bool is_range = 5; + // range_start defines the start of range // [range_start, range_end], range_start >= object_size is invalid, download all object data int64 range_start = 6; + // range_end defines the end of range // range_end >= object_size is invalid, download all object data // range_end < 0, download [range_start, object_end] int64 range_end = 7; } +// DownloaderObjectResponse is response type for the DownloaderObject RPC method. message DownloaderObjectResponse { + // is_valid_range indicates whether ths range valid bool is_valid_range = 1; + // data defines the download data bytes data = 2; } +// DownloaderService defines the gRPC service of download payload. service DownloaderService { + // DownloaderObject download the payload of the object. rpc DownloaderObject(DownloaderObjectRequest) returns (stream DownloaderObjectResponse) {}; } diff --git a/proto/service/signer/types/signer.proto b/proto/service/signer/types/signer.proto index 8793e0fb3..910aca283 100644 --- a/proto/service/signer/types/signer.proto +++ b/proto/service/signer/types/signer.proto @@ -47,7 +47,6 @@ message SignIntegrityHashResponse { } message SealObjectOnChainRequest { -// pkg.types.v1.ObjectInfo object_info = 1; bnbchain.greenfield.storage.MsgSealObject seal_object= 1; } diff --git a/service/blocksyncer/block_syncer.go b/service/blocksyncer/block_syncer.go index fff837837..5a02bb17a 100644 --- a/service/blocksyncer/block_syncer.go +++ b/service/blocksyncer/block_syncer.go @@ -17,7 +17,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/model" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/forbole/juno/v4/modules" "github.com/forbole/juno/v4/parser" "github.com/forbole/juno/v4/types" @@ -109,7 +108,7 @@ func (s *BlockSyncer) Start(ctx context.Context) error { // Stop running SyncerService func (s *BlockSyncer) Stop(ctx context.Context) error { if !s.running.Swap(false) { - return merrors.ErrSyncerStopped + return nil } return nil } diff --git a/service/challenge/challenge.go b/service/challenge/challenge.go index 7792075d9..52d6bcaac 100644 --- a/service/challenge/challenge.go +++ b/service/challenge/challenge.go @@ -14,14 +14,16 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/pkg/log" ) -// Challenge manage the integrity hash check +// Challenge implements the gRPC of ChallengeService, +// responsible for handling challenge piece request. type Challenge struct { config *ChallengeConfig spDB store.SPDB pieceStore *pscli.StoreClient } -// NewChallengeService return a Challenge instance. +// NewChallengeService returns an instance of Challenge that implementation of +// the lifecycle.Service and ChallengeService interface func NewChallengeService(config *ChallengeConfig) (challenge *Challenge, err error) { pieceStore, err := pscli.NewStoreClient(config.PieceStoreConfig) if err != nil { @@ -35,12 +37,12 @@ func NewChallengeService(config *ChallengeConfig) (challenge *Challenge, err err return challenge, nil } -// Name describes the name of Challenge +// Name return the challenge service name, for the lifecycle management func (challenge *Challenge) Name() string { return model.ChallengeService } -// Start implement the lifecycle interface +// Start the challenge gRPC service func (challenge *Challenge) Start(ctx context.Context) error { errCh := make(chan error) @@ -64,7 +66,7 @@ func (challenge *Challenge) Start(ctx context.Context) error { return err } -// Stop implement the lifecycle interface +// Stop the challenge gRPC service and recycle the resources func (challenge *Challenge) Stop(ctx context.Context) error { return nil } diff --git a/service/challenge/challenge_service.go b/service/challenge/challenge_service.go index fad511bfc..d0a4a60fc 100644 --- a/service/challenge/challenge_service.go +++ b/service/challenge/challenge_service.go @@ -8,7 +8,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types" ) -// ChallengePiece implement challenge service server interface and handle the grpc request. +// ChallengePiece handles the piece challenge request +// return the replica's integrity hash, piece hash and piece data func (challenge *Challenge) ChallengePiece( ctx context.Context, req *types.ChallengePieceRequest) ( diff --git a/service/challenge/client/challenge_client.go b/service/challenge/client/challenge_client.go index 67993425c..bbf4e59dd 100644 --- a/service/challenge/client/challenge_client.go +++ b/service/challenge/client/challenge_client.go @@ -6,18 +6,18 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types" ) -// ChallengeClient is a challenge gRPC client wrapper. +// ChallengeClient is a challenge gRPC service client wrapper type ChallengeClient struct { address string challenge types.ChallengeServiceClient conn *grpc.ClientConn } -// NewStoneNodeClient return a ChallengeClient instance. +// NewStoneNodeClient return a ChallengeClient instance func NewStoneNodeClient(address string) (*ChallengeClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -33,11 +33,12 @@ func NewStoneNodeClient(address string) (*ChallengeClient, error) { return client, nil } -// Close the gPRC connection +// Close the challenge gPRC connection func (client *ChallengeClient) Close() error { return client.conn.Close() } +// ChallengePiece send challenge piece request func (client *ChallengeClient) ChallengePiece(ctx context.Context, objectId uint64, replicateIdx int32, segmentIdx uint32, opts ...grpc.CallOption) ([]byte, [][]byte, []byte, error) { diff --git a/service/downloader/client/downloader_client.go b/service/downloader/client/downloader_client.go index 2dffa3643..b45798e22 100644 --- a/service/downloader/client/downloader_client.go +++ b/service/downloader/client/downloader_client.go @@ -7,16 +7,18 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types" ) +// DownloaderClient is a downloader gRPC service client wrapper type DownloaderClient struct { address string downloader types.DownloaderServiceClient conn *grpc.ClientConn } +// NewDownloaderClient return a DownloaderClient instance func NewDownloaderClient(address string) (*DownloaderClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -33,10 +35,12 @@ func NewDownloaderClient(address string) (*DownloaderClient, error) { return client, nil } +// Close the download gPRC connection func (client *DownloaderClient) Close() error { return client.conn.Close() } +// DownloaderObject download the payload of the object func (client *DownloaderClient) DownloaderObject(ctx context.Context, req *types.DownloaderObjectRequest, opts ...grpc.CallOption) (types.DownloaderService_DownloaderObjectClient, error) { // ctx = log.Context(context.Background(), req) diff --git a/service/downloader/downloader.go b/service/downloader/downloader.go index 48fd7b4c6..a79da4710 100644 --- a/service/downloader/downloader.go +++ b/service/downloader/downloader.go @@ -15,7 +15,8 @@ import ( pscli "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client" ) -// Downloader manage the payload data download +// Downloader implements the gRPC of DownloaderService, +// responsible for downloading object payload data type Downloader struct { cfg *DownloaderConfig spDB store.SPDB @@ -23,7 +24,8 @@ type Downloader struct { pieceStore *pscli.StoreClient } -// NewDownloaderService return a downloader instance. +// NewDownloaderService returns an instance of Downloader that implementation of +// the lifecycle.Service and DownloaderService interface func NewDownloaderService(cfg *DownloaderConfig) (*Downloader, error) { pieceStore, err := pscli.NewStoreClient(cfg.PieceStoreConfig) if err != nil { @@ -44,12 +46,12 @@ func NewDownloaderService(cfg *DownloaderConfig) (*Downloader, error) { return downloader, nil } -// Name implement the lifecycle interface +// Name return the downloader service name, for the lifecycle management func (downloader *Downloader) Name() string { return model.DownloaderService } -// Start implement the lifecycle interface +// Start the downloader gRPC service func (downloader *Downloader) Start(ctx context.Context) error { errCh := make(chan error) @@ -73,7 +75,7 @@ func (downloader *Downloader) Start(ctx context.Context) error { return err } -// Stop implement the lifecycle interface +// Stop the downloader gRPC service and recycle the resources func (downloader *Downloader) Stop(ctx context.Context) error { return nil } diff --git a/service/downloader/downloader_service.go b/service/downloader/downloader_service.go index d33f80081..5bb1cc78f 100644 --- a/service/downloader/downloader_service.go +++ b/service/downloader/downloader_service.go @@ -12,7 +12,7 @@ import ( var _ types.DownloaderServiceServer = &Downloader{} -// DownloaderObject download the object data and return to client. +// DownloaderObject download the payload of the object. func (downloader *Downloader) DownloaderObject(req *types.DownloaderObjectRequest, stream types.DownloaderService_DownloaderObjectServer) (err error) { var ( diff --git a/service/gateway/request_util.go b/service/gateway/request_util.go index c2dda314b..85799f81a 100644 --- a/service/gateway/request_util.go +++ b/service/gateway/request_util.go @@ -94,7 +94,7 @@ func (requestContext *requestContext) verifySignature() (sdk.AccAddress, error) if strings.HasPrefix(requestSignature, v2SignaturePrefix) { return requestContext.verifySignatureV2(requestSignature[len(v2SignaturePrefix):]) } - return nil, errors.ErrUnsupportedSignType + return nil, errors.ErrUnsupportSignType } // verifySignatureV1 used to verify request type v1 signature, return (address, nil) if check succeed @@ -173,7 +173,7 @@ func (requestContext *requestContext) verifySignatureV2(requestSignature string) } _ = signature // TODO: parse metamask signature and check timeout - // return nil, errors.ErrUnsupportedSignType + // return nil, errors.ErrUnsupportSignType return sdk.AccAddress{}, nil } diff --git a/service/gateway/sync_piece_handler.go b/service/gateway/sync_piece_handler.go index 22a18de6a..13e53a6a7 100644 --- a/service/gateway/sync_piece_handler.go +++ b/service/gateway/sync_piece_handler.go @@ -89,7 +89,7 @@ package gateway // id, err := util.StringToUin64(objectID) // if err != nil { // log.Errorw("parse object id failed", "error", err) -// return nil, merrors.ErrReqHeader +// return nil, merrors.ErrInvalidHeader // } // syncerInfo.ObjectId = id // @@ -110,7 +110,7 @@ package gateway // pCount, err := util.StringToUint32(pieceCount) // if err != nil { // log.Errorw("parse piece count failed", "error", err) -// return nil, merrors.ErrReqHeader +// return nil, merrors.ErrInvalidHeader // } // syncerInfo.PieceCount = pCount // @@ -123,7 +123,7 @@ package gateway // pIdx, err := util.StringToUint32(pieceIndex) // if err != nil { // log.Errorw("parse piece index failed", "error", err) -// return nil, merrors.ErrReqHeader +// return nil, merrors.ErrInvalidHeader // } // syncerInfo.PieceIndex = pIdx // diff --git a/service/stonenode/client/stone_node_client.go b/service/stonenode/client/stone_node_client.go index 81b55062f..1bb5ecf7f 100644 --- a/service/stonenode/client/stone_node_client.go +++ b/service/stonenode/client/stone_node_client.go @@ -8,18 +8,18 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/bnb-chain/greenfield-storage-provider/service/stonenode/types" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/service/stonenode/types" ) -// StoneNodeClient is a grpc client wrapper. +// StoneNodeClient is an stone node gRPC service client wrapper type StoneNodeClient struct { address string node types.StoneNodeServiceClient conn *grpc.ClientConn } -// NewStoneNodeClient return a toneNodeClient instance. +// NewStoneNodeClient return a StoneNodeClient instance func NewStoneNodeClient(address string) (*StoneNodeClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -35,18 +35,18 @@ func NewStoneNodeClient(address string) (*StoneNodeClient, error) { return client, nil } -// Close the gPRC connection +// Close the stone node gPRC connection func (client *StoneNodeClient) Close() error { return client.conn.Close() } -// ReplicateObject async replicate an object payload to other sp and seal object. +// ReplicateObject async replicate an object payload to other storage provider and seal object func (client *StoneNodeClient) ReplicateObject(ctx context.Context, object *storagetypes.ObjectInfo, opts ...grpc.CallOption) error { _, err := client.node.ReplicateObject(ctx, &types.ReplicateObjectRequest{ObjectInfo: object}, opts...) return err } -// QueryReplicatingObject query a replicating object payload information by object id. +// QueryReplicatingObject query a replicating object payload information by object id func (client *StoneNodeClient) QueryReplicatingObject(ctx context.Context, objectId uint64) (*servicetype.ReplicateSegmentInfo, error) { resp, err := client.node.QueryReplicatingObject(ctx, &types.QueryReplicatingObjectRequest{ObjectId: objectId}) if err != nil { diff --git a/service/stonenode/stone_node.go b/service/stonenode/stone_node.go index 013560550..59474a607 100644 --- a/service/stonenode/stone_node.go +++ b/service/stonenode/stone_node.go @@ -25,7 +25,9 @@ import ( var _ lifecycle.Service = &StoneNode{} -// StoneNode as min execution unit, execute storage provider's background tasks +// StoneNode as background min execution unit, execute storage provider's background tasks +// implements the gRPC of StoneNodeService, +// TODO :: StoneNode support more task tpyes, such as gc etc. type StoneNode struct { config *StoneNodeConfig cache *lru.Cache @@ -101,7 +103,7 @@ func (node *StoneNode) serve(errCh chan error) { } } -// EncodeReplicateSegments load segment data and encode by redundancy type +// EncodeReplicateSegments load segment data and encode according to redundancy type func (node *StoneNode) EncodeReplicateSegments( ctx context.Context, objectId uint64, diff --git a/service/stonenode/stone_node_config.go b/service/stonenode/stone_node_config.go index 99a2cf50e..9e5c8ec0a 100644 --- a/service/stonenode/stone_node_config.go +++ b/service/stonenode/stone_node_config.go @@ -17,7 +17,7 @@ type StoneNodeConfig struct { } var DefaultStoneNodeConfig = &StoneNodeConfig{ - SpOperatorAddress: "bnb-sp", + SpOperatorAddress: model.SpOperatorAddress, GrpcAddress: model.StoneNodeGrpcAddress, SignerGrpcAddress: model.SignerGrpcAddress, SpDBConfig: store.DefaultSqlDBConfig, diff --git a/service/stonenode/stone_node_service.go b/service/stonenode/stone_node_service.go index 356068fed..451c7090a 100644 --- a/service/stonenode/stone_node_service.go +++ b/service/stonenode/stone_node_service.go @@ -49,7 +49,7 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, _, err = node.signer.SealObjectOnChain(ctx, sealMsg) if err != nil { node.spDB.UpdateJobStatue(servicetypes.JobState_JOB_STATE_SIGN_OBJECT_ERROR, objectInfo.Id.Uint64()) - log.CtxErrorw(ctx, "failed to seal object by singer", "error", err) + log.CtxErrorw(ctx, "failed to sign object by signer", "error", err) return } node.spDB.UpdateJobStatue(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_TX_DOING, objectInfo.Id.Uint64()) @@ -57,17 +57,17 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, objectInfo.GetObjectName(), 10) if err != nil { node.spDB.UpdateJobStatue(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_ERROR, objectInfo.Id.Uint64()) - log.CtxErrorw(ctx, "failed to seal object to chain", "error", err) + log.CtxErrorw(ctx, "failed to seal object on chain", "error", err) return } node.spDB.UpdateJobStatue(servicetypes.JobState_JOB_STATE_SEAL_OBJECT_DONE, objectInfo.Id.Uint64()) - log.CtxInfow(ctx, "seal object to chain", "success", success) + log.CtxInfow(ctx, "seal object on chain", "success", success) return }() params, err := node.spDB.GetAllParam() if err != nil { - log.CtxErrorw(ctx, "failed to query sp param", "error", err) + log.CtxErrorw(ctx, "failed to query sp params", "error", err) return } segments := piecestore.ComputeSegmentCount(objectInfo.GetPayloadSize(), @@ -83,7 +83,7 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, spList, err := node.spDB.FetchAllWithoutSp(node.config.SpOperatorAddress, sptypes.STATUS_IN_SERVICE) if err != nil { - log.CtxErrorw(ctx, "failed to get storage providers", "error", err) + log.CtxErrorw(ctx, "failed to get storage providers to replicate", "error", err) return } @@ -137,7 +137,7 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, // integrityHash and signature are http response var integrityHash []byte var signature []byte - log.CtxDebugw(ctx, "receive the sp response", "replicate_idx", rIdx, "integrity_hash", integrityHash, "signature", signature) + log.CtxDebugw(ctx, "receive the sp response", "replica_idx", rIdx, "integrity_hash", integrityHash, "signature", signature) msg := storagetypes.NewSecondarySpSignDoc(sp.GetOperator(), integrityHash).GetSignBytes() approvalAddr, err := sdk.AccAddressFromHexUnsafe(sp.GetApprovalAddress()) if err != nil { @@ -149,9 +149,9 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, log.CtxErrorw(ctx, "failed to verify sp signature", "sp", sp.GetApprovalAddress(), "error", err) continue } - log.CtxInfow(ctx, "success to sync payload to sp", "sp", sp.GetOperator(), "replicate_idx", rIdx) + log.CtxInfow(ctx, "success to sync payload to sp", "sp", sp.GetOperator(), "replica_idx", rIdx) if atomic.AddInt64(&done, 1) == int64(replicates) { - log.CtxInfow(ctx, "finish to syncer all replicates") + log.CtxInfow(ctx, "finish to sync all replicas") errCh <- nil return } @@ -171,7 +171,7 @@ func (node *StoneNode) AsyncReplicateObject(ctx context.Context, } } -// QueryReplicatingObject query a replicating object payload information by object id +// QueryReplicatingObject query a replicating object information by object id func (node *StoneNode) QueryReplicatingObject( ctx context.Context, req *types.QueryReplicatingObjectRequest) ( diff --git a/service/syncer/client/syncer_client.go b/service/syncer/client/syncer_client.go index c61066603..50f043ccb 100644 --- a/service/syncer/client/syncer_client.go +++ b/service/syncer/client/syncer_client.go @@ -12,14 +12,14 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/service/syncer/types" ) -// SyncerClient is a grpc client wrapper. +// SyncerClient is a syncer gRPC service client wrapper type SyncerClient struct { address string syncer types.SyncerServiceClient conn *grpc.ClientConn } -// NewSyncerClient return a SyncerClient instance. +// NewSyncerClient return a SyncerClient instance func NewSyncerClient(address string) (*SyncerClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -37,14 +37,19 @@ func NewSyncerClient(address string) (*SyncerClient, error) { return client, nil } -// SyncObject an object payload with object info. +// Close the syncer gPRC connection +func (client *SyncerClient) Close() error { + return client.conn.Close() +} + +// SyncObject an object payload with object info func (client *SyncerClient) SyncObject( ctx context.Context, opts ...grpc.CallOption) (types.SyncerService_SyncObjectClient, error) { return client.syncer.SyncObject(ctx, opts...) } -// QuerySyncingObject a syncing object info by object id. +// QuerySyncingObject a syncing object info by object id func (client *SyncerClient) QuerySyncingObject( ctx context.Context, objectId uint64) (*servicetypes.SegmentInfo, error) { req := &types.QuerySyncingObjectRequest{ diff --git a/service/syncer/syncer.go b/service/syncer/syncer.go index 4c8a7ca01..10eb49d89 100644 --- a/service/syncer/syncer.go +++ b/service/syncer/syncer.go @@ -20,7 +20,7 @@ import ( var _ lifecycle.Service = &Syncer{} // Syncer implements the gRPC of SyncerService, -// responsible for receive replicate object payload data. +// responsible for receive replicate object payload data type Syncer struct { config *SyncerConfig cache *lru.Cache @@ -30,7 +30,7 @@ type Syncer struct { grpcServer *grpc.Server } -// NewSyncerService return a syncer instance and init the resource +// NewSyncerService return a Syncer instance and init the resource func NewSyncerService(config *SyncerConfig) (*Syncer, error) { cache, _ := lru.New(model.LruCacheLimit) pieceStore, err := psclient.NewStoreClient(config.PieceStoreConfig) @@ -70,7 +70,6 @@ func (syncer *Syncer) Stop(ctx context.Context) error { return nil } -// serve the syncer gRPC service func (syncer *Syncer) serve(errCh chan error) { lis, err := net.Listen("tcp", syncer.config.GrpcAddress) errCh <- err diff --git a/service/syncer/syncer_config.go b/service/syncer/syncer_config.go index b63c3d12c..8a12446b6 100644 --- a/service/syncer/syncer_config.go +++ b/service/syncer/syncer_config.go @@ -15,7 +15,7 @@ type SyncerConfig struct { } var DefaultSyncerConfig = &SyncerConfig{ - SpOperatorAddress: "bnb-sp", + SpOperatorAddress: model.SpOperatorAddress, GrpcAddress: model.SyncerGrpcAddress, SignerGrpcAddress: model.SignerGrpcAddress, SpDBConfig: store.DefaultSqlDBConfig, diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index c31f6d2cc..ca0389b8c 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -42,7 +42,7 @@ func (syncer *Syncer) SyncObject(stream types.SyncerService_SyncObjectServer) (e integrityMeta.Signature = resp.Signature err = syncer.spDB.SetObjectIntegrity(integrityMeta) if err != nil { - log.Errorw("fail to write integrity hash to db", "error", err) + log.Errorw("failed to write integrity hash to db", "error", err) return } traceInfo.IntegrityHash = resp.IntegrityHash diff --git a/service/uploader/client/cli/query.go b/service/uploader/client/cli/query.go deleted file mode 100644 index 7f1e458cd..000000000 --- a/service/uploader/client/cli/query.go +++ /dev/null @@ -1 +0,0 @@ -package cli diff --git a/service/uploader/client/cli/upload.go b/service/uploader/client/cli/upload.go deleted file mode 100644 index 7f1e458cd..000000000 --- a/service/uploader/client/cli/upload.go +++ /dev/null @@ -1 +0,0 @@ -package cli diff --git a/service/uploader/client/uploader_client.go b/service/uploader/client/uploader_client.go index 1304d0c87..71a030d31 100644 --- a/service/uploader/client/uploader_client.go +++ b/service/uploader/client/uploader_client.go @@ -6,18 +6,18 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" servicetypes "github.com/bnb-chain/greenfield-storage-provider/service/types" types "github.com/bnb-chain/greenfield-storage-provider/service/uploader/types" - "github.com/bnb-chain/greenfield-storage-provider/pkg/log" ) -// UploaderClient is a grpc client wrapper. +// UploaderClient is an uploader gRPC service client wrapper type UploaderClient struct { uploader types.UploaderServiceClient conn *grpc.ClientConn } -// NewUploaderClient return an uploader gPRC client instance. +// NewUploaderClient return an UploaderClient instance func NewUploaderClient(address string) (*UploaderClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -31,12 +31,12 @@ func NewUploaderClient(address string) (*UploaderClient, error) { return client, nil } -// Close the uploader gPRC client instance. +// Close the uploader gPRC client connection func (client *UploaderClient) Close() error { return client.conn.Close() } -// QueryUploadingObject query an uploading object info with object id. +// QueryUploadingObject query an uploading object info with object id func (client *UploaderClient) QueryUploadingObject( ctx context.Context, objectId uint64, diff --git a/service/uploader/uploader.go b/service/uploader/uploader.go index 3dea70bcd..3bff16523 100644 --- a/service/uploader/uploader.go +++ b/service/uploader/uploader.go @@ -21,7 +21,7 @@ import ( var _ lifecycle.Service = &Uploader{} // Uploader implements the gRPC of UploaderService, -// responsible for upload object payload data. +// responsible for uploading object payload data. type Uploader struct { config *UploaderConfig cache *lru.Cache diff --git a/service/uploader/uploader_service.go b/service/uploader/uploader_service.go index cc84bd536..e152c1f87 100644 --- a/service/uploader/uploader_service.go +++ b/service/uploader/uploader_service.go @@ -62,12 +62,12 @@ func (uploader *Uploader) UploadObject( traceInfo.GetObjectInfo().Id.Uint64()) log.Infow("finish to upload payload", "error", err) }(&resp, err) - params, err := uploader.spDB.GetAllParam() if err != nil { return } segmentSize := params.GetMaxSegmentSize() + // read payload from gRPC stream go func() { init := true diff --git a/store/piecestore/piece/piece_store.go b/store/piecestore/piece/piece_store.go index 6b9b925e2..58f1482b9 100644 --- a/store/piecestore/piece/piece_store.go +++ b/store/piecestore/piece/piece_store.go @@ -11,8 +11,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storage" ) // NewPieceStore returns an instance of PieceStore @@ -88,7 +88,7 @@ func createStorage(cfg *storage.PieceStoreConfig) (storage.ObjectStorage, error) func checkBucket(ctx context.Context, store storage.ObjectStorage) error { if err := store.HeadBucket(ctx); err != nil { log.Errorw("HeadBucket error", "error", err) - if errors.Is(err, merrors.BucketNotExisted) { + if errors.Is(err, merrors.ErrNotExistBucket) { if err2 := store.CreateBucket(ctx); err2 != nil { return fmt.Errorf("Failed to create bucket in %s: %s, previous err: %s", store, err2, err) } diff --git a/store/piecestore/storage/disk_file.go b/store/piecestore/storage/disk_file.go index 9611c7269..3f0441efc 100644 --- a/store/piecestore/storage/disk_file.go +++ b/store/piecestore/storage/disk_file.go @@ -144,7 +144,7 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error { func (d *diskFileStore) HeadBucket(ctx context.Context) error { if _, err := os.Stat(d.root); err != nil { if os.IsNotExist(err) { - return errors.BucketNotExisted + return errors.ErrNotExistBucket } return err } diff --git a/store/piecestore/storage/disk_file_test.go b/store/piecestore/storage/disk_file_test.go index c7bbe55e4..1e6a36d38 100644 --- a/store/piecestore/storage/disk_file_test.go +++ b/store/piecestore/storage/disk_file_test.go @@ -289,13 +289,13 @@ func TestDiskFile_HeadDirSuccess(t *testing.T) { func TestDiskFile_List(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListObjects(context.TODO(), emptyString, emptyString, emptyString, 0) - assert.Equal(t, merrors.NotSupportedMethod, err) + assert.Equal(t, merrors.ErrUnsupportMethod, err) } func TestDiskFile_ListAll(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, merrors.NotSupportedMethod, err) + assert.Equal(t, merrors.ErrUnsupportMethod, err) } func TestPath(t *testing.T) { diff --git a/store/piecestore/storage/memory.go b/store/piecestore/storage/memory.go index a3d521bf1..fe93336e1 100644 --- a/store/piecestore/storage/memory.go +++ b/store/piecestore/storage/memory.go @@ -42,11 +42,11 @@ func (m *memoryStore) GetObject(ctx context.Context, key string, offset, limit i defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, errors.EmptyObjectKey + return nil, errors.ErrInvalidObjectKey } d, ok := m.objects[key] if !ok { - return nil, errors.EmptyMemoryObject + return nil, errors.ErrNotExitObject } if offset > int64(len(d.data)) { @@ -64,7 +64,7 @@ func (m *memoryStore) PutObject(ctx context.Context, key string, reader io.Reade defer m.Unlock() // Minimum length is 1 if key == "" { - return errors.EmptyObjectKey + return errors.ErrInvalidObjectKey } if _, ok := m.objects[key]; ok { log.Info("overwrite key: ", key) @@ -94,7 +94,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, errors.EmptyObjectKey + return nil, errors.ErrInvalidObjectKey } o, ok := m.objects[key] if !ok { @@ -111,7 +111,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error func (m *memoryStore) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { if delimiter != "" { - return nil, errors.NotSupportedDelimiter + return nil, errors.ErrUnsupportDelimiter } m.Lock() defer m.Unlock() diff --git a/store/piecestore/storage/memory_test.go b/store/piecestore/storage/memory_test.go index ac225bb4f..372d0c5c8 100644 --- a/store/piecestore/storage/memory_test.go +++ b/store/piecestore/storage/memory_test.go @@ -62,12 +62,12 @@ func TestMemory_GetError(t *testing.T) { { name: "memory_get_error_test1", key: emptyString, - wantedErr: errors.EmptyObjectKey, + wantedErr: errors.ErrInvalidObjectKey, }, { name: "memory_get_error_test2", key: mockKey, - wantedErr: errors.EmptyMemoryObject, + wantedErr: errors.ErrNotExitObject, }, } for _, tt := range cases { @@ -94,7 +94,7 @@ func TestMemory_Put(t *testing.T) { name: "memory_put_test1", key: emptyString, data: endPoint, - wantedErr: errors.EmptyObjectKey, + wantedErr: errors.ErrInvalidObjectKey, }, { name: "memory_put_test2", @@ -178,7 +178,7 @@ func TestMemory_HeadError(t *testing.T) { { name: "memory_head_error_test1", key: emptyString, - wantedErr: errors.EmptyObjectKey, + wantedErr: errors.ErrInvalidObjectKey, }, { name: "memory_head_error_test2", @@ -230,7 +230,7 @@ func TestMemory_ListError(t *testing.T) { { name: "memory_list_error_test1", delimiter: mockKey, - wantedErr: errors.NotSupportedDelimiter, + wantedErr: errors.ErrUnsupportDelimiter, }, } for _, tt := range cases { @@ -246,5 +246,5 @@ func TestMemory_ListError(t *testing.T) { func TestMemory_ListAll(t *testing.T) { store := setupMemoryTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, errors.NotSupportedMethod, err) + assert.Equal(t, errors.ErrUnsupportMethod, err) } diff --git a/store/piecestore/storage/object_storage.go b/store/piecestore/storage/object_storage.go index b6c9d2834..2bfea0c42 100644 --- a/store/piecestore/storage/object_storage.go +++ b/store/piecestore/storage/object_storage.go @@ -35,11 +35,11 @@ func (s DefaultObjectStorage) CreateBucket(ctx context.Context) error { } func (s DefaultObjectStorage) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { - return nil, errors.NotSupportedMethod + return nil, errors.ErrUnsupportMethod } func (s DefaultObjectStorage) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, errors.NotSupportedMethod + return nil, errors.ErrUnsupportMethod } type file struct { diff --git a/store/piecestore/storage/s3.go b/store/piecestore/storage/s3.go index 3bee8b921..60d6b6382 100644 --- a/store/piecestore/storage/s3.go +++ b/store/piecestore/storage/s3.go @@ -147,7 +147,7 @@ func (s *s3Store) HeadBucket(ctx context.Context) error { log.Errorw("ObjectStorage S3 HeadBucket error", "error", err) if reqErr, ok := err.(awserr.RequestFailure); ok { if reqErr.StatusCode() == http.StatusNotFound { - return merrors.BucketNotExisted + return merrors.ErrNotExistBucket } } return err @@ -215,7 +215,7 @@ func (s *s3Store) ListObjects(ctx context.Context, prefix, marker, delimiter str } func (s *s3Store) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, merrors.NotSupportedMethod + return nil, merrors.ErrUnsupportMethod } // SessionCache holds session.Session according to ObjectStorageConfig and it synchronizes access/modification diff --git a/store/piecestore/storage/s3_test.go b/store/piecestore/storage/s3_test.go index e60e7b731..28ba9b148 100644 --- a/store/piecestore/storage/s3_test.go +++ b/store/piecestore/storage/s3_test.go @@ -283,7 +283,7 @@ func TestS3_ListSuccess(t *testing.T) { func TestS3_ListAll(t *testing.T) { store := setupS3Test(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, merrors.NotSupportedMethod, err) + assert.Equal(t, merrors.ErrUnsupportMethod, err) } type mockS3ClientError struct { diff --git a/store/sp_db.go b/store/sp_db.go index fd20efe5b..dbaefda12 100644 --- a/store/sp_db.go +++ b/store/sp_db.go @@ -44,10 +44,12 @@ const ( ) type SpInfo interface { + SetSelfSpInfo(*sptypes.StorageProvider) error + GetSelfSpInfo() (*sptypes.StorageProvider, error) UpdateAllSp([]*sptypes.StorageProvider) error FetchAllSp(...sptypes.Status) ([]*sptypes.StorageProvider, error) FetchAllWithoutSp(string, ...sptypes.Status) ([]*sptypes.StorageProvider, error) - GetSpByAddress(addrType string) (*sptypes.StorageProvider, error) + GetSpByAddress(addr string, addrType string) (*sptypes.StorageProvider, error) GetSpByEndpoint(endpoint string) (*sptypes.StorageProvider, error) } diff --git a/store/store.go b/store/store.go index 3f45815e1..7eda595ae 100644 --- a/store/store.go +++ b/store/store.go @@ -4,18 +4,27 @@ import ( "fmt" "os" - "github.com/bnb-chain/greenfield-storage-provider/model" + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/store/config" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobsql" "github.com/bnb-chain/greenfield-storage-provider/store/metadb/metasql" "github.com/bnb-chain/greenfield-storage-provider/store/spdb" - "github.com/bnb-chain/greenfield-storage-provider/pkg/log" +) + +var ( + // MetaDB environment constants + MetaDBUser = "META_DB_USER" + MetaDBPassword = "META_DB_PASSWORD" + + // JobDB environment constants + JobDBUser = "JOB_DB_USER" + JobDBPassword = "JOB_DB_PASSWORD" ) // NewMetaDB return a meta-db instance func NewMetaDB(dbType string, levelDBConfig *config.LevelDBConfig, sqlDBConfig *config.SqlDBConfig) (spdb.MetaDB, error) { var err error - sqlDBConfig.User, sqlDBConfig.Passwd, err = getDBConfigFromEnv(model.MetaDBUser, model.MetaDBPassword) + sqlDBConfig.User, sqlDBConfig.Passwd, err = getDBConfigFromEnv(MetaDBUser, MetaDBPassword) if err != nil { log.Error("load meta db config from env failed") return nil, err @@ -27,7 +36,7 @@ func NewMetaDB(dbType string, levelDBConfig *config.LevelDBConfig, sqlDBConfig * // NewJobDB return a job-db instance func NewJobDB(dbType string, sqlDBConfig *config.SqlDBConfig) (spdb.JobDB, error) { var err error - sqlDBConfig.User, sqlDBConfig.Passwd, err = getDBConfigFromEnv(model.JobDBUser, model.JobDBPassword) + sqlDBConfig.User, sqlDBConfig.Passwd, err = getDBConfigFromEnv(JobDBUser, JobDBPassword) if err != nil { log.Error("load job db config from env failed") return nil, err diff --git a/util/maps/maps.go b/util/maps/maps.go index ffff37ff7..361c657ab 100644 --- a/util/maps/maps.go +++ b/util/maps/maps.go @@ -16,6 +16,7 @@ func SortKeys[M ~map[K]V, K constraints.Ordered, V any](m M) []K { return keys } +// sortSlice return the sort items slice by key func sortSlice[T constraints.Ordered](s []T) { sort.Slice(s, func(i, j int) bool { return s[i] < s[j]