Skip to content

Commit

Permalink
feat: implement downder service and client (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang authored Jan 10, 2023
1 parent 2824d3b commit b1c6c6d
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 10 deletions.
22 changes: 16 additions & 6 deletions cmd/storage_provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/bnb-chain/inscription-storage-provider/config"
"github.com/bnb-chain/inscription-storage-provider/pkg/lifecycle"
"github.com/bnb-chain/inscription-storage-provider/service/challenge"
"github.com/bnb-chain/inscription-storage-provider/service/downloader"
"github.com/bnb-chain/inscription-storage-provider/service/gateway"
"github.com/bnb-chain/inscription-storage-provider/service/stonehub"
"github.com/bnb-chain/inscription-storage-provider/service/stonenode"
Expand All @@ -25,12 +26,13 @@ var (

// define the storage provider supports service names
var (
StoneHubService = "StoneHub"
GetaWayService = "Gateway"
UploaderService = "Uploader"
StoneNodeService = "StoneNode"
SyncerService = "Syncer"
ChallengeService = "Challenge"
StoneHubService = "StoneHub"
GetaWayService = "Gateway"
UploaderService = "Uploader"
DownloaderService = "Downloader"
StoneNodeService = "StoneNode"
SyncerService = "Syncer"
ChallengeService = "Challenge"
)

// initService init service instance by name and config.
Expand All @@ -52,6 +54,14 @@ func initService(serviceName string, cfg *config.StorageProviderConfig) (server
if err != nil {
return nil, err
}
case DownloaderService:
if cfg.DownloaderCfg == nil {
cfg.DownloaderCfg = config.DefaultStorageProviderConfig.DownloaderCfg
}
server, err = downloader.NewDownloaderService(cfg.DownloaderCfg)
if err != nil {
return nil, err
}
case StoneHubService:
if cfg.StoneHubCfg == nil {
cfg.StoneHubCfg = config.DefaultStorageProviderConfig.StoneHubCfg
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/naoina/toml"

"github.com/bnb-chain/inscription-storage-provider/service/downloader"

"github.com/bnb-chain/inscription-storage-provider/service/challenge"
"github.com/bnb-chain/inscription-storage-provider/service/gateway"
"github.com/bnb-chain/inscription-storage-provider/service/stonehub"
Expand All @@ -22,6 +24,7 @@ type StorageProviderConfig struct {
PieceStoreConfig *storage.PieceStoreConfig
GatewayCfg *gateway.GatewayConfig
UploaderCfg *uploader.UploaderConfig
DownloaderCfg *downloader.DownloaderConfig
StoneNodeCfg *stonenode.StoneNodeConfig
SyncerCfg *syncer.SyncerConfig
ChallengeCfg *challenge.ChallengeConfig
Expand All @@ -32,6 +35,7 @@ var DefaultStorageProviderConfig = &StorageProviderConfig{
PieceStoreConfig: storage.DefaultPieceStoreConfig,
GatewayCfg: gateway.DefaultGatewayConfig,
UploaderCfg: uploader.DefaultUploaderConfig,
DownloaderCfg: downloader.DefaultDownloaderConfig,
StoneNodeCfg: stonenode.DefaultStoneNodeConfig,
SyncerCfg: syncer.DefaultSyncerConfig,
ChallengeCfg: challenge.DefaultChallengeConfig,
Expand Down
4 changes: 2 additions & 2 deletions service/challenge/challenge_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (challenge *Challenge) ChallengePiece(ctx context.Context, req *service.Cha
}
log.CtxInfow(ctx, "change success")
}()
if req.StorageProviderId != challenge.config.StorageProvider {
if req.GetStorageProviderId() != challenge.config.StorageProvider {
err = errors.New("storage provider id mismatch")
return
}
Expand All @@ -40,7 +40,7 @@ func (challenge *Challenge) ChallengePiece(ctx context.Context, req *service.Cha
resp.RedundancyType = integrityMeta.RedundancyType
resp.ChallengePieceKey = piecestore.EncodeECPieceKey(req.ObjectId, req.ChallengeIdx, integrityMeta.PieceIdx)
var data []byte
data, err = challenge.pieceStore.GetPiece(ctx, resp.ChallengePieceKey, 0, 0)
data, err = challenge.pieceStore.GetPiece(ctx, resp.ChallengePieceKey, 0, -1)
if err != nil {
return
}
Expand Down
85 changes: 85 additions & 0 deletions service/client/downloader_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package client

import (
"context"
"errors"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

service "github.com/bnb-chain/inscription-storage-provider/service/types/v1"
"github.com/bnb-chain/inscription-storage-provider/util/log"
)

type DownloaderClient struct {
address string
downloader service.DownloaderServiceClient
conn *grpc.ClientConn
}

func NewDownloaderClient(address string) (*DownloaderClient, error) {
ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout)
conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Errorw("invoke downloader service dail failed", "error", err)
return nil, err
}
client := &DownloaderClient{
address: address,
conn: conn,
downloader: service.NewDownloaderServiceClient(conn),
}
return client, nil
}

func (client *DownloaderClient) Close() error {
return client.conn.Close()
}

func (client *DownloaderClient) DownloaderObject(ctx context.Context, req *service.DownloaderServiceDownloaderObjectRequest, opts ...grpc.CallOption) (data []byte, err error) {
ctx = log.Context(context.Background(), req)
var (
stream service.DownloaderService_DownloaderObjectClient
resp *service.DownloaderServiceDownloaderObjectResponse
)
defer func() {
if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() == service.ErrCode_ERR_CODE_ERROR {
err = errors.New(resp.GetErrMessage().GetErrMsg())
}
log.CtxErrorw(ctx, "downloader object completed", "error", err)
}()
stream, err = client.downloader.DownloaderObject(ctx, req, opts...)
if err != nil {
return data, err
}
for {
resp, err = stream.Recv()
if err == io.EOF {
return
}
if err != nil {
return data, err
}
if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() == service.ErrCode_ERR_CODE_ERROR {
err = errors.New(resp.GetErrMessage().GetErrMsg())
return
}
data = append(data, resp.GetData()...)
}
return
}

func (client *DownloaderClient) DownloaderSegment(ctx context.Context, in *service.DownloaderServiceDownloaderSegmentRequest, opts ...grpc.CallOption) (*service.DownloaderServiceDownloaderSegmentResponse, error) {
resp, err := client.downloader.DownloaderSegment(ctx, in, opts...)
ctx = log.Context(ctx, resp)
if err != nil {
log.CtxErrorw(ctx, "downloader segment failed", "error", err)
return nil, err
}
if resp.GetErrMessage() != nil && resp.GetErrMessage().GetErrCode() != service.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED {
log.CtxErrorw(ctx, "downloader segment response code is not success", "error", resp.GetErrMessage().GetErrMsg())
return nil, errors.New(resp.GetErrMessage().GetErrMsg())
}
return resp, nil
}
2 changes: 1 addition & 1 deletion service/client/stone_hub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStoneHubClient(address string) (*StoneHubClient, error) {
ctx, _ := context.WithTimeout(context.Background(), ClientRPCTimeout)
conn, err := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Errorw("invoke stoneHub service grpc.DialContext failed", "error", err)
log.Errorw("invoke stoneHub service dail failed", "error", err)
return nil, err
}
client := &StoneHubClient{
Expand Down
69 changes: 69 additions & 0 deletions service/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package downloader

import (
"context"
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/bnb-chain/inscription-storage-provider/service/client"
service "github.com/bnb-chain/inscription-storage-provider/service/types/v1"
"github.com/bnb-chain/inscription-storage-provider/util/log"
)

// Downloader manage the payload data download
type Downloader struct {
cfg *DownloaderConfig
name string
pieceStore *client.StoreClient
}

// NewDownloaderService return a downloader instance.
func NewDownloaderService(cfg *DownloaderConfig) (*Downloader, error) {
downloader := &Downloader{
cfg: cfg,
name: "Downloader",
}
pieceStore, err := client.NewStoreClient(cfg.PieceConfig)
if err != nil {
return nil, err
}
downloader.pieceStore = pieceStore
return downloader, nil
}

// Name implement the lifecycle interface
func (downloader *Downloader) Name() string {
return downloader.name
}

// Start implement the lifecycle interface
func (downloader *Downloader) Start(ctx context.Context) error {
errCh := make(chan error)

go func(errCh chan error) {
lis, err := net.Listen("tcp", downloader.cfg.Address)
errCh <- err
if err != nil {
log.Errorw("syncer listen failed", "error", err)
return
}
grpcServer := grpc.NewServer()
service.RegisterDownloaderServiceServer(grpcServer, downloader)
reflection.Register(grpcServer)
if err = grpcServer.Serve(lis); err != nil {
log.Errorw("syncer serve failed", "error", err)
return
}
return
}(errCh)

err := <-errCh
return err
}

// Stop implement the lifecycle interface
func (downloader *Downloader) Stop(ctx context.Context) error {
return nil
}
13 changes: 13 additions & 0 deletions service/downloader/downloader_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package downloader

import "github.com/bnb-chain/inscription-storage-provider/store/piecestore/storage"

type DownloaderConfig struct {
Address string
PieceConfig *storage.PieceStoreConfig
}

var DefaultDownloaderConfig = &DownloaderConfig{
Address: "127.0.0.1:5523",
PieceConfig: storage.DefaultPieceStoreConfig,
}
121 changes: 121 additions & 0 deletions service/downloader/downloader_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package downloader

import (
"context"
"fmt"

"github.com/bnb-chain/inscription-storage-provider/model"
merrors "github.com/bnb-chain/inscription-storage-provider/model/errors"
"github.com/bnb-chain/inscription-storage-provider/model/piecestore"
service "github.com/bnb-chain/inscription-storage-provider/service/types/v1"
"github.com/bnb-chain/inscription-storage-provider/util/log"
)

var _ service.DownloaderServiceServer = &Downloader{}

// DownloaderSegment download the segment data and return to client.
func (downloader *Downloader) DownloaderSegment(ctx context.Context, req *service.DownloaderServiceDownloaderSegmentRequest) (resp *service.DownloaderServiceDownloaderSegmentResponse, err error) {
ctx = log.Context(ctx, req)
resp = &service.DownloaderServiceDownloaderSegmentResponse{
TraceId: req.TraceId,
}
defer func() {
if err != nil {
resp.ErrMessage.ErrCode = service.ErrCode_ERR_CODE_ERROR
resp.ErrMessage.ErrMsg = err.Error()
log.CtxErrorw(ctx, "download segment failed", "error", err, "object", req.ObjectId, "segment idx", req.SegmentIdx)
}
log.CtxInfow(ctx, "download segment success", "object", req.ObjectId, "segment idx", req.SegmentIdx)
}()
if req.GetObjectId() == 0 {
err = merrors.ErrObjectID
return
}
pieceKey := piecestore.EncodeSegmentPieceKey(req.GetObjectId(), req.GetSegmentIdx())
resp.Data, err = downloader.pieceStore.GetPiece(ctx, pieceKey, 0, -1)
return resp, nil
}

// DownloaderObject download the object data and return to client.
func (downloader *Downloader) DownloaderObject(req *service.DownloaderServiceDownloaderObjectRequest, stream service.DownloaderService_DownloaderObjectServer) (err error) {
ctx := log.Context(context.Background(), req)
resp := &service.DownloaderServiceDownloaderObjectResponse{
TraceId: req.TraceId,
}
defer func() {
if err != nil {
resp.ErrMessage = merrors.MakeErrMsgResponse(err)
}
err = stream.Send(resp)
log.CtxInfow(ctx, "download object completed", "error", err)
}()

var segmentInfo segments
segmentInfo, err = DownloadPieceInfo(req.GetObjectId(), req.GetObjectSize(), req.GetOffset(), req.GetOffset()+req.GetLength()-1)
if err != nil {
return
}
for _, segment := range segmentInfo {
resp.Data, err = downloader.pieceStore.GetPiece(ctx, segment.pieceKey, int64(segment.offset), int64(segment.offset)+int64(segment.length)-1)
if err != nil {
return
}
if err = stream.Send(resp); err != nil {
return
}
}
return nil
}

type segment struct {
pieceKey string
offset uint64
length uint64
}

type segments []*segment

// DownloadPieceInfo compute the piece store info for download.
// download interval [start, end]
func DownloadPieceInfo(objectID, objectSize, start, end uint64) (pieceInfo segments, err error) {
if objectSize == 0 || start > objectSize || end < start {
return pieceInfo, fmt.Errorf("param error, object size: %d, start: %d, end: %d", objectSize, start, end)
}
segmentCount := int(objectSize / model.SegmentSize)
if objectSize%model.SegmentSize != 0 {
segmentCount++
}
for idx := 0; idx < segmentCount; idx++ {
finish := false
currentStart := uint64(idx) * model.SegmentSize
currentEnd := uint64(idx+1)*model.SegmentSize - 1
if currentEnd >= end {
currentEnd = end
finish = true
}
if start >= currentStart && start <= currentEnd {
pieceInfo = append(pieceInfo, &segment{
pieceKey: piecestore.EncodeSegmentPieceKey(objectID, uint32(idx)),
offset: start - currentStart,
length: currentEnd - start + 1,
})
if finish {
break
}
}
if end >= currentStart && end <= currentEnd {
pieceInfo = append(pieceInfo, &segment{
pieceKey: piecestore.EncodeSegmentPieceKey(objectID, uint32(idx)),
offset: 0,
length: end - currentStart + 1,
})
break
}
if start < currentStart && end > currentEnd {
pieceInfo = append(pieceInfo, &segment{
pieceKey: piecestore.EncodeSegmentPieceKey(objectID, uint32(idx)),
})
}
}
return
}
Loading

0 comments on commit b1c6c6d

Please sign in to comment.