Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stone node sends piece data to gateway #128

Merged
merged 9 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Service = [
Domain = "gnfd.nodereal.com"
UploaderServiceAddress = "127.0.0.1:9133"
DownloaderServiceAddress = "127.0.0.1:9233"
SyncerServiceAddress = "127.0.0.1:9533"
ChallengeServiceAddress = "127.0.0.1:9633"


Expand All @@ -21,6 +22,7 @@ Service = [
[UploaderCfg.PieceStoreConfig.Store]
Storage = "file"
BucketURL = "./data/primary_payload_data"
NoSignRequest = false
MaxRetries = 5
TestMode = true
[UploaderCfg.MetaLevelDBConfig]
Expand All @@ -37,6 +39,7 @@ Service = [
[DownloaderCfg.PieceStoreConfig.Store]
Storage = "file"
BucketURL = "./data/primary_payload_data"
NoSignRequest = false
MaxRetries = 5
TestMode = true

Expand All @@ -52,18 +55,18 @@ Service = [
FileHandles = 1000
ReadOnly = false


[StoneNodeCfg]
StorageProvider = "gnfd-test-sp"
Address = "127.0.0.1:9433"
GatewayAddress = ["127.0.0.1:9034", "127.0.0.1:9035", "127.0.0.1:9036", "127.0.0.1:9037", "127.0.0.1:9038", "127.0.0.1:9039"]
StoneHubServiceAddress = "127.0.0.1:9333"
SyncerServiceAddress = ["127.0.0.1:9543", "127.0.0.1:9553", "127.0.0.1:9563", "127.0.0.1:9573", "127.0.0.1:9583", "127.0.0.1:9593"]
StoneJobLimit = 64
[StoneNodeCfg.PieceConfig]
Shards = 0
[StoneNodeCfg.PieceConfig.Store]
Storage = "file"
BucketURL = "./data/primary_payload_data"
NoSignRequest = false
MaxRetries = 5
TestMode = true

Expand All @@ -74,6 +77,7 @@ Service = [
[SyncerCfg.PieceConfig.Store]
Storage = "file"
BucketURL = "./data/secondary_payload_data"
NoSignRequest = false
MaxRetries = 5
TestMode = true
[SyncerCfg.MetaLevelDBConfig]
Expand All @@ -84,7 +88,7 @@ Service = [
ReadOnly = false

[SignerCfg]
Address = "127.0.0.1:9633"
Address = "127.0.0.1:9733"
WhitelistCIDR = ["127.0.0.1/32","10.0.0.0/8","172.16.0.0/12","192.168.0.0/16"]
APIKey = ""
[SignerCfg.GreenfieldChainConfig]
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
replace (
// TODO: point to develop branch, will be changed to v0.0.6 after greenfield-sdk-go v0.0.6 released
github.com/bnb-chain/greenfield => github.com/bnb-chain/greenfield v0.0.0-20230217070311-2a863e19f57d

github.com/cosmos/cosmos-sdk => github.com/bnb-chain/gnfd-cosmos-sdk v0.0.6
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/tendermint/tendermint => github.com/bnb-chain/gnfd-tendermint v0.0.1
Expand Down
30 changes: 22 additions & 8 deletions model/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
const (
BufPoolSize = 32 << 10
ChecksumAlgo = "Crc32c"
OctetStream = "application/octet-stream"
)

// RPC config
Expand All @@ -40,10 +39,23 @@ const (
MaxCallMsgSize = 25 * 1024 * 1024
)

// http header constants
const (
// http header key
OctetStream = "application/octet-stream"
ContentTypeHeader = "Content-Type"
ETagHeader = "ETag"
ContentLengthHeader = "Content-Length"
ContentTypeXMLHeaderValue = "application/xml"
RangeHeader = "Range"
ContentRangeHeader = "Content-Range"
)

// Gateway
const (
// path
AdminPath = "/greenfield/admin/v1/"
SyncerPath = "/greenfield/syncer/v1/sync-piece"
GetApprovalSubPath = "get-approval"
ChallengeSubPath = "challenge"

Expand All @@ -68,15 +80,17 @@ const (
GnfdIntegrityHashHeader = "X-Gnfd-Integrity-Hash"
GnfdPieceHashHeader = "X-Gnfd-Piece-Hash"

// http header key
ContentTypeHeader = "Content-Type"
ETagHeader = "ETag"
ContentLengthHeader = "Content-Length"
RangeHeader = "Range"
ContentRangeHeader = "Content-Range"
// StoneNode to gateway request header
GnfdTraceIDHeader = "X-Gnfd-Trace-ID"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse GnfdRequestIDHeader?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

GnfdSPIDHeader = "X-Gnfd-SP-ID"
GnfdPieceCountHeader = "X-Gnfd-Piece-Count"
GnfdApprovalSignatureHeader = "X-Gnfd-Approval-Signature"

// gateway to StoneNode response header
GnfdPieceChecksumHeader = "X-Gnfd-Piece-Checksum"
GnfdSealSignatureHeader = "X-Gnfd-Seal-Signature"

// header value
ContentTypeXMLHeaderValue = "application/xml"
ReplicaRedundancyTypeHeaderValue = "Replica"
InlineRedundancyTypeHeaderValue = "Inline"

Expand Down
1 change: 1 addition & 0 deletions model/errors/http_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package errors
6 changes: 5 additions & 1 deletion model/errors/errors.go → model/errors/rpc_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
ErrRequestConsistent = errors.New("request consistent check failed")
ErrSignatureConsistent = errors.New("signature consistent check failed")
ErrUnsupportedSignType = errors.New("unsupported signature type")
ErrEmptyReqHeader = errors.New("request header is empty")
ErrReqHeader = errors.New("request header is wrong")
)

// stone hub service errors
Expand Down Expand Up @@ -67,7 +69,9 @@ var (
ErrInvalidSegmentData = errors.New("invalid segment data, length is not equal to 1")
ErrInvalidECData = errors.New("invalid ec data, length is not equal to 6")
ErrEmptyTargetIdx = errors.New("target index array is empty")
ErrSyncerNumber = errors.New("syncer number is not enough")
ErrGatewayNumber = errors.New("gateway number is not enough")
ErrEmptyRespHeader = errors.New("http response header is empty")
ErrRespHeader = errors.New("http response header is wrong")
)

// syncer service errors
Expand Down
5 changes: 2 additions & 3 deletions pkg/stone/upload_payload_stone.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package stone
import (
"context"

"github.com/bnb-chain/greenfield-storage-provider/store/spdb"
"github.com/looplab/fsm"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/pkg/job"
ptypes "github.com/bnb-chain/greenfield-storage-provider/pkg/types/v1"
stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
"github.com/bnb-chain/greenfield-storage-provider/store/spdb"
"github.com/bnb-chain/greenfield-storage-provider/util"
"github.com/bnb-chain/greenfield-storage-provider/util/log"

merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors"
)

type contextKey string
Expand Down
8 changes: 4 additions & 4 deletions proto/service/types/v1/stone_hub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ message PieceJob {
StorageProviderSealInfo storage_provider_seal_info = 8;
}



message StoneHubServiceBeginUploadPayloadV2Request {
string trace_id = 1;
// bytes tx_hash = 2;
Expand Down Expand Up @@ -95,8 +93,10 @@ message StoneHubServiceAllocStoneJobRequest {
message StoneHubServiceAllocStoneJobResponse {
string trace_id = 1;
// bytes tx_hash = 2;
PieceJob piece_job = 3;
ErrMessage err_message = 4;
string bucket_name = 3;
string object_name = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the field of pb between stone hub and stone node.
shoud add fill the field in stone hub?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

PieceJob piece_job = 5;
ErrMessage err_message = 6;
}

message StoneHubServiceDoneSecondaryPieceJobRequest {
Expand Down
9 changes: 5 additions & 4 deletions service/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ func NewStoreClient(pieceConfig *storage.PieceStoreConfig) (*StoreClient, error)
func (client *StoreClient) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) {
rc, err := client.ps.Get(ctx, key, offset, limit)
if err != nil {
log.Errorw("stone node service invoke PieceStore Get failed", "error", err)
log.Errorw("get piece data from piece store failed", "error", err)
return nil, err
}
data, err := io.ReadAll(rc)
buf := &bytes.Buffer{}
_, err = io.Copy(buf, rc)
if err != nil {
log.Errorw("stone node service invoke io.ReadAll failed", "error", err)
log.Errorw("copy data failed", "error", err)
return nil, err
}
return data, nil
return buf.Bytes(), nil
}

func (client *StoreClient) PutPiece(key string, value []byte) error {
Expand Down
2 changes: 1 addition & 1 deletion service/client/syncer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewSyncerClient(address string) (*SyncerClient, error) {
conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize), grpc.MaxCallSendMsgSize(model.MaxCallMsgSize)))
if err != nil {
log.Errorw("invoke syncer service grpc.DialContext failed", "error", err)
log.Errorw("invoke syncer service failed", "error", err)
return nil, err
}
client := &SyncerClient{
Expand Down
2 changes: 1 addition & 1 deletion service/gateway/admin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *Gateway) getApprovalHandler(w http.ResponseWriter, r *http.Request) {
if statusCode == http.StatusOK {
log.Infof("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail())
} else {
log.Warnf("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail())
log.Errorw("action(%v) statusCode(%v) %v", "getApproval", statusCode, requestContext.generateRequestDetail())
}
}()

Expand Down
10 changes: 5 additions & 5 deletions service/gateway/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ func (dci *debugChainImpl) createBucket(bucketName string, option *createBucketO
return nil
}

// chainClientConfig is the configuration information when creating chainClient.
// ChainClientConfig is the configuration information when creating chainClient.
// currently Mode only support "DebugMode".
type chainClientConfig struct {
type ChainClientConfig struct {
Mode string
DebugDir string
}

var defaultChainClientConfig = &chainClientConfig{
var DefaultChainClientConfig = &ChainClientConfig{
Mode: "DebugMode",
DebugDir: "./debug",
}
Expand All @@ -69,9 +69,9 @@ type chainClient struct {
impl chainClientInterface
}

func newChainClient(c *chainClientConfig) (*chainClient, error) {
func newChainClient(c *ChainClientConfig) (*chainClient, error) {
if c == nil {
c = defaultChainClientConfig
c = DefaultChainClientConfig
}
switch {
case c.Mode == "DebugMode":
Expand Down
7 changes: 6 additions & 1 deletion service/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Gateway struct {
uploader *uclient.UploaderClient
downloader *dclient.DownloaderClient
challenge *client.ChallengeClient
syncer client.SyncerAPI

// mock
chain *chainClient
Expand Down Expand Up @@ -56,6 +57,10 @@ func NewGatewayService(cfg *GatewayConfig) (*Gateway, error) {
log.Warnw("failed to challenge client", "err", err)
return nil, err
}
if g.syncer, err = client.NewSyncerClient(g.config.SyncerServiceAddress); err != nil {
log.Errorw("gateway inits syncer client failed", "error", err)
return nil, err
}
if g.chain, err = newChainClient(g.config.ChainConfig); err != nil {
log.Warnw("failed to create chain client", "err", err)
return nil, err
Expand Down Expand Up @@ -83,7 +88,7 @@ func (g *Gateway) Start(ctx context.Context) error {
// Serve starts http service.
func (g *Gateway) Serve() {
router := mux.NewRouter().SkipClean(true)
g.registerhandler(router)
g.registerHandler(router)
server := &http.Server{
Addr: g.config.Address,
Handler: router,
Expand Down
6 changes: 4 additions & 2 deletions service/gateway/gateway_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ type GatewayConfig struct {
UploaderServiceAddress string
DownloaderServiceAddress string
ChallengeServiceAddress string
ChainConfig *chainClientConfig
SyncerServiceAddress string
ChainConfig *ChainClientConfig
}

var DefaultGatewayConfig = &GatewayConfig{
Expand All @@ -16,6 +17,7 @@ var DefaultGatewayConfig = &GatewayConfig{
Domain: "gnfd.nodereal.com",
UploaderServiceAddress: "127.0.0.1:9133",
DownloaderServiceAddress: "127.0.0.1:9233",
SyncerServiceAddress: "127.0.0.1:9533",
ChallengeServiceAddress: "127.0.0.1:9633",
ChainConfig: defaultChainClientConfig,
ChainConfig: DefaultChainClientConfig,
}
64 changes: 64 additions & 0 deletions service/gateway/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package gateway

import (
"context"
"encoding/base64"
"testing"

"github.com/bnb-chain/greenfield-storage-provider/model"
"google.golang.org/grpc"

stypes "github.com/bnb-chain/greenfield-storage-provider/service/types/v1"
)

func setup(t *testing.T) *Gateway {
return &Gateway{
config: &GatewayConfig{
StorageProvider: "test1",
Address: "test2",
Domain: "test3",
UploaderServiceAddress: "test4",
DownloaderServiceAddress: "test5",
ChallengeServiceAddress: "test6",
SyncerServiceAddress: "test7",
ChainConfig: DefaultChainClientConfig,
},
name: model.GatewayService,
}
}

func makeStreamMock() *StreamMock {
return &StreamMock{
ctx: context.Background(),
recvToServer: make(chan *stypes.SyncerServiceSyncPieceRequest, 10),
}
}

type StreamMock struct {
grpc.ClientStream
ctx context.Context
recvToServer chan *stypes.SyncerServiceSyncPieceRequest
}

func (m *StreamMock) Send(resp *stypes.SyncerServiceSyncPieceRequest) error {
m.recvToServer <- resp
return nil
}

func (m *StreamMock) CloseAndRecv() (*stypes.SyncerServiceSyncPieceResponse, error) {
integrityHash, _ := base64.URLEncoding.DecodeString("pgPGdR4c9_KYz6wQxl-SifyzHXlHhx5XfNa89LzdNCI=")
return &stypes.SyncerServiceSyncPieceResponse{
TraceId: "test_traceID",
SecondarySpInfo: &stypes.StorageProviderSealInfo{
StorageProviderId: "sp1",
PieceIdx: 1,
PieceChecksum: [][]byte{[]byte("1"), []byte("2"), []byte("3"), []byte("4"), []byte("5"), []byte("6")},
IntegrityHash: integrityHash,
Signature: nil,
},
ErrMessage: &stypes.ErrMessage{
ErrCode: stypes.ErrCode_ERR_CODE_SUCCESS_UNSPECIFIED,
ErrMsg: "Success",
},
}, nil
}
7 changes: 4 additions & 3 deletions service/gateway/request_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"time"

"github.com/bnb-chain/greenfield-sdk-go/pkg/signer"
"github.com/bnb-chain/greenfield-storage-provider/model"
"github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/util"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/gorilla/mux"

"github.com/bnb-chain/greenfield-storage-provider/model"
"github.com/bnb-chain/greenfield-storage-provider/model/errors"
"github.com/bnb-chain/greenfield-storage-provider/util"
)

// requestContext is a request context.
Expand Down
2 changes: 1 addition & 1 deletion service/gateway/response_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
InvalidBucketName = &errorDescription{errorCode: "InvalidBucketName", errorMessage: "The specified bucket is not valid.", statusCode: http.StatusBadRequest}
InvalidKey = &errorDescription{errorCode: "InvalidKey", errorMessage: "Object key is Illegal", statusCode: http.StatusBadRequest}
InvalidTxHash = &errorDescription{errorCode: "InvalidTxHash", errorMessage: "transaction hash is Illegal", statusCode: http.StatusBadRequest}
InvalidPayload = &errorDescription{errorCode: "InvalidPaload", errorMessage: "payload is empty", statusCode: http.StatusBadRequest}
InvalidPayload = &errorDescription{errorCode: "InvalidPayload", errorMessage: "payload is empty", statusCode: http.StatusBadRequest}
InvalidRange = &errorDescription{errorCode: "InvalidRange", errorMessage: "range is invalid", statusCode: http.StatusBadRequest}
UnauthorizedAccess = &errorDescription{errorCode: "UnauthorizedAccess", errorMessage: "UnauthorizedAccess", statusCode: http.StatusUnauthorized}
AccessDenied = &errorDescription{errorCode: "AccessDenied", errorMessage: "Access Denied", statusCode: http.StatusForbidden}
Expand Down
Loading