diff --git a/mock/inscription_chain.go b/mock/inscription_chain.go index 2c5312dcd..fac7e6a1e 100644 --- a/mock/inscription_chain.go +++ b/mock/inscription_chain.go @@ -4,7 +4,7 @@ import ( "errors" "sync" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" ) var ( @@ -20,8 +20,8 @@ type ChainEvent struct { // InscriptionChainMock mock the inscription chain type InscriptionChainMock struct { - objectByHash map[string]*types.ObjectInfo - objectByName map[string]*types.ObjectInfo + objectByHash map[string]*ptypesv1pb.ObjectInfo + objectByName map[string]*ptypesv1pb.ObjectInfo events map[string][]chan interface{} notifyCh chan *ChainEvent stopCh chan struct{} @@ -33,8 +33,8 @@ type InscriptionChainMock struct { // NewInscriptionChainMock return the InscriptionChainMock instance func NewInscriptionChainMock() *InscriptionChainMock { cli := &InscriptionChainMock{ - objectByHash: make(map[string]*types.ObjectInfo), - objectByName: make(map[string]*types.ObjectInfo), + objectByHash: make(map[string]*ptypesv1pb.ObjectInfo), + objectByName: make(map[string]*ptypesv1pb.ObjectInfo), events: make(map[string][]chan interface{}), notifyCh: make(chan *ChainEvent, 10), stopCh: make(chan struct{}), @@ -87,7 +87,7 @@ func (cli *InscriptionChainMock) eventLoop() { } // QueryObjectByTx return the object info by create object tx hash. -func (cli *InscriptionChainMock) QueryObjectByTx(txHash []byte) (*types.ObjectInfo, error) { +func (cli *InscriptionChainMock) QueryObjectByTx(txHash []byte) (*ptypesv1pb.ObjectInfo, error) { cli.mu.Lock() defer cli.mu.Unlock() obj, ok := cli.objectByHash[string(txHash)] @@ -98,7 +98,7 @@ func (cli *InscriptionChainMock) QueryObjectByTx(txHash []byte) (*types.ObjectIn } // QueryObjectByName return the object info by create object bucketName/objectName. -func (cli *InscriptionChainMock) QueryObjectByName(name string) (*types.ObjectInfo, error) { +func (cli *InscriptionChainMock) QueryObjectByName(name string) (*ptypesv1pb.ObjectInfo, error) { cli.mu.Lock() defer cli.mu.Unlock() obj, ok := cli.objectByName[name] @@ -109,7 +109,7 @@ func (cli *InscriptionChainMock) QueryObjectByName(name string) (*types.ObjectIn } // CreateObjectByTxHash create the object info on the mock inscription chain. -func (cli *InscriptionChainMock) CreateObjectByTxHash(txHash []byte, object *types.ObjectInfo) { +func (cli *InscriptionChainMock) CreateObjectByTxHash(txHash []byte, object *ptypesv1pb.ObjectInfo) { cli.mu.Lock() defer cli.mu.Unlock() cli.objectID++ @@ -127,7 +127,7 @@ func (cli *InscriptionChainMock) CreateObjectByTxHash(txHash []byte, object *typ } // CreateObjectByName create the object info on the mock inscription chain. -func (cli *InscriptionChainMock) CreateObjectByName(txHash []byte, object *types.ObjectInfo) { +func (cli *InscriptionChainMock) CreateObjectByName(txHash []byte, object *ptypesv1pb.ObjectInfo) { cli.mu.Lock() defer cli.mu.Unlock() cli.objectID++ @@ -140,7 +140,7 @@ func (cli *InscriptionChainMock) CreateObjectByName(txHash []byte, object *types } // SealObjectByTxHash seal the object on the mock inscription chain. -func (cli *InscriptionChainMock) SealObjectByTxHash(txHash []byte, object *types.ObjectInfo) { +func (cli *InscriptionChainMock) SealObjectByTxHash(txHash []byte, object *ptypesv1pb.ObjectInfo) { cli.mu.Lock() defer cli.mu.Unlock() object.TxHash = txHash diff --git a/mock/signer.go b/mock/signer.go index e4e5309f7..0f0b2cb16 100644 --- a/mock/signer.go +++ b/mock/signer.go @@ -3,7 +3,7 @@ package mock import ( "time" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/hash" ) @@ -20,7 +20,7 @@ func NewSignerServerMock(chain *InscriptionChainMock) *SignerServerMock { } // BroadcastCreateObjectMessage mock broadcast create object message to inscription chain -func (signer *SignerServerMock) BroadcastCreateObjectMessage(object *types.ObjectInfo) []byte { +func (signer *SignerServerMock) BroadcastCreateObjectMessage(object *ptypesv1pb.ObjectInfo) []byte { txHash := hash.GenerateChecksum([]byte(time.Now().String())) go func() { time.Sleep(1 * time.Second) @@ -30,7 +30,7 @@ func (signer *SignerServerMock) BroadcastCreateObjectMessage(object *types.Objec } // BroadcastSealObjectMessage mock broadcast seal object message to inscription chain -func (signer *SignerServerMock) BroadcastSealObjectMessage(object *types.ObjectInfo) []byte { +func (signer *SignerServerMock) BroadcastSealObjectMessage(object *ptypesv1pb.ObjectInfo) []byte { txHash := hash.GenerateChecksum([]byte(time.Now().String())) go func() { time.Sleep(1 * time.Second) diff --git a/model/errors/errors.go b/model/errors/errors.go index ee78d56b9..f77867fd5 100644 --- a/model/errors/errors.go +++ b/model/errors/errors.go @@ -3,7 +3,7 @@ package errors import ( "errors" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" ) // piece store errors @@ -66,9 +66,9 @@ var ( ErrReceivedPieceCount = errors.New("syncer service received piece count is wrong") ) -func MakeErrMsgResponse(err error) *service.ErrMessage { - return &service.ErrMessage{ - ErrCode: service.ErrCode_ERR_CODE_ERROR, +func MakeErrMsgResponse(err error) *stypesv1pb.ErrMessage { + return &stypesv1pb.ErrMessage{ + ErrCode: stypesv1pb.ErrCode_ERR_CODE_ERROR, ErrMsg: err.Error(), } } diff --git a/model/piecestore/piece_key.go b/model/piecestore/piece_key.go index 1ff2bf36f..1d5720346 100644 --- a/model/piecestore/piece_key.go +++ b/model/piecestore/piece_key.go @@ -7,7 +7,7 @@ import ( "strings" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -35,13 +35,13 @@ func DecodeSegmentPieceKey(pieceKey string) (uint64, uint32, error) { } // EncodePieceKey encodes piece store key -func EncodePieceKey(rType types.RedundancyType, objectId uint64, segmentIndex, pieceIndex uint32) (string, error) { +func EncodePieceKey(rType ptypesv1pb.RedundancyType, objectId uint64, segmentIndex, pieceIndex uint32) (string, error) { var pieceKey string - if rType == types.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED { + if rType == ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED { pieceKey = EncodeECPieceKey(objectId, segmentIndex, pieceIndex) - } else if rType == types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE { + } else if rType == ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE { pieceKey = EncodeSegmentPieceKey(objectId, pieceIndex) - } else if rType == types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE { + } else if rType == ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE { pieceKey = EncodeSegmentPieceKey(objectId, pieceIndex) } else { return "", merrors.ErrRedundancyType diff --git a/service/challenge/challenge.go b/service/challenge/challenge.go index df7cb32b6..9b9165b61 100644 --- a/service/challenge/challenge.go +++ b/service/challenge/challenge.go @@ -11,7 +11,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/service/client" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb/leveldb" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -73,7 +73,7 @@ func (challenge *Challenge) Start(ctx context.Context) error { return } grpcServer := grpc.NewServer() - service.RegisterChallengeServiceServer(grpcServer, challenge) + stypesv1pb.RegisterChallengeServiceServer(grpcServer, challenge) reflection.Register(grpcServer) if err = grpcServer.Serve(lis); err != nil { log.Errorw("syncer serve failed", "error", err) diff --git a/service/challenge/challenge_service.go b/service/challenge/challenge_service.go index 4a51dae0b..3f5c0bee4 100644 --- a/service/challenge/challenge_service.go +++ b/service/challenge/challenge_service.go @@ -5,21 +5,22 @@ import ( "errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) // ChallengePiece implement challenge service server interface and handle the grpc request. -func (challenge *Challenge) ChallengePiece(ctx context.Context, req *service.ChallengeServiceChallengePieceRequest) (resp *service.ChallengeServiceChallengePieceResponse, err error) { +func (challenge *Challenge) ChallengePiece(ctx context.Context, req *stypesv1pb.ChallengeServiceChallengePieceRequest) ( + resp *stypesv1pb.ChallengeServiceChallengePieceResponse, err error) { ctx = log.Context(ctx, req) - resp = &service.ChallengeServiceChallengePieceResponse{ + resp = &stypesv1pb.ChallengeServiceChallengePieceResponse{ TraceId: req.TraceId, ObjectId: req.ObjectId, } defer func() { if err != nil { - resp.ErrMessage.ErrCode = service.ErrCode_ERR_CODE_ERROR + resp.ErrMessage.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR resp.ErrMessage.ErrMsg = err.Error() log.CtxErrorw(ctx, "change failed", "error", err) } diff --git a/service/client/downloader_client.go b/service/client/downloader_client.go index 6a5646518..c8e3ecc3f 100644 --- a/service/client/downloader_client.go +++ b/service/client/downloader_client.go @@ -7,13 +7,13 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) type DownloaderClient struct { address string - downloader service.DownloaderServiceClient + downloader stypesv1pb.DownloaderServiceClient conn *grpc.ClientConn } @@ -29,7 +29,7 @@ func NewDownloaderClient(address string) (*DownloaderClient, error) { client := &DownloaderClient{ address: address, conn: conn, - downloader: service.NewDownloaderServiceClient(conn), + downloader: stypesv1pb.NewDownloaderServiceClient(conn), } return client, nil } @@ -38,19 +38,21 @@ func (client *DownloaderClient) Close() error { return client.conn.Close() } -func (client *DownloaderClient) DownloaderObject(ctx context.Context, req *service.DownloaderServiceDownloaderObjectRequest, opts ...grpc.CallOption) (service.DownloaderService_DownloaderObjectClient, error) { +func (client *DownloaderClient) DownloaderObject(ctx context.Context, req *stypesv1pb.DownloaderServiceDownloaderObjectRequest, + opts ...grpc.CallOption) (stypesv1pb.DownloaderService_DownloaderObjectClient, error) { ctx = log.Context(context.Background(), req) return client.downloader.DownloaderObject(ctx, req, opts...) } -func (client *DownloaderClient) DownloaderSegment(ctx context.Context, in *service.DownloaderServiceDownloaderSegmentRequest, opts ...grpc.CallOption) (*service.DownloaderServiceDownloaderSegmentResponse, error) { +func (client *DownloaderClient) DownloaderSegment(ctx context.Context, in *stypesv1pb.DownloaderServiceDownloaderSegmentRequest, + opts ...grpc.CallOption) (*stypesv1pb.DownloaderServiceDownloaderSegmentResponse, error) { resp, err := client.downloader.DownloaderSegment(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "downloader segment failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "downloader segment response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } diff --git a/service/client/stone_hub_client.go b/service/client/stone_hub_client.go index 449f6039c..e41c449c1 100644 --- a/service/client/stone_hub_client.go +++ b/service/client/stone_hub_client.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -23,20 +23,19 @@ var _ io.Closer = &StoneHubClient{} // //go:generate mockgen -source=./stone_hub_client.go -destination=./mock/stone_hub_mock.go -package=mock type StoneHubAPI interface { - CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) - SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*service.StoneHubServiceSetObjectCreateInfoResponse, error) - BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) - BeginUploadPayloadV2(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadV2Request, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadV2Response, error) - DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) - AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*service.StoneHubServiceAllocStoneJobResponse, error) - DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) - //QueryStone(ctx context.Context, in *service.StoneHubServiceQueryStoneRequest, opts ...grpc.CallOption) (*service.StoneHubServiceQueryStoneResponse, error) + CreateObject(ctx context.Context, in *stypesv1pb.StoneHubServiceCreateObjectRequest, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceCreateObjectResponse, error) + SetObjectCreateInfo(ctx context.Context, in *stypesv1pb.StoneHubServiceSetObjectCreateInfoRequest, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceSetObjectCreateInfoResponse, error) + BeginUploadPayload(ctx context.Context, in *stypesv1pb.StoneHubServiceBeginUploadPayloadRequest, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceBeginUploadPayloadResponse, error) + BeginUploadPayloadV2(ctx context.Context, in *stypesv1pb.StoneHubServiceBeginUploadPayloadV2Request, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceBeginUploadPayloadV2Response, error) + DonePrimaryPieceJob(ctx context.Context, in *stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse, error) + AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceAllocStoneJobResponse, error) + DoneSecondaryPieceJob(ctx context.Context, in *stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest, opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceDoneSecondaryPieceJobResponse, error) Close() error } type StoneHubClient struct { address string - stoneHub service.StoneHubServiceClient + stoneHub stypesv1pb.StoneHubServiceClient conn *grpc.ClientConn } @@ -50,20 +49,20 @@ func NewStoneHubClient(address string) (*StoneHubClient, error) { client := &StoneHubClient{ address: address, conn: conn, - stoneHub: service.NewStoneHubServiceClient(conn), + stoneHub: stypesv1pb.NewStoneHubServiceClient(conn), } return client, nil } -func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.StoneHubServiceCreateObjectRequest, - opts ...grpc.CallOption) (*service.StoneHubServiceCreateObjectResponse, error) { +func (client *StoneHubClient) CreateObject(ctx context.Context, in *stypesv1pb.StoneHubServiceCreateObjectRequest, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceCreateObjectResponse, error) { resp, err := client.stoneHub.CreateObject(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "create object failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "create object response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } @@ -71,58 +70,60 @@ func (client *StoneHubClient) CreateObject(ctx context.Context, in *service.Ston return resp, nil } -func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *service.StoneHubServiceSetObjectCreateInfoRequest, - opts ...grpc.CallOption) (*service.StoneHubServiceSetObjectCreateInfoResponse, error) { +func (client *StoneHubClient) SetObjectCreateInfo(ctx context.Context, in *stypesv1pb.StoneHubServiceSetObjectCreateInfoRequest, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceSetObjectCreateInfoResponse, error) { resp, err := client.stoneHub.SetObjectCreateInfo(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "set object height and object id failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "set object height and object id response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } return resp, nil } -func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadRequest, - opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadResponse, error) { +func (client *StoneHubClient) BeginUploadPayload(ctx context.Context, in *stypesv1pb.StoneHubServiceBeginUploadPayloadRequest, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceBeginUploadPayloadResponse, error) { resp, err := client.stoneHub.BeginUploadPayload(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "begin upload stone failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "begin upload stone response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } return resp, nil } -func (client *StoneHubClient) BeginUploadPayloadV2(ctx context.Context, in *service.StoneHubServiceBeginUploadPayloadV2Request, opts ...grpc.CallOption) (*service.StoneHubServiceBeginUploadPayloadV2Response, error) { +func (client *StoneHubClient) BeginUploadPayloadV2(ctx context.Context, in *stypesv1pb.StoneHubServiceBeginUploadPayloadV2Request, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceBeginUploadPayloadV2Response, error) { resp, err := client.stoneHub.BeginUploadPayloadV2(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "begin upload stone failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "begin upload stone response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } return resp, nil } -func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *service.StoneHubServiceDonePrimaryPieceJobRequest, opts ...grpc.CallOption) (*service.StoneHubServiceDonePrimaryPieceJobResponse, error) { +func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse, error) { resp, err := client.stoneHub.DonePrimaryPieceJob(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "done primary piece job failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "done primary piece job response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } @@ -130,8 +131,8 @@ func (client *StoneHubClient) DonePrimaryPieceJob(ctx context.Context, in *servi } func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.CallOption) ( - *service.StoneHubServiceAllocStoneJobResponse, error) { - req := &service.StoneHubServiceAllocStoneJobRequest{TraceId: util.GenerateRequestID()} + *stypesv1pb.StoneHubServiceAllocStoneJobResponse, error) { + req := &stypesv1pb.StoneHubServiceAllocStoneJobRequest{TraceId: util.GenerateRequestID()} resp, err := client.stoneHub.AllocStoneJob(ctx, req, opts...) ctx = log.Context(ctx, resp) if err != nil { @@ -141,22 +142,22 @@ func (client *StoneHubClient) AllocStoneJob(ctx context.Context, opts ...grpc.Ca if resp.PieceJob == nil { log.CtxDebugw(ctx, "alloc stone job empty.") } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "alloc stone job failed", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } return resp, nil } -func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *service.StoneHubServiceDoneSecondaryPieceJobRequest, - opts ...grpc.CallOption) (*service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { +func (client *StoneHubClient) DoneSecondaryPieceJob(ctx context.Context, in *stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest, + opts ...grpc.CallOption) (*stypesv1pb.StoneHubServiceDoneSecondaryPieceJobResponse, error) { resp, err := client.stoneHub.DoneSecondaryPieceJob(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "done secondary piece job failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "done secondary piece job response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } diff --git a/service/client/syncer_client.go b/service/client/syncer_client.go index 125afd8f1..d59cc06ba 100644 --- a/service/client/syncer_client.go +++ b/service/client/syncer_client.go @@ -8,7 +8,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/bnb-chain/greenfield-storage-provider/model" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -19,13 +19,13 @@ var _ io.Closer = &SyncerClient{} // //go:generate mockgen -source=./syncer_client.go -destination=./mock/syncer_mock.go -package=mock type SyncerAPI interface { - SyncPiece(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_SyncPieceClient, error) + SyncPiece(ctx context.Context, opts ...grpc.CallOption) (stypesv1pb.SyncerService_SyncPieceClient, error) Close() error } type SyncerClient struct { address string - syncer service.SyncerServiceClient + syncer stypesv1pb.SyncerServiceClient conn *grpc.ClientConn } @@ -40,14 +40,14 @@ func NewSyncerClient(address string) (*SyncerClient, error) { client := &SyncerClient{ address: address, conn: conn, - syncer: service.NewSyncerServiceClient(conn), + syncer: stypesv1pb.NewSyncerServiceClient(conn), } return client, nil } // UploadECPiece return SyncerService_UploadECPieceClient, need to be closed by caller func (client *SyncerClient) SyncPiece(ctx context.Context, opts ...grpc.CallOption) ( - service.SyncerService_SyncPieceClient, error) { + stypesv1pb.SyncerService_SyncPieceClient, error) { return client.syncer.SyncPiece(ctx, opts...) } diff --git a/service/client/uploader_client.go b/service/client/uploader_client.go index 7a43378f2..07657b223 100644 --- a/service/client/uploader_client.go +++ b/service/client/uploader_client.go @@ -7,14 +7,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) // UploaderClient is a grpc client wrapper. type UploaderClient struct { address string - uploader service.UploaderServiceClient + uploader stypesv1pb.UploaderServiceClient conn *grpc.ClientConn } @@ -29,20 +29,21 @@ func NewUploaderClient(address string) (*UploaderClient, error) { client := &UploaderClient{ address: address, conn: conn, - uploader: service.NewUploaderServiceClient(conn), + uploader: stypesv1pb.NewUploaderServiceClient(conn), } return client, nil } // CreateObject invoke uploader service CreateObject interface. -func (client *UploaderClient) CreateObject(ctx context.Context, in *service.UploaderServiceCreateObjectRequest, opts ...grpc.CallOption) (*service.UploaderServiceCreateObjectResponse, error) { +func (client *UploaderClient) CreateObject(ctx context.Context, in *stypesv1pb.UploaderServiceCreateObjectRequest, + opts ...grpc.CallOption) (*stypesv1pb.UploaderServiceCreateObjectResponse, error) { resp, err := client.uploader.CreateObject(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "send create object rpc failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "create object response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } @@ -50,19 +51,20 @@ func (client *UploaderClient) CreateObject(ctx context.Context, in *service.Uplo } // UploadPayload return grpc stream client, and be used to upload payload. -func (client *UploaderClient) UploadPayload(ctx context.Context, opts ...grpc.CallOption) (service.UploaderService_UploadPayloadClient, error) { +func (client *UploaderClient) UploadPayload(ctx context.Context, opts ...grpc.CallOption) (stypesv1pb.UploaderService_UploadPayloadClient, error) { return client.uploader.UploadPayload(ctx, opts...) } // GetAuthentication invoke uploader service GetAuthentication interface. -func (client *UploaderClient) GetAuthentication(ctx context.Context, in *service.UploaderServiceGetAuthenticationRequest, opts ...grpc.CallOption) (*service.UploaderServiceGetAuthenticationResponse, error) { +func (client *UploaderClient) GetAuthentication(ctx context.Context, in *stypesv1pb.UploaderServiceGetAuthenticationRequest, + opts ...grpc.CallOption) (*stypesv1pb.UploaderServiceGetAuthenticationResponse, error) { resp, err := client.uploader.GetAuthentication(ctx, in, opts...) ctx = log.Context(ctx, resp) if err != nil { log.CtxErrorw(ctx, "send get authentication rpc failed", "error", err) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.CtxErrorw(ctx, "get authentication response code is not success", "error", resp.GetErrMessage().GetErrMsg()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } @@ -70,7 +72,7 @@ func (client *UploaderClient) GetAuthentication(ctx context.Context, in *service } // UploadPayloadV2 return grpc stream client, and be used to upload payload. -func (client *UploaderClient) UploadPayloadV2(ctx context.Context, opts ...grpc.CallOption) (service.UploaderService_UploadPayloadV2Client, error) { +func (client *UploaderClient) UploadPayloadV2(ctx context.Context, opts ...grpc.CallOption) (stypesv1pb.UploaderService_UploadPayloadV2Client, error) { return client.uploader.UploadPayloadV2(ctx, opts...) } diff --git a/service/downloader/downloader.go b/service/downloader/downloader.go index 986ed9e7d..84bdaf1d2 100644 --- a/service/downloader/downloader.go +++ b/service/downloader/downloader.go @@ -9,7 +9,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/mock" "github.com/bnb-chain/greenfield-storage-provider/service/client" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -53,7 +53,7 @@ func (downloader *Downloader) Start(ctx context.Context) error { return } grpcServer := grpc.NewServer() - service.RegisterDownloaderServiceServer(grpcServer, downloader) + stypesv1pb.RegisterDownloaderServiceServer(grpcServer, downloader) reflection.Register(grpcServer) if err = grpcServer.Serve(lis); err != nil { log.Errorw("syncer serve failed", "error", err) diff --git a/service/downloader/downloader_service.go b/service/downloader/downloader_service.go index 56d574a3a..2f0e30069 100644 --- a/service/downloader/downloader_service.go +++ b/service/downloader/downloader_service.go @@ -7,22 +7,23 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) -var _ service.DownloaderServiceServer = &Downloader{} +var _ stypesv1pb.DownloaderServiceServer = &Downloader{} // DownloaderSegment download the segment data and return to client. -func (downloader *Downloader) DownloaderSegment(ctx context.Context, req *service.DownloaderServiceDownloaderSegmentRequest) (resp *service.DownloaderServiceDownloaderSegmentResponse, err error) { +func (downloader *Downloader) DownloaderSegment(ctx context.Context, req *stypesv1pb.DownloaderServiceDownloaderSegmentRequest) ( + resp *stypesv1pb.DownloaderServiceDownloaderSegmentResponse, err error) { ctx = log.Context(ctx, req) - resp = &service.DownloaderServiceDownloaderSegmentResponse{ + resp = &stypesv1pb.DownloaderServiceDownloaderSegmentResponse{ TraceId: req.TraceId, } defer func() { if err != nil { - resp.ErrMessage.ErrCode = service.ErrCode_ERR_CODE_ERROR + resp.ErrMessage.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR resp.ErrMessage.ErrMsg = err.Error() log.CtxErrorw(ctx, "download segment failed", "error", err, "object", req.ObjectId, "segment idx", req.SegmentIdx) } @@ -38,15 +39,16 @@ func (downloader *Downloader) DownloaderSegment(ctx context.Context, req *servic } // DownloaderObject download the object data and return to client. -func (downloader *Downloader) DownloaderObject(req *service.DownloaderServiceDownloaderObjectRequest, stream service.DownloaderService_DownloaderObjectServer) (err error) { +func (downloader *Downloader) DownloaderObject(req *stypesv1pb.DownloaderServiceDownloaderObjectRequest, + stream stypesv1pb.DownloaderService_DownloaderObjectServer) (err error) { var ( - objectInfo *types.ObjectInfo + objectInfo *ptypesv1pb.ObjectInfo size int offset uint64 length uint64 ) ctx := log.Context(context.Background(), req) - resp := &service.DownloaderServiceDownloaderObjectResponse{ + resp := &stypesv1pb.DownloaderServiceDownloaderObjectResponse{ TraceId: req.TraceId, } defer func() { diff --git a/service/gateway/download_processor.go b/service/gateway/download_processor.go index 299e6b05c..8cc432eff 100644 --- a/service/gateway/download_processor.go +++ b/service/gateway/download_processor.go @@ -9,7 +9,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/service/client" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -138,7 +138,7 @@ func (d *grpcDownloaderImpl) getObject(objectName string, writer io.Writer, opti size int ) - req := &service.DownloaderServiceDownloaderObjectRequest{ + req := &stypesv1pb.DownloaderServiceDownloaderObjectRequest{ TraceId: option.requestContext.requestID, BucketName: option.requestContext.bucketName, ObjectName: objectName, @@ -165,7 +165,7 @@ func (d *grpcDownloaderImpl) getObject(objectName string, writer io.Writer, opti log.Warnw("failed to read stream", "error", err) return errors.ErrInternalError } - if res.ErrMessage != nil && res.ErrMessage.ErrCode != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if res.ErrMessage != nil && res.ErrMessage.ErrCode != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { err = fmt.Errorf(res.ErrMessage.ErrMsg) log.Warnw("failed to read stream", "error", err) return errors.ErrInternalError diff --git a/service/gateway/request_util.go b/service/gateway/request_util.go index b860979af..5306197db 100644 --- a/service/gateway/request_util.go +++ b/service/gateway/request_util.go @@ -7,7 +7,7 @@ import ( "time" "github.com/bnb-chain/greenfield-storage-provider/model" - pbPkg "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/gorilla/mux" ) @@ -89,10 +89,9 @@ func generateRequestDetail(r *requestContext) string { } // redundancyType can be EC or Replica, if != EC, default is Replica -func redundancyTypeToEnum(redundancyType string) pbPkg.RedundancyType { +func redundancyTypeToEnum(redundancyType string) ptypesv1pb.RedundancyType { if redundancyType == model.ReplicaRedundancyTypeHeaderValue { - return pbPkg.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE + return ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE } - return pbPkg.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED - + return ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED } diff --git a/service/gateway/upload_processor.go b/service/gateway/upload_processor.go index 1ddc360fd..38f240dad 100644 --- a/service/gateway/upload_processor.go +++ b/service/gateway/upload_processor.go @@ -11,9 +11,9 @@ import ( "os" "github.com/bnb-chain/greenfield-storage-provider/model/errors" - pbPkg "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/service/client" - pbService "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -167,9 +167,9 @@ type grpcUploaderImpl struct { // putObjectTx is used to call uploaderService's CreateObject by grpc. func (gui *grpcUploaderImpl) putObjectTx(objectName string, option *putObjectTxOption) (*objectTxInfo, error) { log.Infow("put object tx", "option", option) - resp, err := gui.uploader.CreateObject(context.Background(), &pbService.UploaderServiceCreateObjectRequest{ + resp, err := gui.uploader.CreateObject(context.Background(), &stypesv1pb.UploaderServiceCreateObjectRequest{ TraceId: option.requestContext.requestID, - ObjectInfo: &pbPkg.ObjectInfo{ + ObjectInfo: &ptypesv1pb.ObjectInfo{ BucketName: option.requestContext.bucketName, ObjectName: objectName, Size: option.objectSize, @@ -210,7 +210,7 @@ func (gui *grpcUploaderImpl) putObject(objectName string, reader io.Reader, opti } if readN > 0 { - req := &pbService.UploaderServiceUploadPayloadRequest{ + req := &stypesv1pb.UploaderServiceUploadPayloadRequest{ TraceId: option.requestContext.requestID, TxHash: option.txHash, PayloadData: buf[:readN], @@ -230,7 +230,7 @@ func (gui *grpcUploaderImpl) putObject(objectName string, reader io.Reader, opti log.Warnw("put object failed, due to stream close", "err", err) return nil, errors.ErrInternalError } - if errMsg := resp.GetErrMessage(); errMsg != nil && errMsg.ErrCode != pbService.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if errMsg := resp.GetErrMessage(); errMsg != nil && errMsg.ErrCode != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.Warnw("failed to grpc", "err", resp.ErrMessage) return nil, fmt.Errorf(resp.ErrMessage.ErrMsg) } @@ -244,7 +244,7 @@ func (gui *grpcUploaderImpl) putObject(objectName string, reader io.Reader, opti // getAuthentication is used to call uploaderService's getAuthentication by grpc. func (gui *grpcUploaderImpl) getAuthentication(option *getAuthenticationOption) (*authenticationInfo, error) { - resp, err := gui.uploader.GetAuthentication(context.Background(), &pbService.UploaderServiceGetAuthenticationRequest{ + resp, err := gui.uploader.GetAuthentication(context.Background(), &stypesv1pb.UploaderServiceGetAuthenticationRequest{ TraceId: option.requestContext.requestID, Bucket: option.requestContext.bucketName, Object: option.requestContext.objectName, @@ -280,7 +280,7 @@ func (gui *grpcUploaderImpl) putObjectV2(objectName string, reader io.Reader, op return nil, errors.ErrInternalError } if readN > 0 { - req := &pbService.UploaderServiceUploadPayloadV2Request{ + req := &stypesv1pb.UploaderServiceUploadPayloadV2Request{ TraceId: option.requestContext.requestID, TxHash: option.txHash, PayloadData: buf[:readN], @@ -308,7 +308,7 @@ func (gui *grpcUploaderImpl) putObjectV2(objectName string, reader io.Reader, op log.Warnw("put object failed, due to stream close", "err", err) return nil, errors.ErrInternalError } - if errMsg := resp.GetErrMessage(); errMsg != nil && errMsg.ErrCode != pbService.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if errMsg := resp.GetErrMessage(); errMsg != nil && errMsg.ErrCode != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.Warnw("failed to grpc", "err", resp.ErrMessage) return nil, fmt.Errorf(resp.ErrMessage.ErrMsg) } diff --git a/service/stonehub/stone_hub.go b/service/stonehub/stone_hub.go index ba3887970..a0d2a8984 100644 --- a/service/stonehub/stone_hub.go +++ b/service/stonehub/stone_hub.go @@ -17,8 +17,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "github.com/bnb-chain/greenfield-storage-provider/pkg/stone" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobmemory" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb/jobsql" @@ -146,7 +146,7 @@ func (hub *StoneHub) serve() { return } grpcServer := grpc.NewServer() - service.RegisterStoneHubServiceServer(grpcServer, hub) + stypesv1pb.RegisterStoneHubServiceServer(grpcServer, hub) // register reflection service reflection.Register(grpcServer) if err := grpcServer.Serve(lis); err != nil { @@ -183,7 +183,7 @@ func (hub *StoneHub) eventLoop() { // processStoneJob according to the stone job types to process. func (hub *StoneHub) processStoneJob(stoneJob stone.StoneJob) { switch job := stoneJob.(type) { - case *service.PieceJob: + case *stypesv1pb.PieceJob: hub.jobQueue.Enqueue(job) log.Infow("push secondary piece job to queue", "object_id", job.GetObjectId(), "object_size", job.GetPayloadSize(), @@ -215,7 +215,7 @@ func (hub *StoneHub) gcMemoryStone() { if err != nil { return true // skip err stone } - if val.LastModifyTime() <= current || state == types.JOB_STATE_ERROR { + if val.LastModifyTime() <= current || state == ptypesv1pb.JOB_STATE_ERROR { stoneKey := key.(string) log.Infow("gc memory stone", "key", stoneKey) hub.stone.Delete(stoneKey) @@ -234,7 +234,7 @@ func (hub *StoneHub) listenChain() { if event == nil { continue } - object := event.(*types.ObjectInfo) + object := event.(*ptypesv1pb.ObjectInfo) st, ok := hub.stone.Load(object.GetObjectId()) if !ok { log.Infow("receive seal event, stone has gone") diff --git a/service/stonehub/stone_hub_service.go b/service/stonehub/stone_hub_service.go index 0ccb3985d..0662b1a14 100644 --- a/service/stonehub/stone_hub_service.go +++ b/service/stonehub/stone_hub_service.go @@ -7,8 +7,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/pkg/stone" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -20,15 +20,14 @@ import ( * aim to complete uploading secondary storage provider. */ -var _ service.StoneHubServiceServer = &StoneHub{} +var _ stypesv1pb.StoneHubServiceServer = &StoneHub{} var _ Stone = &stone.UploadPayloadStone{} // CreateObject create job and object info, store the DB table, if already exists will return error -func (hub *StoneHub) CreateObject(ctx context.Context, - req *service.StoneHubServiceCreateObjectRequest) ( - *service.StoneHubServiceCreateObjectResponse, error) { +func (hub *StoneHub) CreateObject(ctx context.Context, req *stypesv1pb.StoneHubServiceCreateObjectRequest) ( + *stypesv1pb.StoneHubServiceCreateObjectResponse, error) { ctx = log.Context(ctx, req) - rsp := &service.StoneHubServiceCreateObjectResponse{ + rsp := &stypesv1pb.StoneHubServiceCreateObjectResponse{ TraceId: req.TraceId, } rsp.ErrMessage = merrors.MakeErrMsgResponse(merrors.ErrInterfaceAbandoned) @@ -61,11 +60,10 @@ func (hub *StoneHub) CreateObject(ctx context.Context, } // SetObjectCreateInfo set CreateObjectTX the height and object resource id on the inscription chain -func (hub *StoneHub) SetObjectCreateInfo(ctx context.Context, - req *service.StoneHubServiceSetObjectCreateInfoRequest) ( - *service.StoneHubServiceSetObjectCreateInfoResponse, error) { +func (hub *StoneHub) SetObjectCreateInfo(ctx context.Context, req *stypesv1pb.StoneHubServiceSetObjectCreateInfoRequest) ( + *stypesv1pb.StoneHubServiceSetObjectCreateInfoResponse, error) { ctx = log.Context(ctx, req) - rsp := &service.StoneHubServiceSetObjectCreateInfoResponse{TraceId: req.TraceId} + rsp := &stypesv1pb.StoneHubServiceSetObjectCreateInfoResponse{TraceId: req.TraceId} rsp.ErrMessage = merrors.MakeErrMsgResponse(merrors.ErrInterfaceAbandoned) log.CtxErrorw(ctx, "set object create info interface is abandoned") return rsp, nil @@ -95,11 +93,10 @@ func (hub *StoneHub) SetObjectCreateInfo(ctx context.Context, // BeginUploadPayload create upload payload stone and start the fsm to upload // if the job context or object info is nil in local, will query from inscription chain -func (hub *StoneHub) BeginUploadPayload(ctx context.Context, - req *service.StoneHubServiceBeginUploadPayloadRequest) ( - *service.StoneHubServiceBeginUploadPayloadResponse, error) { +func (hub *StoneHub) BeginUploadPayload(ctx context.Context, req *stypesv1pb.StoneHubServiceBeginUploadPayloadRequest) ( + *stypesv1pb.StoneHubServiceBeginUploadPayloadResponse, error) { ctx = log.Context(ctx, req) - rsp := &service.StoneHubServiceBeginUploadPayloadResponse{TraceId: req.TraceId} + rsp := &stypesv1pb.StoneHubServiceBeginUploadPayloadResponse{TraceId: req.TraceId} rsp.ErrMessage = merrors.MakeErrMsgResponse(merrors.ErrInterfaceAbandoned) log.CtxErrorw(ctx, "set object create info interface is abandoned") return rsp, nil @@ -189,11 +186,10 @@ func (hub *StoneHub) BeginUploadPayload(ctx context.Context, } // BeginUploadPayloadV2 merge CreateObject, SetObjectCreateInfo and BeginUploadPayload, special for heavy client use. -func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, - req *service.StoneHubServiceBeginUploadPayloadV2Request) ( - resp *service.StoneHubServiceBeginUploadPayloadV2Response, err error) { +func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, req *stypesv1pb.StoneHubServiceBeginUploadPayloadV2Request) ( + resp *stypesv1pb.StoneHubServiceBeginUploadPayloadV2Response, err error) { ctx = log.Context(ctx, req, req.GetObjectInfo()) - resp = &service.StoneHubServiceBeginUploadPayloadV2Response{ + resp = &stypesv1pb.StoneHubServiceBeginUploadPayloadV2Response{ TraceId: req.TraceId, } defer func() { @@ -229,11 +225,11 @@ func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, // TODO:: inline type check and change move to gate if req.GetObjectInfo().GetSize() <= model.InlineSize { - req.GetObjectInfo().RedundancyType = types.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE + req.GetObjectInfo().RedundancyType = ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE } var ( - jobCtx *types.JobContext + jobCtx *ptypesv1pb.JobContext uploadStone *stone.UploadPayloadStone ) // create upload stone @@ -262,11 +258,10 @@ func (hub *StoneHub) BeginUploadPayloadV2(ctx context.Context, } // DonePrimaryPieceJob set the primary piece job completed state -func (hub *StoneHub) DonePrimaryPieceJob(ctx context.Context, - req *service.StoneHubServiceDonePrimaryPieceJobRequest) ( - *service.StoneHubServiceDonePrimaryPieceJobResponse, error) { +func (hub *StoneHub) DonePrimaryPieceJob(ctx context.Context, req *stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest) ( + *stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse, error) { ctx = log.Context(ctx, req, req.GetPieceJob()) - resp := &service.StoneHubServiceDonePrimaryPieceJobResponse{TraceId: req.TraceId, TxHash: req.TxHash} + resp := &stypesv1pb.StoneHubServiceDonePrimaryPieceJobResponse{TraceId: req.TraceId, TxHash: req.TxHash} var ( uploadStone *stone.UploadPayloadStone job Stone @@ -298,7 +293,7 @@ func (hub *StoneHub) DonePrimaryPieceJob(ctx context.Context, return resp, nil } if req.GetErrMessage() != nil && req.GetErrMessage().GetErrCode() == - service.ErrCode_ERR_CODE_ERROR { + stypesv1pb.ErrCode_ERR_CODE_ERROR { interruptErr = errors.New(resp.GetErrMessage().GetErrMsg()) return resp, nil } @@ -324,18 +319,17 @@ func (hub *StoneHub) DonePrimaryPieceJob(ctx context.Context, } // AllocStoneJob pop the secondary piece job -func (hub *StoneHub) AllocStoneJob(ctx context.Context, - req *service.StoneHubServiceAllocStoneJobRequest) ( - *service.StoneHubServiceAllocStoneJobResponse, error) { +func (hub *StoneHub) AllocStoneJob(ctx context.Context, req *stypesv1pb.StoneHubServiceAllocStoneJobRequest) ( + *stypesv1pb.StoneHubServiceAllocStoneJobResponse, error) { ctx = log.Context(ctx, req) - resp := &service.StoneHubServiceAllocStoneJobResponse{TraceId: req.TraceId} + resp := &stypesv1pb.StoneHubServiceAllocStoneJobResponse{TraceId: req.TraceId} stoneJob := hub.ConsumeJob() if stoneJob == nil { log.CtxDebugw(ctx, "no stone job to dispatch") return resp, nil } switch job := stoneJob.(type) { - case *service.PieceJob: + case *stypesv1pb.PieceJob: resp.PieceJob = job default: resp.ErrMessage = merrors.MakeErrMsgResponse(merrors.ErrStoneJobTypeUnrecognized) @@ -345,11 +339,10 @@ func (hub *StoneHub) AllocStoneJob(ctx context.Context, } // DoneSecondaryPieceJob set the secondary piece job completed state -func (hub *StoneHub) DoneSecondaryPieceJob(ctx context.Context, - req *service.StoneHubServiceDoneSecondaryPieceJobRequest) ( - *service.StoneHubServiceDoneSecondaryPieceJobResponse, error) { +func (hub *StoneHub) DoneSecondaryPieceJob(ctx context.Context, req *stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest) ( + *stypesv1pb.StoneHubServiceDoneSecondaryPieceJobResponse, error) { ctx = log.Context(ctx, req, req.GetPieceJob()) - resp := &service.StoneHubServiceDoneSecondaryPieceJobResponse{TraceId: req.TraceId} + resp := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobResponse{TraceId: req.TraceId} var ( uploadStone *stone.UploadPayloadStone job Stone @@ -369,7 +362,7 @@ func (hub *StoneHub) DoneSecondaryPieceJob(ctx context.Context, log.CtxInfow(ctx, "done secondary piece job completed", "piece_idx", pieceIdx, "error", err) }() if req.GetErrMessage() != nil && req.GetErrMessage().GetErrCode() == - service.ErrCode_ERR_CODE_ERROR { + stypesv1pb.ErrCode_ERR_CODE_ERROR { interruptErr = errors.New(resp.GetErrMessage().GetErrMsg()) return resp, nil } @@ -405,9 +398,9 @@ func (hub *StoneHub) DoneSecondaryPieceJob(ctx context.Context, } // QueryStone return the stone info, debug interface -func (hub *StoneHub) QueryStone(ctx context.Context, req *service.StoneHubServiceQueryStoneRequest) (*service.StoneHubServiceQueryStoneResponse, error) { +func (hub *StoneHub) QueryStone(ctx context.Context, req *stypesv1pb.StoneHubServiceQueryStoneRequest) (*stypesv1pb.StoneHubServiceQueryStoneResponse, error) { ctx = log.Context(ctx, req) - rsp := &service.StoneHubServiceQueryStoneResponse{} + rsp := &stypesv1pb.StoneHubServiceQueryStoneResponse{} st := hub.GetStone(req.GetObjectId()) uploadStone := st.(*stone.UploadPayloadStone) diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index 5ae0dd9eb..4b6f9466d 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -6,8 +6,8 @@ import ( "google.golang.org/grpc" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" ) func setup(t *testing.T) *StoneNodeService { @@ -24,18 +24,18 @@ func setup(t *testing.T) *StoneNodeService { } } -func mockAllocResp(objectID uint64, payloadSize uint64, redundancyType ptypes.RedundancyType) *service.StoneHubServiceAllocStoneJobResponse { - return &service.StoneHubServiceAllocStoneJobResponse{ +func mockAllocResp(objectID uint64, payloadSize uint64, redundancyType ptypesv1pb.RedundancyType) *stypesv1pb.StoneHubServiceAllocStoneJobResponse { + return &stypesv1pb.StoneHubServiceAllocStoneJobResponse{ TraceId: "123456", TxHash: []byte("blockchain_one"), - PieceJob: &service.PieceJob{ + PieceJob: &stypesv1pb.PieceJob{ TxHash: []byte("blockchain_one"), ObjectId: objectID, PayloadSize: payloadSize, TargetIdx: nil, RedundancyType: redundancyType, }, - ErrMessage: &service.ErrMessage{ + ErrMessage: &stypesv1pb.ErrMessage{ ErrCode: 0, ErrMsg: "Success", }, @@ -72,33 +72,33 @@ func dispatchInlineMap() map[string][][]byte { func makeStreamMock() *StreamMock { return &StreamMock{ ctx: context.Background(), - recvToServer: make(chan *service.SyncerServiceSyncPieceRequest, 10), + recvToServer: make(chan *stypesv1pb.SyncerServiceSyncPieceRequest, 10), } } type StreamMock struct { grpc.ClientStream ctx context.Context - recvToServer chan *service.SyncerServiceSyncPieceRequest + recvToServer chan *stypesv1pb.SyncerServiceSyncPieceRequest } -func (m *StreamMock) Send(resp *service.SyncerServiceSyncPieceRequest) error { +func (m *StreamMock) Send(resp *stypesv1pb.SyncerServiceSyncPieceRequest) error { m.recvToServer <- resp return nil } -func (m *StreamMock) CloseAndRecv() (*service.SyncerServiceSyncPieceResponse, error) { - return &service.SyncerServiceSyncPieceResponse{ +func (m *StreamMock) CloseAndRecv() (*stypesv1pb.SyncerServiceSyncPieceResponse, error) { + return &stypesv1pb.SyncerServiceSyncPieceResponse{ TraceId: "test_traceID", - SecondarySpInfo: &service.StorageProviderSealInfo{ + SecondarySpInfo: &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: "sp1", PieceIdx: 1, PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")}, IntegrityHash: []byte("a"), Signature: nil, }, - ErrMessage: &service.ErrMessage{ - ErrCode: service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, + ErrMessage: &stypesv1pb.ErrMessage{ + ErrCode: stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, ErrMsg: "Success", }, }, nil diff --git a/service/stonenode/sync.go b/service/stonenode/sync.go index 2a78cff02..3b75d5274 100644 --- a/service/stonenode/sync.go +++ b/service/stonenode/sync.go @@ -11,15 +11,15 @@ import ( merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" "github.com/bnb-chain/greenfield-storage-provider/pkg/redundancy" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/hash" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) // syncPieceToSecondarySP load segment data from primary and sync to secondary. -func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) error { +func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocResp *stypesv1pb.StoneHubServiceAllocStoneJobResponse) error { // TBD:: check secondarySPs count by redundancyType. // EC_TYPE need EC_M + EC_K + backup // REPLICA_TYPE and INLINE_TYPE need segments count + backup @@ -57,11 +57,11 @@ func (node *StoneNodeService) syncPieceToSecondarySP(ctx context.Context, allocR return nil } -func checkRedundancyType(redundancyType ptypes.RedundancyType) error { +func checkRedundancyType(redundancyType ptypesv1pb.RedundancyType) error { switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: return nil - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: return nil default: return merrors.ErrRedundancyType @@ -70,7 +70,7 @@ func checkRedundancyType(redundancyType ptypes.RedundancyType) error { // loadSegmentsData load segment data from primary storage provider. // returned map key is segmentKey, value is corresponding ec data from ec1 to ec6, or segment data -func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *service.StoneHubServiceAllocStoneJobResponse) ( +func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *stypesv1pb.StoneHubServiceAllocStoneJobResponse) ( map[string][][]byte, error) { type segment struct { objectID uint64 @@ -78,7 +78,7 @@ func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *s segmentData []byte pieceData [][]byte pieceErr error - redundancyType ptypes.RedundancyType + redundancyType ptypesv1pb.RedundancyType } var ( doneSegments int64 @@ -161,15 +161,15 @@ func (node *StoneNodeService) loadSegmentsData(ctx context.Context, allocResp *s } // spiltSegmentData spilt segment data into pieces data. -func (node *StoneNodeService) generatePieceData(redundancyType ptypes.RedundancyType, segmentData []byte) ( +func (node *StoneNodeService) generatePieceData(redundancyType ptypesv1pb.RedundancyType, segmentData []byte) ( pieceData [][]byte, err error) { switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED: pieceData, err = redundancy.EncodeRawSegment(segmentData) if err != nil { return } - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: pieceData = append(pieceData, segmentData) default: return nil, merrors.ErrRedundancyType @@ -179,7 +179,7 @@ func (node *StoneNodeService) generatePieceData(redundancyType ptypes.Redundancy // dispatchSecondarySP dispatch piece data to secondary storage provider. // returned map key is spID, value map key is ec piece key or segment key, value map's value is piece data -func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string][][]byte, redundancyType ptypes.RedundancyType, +func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string][][]byte, redundancyType ptypesv1pb.RedundancyType, secondarySPs []string, targetIdx []uint32) (map[string]map[string][]byte, error) { pieceDataBySecondary := make(map[string]map[string][]byte) @@ -210,7 +210,7 @@ func (node *StoneNodeService) dispatchSecondarySP(pieceDataBySegment map[string] } func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, targetIdx []uint32, - redundancyType ptypes.RedundancyType) (map[string]map[string][]byte, error) { + redundancyType ptypesv1pb.RedundancyType) (map[string]map[string][]byte, error) { ecPieceDataMap := make(map[string]map[string][]byte) for pieceKey, pieceData := range pieceDataBySegment { //if len(pieceData) != 6 { @@ -237,7 +237,7 @@ func fillECData(pieceDataBySegment map[string][][]byte, secondarySPs []string, t for _, index := range targetIdx { if int(index) == idx { switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: ecPieceDataMap[sp][pieceKey] = data default: key := piecestore.EncodeECPieceKeyBySegmentKey(pieceKey, uint32(idx)) @@ -291,7 +291,7 @@ func fillReplicaOrInlineData(pieceDataBySegment map[string][][]byte, secondarySP } // doSyncToSecondarySP send piece data to the secondary. -func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *service.StoneHubServiceAllocStoneJobResponse, +func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *stypesv1pb.StoneHubServiceAllocStoneJobResponse, pieceDataBySecondary map[string]map[string][]byte) error { var ( objectID = resp.GetPieceJob().GetObjectId() @@ -302,8 +302,8 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser ) for secondary, pieceData := range pieceDataBySecondary { go func(secondary string, pieceData map[string][]byte) { - errMsg := &service.ErrMessage{} - pieceJob := &service.PieceJob{ + errMsg := &stypesv1pb.ErrMessage{} + pieceJob := &stypesv1pb.PieceJob{ TxHash: txHash, ObjectId: objectID, PayloadSize: payloadSize, @@ -312,7 +312,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser defer func() { // notify stone hub when an ec segment is done - req := &service.StoneHubServiceDoneSecondaryPieceJobRequest{ + req := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest{ TraceId: resp.GetTraceId(), TxHash: pieceJob.GetTxHash(), PieceJob: pieceJob, @@ -325,7 +325,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser } }() - syncResp, err := node.syncPiece(ctx, &service.SyncerInfo{ + syncResp, err := node.syncPiece(ctx, &stypesv1pb.SyncerInfo{ ObjectId: objectID, TxHash: txHash, StorageProviderId: secondary, @@ -335,7 +335,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser // TBD:: retry alloc secondary sp and rat again. if err != nil { log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err) - errMsg.ErrCode = service.ErrCode_ERR_CODE_ERROR + errMsg.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = err.Error() return } @@ -350,7 +350,7 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser if syncResp.GetSecondarySpInfo() == nil || syncResp.GetSecondarySpInfo().GetIntegrityHash() == nil || !bytes.Equal(integrityHash, syncResp.GetSecondarySpInfo().GetIntegrityHash()) { log.CtxErrorw(ctx, "secondary integrity hash check error") - errMsg.ErrCode = service.ErrCode_ERR_CODE_ERROR + errMsg.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() return } @@ -364,16 +364,16 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *ser } // reportErrToStoneHub send error message to stone hub. -func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *service.StoneHubServiceAllocStoneJobResponse, +func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *stypesv1pb.StoneHubServiceAllocStoneJobResponse, reportErr error) { if reportErr == nil { return } - req := &service.StoneHubServiceDoneSecondaryPieceJobRequest{ + req := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest{ TraceId: resp.GetTraceId(), TxHash: resp.GetTxHash(), - ErrMessage: &service.ErrMessage{ - ErrCode: service.ErrCode_ERR_CODE_ERROR, + ErrMessage: &stypesv1pb.ErrMessage{ + ErrCode: stypesv1pb.ErrCode_ERR_CODE_ERROR, ErrMsg: reportErr.Error(), }, } @@ -385,8 +385,8 @@ func (node *StoneNodeService) reportErrToStoneHub(ctx context.Context, resp *ser } // SyncPiece send rpc request to secondary storage provider to sync the piece data. -func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *service.SyncerInfo, - pieceData map[string][]byte, traceID string) (*service.SyncerServiceSyncPieceResponse, error) { +func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypesv1pb.SyncerInfo, + pieceData map[string][]byte, traceID string) (*stypesv1pb.SyncerServiceSyncPieceResponse, error) { log.CtxInfow(ctx, "stone node upload piece data", "redundancy_type", syncerInfo.GetRedundancyType(), "spID", util.SpReadable(syncerInfo.GetStorageProviderId()), "length", len(pieceData)) stream, err := node.syncer.SyncPiece(ctx) @@ -399,7 +399,7 @@ func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *service for key, value := range pieceData { innerMap := make(map[string][]byte) innerMap[key] = value - if err := stream.Send(&service.SyncerServiceSyncPieceRequest{ + if err := stream.Send(&stypesv1pb.SyncerServiceSyncPieceRequest{ TraceId: traceID, SyncerInfo: syncerInfo, PieceData: innerMap, @@ -414,7 +414,7 @@ func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *service log.Errorw("client close error", "error", err, "traceID", resp.GetTraceId()) return nil, err } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { log.Errorw("sync piece sends to stone node response code is not success", "error", err, "traceID", resp.GetTraceId()) return nil, errors.New(resp.GetErrMessage().GetErrMsg()) } diff --git a/service/stonenode/sync_test.go b/service/stonenode/sync_test.go index 23c265746..bf739dbce 100644 --- a/service/stonenode/sync_test.go +++ b/service/stonenode/sync_test.go @@ -10,9 +10,9 @@ import ( "google.golang.org/grpc" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" ) func TestInitClientFailed(t *testing.T) { @@ -30,7 +30,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name string req1 uint64 req2 uint64 - req3 ptypes.RedundancyType + req3 ptypesv1pb.RedundancyType wantedResult1 string wantedResult2 int wantedErr error @@ -39,7 +39,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name: "ec type: payload size greater than 16MB", req1: 20230109001, req2: 20 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, wantedResult1: "20230109001", wantedResult2: 2, wantedErr: nil, @@ -48,7 +48,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name: "ec type: payload size less than 16MB and greater than 1MB", req1: 20230109002, req2: 15 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req3: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, wantedResult1: "20230109002", wantedResult2: 1, wantedErr: nil, @@ -57,7 +57,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name: "replica type: payload size greater than 16MB", req1: 20230109003, req2: 20 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, wantedResult1: "20230109003", wantedResult2: 2, wantedErr: nil, @@ -66,7 +66,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name: "replica type: payload size less than 16MB and greater than 1MB", req1: 20230109004, req2: 15 * 1024 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req3: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, wantedResult1: "20230109004", wantedResult2: 1, wantedErr: nil, @@ -75,7 +75,7 @@ func Test_loadSegmentsDataSuccess(t *testing.T) { name: "inline type: payload size less than 1MB", req1: 20230109005, req2: 1000 * 1024, - req3: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + req3: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, wantedResult1: "20230109005", wantedResult2: 1, wantedErr: nil, @@ -119,7 +119,7 @@ func Test_loadSegmentsDataPieceStoreError(t *testing.T) { }).AnyTimes() result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109001, 20*1024*1024, - ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) + ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED)) assert.Equal(t, errors.New("piece store s3 network error"), err) assert.Equal(t, 0, len(result)) } @@ -137,7 +137,7 @@ func Test_loadSegmentsDataUnknownRedundancyError(t *testing.T) { }).AnyTimes() result, err := node.loadSegmentsData(context.TODO(), mockAllocResp(20230109006, 20*1024*1024, - ptypes.RedundancyType(-1))) + ptypesv1pb.RedundancyType(-1))) assert.Equal(t, merrors.ErrRedundancyType, err) assert.Equal(t, 0, len(result)) } @@ -145,35 +145,35 @@ func Test_loadSegmentsDataUnknownRedundancyError(t *testing.T) { func Test_generatePieceData(t *testing.T) { cases := []struct { name string - req1 ptypes.RedundancyType + req1 ptypesv1pb.RedundancyType req2 []byte wantedResult int wantedErr error }{ { name: "ec type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req1: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, req2: []byte("1"), wantedResult: 6, wantedErr: nil, }, { name: "replica type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req1: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req2: []byte("1"), wantedResult: 1, wantedErr: nil, }, { name: "inline type", - req1: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req1: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req2: []byte("1"), wantedResult: 1, wantedErr: nil, }, { name: "unknown redundancy type", - req1: ptypes.RedundancyType(-1), + req1: ptypesv1pb.RedundancyType(-1), req2: []byte("1"), wantedResult: 0, wantedErr: merrors.ErrRedundancyType, @@ -195,7 +195,7 @@ func Test_dispatchSecondarySP(t *testing.T) { cases := []struct { name string req1 map[string][][]byte - req2 ptypes.RedundancyType + req2 ptypesv1pb.RedundancyType req3 []string req4 []uint32 wantedResult int @@ -204,7 +204,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "ec type dispatch", req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, req3: spList, req4: []uint32{0, 1, 2, 3, 4, 5}, wantedResult: 6, @@ -213,7 +213,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "replica type dispatch", req1: dispatchSegmentMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req3: spList, req4: []uint32{0, 1, 2}, wantedResult: 1, @@ -222,7 +222,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "inline type dispatch", req1: dispatchInlineMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, req3: spList, req4: []uint32{0}, wantedResult: 1, @@ -231,7 +231,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "ec type data retransmission", req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, req3: spList, req4: []uint32{2, 3}, wantedResult: 2, @@ -240,7 +240,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "replica type data retransmission", req1: dispatchSegmentMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, req3: spList, req4: []uint32{1, 2}, wantedResult: 0, @@ -249,7 +249,7 @@ func Test_dispatchSecondarySP(t *testing.T) { { name: "wrong secondary sp number", req1: dispatchPieceMap(), - req2: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + req2: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, req3: []string{}, req4: []uint32{0, 1, 2, 3, 4, 5}, wantedResult: 0, @@ -299,7 +299,7 @@ func Test_doSyncToSecondarySP(t *testing.T) { } cases := []struct { name string - req1 *service.StoneHubServiceAllocStoneJobResponse + req1 *stypesv1pb.StoneHubServiceAllocStoneJobResponse req2 map[string]map[string][]byte }{ { @@ -327,13 +327,13 @@ func Test_doSyncToSecondarySP(t *testing.T) { syncer := mock.NewMockSyncerAPI(ctrl) node.syncer = syncer syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_SyncPieceClient, error) { + func(ctx context.Context, opts ...grpc.CallOption) (stypesv1pb.SyncerService_SyncPieceClient, error) { return streamClient, nil }).AnyTimes() for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) + allocResp := mockAllocResp(123456, 20*1024*1024, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req2) assert.Equal(t, nil, err) }) @@ -349,15 +349,15 @@ func TestSyncPieceSuccess(t *testing.T) { syncer := mock.NewMockSyncerAPI(ctrl) node.syncer = syncer syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (service.SyncerService_SyncPieceClient, error) { + func(ctx context.Context, opts ...grpc.CallOption) (stypesv1pb.SyncerService_SyncPieceClient, error) { return streamClient, nil }).AnyTimes() - sInfo := &service.SyncerInfo{ + sInfo := &stypesv1pb.SyncerInfo{ ObjectId: 123456, TxHash: []byte("i"), StorageProviderId: "sp1", - RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + RedundancyType: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, } data := map[string][]byte{ "123456_s0_p0": []byte("test1"), diff --git a/service/syncer/server.go b/service/syncer/server.go index 474c79aa4..34e36750f 100644 --- a/service/syncer/server.go +++ b/service/syncer/server.go @@ -12,7 +12,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/service/client" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb/leveldb" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -107,7 +107,7 @@ func (s *Syncer) serve(errCh chan error) { return } grpcServer := grpc.NewServer(grpc.MaxSendMsgSize(model.MaxCallMsgSize), grpc.MaxRecvMsgSize(model.MaxCallMsgSize)) - service.RegisterSyncerServiceServer(grpcServer, s) + stypesv1pb.RegisterSyncerServiceServer(grpcServer, s) reflection.Register(grpcServer) if err = grpcServer.Serve(lis); err != nil { log.Errorw("syncer serve failed", "error", err) diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index a0d8d54cc..b06c595a7 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -7,8 +7,8 @@ import ( merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" - ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/hash" @@ -16,7 +16,7 @@ import ( ) // SyncPiece syncs piece data to secondary storage provider -func (s *Syncer) SyncPiece(stream service.SyncerService_SyncPieceServer) error { +func (s *Syncer) SyncPiece(stream stypesv1pb.SyncerService_SyncPieceServer) error { var count uint32 var integrityMeta *metadb.IntegrityMeta var key string @@ -49,11 +49,11 @@ func (s *Syncer) SyncPiece(stream service.SyncerService_SyncPieceServer) error { if err := s.setIntegrityMeta(s.metaDB, integrityMeta); err != nil { return err } - resp := &service.SyncerServiceSyncPieceResponse{ + resp := &stypesv1pb.SyncerServiceSyncPieceResponse{ TraceId: req.GetTraceId(), SecondarySpInfo: sealInfo, - ErrMessage: &service.ErrMessage{ - ErrCode: service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, + ErrMessage: &stypesv1pb.ErrMessage{ + ErrCode: stypesv1pb.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, ErrMsg: "success", }, } @@ -83,7 +83,7 @@ func (s *Syncer) setIntegrityMeta(db metadb.MetaDB, meta *metadb.IntegrityMeta) return nil } -func generateSealInfo(spID string, integrityMeta *metadb.IntegrityMeta) *service.StorageProviderSealInfo { +func generateSealInfo(spID string, integrityMeta *metadb.IntegrityMeta) *stypesv1pb.StorageProviderSealInfo { keys := util.GenericSortedKeys(integrityMeta.PieceHash) pieceChecksumList := make([][]byte, 0) var integrityHash []byte @@ -92,7 +92,7 @@ func generateSealInfo(spID string, integrityMeta *metadb.IntegrityMeta) *service pieceChecksumList = append(pieceChecksumList, value) } integrityHash = hash.GenerateIntegrityHash(pieceChecksumList) - resp := &service.StorageProviderSealInfo{ + resp := &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: spID, PieceIdx: integrityMeta.PieceIdx, PieceChecksum: pieceChecksumList, @@ -102,7 +102,7 @@ func generateSealInfo(spID string, integrityMeta *metadb.IntegrityMeta) *service return resp } -func (s *Syncer) handlePieceData(req *service.SyncerServiceSyncPieceRequest) (*metadb.IntegrityMeta, string, []byte, error) { +func (s *Syncer) handlePieceData(req *stypesv1pb.SyncerServiceSyncPieceRequest) (*metadb.IntegrityMeta, string, []byte, error) { if len(req.GetPieceData()) != 1 { return nil, "", nil, errors.New("the length of piece data map is not equal to 1") } @@ -134,13 +134,13 @@ func (s *Syncer) handlePieceData(req *service.SyncerServiceSyncPieceRequest) (*m return integrityMeta, key, value, nil } -func parsePieceIndex(redundancyType ptypes.RedundancyType, key string) (uint32, error) { +func parsePieceIndex(redundancyType ptypesv1pb.RedundancyType, key string) (uint32, error) { var ( err error pieceIndex uint32 ) switch redundancyType { - case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: + case ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE: _, pieceIndex, err = piecestore.DecodeSegmentPieceKey(key) default: // ec type _, _, pieceIndex, err = piecestore.DecodeECPieceKey(key) diff --git a/service/uploader/stream_reader.go b/service/uploader/stream_reader.go index 29ef213a5..8e7778505 100644 --- a/service/uploader/stream_reader.go +++ b/service/uploader/stream_reader.go @@ -5,8 +5,8 @@ import ( "io" "sync" - pbPkg "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - pbService "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/util/log" ) @@ -21,11 +21,11 @@ type streamReader struct { bucket string object string size uint64 - redundancyType pbPkg.RedundancyType + redundancyType ptypesv1pb.RedundancyType } // initSteamReaderOnce init stream reader content, is not thread safely. -func (sr *streamReader) initSteamReaderOnce(req *pbService.UploaderServiceUploadPayloadRequest) error { +func (sr *streamReader) initSteamReaderOnce(req *stypesv1pb.UploaderServiceUploadPayloadRequest) error { if sr.txHash == nil { sr.txHash = req.TxHash sr.traceID = req.TraceId @@ -35,7 +35,7 @@ func (sr *streamReader) initSteamReaderOnce(req *pbService.UploaderServiceUpload } // newStreamReader is used to stream read UploaderService_UploadPayloadServer. -func newStreamReader(stream pbService.UploaderService_UploadPayloadServer, ch chan []byte) *streamReader { +func newStreamReader(stream stypesv1pb.UploaderService_UploadPayloadServer, ch chan []byte) *streamReader { var sr = &streamReader{} sr.pr, sr.pw = io.Pipe() go func() { @@ -65,7 +65,7 @@ func newStreamReader(stream pbService.UploaderService_UploadPayloadServer, ch ch } // initSteamReaderOnceV2 init stream reader content, is not thread safely. -func (sr *streamReader) initSteamReaderOnceV2(req *pbService.UploaderServiceUploadPayloadV2Request) error { +func (sr *streamReader) initSteamReaderOnceV2(req *stypesv1pb.UploaderServiceUploadPayloadV2Request) error { if sr.txHash == nil { sr.txHash = req.TxHash sr.traceID = req.TraceId @@ -79,7 +79,7 @@ func (sr *streamReader) initSteamReaderOnceV2(req *pbService.UploaderServiceUplo } // newStreamReaderV2 is used to stream read UploaderService_UploadPayloadV2Server. -func newStreamReaderV2(stream pbService.UploaderService_UploadPayloadV2Server, ch chan []byte) *streamReader { +func newStreamReaderV2(stream stypesv1pb.UploaderService_UploadPayloadV2Server, ch chan []byte) *streamReader { var sr = &streamReader{} sr.pr, sr.pw = io.Pipe() go func() { diff --git a/service/uploader/uploader.go b/service/uploader/uploader.go index ee09211d4..4a1bc8a02 100644 --- a/service/uploader/uploader.go +++ b/service/uploader/uploader.go @@ -13,7 +13,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/mock" "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/service/client" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/store/metadb/leveldb" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -102,7 +102,7 @@ func (uploader *Uploader) serve(errCh chan error) { } grpcServer := grpc.NewServer() - service.RegisterUploaderServiceServer(grpcServer, uploader) + stypesv1pb.RegisterUploaderServiceServer(grpcServer, uploader) uploader.grpcServer = grpcServer reflection.Register(grpcServer) if err := grpcServer.Serve(lis); err != nil { diff --git a/service/uploader/uploader_service.go b/service/uploader/uploader_service.go index f3b9d21b3..ff453c46f 100644 --- a/service/uploader/uploader_service.go +++ b/service/uploader/uploader_service.go @@ -11,8 +11,8 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/model" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/model/piecestore" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - pbService "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/metadb" "github.com/bnb-chain/greenfield-storage-provider/util/hash" "github.com/bnb-chain/greenfield-storage-provider/util/log" @@ -23,12 +23,13 @@ var ( ) // CreateObject handle grpc CreateObject request, send create object tx to chain -func (uploader *Uploader) CreateObject(ctx context.Context, req *pbService.UploaderServiceCreateObjectRequest) (resp *pbService.UploaderServiceCreateObjectResponse, err error) { +func (uploader *Uploader) CreateObject(ctx context.Context, req *stypesv1pb.UploaderServiceCreateObjectRequest) ( + resp *stypesv1pb.UploaderServiceCreateObjectResponse, err error) { ctx = log.Context(ctx, req, req.GetObjectInfo()) - resp = &pbService.UploaderServiceCreateObjectResponse{TraceId: req.GetTraceId()} - defer func(r *pbService.UploaderServiceCreateObjectResponse, err error) { + resp = &stypesv1pb.UploaderServiceCreateObjectResponse{TraceId: req.GetTraceId()} + defer func(r *stypesv1pb.UploaderServiceCreateObjectResponse, err error) { if err != nil { - r.ErrMessage.ErrCode = pbService.ErrCode_ERR_CODE_ERROR + r.ErrMessage.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR r.ErrMessage.ErrMsg = err.Error() log.CtxErrorw(ctx, "create object failed", "error", err) } @@ -41,7 +42,7 @@ func (uploader *Uploader) CreateObject(ctx context.Context, req *pbService.Uploa // 3.1 subscribe inscription chain create object event createObjectCh := uploader.eventWaiter.SubscribeEvent(mock.CreateObject) // 3.2 register the object info to stone hub - if _, err = uploader.stoneHub.CreateObject(ctx, &pbService.StoneHubServiceCreateObjectRequest{ + if _, err = uploader.stoneHub.CreateObject(ctx, &stypesv1pb.StoneHubServiceCreateObjectRequest{ TraceId: req.TraceId, TxHash: txHash, ObjectInfo: req.ObjectInfo, @@ -50,14 +51,14 @@ func (uploader *Uploader) CreateObject(ctx context.Context, req *pbService.Uploa } // 3.3 wait create object tx to inscription chain createObjectTimer := time.After(CreateObjectTimeout) - var objectInfo *types.ObjectInfo + var objectInfo *ptypesv1pb.ObjectInfo for { if objectInfo != nil { break } select { case event := <-createObjectCh: - object := event.(*types.ObjectInfo) + object := event.(*ptypesv1pb.ObjectInfo) if bytes.Equal(object.TxHash, txHash) { objectInfo = object } @@ -71,7 +72,7 @@ func (uploader *Uploader) CreateObject(ctx context.Context, req *pbService.Uploa return } // 4. update object height and object id to stone hub - if _, err = uploader.stoneHub.SetObjectCreateInfo(ctx, &pbService.StoneHubServiceSetObjectCreateInfoRequest{ + if _, err = uploader.stoneHub.SetObjectCreateInfo(ctx, &stypesv1pb.StoneHubServiceSetObjectCreateInfoRequest{ TraceId: req.TraceId, TxHash: txHash, TxHeight: objectInfo.Height, @@ -89,20 +90,20 @@ func (uploader *Uploader) CreateObject(ctx context.Context, req *pbService.Uploa // // 2.1 fetch upload job meta from stone hub; // 2.2 upload segment to piece store, and report job progress. -func (uploader *Uploader) UploadPayload(stream pbService.UploaderService_UploadPayloadServer) (err error) { +func (uploader *Uploader) UploadPayload(stream stypesv1pb.UploaderService_UploadPayloadServer) (err error) { var ( txChan = make(chan []byte) pieceChan = make(chan *SegmentContext, 500) wg sync.WaitGroup - resp pbService.UploaderServiceUploadPayloadResponse + resp stypesv1pb.UploaderServiceUploadPayloadResponse waitDone = make(chan bool) errChan = make(chan error) ctx = context.Background() sr *streamReader ) - defer func(resp *pbService.UploaderServiceUploadPayloadResponse, err error) { + defer func(resp *stypesv1pb.UploaderServiceUploadPayloadResponse, err error) { if err != nil { - resp.ErrMessage.ErrCode = pbService.ErrCode_ERR_CODE_ERROR + resp.ErrMessage.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR resp.ErrMessage.ErrMsg = err.Error() } err = stream.SendAndClose(resp) @@ -172,14 +173,14 @@ type JobMeta struct { objectID uint64 toUploadedIDs map[uint32]bool txHash []byte - pieceJob *pbService.PieceJob + pieceJob *stypesv1pb.PieceJob done bool } // fetchJobMeta fetch job meta from stone hub. func (uploader *Uploader) fetchJobMeta(ctx context.Context, txHash []byte) (*JobMeta, error) { traceID, _ := ctx.Value("traceID").(string) - resp, err := uploader.stoneHub.BeginUploadPayload(ctx, &pbService.StoneHubServiceBeginUploadPayloadRequest{ + resp, err := uploader.stoneHub.BeginUploadPayload(ctx, &stypesv1pb.StoneHubServiceBeginUploadPayloadRequest{ TraceId: traceID, TxHash: txHash, }) @@ -210,17 +211,17 @@ func (uploader *Uploader) fetchJobMeta(ctx context.Context, txHash []byte) (*Job // reportJobProgress report done piece index to stone hub. func (uploader *Uploader) reportJobProgress(ctx context.Context, jm *JobMeta, uploadID uint32, checkSum []byte) error { var ( - req *pbService.StoneHubServiceDonePrimaryPieceJobRequest - pieceJob pbService.PieceJob + req *stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest + pieceJob stypesv1pb.PieceJob ) traceID, _ := ctx.Value("traceID").(string) pieceJob = *jm.pieceJob - pieceJob.StorageProviderSealInfo = &pbService.StorageProviderSealInfo{ + pieceJob.StorageProviderSealInfo = &stypesv1pb.StorageProviderSealInfo{ StorageProviderId: uploader.config.StorageProvider, PieceIdx: uploadID, PieceChecksum: [][]byte{checkSum}, } - req = &pbService.StoneHubServiceDonePrimaryPieceJobRequest{ + req = &stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest{ TraceId: traceID, TxHash: jm.txHash, PieceJob: &pieceJob, @@ -232,7 +233,8 @@ func (uploader *Uploader) reportJobProgress(ctx context.Context, jm *JobMeta, up } // GetAuthentication get auth info, currently PreSignature is mocked. -func (uploader *Uploader) GetAuthentication(ctx context.Context, req *pbService.UploaderServiceGetAuthenticationRequest) (resp *pbService.UploaderServiceGetAuthenticationResponse, err error) { +func (uploader *Uploader) GetAuthentication(ctx context.Context, req *stypesv1pb.UploaderServiceGetAuthenticationRequest) ( + resp *stypesv1pb.UploaderServiceGetAuthenticationResponse, err error) { ctx = log.Context(ctx, req) defer func() { if err != nil { @@ -243,7 +245,7 @@ func (uploader *Uploader) GetAuthentication(ctx context.Context, req *pbService. } }() - resp = &pbService.UploaderServiceGetAuthenticationResponse{TraceId: req.TraceId} + resp = &stypesv1pb.UploaderServiceGetAuthenticationResponse{TraceId: req.TraceId} meta := &metadb.UploadPayloadAskingMeta{ BucketName: req.Bucket, ObjectName: req.Object, @@ -260,20 +262,20 @@ func (uploader *Uploader) GetAuthentication(ctx context.Context, req *pbService. } // UploadPayloadV2 merge CreateObject, SetObjectCreateInfo and BeginUploadPayload, special for heavy client use. -func (uploader *Uploader) UploadPayloadV2(stream pbService.UploaderService_UploadPayloadV2Server) (err error) { +func (uploader *Uploader) UploadPayloadV2(stream stypesv1pb.UploaderService_UploadPayloadV2Server) (err error) { var ( txChan = make(chan []byte) pieceChan = make(chan *SegmentContext, 500) wg sync.WaitGroup - resp pbService.UploaderServiceUploadPayloadV2Response + resp stypesv1pb.UploaderServiceUploadPayloadV2Response waitDone = make(chan bool) errChan = make(chan error) ctx = context.Background() sr *streamReader ) - defer func(resp *pbService.UploaderServiceUploadPayloadV2Response, err error) { + defer func(resp *stypesv1pb.UploaderServiceUploadPayloadV2Response, err error) { if err != nil { - resp.ErrMessage.ErrCode = pbService.ErrCode_ERR_CODE_ERROR + resp.ErrMessage.ErrCode = stypesv1pb.ErrCode_ERR_CODE_ERROR resp.ErrMessage.ErrMsg = err.Error() } err = stream.SendAndClose(resp) @@ -341,11 +343,11 @@ func (uploader *Uploader) UploadPayloadV2(stream pbService.UploaderService_Uploa // checkAndPrepareMeta check auth by metaDB, and then get meta from stoneHub. func (uploader *Uploader) checkAndPrepareMeta(sr *streamReader, txHash []byte) (*JobMeta, error) { - objectInfo := &types.ObjectInfo{ + objectInfo := &ptypesv1pb.ObjectInfo{ BucketName: sr.bucket, ObjectName: sr.object, Size: sr.size, - PrimarySp: &types.StorageProviderInfo{SpId: uploader.config.StorageProvider}, + PrimarySp: &ptypesv1pb.StorageProviderInfo{SpId: uploader.config.StorageProvider}, RedundancyType: sr.redundancyType, } uploader.eventWaiter.CreateObjectByName(txHash, objectInfo) @@ -359,7 +361,7 @@ func (uploader *Uploader) checkAndPrepareMeta(sr *streamReader, txHash []byte) ( err = errors.New("auth info has timeout") return nil, err } - resp, err := uploader.stoneHub.BeginUploadPayloadV2(context.Background(), &pbService.StoneHubServiceBeginUploadPayloadV2Request{ + resp, err := uploader.stoneHub.BeginUploadPayloadV2(context.Background(), &stypesv1pb.StoneHubServiceBeginUploadPayloadV2Request{ TraceId: sr.traceID, ObjectInfo: objectInfo, }) diff --git a/store/jobdb/job_db.go b/store/jobdb/job_db.go index 0d050b78f..f54779bb3 100644 --- a/store/jobdb/job_db.go +++ b/store/jobdb/job_db.go @@ -1,6 +1,6 @@ package jobdb -import types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" +import ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" // PieceJob record the piece job context, interact with db. // For primary: @@ -24,11 +24,11 @@ type PieceJob struct { } type JobDB interface { - CreateUploadPayloadJob(txHash []byte, info *types.ObjectInfo) (uint64, error) + CreateUploadPayloadJob(txHash []byte, info *ptypesv1pb.ObjectInfo) (uint64, error) SetObjectCreateHeightAndObjectID(txHash []byte, height uint64, objectID uint64) error - GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) - GetJobContext(jobId uint64) (*types.JobContext, error) + GetObjectInfo(txHash []byte) (*ptypesv1pb.ObjectInfo, error) + GetJobContext(jobId uint64) (*ptypesv1pb.JobContext, error) SetUploadPayloadJobState(jobId uint64, state string, timestamp int64) error SetUploadPayloadJobJobError(jobID uint64, jobState string, jobErr string, timestamp int64) error diff --git a/store/jobdb/jobmemory/job_mem_db.go b/store/jobdb/jobmemory/job_mem_db.go index 784a33dbb..0c4135d99 100644 --- a/store/jobdb/jobmemory/job_mem_db.go +++ b/store/jobdb/jobmemory/job_mem_db.go @@ -4,15 +4,15 @@ import ( "errors" "sync" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" ) // MemJobDB is a memory db, maintains job, object and piece job table. type MemJobDB struct { JobCount uint64 - JobTable map[uint64]types.JobContext - ObjectTable map[string]types.ObjectInfo + JobTable map[uint64]ptypesv1pb.JobContext + ObjectTable map[string]ptypesv1pb.ObjectInfo PrimaryPieceJobTable map[string]map[uint32]jobdb.PieceJob SecondaryPieceJobTable map[string]map[uint32]jobdb.PieceJob mu sync.RWMutex @@ -22,23 +22,23 @@ type MemJobDB struct { func NewMemJobDB() *MemJobDB { return &MemJobDB{ JobCount: 0, - JobTable: make(map[uint64]types.JobContext), - ObjectTable: make(map[string]types.ObjectInfo), + JobTable: make(map[uint64]ptypesv1pb.JobContext), + ObjectTable: make(map[string]ptypesv1pb.ObjectInfo), PrimaryPieceJobTable: make(map[string]map[uint32]jobdb.PieceJob), SecondaryPieceJobTable: make(map[string]map[uint32]jobdb.PieceJob), } } // CreateUploadPayloadJob create a job info for special object. -func (db *MemJobDB) CreateUploadPayloadJob(txHash []byte, info *types.ObjectInfo) (uint64, error) { +func (db *MemJobDB) CreateUploadPayloadJob(txHash []byte, info *ptypesv1pb.ObjectInfo) (uint64, error) { if info == nil { return 0, errors.New("object info is nil") } db.mu.Lock() defer db.mu.Unlock() - db.JobTable[db.JobCount] = types.JobContext{ + db.JobTable[db.JobCount] = ptypesv1pb.JobContext{ JobId: db.JobCount, - JobState: types.JobState_JOB_STATE_CREATE_OBJECT_DONE, + JobState: ptypesv1pb.JobState_JOB_STATE_CREATE_OBJECT_DONE, } info.JobId = db.JobCount db.ObjectTable[string(txHash)] = *info @@ -62,7 +62,7 @@ func (db *MemJobDB) SetObjectCreateHeightAndObjectID(txHash []byte, height uint6 } // GetObjectInfo returns the object info by create object transaction hash. -func (db *MemJobDB) GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) { +func (db *MemJobDB) GetObjectInfo(txHash []byte) (*ptypesv1pb.ObjectInfo, error) { db.mu.RLock() defer db.mu.RUnlock() objectInfo, ok := db.ObjectTable[string(txHash)] @@ -73,7 +73,7 @@ func (db *MemJobDB) GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) { } // GetJobContext returns the job info . -func (db *MemJobDB) GetJobContext(jobId uint64) (*types.JobContext, error) { +func (db *MemJobDB) GetJobContext(jobId uint64) (*ptypesv1pb.JobContext, error) { db.mu.RLock() defer db.mu.RUnlock() job, ok := db.JobTable[jobId] @@ -91,11 +91,11 @@ func (db *MemJobDB) SetUploadPayloadJobState(jobId uint64, state string, timesta if !ok { return errors.New("job is not exist") } - jobState, ok := types.JobState_value[state] + jobState, ok := ptypesv1pb.JobState_value[state] if !ok { return errors.New("state is not correct job state") } - job.JobState = (types.JobState)(jobState) + job.JobState = (ptypesv1pb.JobState)(jobState) job.ModifyTime = timestamp db.JobTable[jobId] = job return nil @@ -109,11 +109,11 @@ func (db *MemJobDB) SetUploadPayloadJobJobError(jobId uint64, state string, jobE if !ok { return errors.New("job is not exist") } - jobState, ok := types.JobState_value[state] + jobState, ok := ptypesv1pb.JobState_value[state] if !ok { return errors.New("state is not correct job state") } - job.JobState = (types.JobState)(jobState) + job.JobState = (ptypesv1pb.JobState)(jobState) job.ModifyTime = timestamp job.JobErr = jobErr db.JobTable[jobId] = job diff --git a/store/jobdb/jobsql/job_meta.go b/store/jobdb/jobsql/job_meta.go index a1d5fe13e..63aaf8406 100644 --- a/store/jobdb/jobsql/job_meta.go +++ b/store/jobdb/jobsql/job_meta.go @@ -7,7 +7,7 @@ import ( "gorm.io/gorm" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" ) @@ -27,7 +27,7 @@ func NewJobMetaImpl(option *DBOption) (*JobMetaImpl, error) { } // CreateUploadPayloadJob create DBJob record and DBObject record, Use JobID field for association. -func (jmi *JobMetaImpl) CreateUploadPayloadJob(txHash []byte, info *types.ObjectInfo) (uint64, error) { +func (jmi *JobMetaImpl) CreateUploadPayloadJob(txHash []byte, info *ptypesv1pb.ObjectInfo) (uint64, error) { var ( result *gorm.DB insertJobRecord *DBJob @@ -35,8 +35,8 @@ func (jmi *JobMetaImpl) CreateUploadPayloadJob(txHash []byte, info *types.Object ) insertJobRecord = &DBJob{ - JobType: uint32(types.JobType_JOB_TYPE_CREATE_OBJECT), - JobState: uint32(types.JobState_JOB_STATE_CREATE_OBJECT_DONE), + JobType: uint32(ptypesv1pb.JobType_JOB_TYPE_CREATE_OBJECT), + JobState: uint32(ptypesv1pb.JobState_JOB_STATE_CREATE_OBJECT_DONE), CreateTime: time.Now(), ModifyTime: time.Now(), } @@ -85,7 +85,7 @@ func (jmi *JobMetaImpl) SetObjectCreateHeight(txHash []byte, height uint64) erro } // GetObjectInfo query DBObject by txHash, and convert to types.ObjectInfo. -func (jmi *JobMetaImpl) GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) { +func (jmi *JobMetaImpl) GetObjectInfo(txHash []byte) (*ptypesv1pb.ObjectInfo, error) { var ( result *gorm.DB queryReturn DBObject @@ -101,7 +101,7 @@ func (jmi *JobMetaImpl) GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) if err != nil { return nil, err } - return &types.ObjectInfo{ + return &ptypesv1pb.ObjectInfo{ JobId: queryReturn.JobID, Owner: queryReturn.Owner, BucketName: queryReturn.BucketName, @@ -113,17 +113,17 @@ func (jmi *JobMetaImpl) GetObjectInfo(txHash []byte) (*types.ObjectInfo, error) PrimarySp: nil, // todo: how to decode sp info Height: queryReturn.Height, TxHash: txHash, - RedundancyType: types.RedundancyType(queryReturn.RedundancyType), + RedundancyType: ptypesv1pb.RedundancyType(queryReturn.RedundancyType), SecondarySps: nil, // todo: how to fill }, nil } // ScanObjectInfo query scan DBObject, and convert to ObjectInfo. -func (jmi *JobMetaImpl) ScanObjectInfo(offset int, limit int) ([]*types.ObjectInfo, error) { +func (jmi *JobMetaImpl) ScanObjectInfo(offset int, limit int) ([]*ptypesv1pb.ObjectInfo, error) { var ( result *gorm.DB queryReturns []DBObject - objects []*types.ObjectInfo + objects []*ptypesv1pb.ObjectInfo ) result = jmi.db.Limit(limit).Offset(offset).Find(&queryReturns) @@ -135,7 +135,7 @@ func (jmi *JobMetaImpl) ScanObjectInfo(offset int, limit int) ([]*types.ObjectIn if err != nil { return objects, err } - objects = append(objects, &types.ObjectInfo{ + objects = append(objects, &ptypesv1pb.ObjectInfo{ JobId: object.JobID, Owner: object.Owner, BucketName: object.BucketName, @@ -147,7 +147,7 @@ func (jmi *JobMetaImpl) ScanObjectInfo(offset int, limit int) ([]*types.ObjectIn PrimarySp: nil, // todo: how to decode sp info Height: object.Height, TxHash: txHash, - RedundancyType: types.RedundancyType(object.RedundancyType), + RedundancyType: ptypesv1pb.RedundancyType(object.RedundancyType), SecondarySps: nil, // todo: how to fill }) } @@ -155,7 +155,7 @@ func (jmi *JobMetaImpl) ScanObjectInfo(offset int, limit int) ([]*types.ObjectIn } // GetJobContext query DBJob by jobID, and convert to types.JobContext. -func (jmi *JobMetaImpl) GetJobContext(jobId uint64) (*types.JobContext, error) { +func (jmi *JobMetaImpl) GetJobContext(jobId uint64) (*ptypesv1pb.JobContext, error) { var ( result *gorm.DB queryCondition *DBJob @@ -170,10 +170,10 @@ func (jmi *JobMetaImpl) GetJobContext(jobId uint64) (*types.JobContext, error) { if result.Error != nil { return nil, fmt.Errorf("select job record's failed, %s", result.Error) } - return &types.JobContext{ + return &ptypesv1pb.JobContext{ JobId: queryReturn.JobID, - JobType: types.JobType(queryReturn.JobType), - JobState: types.JobState(queryReturn.JobState), + JobType: ptypesv1pb.JobType(queryReturn.JobType), + JobState: ptypesv1pb.JobState(queryReturn.JobState), JobErr: queryReturn.JobErr, CreateTime: queryReturn.CreateTime.Unix(), ModifyTime: queryReturn.ModifyTime.Unix(), @@ -192,7 +192,7 @@ func (jmi *JobMetaImpl) SetUploadPayloadJobState(jobId uint64, jobState string, JobID: jobId, } updateFields = &DBJob{ - JobState: uint32(types.JobState_value[jobState]), + JobState: uint32(ptypesv1pb.JobState_value[jobState]), ModifyTime: time.Unix(timestampSec, 0), } result = jmi.db.Model(queryCondition).Updates(updateFields) @@ -214,7 +214,7 @@ func (jmi *JobMetaImpl) SetUploadPayloadJobJobError(jobID uint64, jobState strin JobID: jobID, } updateFields = &DBJob{ - JobState: uint32(types.JobState_value[jobState]), + JobState: uint32(ptypesv1pb.JobState_value[jobState]), ModifyTime: time.Unix(timestampSec, 0), JobErr: jobErr, } @@ -234,7 +234,7 @@ func (jmi *JobMetaImpl) SetPrimaryPieceJobDone(txHash []byte, pj *jobdb.PieceJob insertPieceJobRecord = &DBPieceJob{ CreateHash: hex.EncodeToString(txHash), - PieceType: uint32(types.JobType_JOB_TYPE_UPLOAD_PRIMARY), + PieceType: uint32(ptypesv1pb.JobType_JOB_TYPE_UPLOAD_PRIMARY), PieceIdx: pj.PieceId, Checksum: string(pj.Checksum[0]), PieceState: 0, // todo: fill what? @@ -258,7 +258,7 @@ func (jmi *JobMetaImpl) GetPrimaryJob(txHash []byte) ([]*jobdb.PieceJob, error) ) result = jmi.db. - Where("create_hash = ? AND piece_type = ?", hex.EncodeToString(txHash), types.JobType_JOB_TYPE_UPLOAD_PRIMARY). + Where("create_hash = ? AND piece_type = ?", hex.EncodeToString(txHash), ptypesv1pb.JobType_JOB_TYPE_UPLOAD_PRIMARY). Find(&queryReturns) if result.Error != nil { return pieceJobs, fmt.Errorf("select primary piece jobs failed, %s", result.Error) @@ -281,7 +281,7 @@ func (jmi *JobMetaImpl) SetSecondaryPieceJobDone(txHash []byte, pj *jobdb.PieceJ insertPieceJobRecord = &DBPieceJob{ CreateHash: hex.EncodeToString(txHash), - PieceType: uint32(types.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC), + PieceType: uint32(ptypesv1pb.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC), PieceIdx: pj.PieceId, Checksum: string(pj.Checksum[0]), PieceState: 0, // todo: fill what? @@ -305,7 +305,7 @@ func (jmi *JobMetaImpl) GetSecondaryJob(txHash []byte) ([]*jobdb.PieceJob, error ) result = jmi.db. - Where("create_hash = ? AND piece_type = ?", hex.EncodeToString(txHash), types.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC). + Where("create_hash = ? AND piece_type = ?", hex.EncodeToString(txHash), ptypesv1pb.JobType_JOB_TYPE_UPLOAD_SECONDARY_EC). Find(&queryReturns) if result.Error != nil { return pieceJobs, fmt.Errorf("select secondary piece jobs failed, %s", result.Error) diff --git a/store/jobdb/jobsql/job_meta_test.go b/store/jobdb/jobsql/job_meta_test.go index a4426f794..b71d6f100 100644 --- a/store/jobdb/jobsql/job_meta_test.go +++ b/store/jobdb/jobsql/job_meta_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" "github.com/bnb-chain/greenfield-storage-provider/store/jobdb" ) @@ -23,7 +23,7 @@ func TestJobMeta(t *testing.T) { jmi, _ := NewJobMetaImpl(DefaultDBOption) _, err := jmi.CreateUploadPayloadJob( txHash, - &types.ObjectInfo{BucketName: "testBucket", ObjectName: "testObject"}) + &ptypesv1pb.ObjectInfo{BucketName: "testBucket", ObjectName: "testObject"}) assert.Equal(t, nil, err) } fmt.Println(string(txHash)) diff --git a/store/metadb/meta_db.go b/store/metadb/meta_db.go index af34b0df1..e55a0923c 100644 --- a/store/metadb/meta_db.go +++ b/store/metadb/meta_db.go @@ -1,14 +1,14 @@ package metadb -import types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" +import ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" // IntegrityMeta defines the integrity hash info type IntegrityMeta struct { - ObjectID uint64 `json:"ObjectID"` - PieceIdx uint32 `json:"PieceIdx"` // only use for ec piece and secondary - PieceCount uint32 `json:"PieceCount"` - IsPrimary bool `json:"IsPrimary"` - RedundancyType types.RedundancyType `json:"RedundancyType"` + ObjectID uint64 `json:"ObjectID"` + PieceIdx uint32 `json:"PieceIdx"` // only use for ec piece and secondary + PieceCount uint32 `json:"PieceCount"` + IsPrimary bool `json:"IsPrimary"` + RedundancyType ptypesv1pb.RedundancyType `json:"RedundancyType"` IntegrityHash []byte `json:"IntegrityHash"` PieceHash map[string][]byte `json:"PieceHash"` diff --git a/store/metadb/meta_db_test.go b/store/metadb/meta_db_test.go index 0c7176c15..e307ccea9 100644 --- a/store/metadb/meta_db_test.go +++ b/store/metadb/meta_db_test.go @@ -7,14 +7,14 @@ import ( "github.com/stretchr/testify/assert" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" ) func Test_IntegrityMeta_Json_Marshal_Unmarshal(t *testing.T) { meta := &IntegrityMeta{ ObjectID: 1, PieceIdx: 1, - RedundancyType: types.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, + RedundancyType: ptypesv1pb.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, } data, err := json.Marshal(meta) assert.Equal(t, nil, err) diff --git a/store/piecestore/storage/memory_test.go b/store/piecestore/storage/memory_test.go index 0fe167936..ac225bb4f 100644 --- a/store/piecestore/storage/memory_test.go +++ b/store/piecestore/storage/memory_test.go @@ -2,13 +2,14 @@ package storage import ( "context" - "github.com/bnb-chain/greenfield-storage-provider/model/errors" "io" "os" "strings" "testing" "github.com/stretchr/testify/assert" + + "github.com/bnb-chain/greenfield-storage-provider/model/errors" ) func setupMemoryTest(t *testing.T) *memoryStore { diff --git a/test/test_tool/stonehub/beigin_upload_payload.go b/test/test_tool/stonehub/beigin_upload_payload.go index f9dfb043b..5e744e20d 100644 --- a/test/test_tool/stonehub/beigin_upload_payload.go +++ b/test/test_tool/stonehub/beigin_upload_payload.go @@ -8,7 +8,7 @@ import ( "github.com/urfave/cli" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) @@ -34,7 +34,7 @@ func beginStone(c *cli.Context) { if err != nil { fmt.Println("tx hash param decode error: ", err) } - req := &service.StoneHubServiceBeginUploadPayloadRequest{ + req := &stypesv1pb.StoneHubServiceBeginUploadPayloadRequest{ TxHash: txHash, } client, err := GetStoneHubClient() diff --git a/test/test_tool/stonehub/create_object.go b/test/test_tool/stonehub/create_object.go index 625f6bcb4..646f909e9 100644 --- a/test/test_tool/stonehub/create_object.go +++ b/test/test_tool/stonehub/create_object.go @@ -9,8 +9,8 @@ import ( "github.com/urfave/cli" - types "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + ptypesv1pb "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) @@ -73,7 +73,7 @@ func createObjectToStoneHub(c *cli.Context) { hash.Write([]byte(time.Now().String())) txHash := hash.Sum(nil) - object := &types.ObjectInfo{ + object := &ptypesv1pb.ObjectInfo{ Owner: c.String("w"), BucketName: c.String("b"), ObjectName: c.String("o"), @@ -81,12 +81,12 @@ func createObjectToStoneHub(c *cli.Context) { ObjectId: c.Uint64("i"), Height: c.Uint64("c"), TxHash: txHash, - PrimarySp: &types.StorageProviderInfo{ + PrimarySp: &ptypesv1pb.StorageProviderInfo{ SpId: c.String("sp"), }, } - req := &service.StoneHubServiceCreateObjectRequest{ + req := &stypesv1pb.StoneHubServiceCreateObjectRequest{ TxHash: txHash, ObjectInfo: object, } diff --git a/test/test_tool/stonehub/done_primary_piece_job.go b/test/test_tool/stonehub/done_primary_piece_job.go index 3f6bffed9..5ee854bf8 100644 --- a/test/test_tool/stonehub/done_primary_piece_job.go +++ b/test/test_tool/stonehub/done_primary_piece_job.go @@ -9,7 +9,7 @@ import ( "github.com/urfave/cli" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) @@ -50,10 +50,10 @@ func donePrimaryPieceJob(c *cli.Context) { hash := sha256.New() hash.Write([]byte(time.Now().String())) checksum := hash.Sum(nil) - req := &service.StoneHubServiceDonePrimaryPieceJobRequest{ + req := &stypesv1pb.StoneHubServiceDonePrimaryPieceJobRequest{ TxHash: txHash, - PieceJob: &service.PieceJob{ - StorageProviderSealInfo: &service.StorageProviderSealInfo{ + PieceJob: &stypesv1pb.PieceJob{ + StorageProviderSealInfo: &stypesv1pb.StorageProviderSealInfo{ PieceIdx: uint32(c.Uint64("i")), StorageProviderId: c.String("s"), PieceChecksum: [][]byte{checksum}, diff --git a/test/test_tool/stonehub/done_secondary_piece_job.go b/test/test_tool/stonehub/done_secondary_piece_job.go index c3bd48f35..bbe90303d 100644 --- a/test/test_tool/stonehub/done_secondary_piece_job.go +++ b/test/test_tool/stonehub/done_secondary_piece_job.go @@ -9,7 +9,7 @@ import ( "github.com/urfave/cli" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) @@ -55,10 +55,10 @@ func doneSecondaryPieceJob(c *cli.Context) { checksums = append(checksums, checksum) } - req := &service.StoneHubServiceDoneSecondaryPieceJobRequest{ + req := &stypesv1pb.StoneHubServiceDoneSecondaryPieceJobRequest{ TxHash: txHash, - PieceJob: &service.PieceJob{ - StorageProviderSealInfo: &service.StorageProviderSealInfo{ + PieceJob: &stypesv1pb.PieceJob{ + StorageProviderSealInfo: &stypesv1pb.StorageProviderSealInfo{ PieceIdx: uint32(c.Uint64("i")), StorageProviderId: c.String("s"), PieceChecksum: checksums, diff --git a/test/test_tool/stonehub/query_stone.go b/test/test_tool/stonehub/query_stone.go index 73f053fb8..4c4bc50b4 100644 --- a/test/test_tool/stonehub/query_stone.go +++ b/test/test_tool/stonehub/query_stone.go @@ -8,7 +8,7 @@ import ( "github.com/urfave/cli" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) @@ -35,7 +35,7 @@ func queryStone(c *cli.Context) { fmt.Println("tx hash param decode error: ", err) return } - req := &service.StoneHubServiceQueryStoneRequest{ + req := &stypesv1pb.StoneHubServiceQueryStoneRequest{ TxHash: txHash, } client, err := GetStoneHubClient() diff --git a/test/test_tool/stonehub/stone_hub_client.go b/test/test_tool/stonehub/stone_hub_client.go index cd654ca6c..aa31d7a89 100644 --- a/test/test_tool/stonehub/stone_hub_client.go +++ b/test/test_tool/stonehub/stone_hub_client.go @@ -5,17 +5,17 @@ import ( "google.golang.org/grpc" - service "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + stypesv1pb "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" cliCtx "github.com/bnb-chain/greenfield-storage-provider/test/test_tool/context" ) -func GetStoneHubClient() (service.StoneHubServiceClient, error) { +func GetStoneHubClient() (stypesv1pb.StoneHubServiceClient, error) { ctx := cliCtx.GetContext() conn, err := grpc.Dial(ctx.Cfg.StoneHubAddr, grpc.WithInsecure()) if err != nil { fmt.Println("dial stone hub error: ", err) return nil, err } - client := service.NewStoneHubServiceClient(conn) + client := stypesv1pb.NewStoneHubServiceClient(conn) return client, nil }