From a8900f8a1f2cddbd2dc09c81d036eeb956274ac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=8B=97=E8=9B=8B?= <155265132+GoudanWoo@users.noreply.github.com> Date: Fri, 22 Mar 2024 23:04:08 +0800 Subject: [PATCH] feat: Add `.Response()` and `.Err()` method for all subscriber (#177) --- rpc/ws/accountSubscribe.go | 17 +++++++++++++++++ rpc/ws/blockSubscribe.go | 17 +++++++++++++++++ rpc/ws/logsSubscribe.go | 17 +++++++++++++++++ rpc/ws/programSubscribe.go | 17 +++++++++++++++++ rpc/ws/rootSubscribe.go | 17 +++++++++++++++++ rpc/ws/slotSubscribe.go | 17 +++++++++++++++++ rpc/ws/slotsUpdatesSubscribe.go | 17 +++++++++++++++++ rpc/ws/voteSubscribe.go | 17 +++++++++++++++++ 8 files changed, 136 insertions(+) diff --git a/rpc/ws/accountSubscribe.go b/rpc/ws/accountSubscribe.go index 807efa1c..8c54229a 100644 --- a/rpc/ws/accountSubscribe.go +++ b/rpc/ws/accountSubscribe.go @@ -92,6 +92,23 @@ func (sw *AccountSubscription) Recv() (*AccountResult, error) { } } +func (sw *AccountSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *AccountSubscription) Response() <-chan *AccountResult { + typedChan := make(chan *AccountResult, 1) + go func(ch chan *AccountResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*AccountResult) + }(typedChan) + return typedChan +} + func (sw *AccountSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/blockSubscribe.go b/rpc/ws/blockSubscribe.go index 559a269c..9fdbfba9 100644 --- a/rpc/ws/blockSubscribe.go +++ b/rpc/ws/blockSubscribe.go @@ -150,6 +150,23 @@ func (sw *BlockSubscription) Recv() (*BlockResult, error) { } } +func (sw *BlockSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *BlockSubscription) Response() <-chan *BlockResult { + typedChan := make(chan *BlockResult, 1) + go func(ch chan *BlockResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*BlockResult) + }(typedChan) + return typedChan +} + func (sw *BlockSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/logsSubscribe.go b/rpc/ws/logsSubscribe.go index 548a3ef2..1a9b5e03 100644 --- a/rpc/ws/logsSubscribe.go +++ b/rpc/ws/logsSubscribe.go @@ -116,6 +116,23 @@ func (sw *LogSubscription) Recv() (*LogResult, error) { } } +func (sw *LogSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *LogSubscription) Response() <-chan *LogResult { + typedChan := make(chan *LogResult, 1) + go func(ch chan *LogResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*LogResult) + }(typedChan) + return typedChan +} + func (sw *LogSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/programSubscribe.go b/rpc/ws/programSubscribe.go index 8639395f..fb94491a 100644 --- a/rpc/ws/programSubscribe.go +++ b/rpc/ws/programSubscribe.go @@ -95,6 +95,23 @@ func (sw *ProgramSubscription) Recv() (*ProgramResult, error) { } } +func (sw *ProgramSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *ProgramSubscription) Response() <-chan *ProgramResult { + typedChan := make(chan *ProgramResult, 1) + go func(ch chan *ProgramResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*ProgramResult) + }(typedChan) + return typedChan +} + func (sw *ProgramSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/rootSubscribe.go b/rpc/ws/rootSubscribe.go index 2d519dca..f54f30f5 100644 --- a/rpc/ws/rootSubscribe.go +++ b/rpc/ws/rootSubscribe.go @@ -51,6 +51,23 @@ func (sw *RootSubscription) Recv() (*RootResult, error) { } } +func (sw *RootSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *RootSubscription) Response() <-chan *RootResult { + typedChan := make(chan *RootResult, 1) + go func(ch chan *RootResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*RootResult) + }(typedChan) + return typedChan +} + func (sw *RootSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/slotSubscribe.go b/rpc/ws/slotSubscribe.go index d79aed36..05096bad 100644 --- a/rpc/ws/slotSubscribe.go +++ b/rpc/ws/slotSubscribe.go @@ -54,6 +54,23 @@ func (sw *SlotSubscription) Recv() (*SlotResult, error) { } } +func (sw *SlotSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *SlotSubscription) Response() <-chan *SlotResult { + typedChan := make(chan *SlotResult, 1) + go func(ch chan *SlotResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*SlotResult) + }(typedChan) + return typedChan +} + func (sw *SlotSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/slotsUpdatesSubscribe.go b/rpc/ws/slotsUpdatesSubscribe.go index 7aebb6c9..45712434 100644 --- a/rpc/ws/slotsUpdatesSubscribe.go +++ b/rpc/ws/slotsUpdatesSubscribe.go @@ -86,6 +86,23 @@ func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) { } } +func (sw *SlotsUpdatesSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *SlotsUpdatesSubscription) Response() <-chan *SlotsUpdatesResult { + typedChan := make(chan *SlotsUpdatesResult, 1) + go func(ch chan *SlotsUpdatesResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*SlotsUpdatesResult) + }(typedChan) + return typedChan +} + func (sw *SlotsUpdatesSubscription) Unsubscribe() { sw.sub.Unsubscribe() } diff --git a/rpc/ws/voteSubscribe.go b/rpc/ws/voteSubscribe.go index 9fc526bc..1d4aebc6 100644 --- a/rpc/ws/voteSubscribe.go +++ b/rpc/ws/voteSubscribe.go @@ -68,6 +68,23 @@ func (sw *VoteSubscription) Recv() (*VoteResult, error) { } } +func (sw *VoteSubscription) Err() <-chan error { + return sw.sub.err +} + +func (sw *VoteSubscription) Response() <-chan *VoteResult { + typedChan := make(chan *VoteResult, 1) + go func(ch chan *VoteResult) { + // TODO: will this subscription yield more than one result? + d, ok := <-sw.sub.stream + if !ok { + return + } + ch <- d.(*VoteResult) + }(typedChan) + return typedChan +} + func (sw *VoteSubscription) Unsubscribe() { sw.sub.Unsubscribe() }