Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal(dot/rpc/subscription): refactoring the websocket HandleComm #1673

Merged
merged 12 commits into from
Jul 7, 2021
4 changes: 4 additions & 0 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"time"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/rpc/subscription"
Expand Down Expand Up @@ -242,6 +243,9 @@ func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn
CoreAPI: cfg.CoreAPI,
TxStateAPI: cfg.TransactionQueueAPI,
RPCHost: fmt.Sprintf("http://%s:%d/", cfg.Host, cfg.RPCPort),
HTTP: &http.Client{
Timeout: time.Second * 30,
},
}
return c
}
161 changes: 116 additions & 45 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package subscription

import (
"context"
"fmt"
"reflect"

Expand All @@ -28,6 +29,7 @@ import (
// Listener interface for functions that define Listener related functions
type Listener interface {
Listen()
Stop()
}

// WSConnAPI interface defining methors a WSConn should have
Expand Down Expand Up @@ -85,59 +87,98 @@ func (s *StorageObserver) GetFilter() map[string][]byte {
// Listen to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Listen() {}

// Stop to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Stop() {}

// BlockListener to handle listening for blocks importedChan
type BlockListener struct {
Channel chan *types.Block
wsconn WSConnAPI
ChanID byte
subID uint

ctx context.Context
cancel context.CancelFunc
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockListener) Listen() {
for block := range l.Channel {
if block == nil {
continue
}
head, err := modules.HeaderToJSON(*block.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
for {
select {
case <-l.ctx.Done():
return
case block, ok := <-l.Channel:
if !ok {
return
}

res := newSubcriptionBaseResponseJSON()
res.Method = "chain_newHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
if block == nil {
continue
}
head, err := modules.HeaderToJSON(*block.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}

res := newSubcriptionBaseResponseJSON()
res.Method = "chain_newHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *BlockListener) Stop() { l.cancel() }

// BlockFinalizedListener to handle listening for finalised blocks
type BlockFinalizedListener struct {
channel chan *types.FinalisationInfo
wsconn WSConnAPI
chanID byte
subID uint
ctx context.Context
cancel context.CancelFunc
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockFinalizedListener) Listen() {
for info := range l.channel {
if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
l.ctx, l.cancel = context.WithCancel(context.Background())

go func() {
for {
select {
case <-l.ctx.Done():
return
case info, ok := <-l.channel:
if !ok {
return
}

if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params.Result = head
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *BlockFinalizedListener) Stop() { l.cancel() }

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn WSConnAPI
Expand All @@ -149,46 +190,72 @@ type ExtrinsicSubmitListener struct {
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte

ctx context.Context
cancel context.CancelFunc
}

// AuthorExtrinsicUpdates method name
const AuthorExtrinsicUpdates = "author_extrinsicUpdate"

// Listen implementation of Listen interface to listen for importedChan changes
func (l *ExtrinsicSubmitListener) Listen() {
l.ctx, l.cancel = context.WithCancel(context.Background())

// listen for imported blocks with extrinsic
go func() {
for block := range l.importedChan {
if block == nil {
continue
}
bodyHasExtrinsic, err := block.Body.HasExtrinsic(l.extrinsic)
if err != nil {
fmt.Printf("error %v\n", err)
}
for {
select {
case <-l.ctx.Done():
return
case block, ok := <-l.importedChan:
if !ok {
return
}

if block == nil {
continue
}
bodyHasExtrinsic, err := block.Body.HasExtrinsic(l.extrinsic)
if err != nil {
fmt.Printf("error %v\n", err)
}

if bodyHasExtrinsic {
resM := make(map[string]interface{})
resM["inBlock"] = block.Header.Hash().String()
if bodyHasExtrinsic {
resM := make(map[string]interface{})
resM["inBlock"] = block.Header.Hash().String()

l.importedHash = block.Header.Hash()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
l.importedHash = block.Header.Hash()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
}
}()

// listen for finalised headers
go func() {
for info := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
for {
select {
case <-l.ctx.Done():
return
case info, ok := <-l.finalisedChan:
if !ok {
return
}

if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
}
}()
}

// Stop to cancel the running goroutines to this listener
func (l *ExtrinsicSubmitListener) Stop() { l.cancel() }

// RuntimeVersionListener to handle listening for Runtime Version
type RuntimeVersionListener struct {
wsconn *WSConn
Expand All @@ -215,3 +282,7 @@ func (l *RuntimeVersionListener) Listen() {

l.wsconn.safeSend(newSubscriptionResponse("state_runtimeVersion", l.subID, ver))
}

// Stop to runtimeVersionListener not implemented yet because the listener
// does not need to be stoped
func (l *RuntimeVersionListener) Stop() {}
80 changes: 80 additions & 0 deletions dot/rpc/subscription/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package subscription

import (
"errors"
"fmt"
"strconv"
)

var errUknownParamSubscribeID = errors.New("invalid params format type")
var errCannotParseID = errors.New("could not parse param id")
var errCannotFindListener = errors.New("could not find listener")
var errCannotFindUnsubsriber = errors.New("could not find unsubsriber function")

type unsubListener func(reqid float64, l Listener, params interface{})
type setupListener func(reqid float64, params interface{}) (Listener, error)

func (c *WSConn) getSetupListener(method string) setupListener {
switch method {
case "chain_subscribeNewHeads", "chain_subscribeNewHead":
return c.initBlockListener
case "state_subscribeStorage":
return c.initStorageChangeListener
case "chain_subscribeFinalizedHeads":
return c.initBlockFinalizedListener
case "state_subscribeRuntimeVersion":
return c.initRuntimeVersionListener
default:
return nil
}
}

func (c *WSConn) getUnsubListener(method string, params interface{}) (unsubListener, Listener, error) {
subscribeID, err := parseSubscribeID(params)
if err != nil {
return nil, nil, err
}

listener, ok := c.Subscriptions[subscribeID]
if !ok {
return nil, nil, fmt.Errorf("subscriber id %v: %w", subscribeID, errCannotFindListener)
}

var unsub unsubListener

switch method {
case "state_unsubscribeStorage":
unsub = c.unsubscribeStorageListener
default:
return nil, nil, errCannotFindUnsubsriber
}

return unsub, listener, nil
}

func parseSubscribeID(p interface{}) (uint, error) {
switch v := p.(type) {
case []interface{}:
if len(v) == 0 {
return 0, errUknownParamSubscribeID
}
default:
return 0, errUknownParamSubscribeID
}

var id uint
switch v := p.([]interface{})[0].(type) {
case float64:
id = uint(v)
case string:
i, err := strconv.ParseUint(v, 10, 32)
if err != nil {
return 0, errCannotParseID
}
id = uint(i)
default:
return 0, errUknownParamSubscribeID
}

return id, nil
}
Loading