diff --git a/config/config.toml b/config/config.toml index 8b61e4875..1b6a5be39 100644 --- a/config/config.toml +++ b/config/config.toml @@ -8,6 +8,7 @@ Service = [ Domain = "gnfd.nodereal.com" UploaderServiceAddress = "127.0.0.1:9133" DownloaderServiceAddress = "127.0.0.1:9233" + SyncerServiceAddress = "127.0.0.1:9533" ChallengeServiceAddress = "127.0.0.1:9633" @@ -21,6 +22,7 @@ Service = [ [UploaderCfg.PieceStoreConfig.Store] Storage = "file" BucketURL = "./data/primary_payload_data" + NoSignRequest = false MaxRetries = 5 TestMode = true [UploaderCfg.MetaLevelDBConfig] @@ -37,6 +39,7 @@ Service = [ [DownloaderCfg.PieceStoreConfig.Store] Storage = "file" BucketURL = "./data/primary_payload_data" + NoSignRequest = false MaxRetries = 5 TestMode = true @@ -52,18 +55,18 @@ Service = [ FileHandles = 1000 ReadOnly = false - [StoneNodeCfg] StorageProvider = "gnfd-test-sp" Address = "127.0.0.1:9433" + GatewayAddress = ["127.0.0.1:9034", "127.0.0.1:9035", "127.0.0.1:9036", "127.0.0.1:9037", "127.0.0.1:9038", "127.0.0.1:9039"] StoneHubServiceAddress = "127.0.0.1:9333" - SyncerServiceAddress = ["127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583", "127.0.0.1:9593"] StoneJobLimit = 64 [StoneNodeCfg.PieceStoreConfig] Shards = 0 [StoneNodeCfg.PieceStoreConfig.Store] Storage = "file" BucketURL = "./data/primary_payload_data" + NoSignRequest = false MaxRetries = 5 TestMode = true @@ -74,6 +77,7 @@ Service = [ [SyncerCfg.PieceStoreConfig.Store] Storage = "file" BucketURL = "./data/secondary_payload_data" + NoSignRequest = false MaxRetries = 5 TestMode = true [SyncerCfg.MetaLevelDBConfig] diff --git a/go.mod b/go.mod index 313e9fcff..e8a1bb1d5 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.19 replace ( // TODO: point to develop branch, will be changed to v0.0.6 after greenfield-sdk-go v0.0.6 released github.com/bnb-chain/greenfield => github.com/bnb-chain/greenfield v0.0.0-20230217070311-2a863e19f57d - github.com/cosmos/cosmos-sdk => github.com/bnb-chain/gnfd-cosmos-sdk v0.0.6 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/tendermint/tendermint => github.com/bnb-chain/gnfd-tendermint v0.0.1 diff --git a/model/const.go b/model/const.go index 9ce3361d1..bf0e56ca1 100644 --- a/model/const.go +++ b/model/const.go @@ -31,7 +31,6 @@ const ( const ( BufPoolSize = 32 << 10 ChecksumAlgo = "Crc32c" - OctetStream = "application/octet-stream" ) // RPC config @@ -40,10 +39,23 @@ const ( MaxCallMsgSize = 25 * 1024 * 1024 ) +// http header constants +const ( + // http header key + OctetStream = "application/octet-stream" + ContentTypeHeader = "Content-Type" + ETagHeader = "ETag" + ContentLengthHeader = "Content-Length" + ContentTypeXMLHeaderValue = "application/xml" + RangeHeader = "Range" + ContentRangeHeader = "Content-Range" +) + // Gateway const ( // path AdminPath = "/greenfield/admin/v1/" + SyncerPath = "/greenfield/syncer/v1/sync-piece" GetApprovalSubPath = "get-approval" ChallengeSubPath = "challenge" @@ -68,15 +80,16 @@ const ( GnfdIntegrityHashHeader = "X-Gnfd-Integrity-Hash" GnfdPieceHashHeader = "X-Gnfd-Piece-Hash" - // http header key - ContentTypeHeader = "Content-Type" - ETagHeader = "ETag" - ContentLengthHeader = "Content-Length" - RangeHeader = "Range" - ContentRangeHeader = "Content-Range" + // StoneNode to gateway request header + GnfdSPIDHeader = "X-Gnfd-SP-ID" + GnfdPieceCountHeader = "X-Gnfd-Piece-Count" + GnfdApprovalSignatureHeader = "X-Gnfd-Approval-Signature" + + // gateway to StoneNode response header + GnfdPieceChecksumHeader = "X-Gnfd-Piece-Checksum" + GnfdSealSignatureHeader = "X-Gnfd-Seal-Signature" // header value - ContentTypeXMLHeaderValue = "application/xml" ReplicaRedundancyTypeHeaderValue = "Replica" InlineRedundancyTypeHeaderValue = "Inline" diff --git a/model/errors/http_error.go b/model/errors/http_error.go new file mode 100644 index 000000000..04b32187b --- /dev/null +++ b/model/errors/http_error.go @@ -0,0 +1 @@ +package errors diff --git a/model/errors/errors.go b/model/errors/rpc_error.go similarity index 92% rename from model/errors/errors.go rename to model/errors/rpc_error.go index 321536059..089671915 100644 --- a/model/errors/errors.go +++ b/model/errors/rpc_error.go @@ -30,6 +30,8 @@ var ( ErrRequestConsistent = errors.New("request consistent check failed") ErrSignatureConsistent = errors.New("signature consistent check failed") ErrUnsupportedSignType = errors.New("unsupported signature type") + ErrEmptyReqHeader = errors.New("request header is empty") + ErrReqHeader = errors.New("request header is wrong") ) // stone hub service errors @@ -67,7 +69,9 @@ var ( ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1") ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6") ErrEmptyTargetIdx = errors.New("target index array is empty") - ErrSyncerNumber = errors.New("syncer number is not enough") + ErrGatewayNumber = errors.New("gateway number is not enough") + ErrEmptyRespHeader = errors.New("http response header is empty") + ErrRespHeader = errors.New("http response header is wrong") ) // syncer service errors diff --git a/pkg/stone/upload_payload_stone.go b/pkg/stone/upload_payload_stone.go index 1e68c021a..d1f509c7c 100644 --- a/pkg/stone/upload_payload_stone.go +++ b/pkg/stone/upload_payload_stone.go @@ -3,16 +3,15 @@ package stone import ( "context" - "github.com/bnb-chain/greenfield-storage-provider/store/spdb" "github.com/looplab/fsm" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/pkg/job" ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/store/spdb" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/log" - - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" ) type contextKey string diff --git a/proto/service/types/v1/stone_hub.proto b/proto/service/types/v1/stone_hub.proto index 0602e5066..25865ae85 100644 --- a/proto/service/types/v1/stone_hub.proto +++ b/proto/service/types/v1/stone_hub.proto @@ -60,8 +60,6 @@ message PieceJob { StorageProviderSealInfo storage_provider_seal_info = 8; } - - message StoneHubServiceBeginUploadPayloadV2Request { string trace_id = 1; // bytes tx_hash = 2; @@ -95,8 +93,10 @@ message StoneHubServiceAllocStoneJobRequest { message StoneHubServiceAllocStoneJobResponse { string trace_id = 1; // bytes tx_hash = 2; - PieceJob piece_job = 3; - ErrMessage err_message = 4; + string bucket_name = 3; + string object_name = 4; + PieceJob piece_job = 5; + ErrMessage err_message = 6; } message StoneHubServiceDoneSecondaryPieceJobRequest { diff --git a/service/client/piece_store_client.go b/service/client/piece_store_client.go index b6d36b723..24b81d202 100644 --- a/service/client/piece_store_client.go +++ b/service/client/piece_store_client.go @@ -36,15 +36,16 @@ func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error) func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) { rc, err := client.ps.Get(ctx, key, offset, limit) if err != nil { - log.Errorw("stone node service invoke PieceStore Get failed", "error", err) + log.Errorw("get piece data from piece store failed", "error", err) return nil, err } - data, err := io.ReadAll(rc) + buf := &bytes.Buffer{} + _, err = io.Copy(buf, rc) if err != nil { - log.Errorw("stone node service invoke io.ReadAll failed", "error", err) + log.Errorw("copy data failed", "error", err) return nil, err } - return data, nil + return buf.Bytes(), nil } func (client *StoreClient) PutPiece(key string, value []byte) error { diff --git a/service/client/syncer_client.go b/service/client/syncer_client.go index cb2d303d9..e3d4bc624 100644 --- a/service/client/syncer_client.go +++ b/service/client/syncer_client.go @@ -32,7 +32,7 @@ func NewSyncerClient(address string) (*SyncerClient, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize), grpc.MaxCallSendMsgSize(model.MaxCallMsgSize))) if err != nil { - log.Errorw("invoke syncer service grpc.DialContext failed", "error", err) + log.Errorw("invoke syncer service failed", "error", err) return nil, err } client := &SyncerClient{ diff --git a/service/gateway/admin_handler.go b/service/gateway/admin_handler.go index cb6b01c3d..921df3950 100644 --- a/service/gateway/admin_handler.go +++ b/service/gateway/admin_handler.go @@ -39,7 +39,7 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) { if statusCode == http.StatusOK { log.Infof("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail()) } else { - log.Warnf("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail()) + log.Errorw("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail()) } }() diff --git a/service/gateway/chain_client.go b/service/gateway/chain_client.go index 125a1ccc2..ebd86bb68 100644 --- a/service/gateway/chain_client.go +++ b/service/gateway/chain_client.go @@ -51,14 +51,14 @@ func (dci *debugChainImpl) createBucket(bucketName string, option *createBucketO return nil } -// chainClientConfig is the configuration information when creating chainClient. +// ChainClientConfig is the configuration information when creating chainClient. // currently Mode only support "DebugMode". -type chainClientConfig struct { +type ChainClientConfig struct { Mode string DebugDir string } -var defaultChainClientConfig = &chainClientConfig{ +var DefaultChainClientConfig = &ChainClientConfig{ Mode: "DebugMode", DebugDir: "./debug", } @@ -69,9 +69,9 @@ type chainClient struct { impl chainClientInterface } -func newChainClient(c *chainClientConfig) (*chainClient, error) { +func newChainClient(c *ChainClientConfig) (*chainClient, error) { if c == nil { - c = defaultChainClientConfig + c = DefaultChainClientConfig } switch { case c.Mode == "DebugMode": diff --git a/service/gateway/gateway.go b/service/gateway/gateway.go index 12c5de8f5..35c6d7f0e 100644 --- a/service/gateway/gateway.go +++ b/service/gateway/gateway.go @@ -27,6 +27,7 @@ type Gateway struct { uploader *uclient.UploaderClient downloader *dclient.DownloaderClient challenge *client.ChallengeClient + syncer client.SyncerAPI // mock chain *chainClient @@ -56,6 +57,10 @@ func NewGatewayService(cfg *GatewayConfig) (*Gateway, error) { log.Warnw("failed to challenge client", "err", err) return nil, err } + if g.syncer, err = client.NewSyncerClient(g.config.SyncerServiceAddress); err != nil { + log.Errorw("gateway inits syncer client failed", "error", err) + return nil, err + } if g.chain, err = newChainClient(g.config.ChainConfig); err != nil { log.Warnw("failed to create chain client", "err", err) return nil, err @@ -83,7 +88,7 @@ func (g *Gateway) Start(ctx context.Context) error { // Serve starts http service. func (g *Gateway) Serve() { router := mux.NewRouter().SkipClean(true) - g.registerhandler(router) + g.registerHandler(router) server := &http.Server{ Addr: g.config.Address, Handler: router, diff --git a/service/gateway/gateway_config.go b/service/gateway/gateway_config.go index 5b64fc6be..8c9d17788 100644 --- a/service/gateway/gateway_config.go +++ b/service/gateway/gateway_config.go @@ -7,7 +7,8 @@ type GatewayConfig struct { UploaderServiceAddress string DownloaderServiceAddress string ChallengeServiceAddress string - ChainConfig *chainClientConfig + SyncerServiceAddress string + ChainConfig *ChainClientConfig } var DefaultGatewayConfig = &GatewayConfig{ @@ -16,6 +17,7 @@ var DefaultGatewayConfig = &GatewayConfig{ Domain: "gnfd.nodereal.com", UploaderServiceAddress: "127.0.0.1:9133", DownloaderServiceAddress: "127.0.0.1:9233", + SyncerServiceAddress: "127.0.0.1:9533", ChallengeServiceAddress: "127.0.0.1:9633", - ChainConfig: defaultChainClientConfig, + ChainConfig: DefaultChainClientConfig, } diff --git a/service/gateway/helper_test.go b/service/gateway/helper_test.go new file mode 100644 index 000000000..34fa9fd61 --- /dev/null +++ b/service/gateway/helper_test.go @@ -0,0 +1,64 @@ +package gateway + +import ( + "context" + "encoding/base64" + "testing" + + "github.com/bnb-chain/greenfield-storage-provider/model" + "google.golang.org/grpc" + + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" +) + +func setup(t *testing.T) *Gateway { + return &Gateway{ + config: &GatewayConfig{ + StorageProvider: "test1", + Address: "test2", + Domain: "test3", + UploaderServiceAddress: "test4", + DownloaderServiceAddress: "test5", + ChallengeServiceAddress: "test6", + SyncerServiceAddress: "test7", + ChainConfig: DefaultChainClientConfig, + }, + name: model.GatewayService, + } +} + +func makeStreamMock() *StreamMock { + return &StreamMock{ + ctx: context.Background(), + recvToServer: make(chan *stypes.SyncerServiceSyncPieceRequest, 10), + } +} + +type StreamMock struct { + grpc.ClientStream + ctx context.Context + recvToServer chan *stypes.SyncerServiceSyncPieceRequest +} + +func (m *StreamMock) Send(resp *stypes.SyncerServiceSyncPieceRequest) error { + m.recvToServer <- resp + return nil +} + +func (m *StreamMock) CloseAndRecv() (*stypes.SyncerServiceSyncPieceResponse, error) { + integrityHash, _ := base64.URLEncoding.DecodeString("pgPGdR4c9_KYz6wQxl-SifyzHXlHhx5XfNa89LzdNCI=") + return &stypes.SyncerServiceSyncPieceResponse{ + TraceId: "test_traceID", + SecondarySpInfo: &stypes.StorageProviderSealInfo{ + StorageProviderId: "sp1", + PieceIdx: 1, + PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")}, + IntegrityHash: integrityHash, + Signature: nil, + }, + ErrMessage: &stypes.ErrMessage{ + ErrCode: stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, + ErrMsg: "Success", + }, + }, nil +} diff --git a/service/gateway/request_util.go b/service/gateway/request_util.go index fc5be54dc..b24489d95 100644 --- a/service/gateway/request_util.go +++ b/service/gateway/request_util.go @@ -8,12 +8,13 @@ import ( "time" "github.com/bnb-chain/greenfield-sdk-go/pkg/signer" - "github.com/bnb-chain/greenfield-storage-provider/model" - "github.com/bnb-chain/greenfield-storage-provider/model/errors" - "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/gorilla/mux" + + "github.com/bnb-chain/greenfield-storage-provider/model" + "github.com/bnb-chain/greenfield-storage-provider/model/errors" + "github.com/bnb-chain/greenfield-storage-provider/util" ) // requestContext is a request context. diff --git a/service/gateway/response_util.go b/service/gateway/response_util.go index 1c7a05261..934ede053 100644 --- a/service/gateway/response_util.go +++ b/service/gateway/response_util.go @@ -22,7 +22,7 @@ var ( InvalidBucketName = &errorDescription{errorCode: "InvalidBucketName", errorMessage: "The specified bucket is not valid.", statusCode: http.StatusBadRequest} InvalidKey = &errorDescription{errorCode: "InvalidKey", errorMessage: "Object key is Illegal", statusCode: http.StatusBadRequest} InvalidTxHash = &errorDescription{errorCode: "InvalidTxHash", errorMessage: "transaction hash is Illegal", statusCode: http.StatusBadRequest} - InvalidPayload = &errorDescription{errorCode: "InvalidPaload", errorMessage: "payload is empty", statusCode: http.StatusBadRequest} + InvalidPayload = &errorDescription{errorCode: "InvalidPayload", errorMessage: "payload is empty", statusCode: http.StatusBadRequest} InvalidRange = &errorDescription{errorCode: "InvalidRange", errorMessage: "range is invalid", statusCode: http.StatusBadRequest} UnauthorizedAccess = &errorDescription{errorCode: "UnauthorizedAccess", errorMessage: "UnauthorizedAccess", statusCode: http.StatusUnauthorized} AccessDenied = &errorDescription{errorCode: "AccessDenied", errorMessage: "Access Denied", statusCode: http.StatusForbidden} diff --git a/service/gateway/router.go b/service/gateway/router.go index d5412f4bb..d5f35399b 100644 --- a/service/gateway/router.go +++ b/service/gateway/router.go @@ -18,8 +18,8 @@ func (g *Gateway) notFoundHandler(w http.ResponseWriter, r *http.Request) { w.Write(s) } -// registerhandler is used to register mux handlers. -func (g *Gateway) registerhandler(r *mux.Router) { +// registerHandler is used to register mux handlers. +func (g *Gateway) registerHandler(r *mux.Router) { // bucket router, virtual-hosted style bucketRouter := r.Host("{bucket:.+}." + g.config.Domain).Subrouter() bucketRouter.NewRoute(). @@ -54,6 +54,8 @@ func (g *Gateway) registerhandler(r *mux.Router) { Methods(http.MethodGet). Queries(model.ActionQuery, "{action}"). HandlerFunc(g.getApprovalHandler) + // sync piece to syncer + r.Path(model.SyncerPath).Name("SyncPiece").Methods(http.MethodPut).HandlerFunc(g.syncPieceHandler) r.Path(model.AdminPath + model.ChallengeSubPath). Name(challengeRouterName). diff --git a/service/gateway/sync_piece_handler.go b/service/gateway/sync_piece_handler.go new file mode 100644 index 000000000..caacbd5aa --- /dev/null +++ b/service/gateway/sync_piece_handler.go @@ -0,0 +1,159 @@ +package gateway + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "io" + "net/http" + "strconv" + + "github.com/bnb-chain/greenfield-storage-provider/model" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +func (g *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) { + var ( + err error + errDescription *errorDescription + reqContext *requestContext + ) + + defer func() { + statusCode := 200 + if errDescription != nil { + statusCode = errDescription.statusCode + _ = errDescription.errorResponse(w, reqContext) + } + if statusCode == 200 { + log.Debugf("action(%v) statusCode(%v) %v", "syncPiece", statusCode, + reqContext.generateRequestDetail()) + } else { + log.Errorf("action(%v) statusCode(%v) %v", "syncPiece", statusCode, + reqContext.generateRequestDetail()) + } + }() + + reqContext = newRequestContext(r) + log.Infow("sync piece handler receive data", "request", reqContext.generateRequestDetail()) + syncerInfo, err := getReqHeader(r.Header) + if err != nil { + log.Errorw("get request header failed", "error", err) + return + } + // get trace id + traceID := r.Header.Get(model.GnfdRequestIDHeader) + if traceID == "" { + traceID = reqContext.requestID + } + pieceData, err := parseBody(r.Body) + if err != nil { + // TODO, add more error + log.Errorw("parse request body failed", "error", err) + return + } + resp, err := g.syncPiece(context.Background(), syncerInfo, pieceData, traceID) + if err != nil { + errDescription = InternalError + } + addRespHeader(resp, w) + log.Infow("sync piece handler reply response to stone node", "response header", w.Header()) +} + +func parseBody(body io.ReadCloser) ([][]byte, error) { + buf := &bytes.Buffer{} + _, err := io.Copy(buf, body) + if err != nil { + log.Errorw("copy request body failed", "error", err) + return nil, merrors.ErrInternalError + } + pieceData := make([][]byte, 0) + if err := json.Unmarshal(buf.Bytes(), &pieceData); err != nil { + log.Errorw("unmarshal body failed", "error", err) + return nil, merrors.ErrInternalError + } + return pieceData, nil +} + +func getReqHeader(header http.Header) (*stypes.SyncerInfo, error) { + syncerInfo := &stypes.SyncerInfo{} + // get object id + objectID := header.Get(model.GnfdObjectIDHeader) + if objectID == "" { + log.Error("req header object id is empty") + return nil, merrors.ErrEmptyReqHeader + } + id, err := util.HeaderToUint64(objectID) + if err != nil { + log.Errorw("parse object id failed", "error", err) + return nil, merrors.ErrReqHeader + } + syncerInfo.ObjectId = id + + // get storage provider id + spID := header.Get(model.GnfdSPIDHeader) + if spID == "" { + log.Error("req header sp id is empty") + return nil, merrors.ErrEmptyReqHeader + } + syncerInfo.StorageProviderId = spID + + // get piece count + pieceCount := header.Get(model.GnfdPieceCountHeader) + if pieceCount == "" { + log.Error("req header piece count is empty") + return nil, merrors.ErrEmptyReqHeader + } + pCount, err := util.HeaderToUint32(pieceCount) + if err != nil { + log.Errorw("parse piece count failed", "error", err) + return nil, merrors.ErrReqHeader + } + syncerInfo.PieceCount = pCount + + // get piece index + pieceIndex := header.Get(model.GnfdPieceIndexHeader) + if pieceIndex == "" { + log.Error("req header piece index is empty") + return nil, merrors.ErrEmptyReqHeader + } + pIdx, err := util.HeaderToUint32(pieceIndex) + if err != nil { + log.Errorw("parse piece index failed", "error", err) + return nil, merrors.ErrReqHeader + } + syncerInfo.PieceIndex = pIdx + + // get redundancy type + redundancyType := header.Get(model.GnfdRedundancyTypeHeader) + if redundancyType == "" { + log.Error("req header redundancy type is empty") + return nil, merrors.ErrEmptyReqHeader + } + rType, err := util.TransferRedundancyType(redundancyType) + if err != nil { + log.Errorw("transfer redundancy type failed", "error", err) + return nil, err + } + syncerInfo.RedundancyType = rType + return syncerInfo, nil +} + +func addRespHeader(resp *stypes.SyncerServiceSyncPieceResponse, w http.ResponseWriter) { + w.Header().Set(model.GnfdRequestIDHeader, resp.GetTraceId()) + w.Header().Set(model.GnfdSPIDHeader, resp.GetSecondarySpInfo().GetStorageProviderId()) + w.Header().Set(model.GnfdPieceIndexHeader, strconv.Itoa(int(resp.GetSecondarySpInfo().GetPieceIdx()))) + + checksum := util.EncodePieceHash(resp.GetSecondarySpInfo().GetPieceChecksum()) + w.Header().Set(model.GnfdPieceChecksumHeader, checksum) + + integrityHash := hex.EncodeToString(resp.GetSecondarySpInfo().GetIntegrityHash()) + w.Header().Set(model.GnfdIntegrityHashHeader, integrityHash) + + sig := hex.EncodeToString([]byte("test_signature")) + w.Header().Set(model.GnfdSealSignatureHeader, sig) +} diff --git a/service/gateway/synce_piece_test.go b/service/gateway/synce_piece_test.go new file mode 100644 index 000000000..fd9ef6df0 --- /dev/null +++ b/service/gateway/synce_piece_test.go @@ -0,0 +1,45 @@ +package gateway + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/service/client/mock" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" +) + +func TestSyncPieceSuccess(t *testing.T) { + gw := setup(t) + ctrl := gomock.NewController(t) + + streamClient := makeStreamMock() + syncer := mock.NewMockSyncerAPI(ctrl) + gw.syncer = syncer + syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { + return streamClient, nil + }).AnyTimes() + + sInfo := &stypes.SyncerInfo{ + ObjectId: 123456, + StorageProviderId: "440246a94fc4257096b8d4fa8db94a5655f455f88555f885b10da1466763f742", + RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, + } + data := [][]byte{ + []byte("test1"), + []byte("test2"), + []byte("test3"), + []byte("test4"), + []byte("test5"), + []byte("test6"), + } + resp, err := gw.syncPiece(context.TODO(), sInfo, data, "test_traceID") + assert.Equal(t, err, nil) + assert.Equal(t, resp.GetTraceId(), "test_traceID") + assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) +} diff --git a/service/gateway/syncer_processor.go b/service/gateway/syncer_processor.go new file mode 100644 index 000000000..c92513a06 --- /dev/null +++ b/service/gateway/syncer_processor.go @@ -0,0 +1,42 @@ +package gateway + +import ( + "context" + "fmt" + + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +// syncPiece send rpc request to secondary storage provider to sync the piece data +func (g *Gateway) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo, pieceData [][]byte, traceID string) ( + *stypes.SyncerServiceSyncPieceResponse, error) { + stream, err := g.syncer.SyncPiece(ctx) + if err != nil { + log.Errorw("sync secondary piece job error", "err", err) + return nil, err + } + + // send data one by one to avoid exceeding rpc max msg size + for _, value := range pieceData { + if err := stream.Send(&stypes.SyncerServiceSyncPieceRequest{ + TraceId: traceID, + SyncerInfo: syncerInfo, + PieceData: value, + }); err != nil { + log.Errorw("client send request error", "error", err) + return nil, err + } + } + + resp, err := stream.CloseAndRecv() + if err != nil { + log.Errorw("client close error", "error", err, "traceID", resp.GetTraceId()) + return nil, err + } + if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { + log.Errorw("sync piece sends to stone node response code is not success", "error", err, "traceID", resp.GetTraceId()) + return nil, fmt.Errorf(resp.GetErrMessage().GetErrMsg()) + } + return resp, nil +} diff --git a/service/stonehub/stone_hub_service.go b/service/stonehub/stone_hub_service.go index acd76db2a..cbcaf089f 100644 --- a/service/stonehub/stone_hub_service.go +++ b/service/stonehub/stone_hub_service.go @@ -201,6 +201,9 @@ func (hub *StoneHub) AllocStoneJob(ctx context.Context, req *stypes.StoneHubServ } switch job := stoneJob.(type) { case *stypes.PieceJob: + objectInfo, _ := hub.jobDB.GetObjectInfo(job.GetObjectId()) + resp.BucketName = objectInfo.BucketName + resp.ObjectName = objectInfo.ObjectName resp.PieceJob = job default: resp.ErrMessage = merrors.MakeErrMsgResponse(merrors.ErrStoneJobTypeUnrecognized) diff --git a/service/stonenode/alloc_stone_job.go b/service/stonenode/alloc_stone_job.go index 353c8aa6e..e40ee3ef5 100644 --- a/service/stonenode/alloc_stone_job.go +++ b/service/stonenode/alloc_stone_job.go @@ -36,7 +36,6 @@ func (node *StoneNodeService) loadAndSyncPieces(ctx context.Context, allocResp * // TBD:: check secondarySPs count by redundancyType. // EC_TYPE need EC_M + EC_K + backup // REPLICA_TYPE and INLINE_TYPE need segments count + backup - secondarySPs := mock.AllocUploadSecondarySP() // validate redundancyType and targetIdx redundancyType := allocResp.GetPieceJob().GetRedundancyType() @@ -60,6 +59,7 @@ func (node *StoneNodeService) loadAndSyncPieces(ctx context.Context, allocResp * } // 2. dispatch the piece data to different secondary sp + secondarySPs := mock.AllocUploadSecondarySP() secondaryPieceData, err := node.dispatchSecondarySP(pieceData, redundancyType, secondarySPs, targetIdx) if err != nil { log.CtxErrorw(ctx, "dispatch piece data to secondary sp error") @@ -67,14 +67,16 @@ func (node *StoneNodeService) loadAndSyncPieces(ctx context.Context, allocResp * return err } - if len(secondaryPieceData) > len(node.syncer) { - log.Errorw("syncer number is not enough") - node.reportErrToStoneHub(ctx, allocResp, merrors.ErrSyncerNumber) - return merrors.ErrSyncerNumber + secondaryGatewayList := node.cfg.GatewayAddress + if len(secondaryPieceData) > len(secondarySPs) { + log.Errorw("secondary sp is not enough") + node.reportErrToStoneHub(ctx, allocResp, merrors.ErrSecondarySPNumber) + return merrors.ErrSecondarySPNumber } + log.Debugw("secondary gateway address list", "list", secondaryGatewayList) // 3. send piece data to the secondary - node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData, secondarySPs) + node.doSyncToSecondarySP(ctx, allocResp, secondaryPieceData, secondaryGatewayList, secondarySPs) return nil } diff --git a/service/stonenode/helper_test.go b/service/stonenode/helper_test.go index f3ee8bdf5..99b558378 100644 --- a/service/stonenode/helper_test.go +++ b/service/stonenode/helper_test.go @@ -1,12 +1,8 @@ package stonenode import ( - "context" - "encoding/base64" "testing" - "google.golang.org/grpc" - "github.com/bnb-chain/greenfield-storage-provider/model" ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1" stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" @@ -17,7 +13,6 @@ func setup(t *testing.T) *StoneNodeService { cfg: &StoneNodeConfig{ Address: "test1", StoneHubServiceAddress: "test2", - SyncerServiceAddress: []string{"test3"}, StorageProvider: "test", StoneJobLimit: 0, }, @@ -68,39 +63,3 @@ func dispatchInlinePieceSlice() [][][]byte { inlineSlice = append(inlineSlice, inlineList) return inlineSlice } - -func makeStreamMock() *StreamMock { - return &StreamMock{ - ctx: context.Background(), - recvToServer: make(chan *stypes.SyncerServiceSyncPieceRequest, 10), - } -} - -type StreamMock struct { - grpc.ClientStream - ctx context.Context - recvToServer chan *stypes.SyncerServiceSyncPieceRequest -} - -func (m *StreamMock) Send(resp *stypes.SyncerServiceSyncPieceRequest) error { - m.recvToServer <- resp - return nil -} - -func (m *StreamMock) CloseAndRecv() (*stypes.SyncerServiceSyncPieceResponse, error) { - integrityHash, _ := base64.URLEncoding.DecodeString("pgPGdR4c9_KYz6wQxl-SifyzHXlHhx5XfNa89LzdNCI=") - return &stypes.SyncerServiceSyncPieceResponse{ - TraceId: "test_traceID", - SecondarySpInfo: &stypes.StorageProviderSealInfo{ - StorageProviderId: "sp1", - PieceIdx: 1, - PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")}, - IntegrityHash: integrityHash, - Signature: nil, - }, - ErrMessage: &stypes.ErrMessage{ - ErrCode: stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, - ErrMsg: "Success", - }, - }, nil -} diff --git a/service/stonenode/http_client.go b/service/stonenode/http_client.go new file mode 100644 index 000000000..74662ef6f --- /dev/null +++ b/service/stonenode/http_client.go @@ -0,0 +1,151 @@ +package stonenode + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + + "github.com/bnb-chain/greenfield-storage-provider/model" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" + stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" + "github.com/bnb-chain/greenfield-storage-provider/util" + "github.com/bnb-chain/greenfield-storage-provider/util/log" +) + +// sendRequest send piece data to gateway through HTTP protocol +func sendRequest(pieceData [][]byte, httpEndpoint string, syncerInfo *stypes.SyncerInfo, traceID string) ( + *stypes.StorageProviderSealInfo, error) { + //TODO, use io.Copy to avoid using big memory + body, err := json.Marshal(pieceData) + if err != nil { + log.Errorw("marshal piece data failed", "error", err) + } + req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("http://%s%s", httpEndpoint, model.SyncerPath), bytes.NewReader(body)) + if err != nil { + log.Errorw("http NewRequest failed", "error", err) + return nil, err + } + + req = addReqHeader(req, syncerInfo, traceID) + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Errorw("client Do failed", "error", err) + return nil, err + } + defer resp.Body.Close() + + // if http.StatusCode isn't 200, return error + if resp.StatusCode != http.StatusOK { + log.Error("HTTP status code is not 200: ", resp.StatusCode) + if err := parseBody(resp.Body); err != nil { + log.Errorw("parse body error") + return nil, err + } + } + sealInfo, err := generateSealInfo(resp) + if err != nil { + log.Errorw("generate seal info", "error", err) + return nil, err + } + return sealInfo, nil +} + +func addReqHeader(req *http.Request, syncerInfo *stypes.SyncerInfo, traceID string) *http.Request { + req.Header.Add(model.ContentTypeHeader, model.OctetStream) + req.Header.Add(model.GnfdRequestIDHeader, traceID) + req.Header.Add(model.GnfdObjectIDHeader, strconv.FormatUint(syncerInfo.GetObjectId(), 10)) + req.Header.Add(model.GnfdSPIDHeader, syncerInfo.GetStorageProviderId()) + req.Header.Add(model.GnfdPieceCountHeader, strconv.FormatUint(uint64(syncerInfo.GetPieceCount()), 10)) + req.Header.Add(model.GnfdPieceIndexHeader, strconv.FormatUint(uint64(syncerInfo.GetPieceIndex()), 10)) + req.Header.Add(model.GnfdRedundancyTypeHeader, syncerInfo.GetRedundancyType().String()) + return req +} + +// TODO, perfect handling error message later +func parseBody(body io.ReadCloser) error { + buf := &bytes.Buffer{} + _, err := io.Copy(buf, body) + if err != nil { + log.Errorw("copy request body failed", "error", err) + return err + } + // parse error message in response body + //if err := xml.Unmarshal(buf.Bytes(), nil); err != nil { + // log.Errorw("unmarshal xml response body failed", "error", err) + // return err + //} + return fmt.Errorf("HTTP status code is not 200") +} + +func generateSealInfo(resp *http.Response) (*stypes.StorageProviderSealInfo, error) { + spSealInfo := &stypes.StorageProviderSealInfo{} + // get storage provider ID + spID := resp.Header.Get(model.GnfdSPIDHeader) + if spID == "" { + log.Error("resp header sp id is empty") + return nil, merrors.ErrEmptyRespHeader + } + spSealInfo.StorageProviderId = spID + + // get piece index + pieceIndex := resp.Header.Get(model.GnfdPieceIndexHeader) + if pieceIndex == "" { + log.Error("resp header piece index is empty") + return nil, merrors.ErrEmptyRespHeader + } + idx, err := util.HeaderToUint32(pieceIndex) + if err != nil { + log.Errorw("parse piece index failed", "error", err) + return nil, merrors.ErrRespHeader + } + spSealInfo.PieceIdx = idx + + // get piece checksum + pieceChecksum := resp.Header.Get(model.GnfdPieceChecksumHeader) + if pieceChecksum == "" { + log.Error("resp header piece checksum is empty") + return nil, merrors.ErrEmptyRespHeader + } + checksum, err := util.DecodePieceHash(pieceChecksum) + if err != nil { + return nil, err + } + spSealInfo.PieceChecksum = checksum + + // get integrity hash + integrityHash := resp.Header.Get(model.GnfdIntegrityHashHeader) + if integrityHash == "" { + log.Error("resp header integrity hash is empty") + return nil, merrors.ErrEmptyRespHeader + } + iHash, err := hex.DecodeString(integrityHash) + if err != nil { + log.Errorw("decode integrity hash failed", "error", err) + return nil, merrors.ErrRespHeader + } + spSealInfo.IntegrityHash = iHash + + // get signature + signature := resp.Header.Get(model.GnfdSealSignatureHeader) + if signature == "" { + log.Error("resp header seal signature is empty") + return nil, merrors.ErrEmptyRespHeader + } + sig, err := hex.DecodeString(signature) + if err != nil { + log.Errorw("decode seal signature failed", "error", err) + return nil, merrors.ErrRespHeader + } + spSealInfo.Signature = sig + return spSealInfo, nil +} + +// +//func getApprovalSigNature() { +// storagetypespb.NewMsgCreateObject(nil, "", "", 0, false, nil, "", nil, nil) +//} diff --git a/service/stonenode/stone_node.go b/service/stonenode/stone_node.go index 71dc66c7b..799eb9638 100644 --- a/service/stonenode/stone_node.go +++ b/service/stonenode/stone_node.go @@ -20,7 +20,6 @@ const ( type StoneNodeService struct { cfg *StoneNodeConfig name string - syncer []client.SyncerAPI stoneHub client.StoneHubAPI store client.PieceStoreAPI stoneLimit int64 @@ -58,14 +57,6 @@ func (node *StoneNodeService) initClient() error { log.Errorw("stone node inits stone hub client failed", "error", err) return err } - for _, value := range node.cfg.SyncerServiceAddress { - syncer, err := client.NewSyncerClient(value) - if err != nil { - log.Errorw("stone node inits syncer client failed", "error", err) - return err - } - node.syncer = append(node.syncer, syncer) - } node.store = store node.stoneHub = stoneHub return nil @@ -126,11 +117,6 @@ func (node *StoneNodeService) Stop(ctx context.Context) error { if err := node.stoneHub.Close(); err != nil { errs = append(errs, err) } - for _, syncer := range node.syncer { - if err := syncer.Close(); err != nil { - errs = append(errs, err) - } - } if errs != nil { return fmt.Errorf("%v", errs) } diff --git a/service/stonenode/stone_node_config.go b/service/stonenode/stone_node_config.go index 905f67ee3..c88f37bc2 100644 --- a/service/stonenode/stone_node_config.go +++ b/service/stonenode/stone_node_config.go @@ -4,8 +4,8 @@ import "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/storag type StoneNodeConfig struct { Address string + GatewayAddress []string StoneHubServiceAddress string - SyncerServiceAddress []string StorageProvider string PieceStoreConfig *storage.PieceStoreConfig StoneJobLimit int64 @@ -13,8 +13,8 @@ type StoneNodeConfig struct { var DefaultStoneNodeConfig = &StoneNodeConfig{ Address: "127.0.0.1:9433", + GatewayAddress: []string{"127.0.0.1:9034", "127.0.0.1:9035", "127.0.0.1:9036", "127.0.0.1:9037", "127.0.0.1:9038", "127.0.0.1:9039"}, StoneHubServiceAddress: "127.0.0.1:9333", - SyncerServiceAddress: []string{"127.0.0.1:9533", "127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583"}, StorageProvider: "bnb-sp", PieceStoreConfig: storage.DefaultPieceStoreConfig, StoneJobLimit: 64, diff --git a/service/stonenode/sync_piece.go b/service/stonenode/sync_piece.go index e0b4f0fed..1708e9021 100644 --- a/service/stonenode/sync_piece.go +++ b/service/stonenode/sync_piece.go @@ -3,7 +3,6 @@ package stonenode import ( "bytes" "context" - "errors" merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1" @@ -13,7 +12,7 @@ import ( // doSyncToSecondarySP send piece data to the secondary func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *stypes.StoneHubServiceAllocStoneJobResponse, - pieceDataBySecondary [][][]byte, secondarySPs []string) error { + pieceDataBySecondary [][][]byte, urlList []string, secondarySPs []string) error { var ( objectID = resp.GetPieceJob().GetObjectId() payloadSize = resp.GetPieceJob().GetPayloadSize() @@ -46,22 +45,20 @@ func (node *StoneNodeService) doSyncToSecondarySP(ctx context.Context, resp *sty } }() - syncResp, err := node.syncPiece(ctx, &stypes.SyncerInfo{ + spInfo, err := sendRequest(pieceData, urlList[index], &stypes.SyncerInfo{ ObjectId: objectID, StorageProviderId: secondarySPs[index], PieceIndex: uint32(index), PieceCount: uint32(len(pieceData)), RedundancyType: redundancyType, - }, pieceData, index, resp.GetTraceId()) - // TBD:: retry alloc secondary sp and rat again. + }, resp.GetTraceId()) if err != nil { - log.CtxErrorw(ctx, "sync to secondary piece job failed", "error", err) + log.CtxErrorw(ctx, "send request to gateway failed", "error", err) errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = err.Error() return } - spInfo := syncResp.GetSecondarySpInfo() if ok := verifyIntegrityHash(pieceData, spInfo); !ok { errMsg.ErrCode = stypes.ErrCode_ERR_CODE_ERROR errMsg.ErrMsg = merrors.ErrIntegrityHash.Error() @@ -92,36 +89,3 @@ func verifyIntegrityHash(pieceData [][]byte, spInfo *stypes.StorageProviderSealI "remote_integrity_hash", spInfo.GetIntegrityHash()) return true } - -// syncPiece send rpc request to secondary storage provider to sync the piece data -func (node *StoneNodeService) syncPiece(ctx context.Context, syncerInfo *stypes.SyncerInfo, - pieceData [][]byte, index int, traceID string) (*stypes.SyncerServiceSyncPieceResponse, error) { - stream, err := node.syncer[index].SyncPiece(ctx) - if err != nil { - log.Errorw("sync secondary piece job error", "err", err) - return nil, err - } - - // send data one by one to avoid exceeding rpc max msg size - for _, value := range pieceData { - if err := stream.Send(&stypes.SyncerServiceSyncPieceRequest{ - TraceId: traceID, - SyncerInfo: syncerInfo, - PieceData: value, - }); err != nil { - log.Errorw("client send request error", "error", err) - return nil, err - } - } - - resp, err := stream.CloseAndRecv() - if err != nil { - log.Errorw("client close error", "error", err, "traceID", resp.GetTraceId()) - return nil, err - } - if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED { - log.Errorw("sync piece sends to stone node response code is not success", "error", err, "traceID", resp.GetTraceId()) - return nil, errors.New(resp.GetErrMessage().GetErrMsg()) - } - return resp, nil -} diff --git a/service/stonenode/sync_piece_test.go b/service/stonenode/sync_piece_test.go index e80c14b60..7fcf11a2b 100644 --- a/service/stonenode/sync_piece_test.go +++ b/service/stonenode/sync_piece_test.go @@ -40,87 +40,12 @@ func Test_doSyncToSecondarySP(t *testing.T) { return nil, nil }).AnyTimes() - // syncer service stub - streamClient := makeStreamMock() - syncer1 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer1) - syncer1.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - syncer2 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer2) - syncer2.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - syncer3 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer3) - syncer3.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - syncer4 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer4) - syncer4.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - syncer5 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer5) - syncer5.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - syncer6 := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer6) - syncer6.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { allocResp := mockAllocResp(123456, 20*1024*1024, ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED) - err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req1, spmock.AllocUploadSecondarySP()) + err := node.doSyncToSecondarySP(context.TODO(), allocResp, tt.req1, spmock.AllocUploadSecondarySP(), + spmock.AllocUploadSecondarySP()) assert.Equal(t, nil, err) }) } } - -func TestSyncPieceSuccess(t *testing.T) { - node := setup(t) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - streamClient := makeStreamMock() - syncer := mock.NewMockSyncerAPI(ctrl) - node.syncer = append(node.syncer, syncer) - syncer.EXPECT().SyncPiece(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, opts ...grpc.CallOption) (stypes.SyncerService_SyncPieceClient, error) { - return streamClient, nil - }).AnyTimes() - - sInfo := &stypes.SyncerInfo{ - ObjectId: 123456, - StorageProviderId: "440246a94fc4257096b8d4fa8db94a5655f455f88555f885b10da1466763f742", - RedundancyType: ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, - } - data := [][]byte{ - []byte("test1"), - []byte("test2"), - []byte("test3"), - []byte("test4"), - []byte("test5"), - []byte("test6"), - } - resp, err := node.syncPiece(context.TODO(), sInfo, data, 0, "test_traceID") - assert.Equal(t, err, nil) - assert.Equal(t, resp.GetTraceId(), "test_traceID") - assert.Equal(t, resp.GetSecondarySpInfo().GetPieceIdx(), uint32(1)) -} diff --git a/service/syncer/syncer_service.go b/service/syncer/syncer_service.go index 3c59a63a8..e8c20302f 100644 --- a/service/syncer/syncer_service.go +++ b/service/syncer/syncer_service.go @@ -18,6 +18,7 @@ func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { var count uint32 var integrityMeta *spdb.IntegrityMeta var spID string + var traceID string var value []byte pieceHash := make([][]byte, 0) @@ -37,7 +38,7 @@ func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { return err } resp := &stypes.SyncerServiceSyncPieceResponse{ - TraceId: req.GetTraceId(), + TraceId: traceID, SecondarySpInfo: sealInfo, ErrMessage: &stypes.ErrMessage{ ErrCode: stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED, @@ -58,6 +59,7 @@ func (s *Syncer) SyncPiece(stream stypes.SyncerService_SyncPieceServer) error { if err != nil { return err } + traceID = req.GetTraceId() pieceHash = append(pieceHash, hash.GenerateChecksum(value)) } } diff --git a/store/piecestore/storage/s3.go b/store/piecestore/storage/s3.go index a8c2b2820..08f45a245 100644 --- a/store/piecestore/storage/s3.go +++ b/store/piecestore/storage/s3.go @@ -218,7 +218,7 @@ func (s *s3Store) ListAllObjects(ctx context.Context, prefix, marker string) (<- return nil, merrors.NotSupportedMethod } -// SessionCache holds session.Session according to model.ObjectStorage and it synchronizes access/modification +// SessionCache holds session.Session according to ObjectStorageConfig and it synchronizes access/modification type SessionCache struct { sync.Mutex sessions map[ObjectStorageConfig]*session.Session @@ -250,7 +250,7 @@ func (sc *SessionCache) newSession(cfg ObjectStorageConfig) (*session.Session, s } // if TestMode is true, you can communicate with private bucket or public bucket, // in this TestMode, if you want to visit private bucket, you should provide accessKey, secretKey. - // if TestMode is false, you can use service account or ec2 to visit you s3 straightly + // if TestMode is false, you can use service account or ec2 to visit your s3 straightly if cfg.TestMode { key := getAWSSecretKeyFromEnv() if cfg.NoSignRequest { diff --git a/test/e2e/onebox/setup_onebox.go b/test/e2e/onebox/setup_onebox.go index ce23ab4d8..40c729b68 100644 --- a/test/e2e/onebox/setup_onebox.go +++ b/test/e2e/onebox/setup_onebox.go @@ -37,8 +37,8 @@ const ( ) func initConfig() { - cfg.Service = []string{model.SyncerService} - cfg.GatewayCfg = gateway.DefaultGatewayConfig + cfg.Service = []string{model.SyncerService, model.GatewayService} + //cfg.GatewayCfg = gateway.DefaultGatewayConfig cfg.UploaderCfg = uploader.DefaultUploaderConfig cfg.StoneHubCfg = stonehub.DefaultStoneHubConfig cfg.ChallengeCfg = challenge.DefaultChallengeConfig @@ -52,6 +52,43 @@ func initConfig() { if cfg.SyncerCfg.PieceStoreConfig == nil { cfg.SyncerCfg.PieceStoreConfig = storage.DefaultPieceStoreConfig } + if cfg.GatewayCfg.ChainConfig == nil { + cfg.GatewayCfg.ChainConfig = gateway.DefaultChainClientConfig + } +} + +// SyncerAddress = ["127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583", "127.0.0.1:9593"] +func main() { + log.Info("begin setup one-box, deploy secondary syncers") + + cfg = config.LoadConfig(*configFile) + syncerAddrList := []string{"127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583", "127.0.0.1:9593"} + gatewayAddrList := cfg.StoneNodeCfg.GatewayAddress + if len(syncerAddrList) != len(gatewayAddrList) { + log.Errorw("syncer number is not equal to secondary gateway number") + os.Exit(1) + } + initConfig() + + // clear + // todo: polish not clear data + os.RemoveAll(oneboxDir) + pkillCMD := fmt.Sprintf("kill -9 $(pgrep -f %s)", destBinary) + runShell(pkillCMD) + if processNum, err := getProcessNum(); err != nil || processNum != 0 { + log.Errorw("failed to pkill", "error", err) + os.Exit(1) + return + } + + // setup + if err := os.Mkdir(oneboxDir, 0777); err != nil { + log.Errorw("failed to mkdir one-box directory", "error", err) + os.Exit(1) + return + } + multiSPService(syncerAddrList, gatewayAddrList) + log.Info("succeed to setup one-box") } func runShell(cmdStr string) (string, error) { @@ -71,7 +108,7 @@ func runShell(cmdStr string) (string, error) { } func getProcessNum() (int, error) { - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) getProcessNumCMD := fmt.Sprintf("ps axu|grep %s | grep -v \"grep\" |wc -l", destBinary) processNumStr, err := runShell(getProcessNumCMD) if err != nil { @@ -87,35 +124,11 @@ func getProcessNum() (int, error) { return processNum, nil } -func main() { - log.Info("begin setup onebox, deploy secondary syncers") - - cfg = config.LoadConfig(*configFile) - addrList := cfg.StoneNodeCfg.SyncerServiceAddress - initConfig() - - // clear - // todo: polish not clear data - os.RemoveAll(oneboxDir) - pkillCMD := fmt.Sprintf("kill -9 $(pgrep -f %s)", destBinary) - runShell(pkillCMD) - if processNum, err := getProcessNum(); err != nil || processNum != 0 { - log.Errorw("failed to pkill", "error", err) - os.Exit(1) - return - } - - // setup - if err := os.Mkdir(oneboxDir, 0777); err != nil { - log.Errorw("failed to mkdir onebox directory", "error", err) - os.Exit(1) - return - } - - for index, addr := range addrList { +func multiSPService(syncerAddrList, gatewayAddrList []string) { + for index, addr := range syncerAddrList { spDir := oneboxDir + "/sp" + strconv.Itoa(index) if err := os.Mkdir(spDir, 0777); err != nil { - log.Errorw("failed to mkdir onebox sp directory", "error", err) + log.Errorw("failed to mkdir one-box sp directory", "error", err) os.Exit(1) return } @@ -135,6 +148,10 @@ func main() { cfg.SyncerCfg.StorageProvider = spDir cfg.SyncerCfg.MetaLevelDBConfig.Path = spDir + "/leveldb" cfg.SyncerCfg.PieceStoreConfig.Store.BucketURL = spDir + "/piece_store" + cfg.GatewayCfg.Address = gatewayAddrList[index] + cfg.GatewayCfg.SyncerServiceAddress = addr + cfg.GatewayCfg.UploaderServiceAddress = "1" + cfg.GatewayCfg.DownloaderServiceAddress = "2" if err = util.TomlSettings.NewEncoder(f).Encode(cfg); err != nil { log.Errorw("failed to encode config", "error", err) os.Exit(1) @@ -156,11 +173,10 @@ func main() { } // check - if processNum, err := getProcessNum(); err != nil || processNum != len(addrList) { - log.Errorw("failed to setup onebox, syncer maybe down and please check log in ./onebox/sp*/log.txt", - "expect", len(addrList), "actual", processNum) + if processNum, err := getProcessNum(); err != nil || processNum != len(syncerAddrList) { + log.Errorw("failed to setup one-box, syncer maybe down and please check log in ./onebox/sp*/log.txt", + "expect", len(syncerAddrList), "actual", processNum) os.Exit(1) return } - log.Info("succeed to setup onebox") } diff --git a/test/e2e/services/case_driver.go b/test/e2e/services/case_driver.go index 7b8b45e07..1e2bbc129 100644 --- a/test/e2e/services/case_driver.go +++ b/test/e2e/services/case_driver.go @@ -11,12 +11,13 @@ import ( "time" "github.com/bnb-chain/greenfield-sdk-go/pkg/signer" + "github.com/cosmos/cosmos-sdk/testutil/testdata" + "github.com/bnb-chain/greenfield-storage-provider/config" "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/util" "github.com/bnb-chain/greenfield-storage-provider/util/hash" "github.com/bnb-chain/greenfield-storage-provider/util/log" - "github.com/cosmos/cosmos-sdk/testutil/testdata" ) var ( diff --git a/util/hash/checksum.go b/util/hash/checksum.go index 9d9d20de8..9a2f6a832 100644 --- a/util/hash/checksum.go +++ b/util/hash/checksum.go @@ -6,19 +6,18 @@ import ( "fmt" ) -// GenerateChecksum generates the checksum of piece data +// GenerateChecksum generates the checksum of one piece data func GenerateChecksum(pieceData []byte) []byte { hash := sha256.New() hash.Write(pieceData) return hash.Sum(nil) } -// GenerateIntegrityHash generate integrity hash of ec data +// GenerateIntegrityHash generates integrity hash of all piece data checksum func GenerateIntegrityHash(checksumList [][]byte) []byte { hash := sha256.New() - for _, j := range checksumList { - hash.Write(j) - } + checksumBytesTotal := bytes.Join(checksumList, []byte("")) + hash.Write(checksumBytesTotal) return hash.Sum(nil) } diff --git a/util/utils.go b/util/utils.go index b003136a7..cfd2a7a9a 100644 --- a/util/utils.go +++ b/util/utils.go @@ -156,18 +156,18 @@ func Uint64ToHeader(u uint64) string { } // EncodePieceHash is used to serialize -func EncodePieceHash(PieceHash [][]byte) string { - PieceStringList := make([]string, len(PieceHash)) - for index, h := range PieceHash { +func EncodePieceHash(pieceHash [][]byte) string { + PieceStringList := make([]string, len(pieceHash)) + for index, h := range pieceHash { PieceStringList[index] = hex.EncodeToString(h) } return StringSliceToHeader(PieceStringList) } // DecodePieceHash is used to deserialize -func DecodePieceHash(PieceHash string) ([][]byte, error) { +func DecodePieceHash(pieceHash string) ([][]byte, error) { var err error - pieceStringList := HeaderToStringSlice(PieceHash) + pieceStringList := HeaderToStringSlice(pieceHash) hashList := make([][]byte, len(pieceStringList)) for idx := range pieceStringList { if hashList[idx], err = hex.DecodeString(pieceStringList[idx]); err != nil { @@ -176,3 +176,16 @@ func DecodePieceHash(PieceHash string) ([][]byte, error) { } return hashList, nil } + +func TransferRedundancyType(redundancyType string) (ptypes.RedundancyType, error) { + switch redundancyType { + case ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED.String(): + return ptypes.RedundancyType_REDUNDANCY_TYPE_EC_TYPE_UNSPECIFIED, nil + case ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE.String(): + return ptypes.RedundancyType_REDUNDANCY_TYPE_REPLICA_TYPE, nil + case ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE.String(): + return ptypes.RedundancyType_REDUNDANCY_TYPE_INLINE_TYPE, nil + default: + return -1, merrors.ErrRedundancyType + } +}