diff --git a/client.go b/client.go index fa76fc7e..7527d804 100644 --- a/client.go +++ b/client.go @@ -137,6 +137,20 @@ func (c *Client) Call(calls ...w3types.RPCCaller) error { return c.CallCtx(context.Background(), calls...) } +// SubscribeCtx creates a new subscription and returns a [rpc.ClientSubscription]. +func (c *Client) SubscribeCtx(ctx context.Context, s w3types.RPCSubscriber) (*rpc.ClientSubscription, error) { + namespace, ch, params, err := s.CreateRequest() + if err != nil { + return nil, err + } + return c.client.Subscribe(ctx, namespace, ch, params...) +} + +// Subscribe is like [Client.SubscribeCtx] with ctx equal to context.Background(). +func (c *Client) Subscribe(s w3types.RPCSubscriber) (*rpc.ClientSubscription, error) { + return c.SubscribeCtx(context.Background(), s) +} + func (c *Client) rateLimit(ctx context.Context, batchElems []rpc.BatchElem) error { if c.rl == nil { return nil diff --git a/client_test.go b/client_test.go index bf0d1128..a3aa66ef 100644 --- a/client_test.go +++ b/client_test.go @@ -197,6 +197,28 @@ func ExampleCallErrors() { // 0x00000000219ab540356cBB839Cbe05303d7705Fa: unknown symbol: execution reverted } +func ExampleClient_Subscribe() { + client := w3.MustDial("wss://mainnet.gateway.tenderly.co") + defer client.Close() + + txCh := make(chan *types.Transaction) + sub, err := client.Subscribe(eth.PendingTransactions(txCh)) + if err != nil { + fmt.Printf("Failed to subscribe: %v\n", err) + return + } + + for { + select { + case tx := <-txCh: + fmt.Printf("New pending tx: %s\n", tx.Hash()) + case err := <-sub.Err(): + fmt.Printf("Subscription error: %v\n", err) + return + } + } +} + func TestClientCall(t *testing.T) { tests := []struct { Buf *bytes.Buffer diff --git a/module/eth/subscribe.go b/module/eth/subscribe.go new file mode 100644 index 00000000..20090b31 --- /dev/null +++ b/module/eth/subscribe.go @@ -0,0 +1,33 @@ +package eth + +import ( + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + "github.com/lmittmann/w3/w3types" +) + +// NewHeads subscribes to notifications of updates to the blockchain head. +func NewHeads(ch chan<- *types.Header) w3types.RPCSubscriber { + return ðSubscription[*types.Header]{ch, []any{"newHeads"}, nil} +} + +// PendingTransactions subscribes to notifications about new pending transactions in the transaction pool. +func PendingTransactions(ch chan<- *types.Transaction) w3types.RPCSubscriber { + return ðSubscription[*types.Transaction]{ch, []any{"newPendingTransactions", true}, nil} +} + +// NewLogs subscribes to notifications about logs that match the given filter query. +func NewLogs(ch chan<- *types.Log, q ethereum.FilterQuery) w3types.RPCSubscriber { + arg, err := toFilterArg(q) + return ðSubscription[*types.Log]{ch, []any{"logs", arg}, err} +} + +type ethSubscription[T any] struct { + ch chan<- T + params []any + err error +} + +func (s *ethSubscription[T]) CreateRequest() (string, any, []any, error) { + return "eth", s.ch, s.params, s.err +} diff --git a/w3types/interfaces.go b/w3types/interfaces.go index 903c2ea3..95a35d34 100644 --- a/w3types/interfaces.go +++ b/w3types/interfaces.go @@ -37,6 +37,13 @@ type RPCCallerFactory[T any] interface { Returns(*T) RPCCaller } +// RPCSubscriber is the interface that wraps the basic CreateRequest method. +type RPCSubscriber interface { + // CreateRequest returns the namespace, channel, params for starting a new + // subscription and an error if the request cannot be created. + CreateRequest() (namespace string, ch any, params []any, err error) +} + // Caller is the interface that groups the basic CreateRequest and // HandleResponse methods. //