diff --git a/model/const.go b/model/const.go index 800f4a893..b0c3ce458 100644 --- a/model/const.go +++ b/model/const.go @@ -117,6 +117,12 @@ const ( MaxCallMsgSize = 25 * 1024 * 1024 ) +// define gateway constants +const ( + // StreamBufSize defines gateway stream forward payload buf size + StreamBufSize = 64 * 1024 +) + // http header constants const ( // ContentTypeHeader and below are standard http protocols diff --git a/service/gateway/admin_handler.go b/service/gateway/admin_handler.go index c6c6a1602..da14de699 100644 --- a/service/gateway/admin_handler.go +++ b/service/gateway/admin_handler.go @@ -72,8 +72,8 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) { return } if err = msg.ValidateBasic(); err != nil { - log.Errorw("failed to check bucket msg", "bucket_msg", msg, "error", err) - errDescription = InvalidBucketName + log.Errorw("failed to basic check", "bucket_msg", msg, "error", err) + errDescription = InvalidHeader return } // TODO: to config it @@ -98,8 +98,8 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) { return } if err = msg.ValidateBasic(); err != nil { - log.Errorw("failed to check object_info", "object_info", msg, "error", err) - errDescription = InvalidBucketName + log.Errorw("failed to basic check", "object_msg", msg, "error", err) + errDescription = InvalidHeader return } // TODO: to config it diff --git a/service/gateway/client/gateway_client.go b/service/gateway/client/gateway_client.go index 5633bbbb1..f9bafb011 100644 --- a/service/gateway/client/gateway_client.go +++ b/service/gateway/client/gateway_client.go @@ -1,10 +1,9 @@ package client import ( - "bytes" "encoding/hex" - "encoding/json" "fmt" + "io" "net/http" "time" @@ -38,16 +37,68 @@ func NewGatewayClient(address string) (*GatewayClient, error) { return client, nil } -func (gatewayClient *GatewayClient) SyncPieceData(objectInfo *types.ObjectInfo, replicateIdx uint32, segmentSize uint32, +// PieceDataReader defines [][]pieceData Reader. +type PieceDataReader struct { + pieceData [][]byte + outerIdx int + innerIdx int +} + +// NewPieceDataReader return a PieceDataReader instance +func NewPieceDataReader(pieceData [][]byte) (reader *PieceDataReader, err error) { + if pieceData == nil || len(pieceData) == 0 { + return nil, fmt.Errorf("failed to new due to invalid args") + } + return &PieceDataReader{ + pieceData: pieceData, + outerIdx: 0, + innerIdx: 0, + }, nil +} + +// Read populates the given byte slice with data and returns the number of bytes populated and an error value. +// It returns an io.EOF error when the stream ends. +func (p *PieceDataReader) Read(buf []byte) (n int, err error) { + if buf == nil || len(buf) == 0 { + return 0, fmt.Errorf("failed to read due to invalid args") + } + + readLen := 0 + for p.outerIdx < len(p.pieceData) { + curReadLen := copy(buf[readLen:], p.pieceData[p.outerIdx][p.innerIdx:]) + p.innerIdx += curReadLen + if p.innerIdx == len(p.pieceData[p.outerIdx]) { + p.outerIdx += 1 + p.innerIdx = 0 + } + readLen = readLen + curReadLen + if readLen == len(buf) { + break + } + } + if readLen != 0 { + return readLen, nil + } + return 0, io.EOF +} + +// SyncPieceData sync piece data to the target storage-provider. +func (gatewayClient *GatewayClient) SyncPieceData( + objectInfo *types.ObjectInfo, + replicateIdx uint32, + segmentSize uint32, pieceData [][]byte) (integrityHash []byte, signature []byte, err error) { - marshalObjectInfo := hex.EncodeToString(types.ModuleCdc.MustMarshalJSON(objectInfo)) - marshalPieceData, err := json.Marshal(pieceData) + pieceDataReader, err := NewPieceDataReader(pieceData) if err != nil { - log.Errorw("failed to marshal piece data", "error", err) + log.Errorw("failed to sync piece data due to new piece data reader error", "error", err) return nil, nil, err } - - req, err := http.NewRequest(http.MethodPut, gatewayClient.address+model.SyncerPath, bytes.NewReader(marshalPieceData)) + req, err := http.NewRequest(http.MethodPut, gatewayClient.address+model.SyncerPath, pieceDataReader) + if err != nil { + log.Errorw("failed to sync piece data due to new request error", "error", err) + return nil, nil, err + } + marshalObjectInfo := hex.EncodeToString(types.ModuleCdc.MustMarshalJSON(objectInfo)) req.Header.Add(model.GnfdObjectInfoHeader, marshalObjectInfo) req.Header.Add(model.GnfdReplicateIdxHeader, util.Uint32ToString(replicateIdx)) req.Header.Add(model.GnfdSegmentSizeHeader, util.Uint32ToString(segmentSize)) @@ -60,7 +111,6 @@ func (gatewayClient *GatewayClient) SyncPieceData(objectInfo *types.ObjectInfo, } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - // TODO: get more error info from body log.Errorw("failed to sync piece data", "status_code", resp.StatusCode, "sp_endpoint", gatewayClient.address) return nil, nil, fmt.Errorf("failed to sync piece") } diff --git a/service/gateway/client/gateway_client_test.go b/service/gateway/client/gateway_client_test.go new file mode 100644 index 000000000..f96b91c2b --- /dev/null +++ b/service/gateway/client/gateway_client_test.go @@ -0,0 +1,60 @@ +package client + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_pieceDataReader(t *testing.T) { + pieceData := make([][]byte, 3) + pieceData[0] = []byte{'A', 'B'} + pieceData[1] = []byte{'C', 'D', 'E'} + pieceData[2] = []byte{'F'} + + { + // read all data + pieceDataReader, err := NewPieceDataReader(pieceData) + require.NoError(t, err) + readBuf := make([]byte, 10) + readN, err := pieceDataReader.Read(readBuf) + assert.Equal(t, 6, readN) + assert.Equal(t, "ABCDEF", string(readBuf[0:readN])) + + require.NoError(t, err) + readN, err = pieceDataReader.Read(readBuf) + assert.Equal(t, 0, readN) + assert.Equal(t, io.EOF, err) + } + { + // read part data + pieceDataReader, err := NewPieceDataReader(pieceData) + require.NoError(t, err) + readBufA := make([]byte, 1) + readN, err := pieceDataReader.Read(readBufA) + assert.Equal(t, 1, readN) + require.NoError(t, err) + assert.Equal(t, byte('A'), readBufA[0]) + + readBufBC := make([]byte, 2) + readN, err = pieceDataReader.Read(readBufBC) + assert.Equal(t, 2, readN) + require.NoError(t, err) + assert.Equal(t, byte('B'), readBufBC[0]) + assert.Equal(t, byte('C'), readBufBC[1]) + + readBufDEF := make([]byte, 3) + readN, err = pieceDataReader.Read(readBufDEF) + assert.Equal(t, 3, readN) + require.NoError(t, err) + assert.Equal(t, byte('D'), readBufDEF[0]) + assert.Equal(t, byte('E'), readBufDEF[1]) + assert.Equal(t, byte('F'), readBufDEF[2]) + + readN, err = pieceDataReader.Read(readBufDEF) + assert.Equal(t, 0, readN) + assert.Equal(t, io.EOF, err) + } +} diff --git a/service/gateway/object_handler.go b/service/gateway/object_handler.go index a60d312ed..42d0fffd4 100644 --- a/service/gateway/object_handler.go +++ b/service/gateway/object_handler.go @@ -136,12 +136,11 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) { errDescription *errorDescription reqContext *requestContext addr sdk.AccAddress - buf = make([]byte, 65536) + size int readN int - size uint64 - hashBuf = make([]byte, 65536) + buf = make([]byte, model.StreamBufSize) + hashBuf = make([]byte, model.StreamBufSize) md5Hash = md5.New() - md5Value string ) reqContext = newRequestContext(r) @@ -172,6 +171,8 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) { errDescription = InvalidKey return } + // TODO: maybe tx_hash will be used in the future + _, _ = hex.DecodeString(reqContext.request.Header.Get(model.GnfdTransactionHashHeader)) if addr, err = reqContext.verifySignature(); err != nil { log.Errorw("failed to verify signature", "error", err) @@ -184,9 +185,6 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) { return } - // TODO: maybe tx_hash will be used in the future - _, _ = hex.DecodeString(reqContext.request.Header.Get(model.GnfdTransactionHashHeader)) - stream, err := g.uploader.UploadObject(context.Background()) if err != nil { log.Errorf("failed to put object", "error", err) @@ -210,7 +208,7 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) { errDescription = InternalError return } - size += uint64(readN) + size += readN copy(hashBuf, buf[:readN]) md5Hash.Write(hashBuf[:readN]) } @@ -220,18 +218,17 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) { errDescription = InvalidPayload return } - resp, err := stream.CloseAndRecv() + _, err = stream.CloseAndRecv() if err != nil { log.Errorw("failed to put object due to stream close", "error", err) errDescription = InternalError return } - // TODO: check response status code - _ = resp + // succeed to put object break } } - md5Value = hex.EncodeToString(md5Hash.Sum(nil)) + w.Header().Set(model.GnfdRequestIDHeader, reqContext.requestID) - w.Header().Set(model.ETagHeader, md5Value) + w.Header().Set(model.ETagHeader, hex.EncodeToString(md5Hash.Sum(nil))) } diff --git a/service/gateway/request_util.go b/service/gateway/request_util.go index d46808c10..fbdcbba49 100644 --- a/service/gateway/request_util.go +++ b/service/gateway/request_util.go @@ -51,6 +51,9 @@ func (reqContext *requestContext) generateRequestDetail() string { var headerToString = func(header http.Header) string { var sb = strings.Builder{} for k := range header { + if k == model.GnfdObjectInfoHeader || k == model.GnfdUnsignedApprovalMsgHeader { + continue + } if sb.Len() != 0 { sb.WriteString(",") } diff --git a/service/gateway/sync_piece_handler.go b/service/gateway/sync_piece_handler.go index df69f1588..2e56a079a 100644 --- a/service/gateway/sync_piece_handler.go +++ b/service/gateway/sync_piece_handler.go @@ -1,17 +1,14 @@ package gateway import ( - "bytes" "context" "encoding/hex" - "encoding/json" "io" "net/http" "github.com/bnb-chain/greenfield/x/storage/types" "github.com/bnb-chain/greenfield-storage-provider/model" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" syncertypes "github.com/bnb-chain/greenfield-storage-provider/service/syncer/types" "github.com/bnb-chain/greenfield-storage-provider/util" @@ -25,6 +22,9 @@ func (g *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) { objectInfo = types.ObjectInfo{} replicateIdx uint32 segmentSize uint64 + size int + readN int + buf = make([]byte, model.StreamBufSize) ) reqContext = newRequestContext(r) @@ -33,11 +33,9 @@ func (g *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) { _ = errDescription.errorResponse(w, reqContext) } if errDescription != nil && errDescription.statusCode == http.StatusOK { - log.Errorf("action(%v) statusCode(%v) %v", syncPieceRouterName, errDescription.statusCode, - reqContext.generateRequestDetail()) + log.Errorf("action(%v) statusCode(%v) %v", syncPieceRouterName, errDescription.statusCode, reqContext.generateRequestDetail()) } else { - log.Infof("action(%v) statusCode(200) %v", syncPieceRouterName, - reqContext.generateRequestDetail()) + log.Infof("action(%v) statusCode(200) %v", syncPieceRouterName, reqContext.generateRequestDetail()) } }() @@ -69,58 +67,50 @@ func (g *Gateway) syncPieceHandler(w http.ResponseWriter, r *http.Request) { return } - pieceData, err := parseBody(r.Body) - if err != nil { - // TODO: add more error - log.Errorw("failed to parse request body", "error", err) - errDescription = InternalError - return - } - stream, err := g.syncer.SyncObject(context.Background()) if err != nil { - log.Errorw("failed to sync piece", "err", err) + log.Errorw("failed to sync piece", "error", err) errDescription = InternalError return } - - // send data one by one to avoid exceeding rpc max msg size - for _, value := range pieceData { - if err = stream.Send(&syncertypes.SyncObjectRequest{ - ObjectInfo: &objectInfo, - ReplicateIdx: replicateIdx, - SegmentSize: segmentSize, - ReplicateData: value, - }); err != nil { - log.Errorw("failed to send stream", "error", err) + resp := &syncertypes.SyncObjectResponse{} + for { + readN, err = r.Body.Read(buf) + if err != nil && err != io.EOF { + log.Errorw("failed to sync piece due to reader error", "error", err) errDescription = InternalError return } + if readN > 0 { + if err = stream.Send(&syncertypes.SyncObjectRequest{ + ObjectInfo: &objectInfo, + ReplicateIdx: replicateIdx, + SegmentSize: segmentSize, + ReplicateData: buf[:readN], + }); err != nil { + log.Errorw("failed to send stream", "error", err) + errDescription = InternalError + return + } + size += readN + } + if err == io.EOF { + if size == 0 { + log.Errorw("failed to sync piece due to payload is empty") + errDescription = InvalidPayload + return + } + resp, err = stream.CloseAndRecv() + if err != nil { + log.Errorw("failed to sync piece due to stream close", "error", err) + errDescription = InternalError + return + } + // succeed to sync piece + break + } } - resp, err := stream.CloseAndRecv() - if err != nil { - log.Errorw("failed to close stream", "error", err) - errDescription = InternalError - return - } - // TODO: check resp error code w.Header().Set(model.GnfdIntegrityHashHeader, hex.EncodeToString(resp.GetIntegrityHash())) w.Header().Set(model.GnfdIntegrityHashSignatureHeader, hex.EncodeToString(resp.GetSignature())) - -} - -func parseBody(body io.ReadCloser) ([][]byte, error) { - buf := &bytes.Buffer{} - _, err := io.Copy(buf, body) - if err != nil { - log.Errorw("failed to copy request body", "error", err) - return nil, merrors.ErrInternalError - } - pieceData := make([][]byte, 0) - if err := json.Unmarshal(buf.Bytes(), &pieceData); err != nil { - log.Errorw("failed to unmarshal body", "error", err) - return nil, merrors.ErrInternalError - } - return pieceData, nil }