Skip to content

Commit

Permalink
feat: improve sync piece efficiency (#165)
Browse files Browse the repository at this point in the history
Co-authored-by: joeycli <joeycli0919@gmail.com>
  • Loading branch information
will-2012 and joeylichang authored Mar 4, 2023
1 parent 2ff121f commit 1b24d1d
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 79 deletions.
6 changes: 6 additions & 0 deletions model/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ const (
MaxCallMsgSize = 25 * 1024 * 1024
)

// define gateway constants
const (
// StreamBufSize defines gateway stream forward payload buf size
StreamBufSize = 64 * 1024
)

// http header constants
const (
// ContentTypeHeader and below are standard http protocols
Expand Down
8 changes: 4 additions & 4 deletions service/gateway/admin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) {
return
}
if err = msg.ValidateBasic(); err != nil {
log.Errorw("failed to check bucket msg", "bucket_msg", msg, "error", err)
errDescription = InvalidBucketName
log.Errorw("failed to basic check", "bucket_msg", msg, "error", err)
errDescription = InvalidHeader
return
}
// TODO: to config it
Expand All @@ -98,8 +98,8 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) {
return
}
if err = msg.ValidateBasic(); err != nil {
log.Errorw("failed to check object_info", "object_info", msg, "error", err)
errDescription = InvalidBucketName
log.Errorw("failed to basic check", "object_msg", msg, "error", err)
errDescription = InvalidHeader
return
}
// TODO: to config it
Expand Down
66 changes: 56 additions & 10 deletions service/gateway/client/gateway_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package client

import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -38,20 +37,68 @@ func NewGatewayClient(address string) (*GatewayClient, error) {
return client, nil
}

func (gatewayClient *GatewayClient) SyncPieceData(objectInfo *types.ObjectInfo, replicateIdx uint32, segmentSize uint32,
// PieceDataReader defines [][]pieceData Reader.
type PieceDataReader struct {
pieceData [][]byte
outerIdx int
innerIdx int
}

// NewPieceDataReader return a PieceDataReader instance
func NewPieceDataReader(pieceData [][]byte) (reader *PieceDataReader, err error) {
if pieceData == nil || len(pieceData) == 0 {
return nil, fmt.Errorf("failed to new due to invalid args")
}
return &PieceDataReader{
pieceData: pieceData,
outerIdx: 0,
innerIdx: 0,
}, nil
}

// Read populates the given byte slice with data and returns the number of bytes populated and an error value.
// It returns an io.EOF error when the stream ends.
func (p *PieceDataReader) Read(buf []byte) (n int, err error) {
if buf == nil || len(buf) == 0 {
return 0, fmt.Errorf("failed to read due to invalid args")
}

readLen := 0
for p.outerIdx < len(p.pieceData) {
curReadLen := copy(buf[readLen:], p.pieceData[p.outerIdx][p.innerIdx:])
p.innerIdx += curReadLen
if p.innerIdx == len(p.pieceData[p.outerIdx]) {
p.outerIdx += 1
p.innerIdx = 0
}
readLen = readLen + curReadLen
if readLen == len(buf) {
break
}
}
if readLen != 0 {
return readLen, nil
}
return 0, io.EOF
}

// SyncPieceData sync piece data to the target storage-provider.
func (gatewayClient *GatewayClient) SyncPieceData(
objectInfo *types.ObjectInfo,
replicateIdx uint32,
segmentSize uint32,
pieceData [][]byte) (integrityHash []byte, signature []byte, err error) {
marshalObjectInfo := hex.EncodeToString(types.ModuleCdc.MustMarshalJSON(objectInfo))
marshalPieceData, err := json.Marshal(pieceData)
pieceDataReader, err := NewPieceDataReader(pieceData)
if err != nil {
log.Errorw("failed to marshal piece data", "error", err)
log.Errorw("failed to sync piece data due to new piece data reader error", "error", err)
return nil, nil, err
}

req, err := http.NewRequest(http.MethodPut, gatewayClient.address+model.SyncerPath, bytes.NewReader(marshalPieceData))
req, err := http.NewRequest(http.MethodPut, gatewayClient.address+model.SyncerPath, pieceDataReader)
if err != nil {
log.Errorw("failed to new http request", "error", err)
log.Errorw("failed to sync piece data due to new request error", "error", err)
return nil, nil, err
}
marshalObjectInfo := hex.EncodeToString(types.ModuleCdc.MustMarshalJSON(objectInfo))
req.Header.Add(model.GnfdObjectInfoHeader, marshalObjectInfo)
req.Header.Add(model.GnfdReplicateIdxHeader, util.Uint32ToString(replicateIdx))
req.Header.Add(model.GnfdSegmentSizeHeader, util.Uint32ToString(segmentSize))
Expand All @@ -64,7 +111,6 @@ func (gatewayClient *GatewayClient) SyncPieceData(objectInfo *types.ObjectInfo,
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// TODO: get more error info from body
log.Errorw("failed to sync piece data", "status_code", resp.StatusCode, "sp_endpoint", gatewayClient.address)
return nil, nil, fmt.Errorf("failed to sync piece")
}
Expand Down
60 changes: 60 additions & 0 deletions service/gateway/client/gateway_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package client

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_pieceDataReader(t *testing.T) {
pieceData := make([][]byte, 3)
pieceData[0] = []byte{'A', 'B'}
pieceData[1] = []byte{'C', 'D', 'E'}
pieceData[2] = []byte{'F'}

{
// read all data
pieceDataReader, err := NewPieceDataReader(pieceData)
require.NoError(t, err)
readBuf := make([]byte, 10)
readN, err := pieceDataReader.Read(readBuf)
assert.Equal(t, 6, readN)
assert.Equal(t, "ABCDEF", string(readBuf[0:readN]))

require.NoError(t, err)
readN, err = pieceDataReader.Read(readBuf)
assert.Equal(t, 0, readN)
assert.Equal(t, io.EOF, err)
}
{
// read part data
pieceDataReader, err := NewPieceDataReader(pieceData)
require.NoError(t, err)
readBufA := make([]byte, 1)
readN, err := pieceDataReader.Read(readBufA)
assert.Equal(t, 1, readN)
require.NoError(t, err)
assert.Equal(t, byte('A'), readBufA[0])

readBufBC := make([]byte, 2)
readN, err = pieceDataReader.Read(readBufBC)
assert.Equal(t, 2, readN)
require.NoError(t, err)
assert.Equal(t, byte('B'), readBufBC[0])
assert.Equal(t, byte('C'), readBufBC[1])

readBufDEF := make([]byte, 3)
readN, err = pieceDataReader.Read(readBufDEF)
assert.Equal(t, 3, readN)
require.NoError(t, err)
assert.Equal(t, byte('D'), readBufDEF[0])
assert.Equal(t, byte('E'), readBufDEF[1])
assert.Equal(t, byte('F'), readBufDEF[2])

readN, err = pieceDataReader.Read(readBufDEF)
assert.Equal(t, 0, readN)
assert.Equal(t, io.EOF, err)
}
}
4 changes: 2 additions & 2 deletions service/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func (g *Gateway) Start(ctx context.Context) error {
if g.running.Swap(true) {
return errors.New("gateway has started")
}
go g.Serve()
go g.serve()
return nil
}

// Serve starts http service.
func (g *Gateway) Serve() {
func (g *Gateway) serve() {
router := mux.NewRouter().SkipClean(true)
g.registerHandler(router)
server := &http.Server{
Expand Down
23 changes: 10 additions & 13 deletions service/gateway/object_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) {
errDescription *errorDescription
reqContext *requestContext
addr sdk.AccAddress
buf = make([]byte, 65536)
size int
readN int
size uint64
hashBuf = make([]byte, 65536)
buf = make([]byte, model.StreamBufSize)
hashBuf = make([]byte, model.StreamBufSize)
md5Hash = md5.New()
md5Value string
)

reqContext = newRequestContext(r)
Expand Down Expand Up @@ -172,6 +171,8 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) {
errDescription = InvalidKey
return
}
// TODO: maybe tx_hash will be used in the future
_, _ = hex.DecodeString(reqContext.request.Header.Get(model.GnfdTransactionHashHeader))

if addr, err = reqContext.verifySignature(); err != nil {
log.Errorw("failed to verify signature", "error", err)
Expand All @@ -184,9 +185,6 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

// TODO: maybe tx_hash will be used in the future
_, _ = hex.DecodeString(reqContext.request.Header.Get(model.GnfdTransactionHashHeader))

stream, err := g.uploader.UploadObject(context.Background())
if err != nil {
log.Errorf("failed to put object", "error", err)
Expand All @@ -210,7 +208,7 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) {
errDescription = InternalError
return
}
size += uint64(readN)
size += readN
copy(hashBuf, buf[:readN])
md5Hash.Write(hashBuf[:readN])
}
Expand All @@ -220,18 +218,17 @@ func (g *Gateway) putObjectHandler(w http.ResponseWriter, r *http.Request) {
errDescription = InvalidPayload
return
}
resp, err := stream.CloseAndRecv()
_, err = stream.CloseAndRecv()
if err != nil {
log.Errorw("failed to put object due to stream close", "error", err)
errDescription = InternalError
return
}
// TODO: check response status code
_ = resp
// succeed to put object
break
}
}
md5Value = hex.EncodeToString(md5Hash.Sum(nil))

w.Header().Set(model.GnfdRequestIDHeader, reqContext.requestID)
w.Header().Set(model.ETagHeader, md5Value)
w.Header().Set(model.ETagHeader, hex.EncodeToString(md5Hash.Sum(nil)))
}
3 changes: 3 additions & 0 deletions service/gateway/request_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (reqContext *requestContext) generateRequestDetail() string {
var headerToString = func(header http.Header) string {
var sb = strings.Builder{}
for k := range header {
if k == model.GnfdObjectInfoHeader || k == model.GnfdUnsignedApprovalMsgHeader {
continue
}
if sb.Len() != 0 {
sb.WriteString(",")
}
Expand Down
2 changes: 1 addition & 1 deletion service/gateway/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (g *Gateway) registerHandler(r *mux.Router) {
HandlerFunc(g.getObjectHandler)
bucketRouter.NotFoundHandler = http.HandlerFunc(g.notFoundHandler)

// admin router, path style.
// admin router, path style
r.Path(model.GetApprovalPath).
Name(approvalRouterName).
Methods(http.MethodGet).
Expand Down
Loading

0 comments on commit 1b24d1d

Please sign in to comment.