From 71666e5d5be830c6c67f769268860b35944e4abb Mon Sep 17 00:00:00 2001 From: Chris Marslender Date: Sat, 16 Mar 2024 17:01:59 -0500 Subject: [PATCH] Rework the websocket client with new sync mode + more intuitive usage patterns + add daemon service (#113) * Rework the websocket mode to be a bit more intuiative to use + add a sync mode + ability to remove handlers after they're registered Sync mode will cause the rpc function calls to wait for their response before returning (request/response style calls vs async handlers) * Add daemon service * Update readme with new websocket usage patterns * Don't return errors from the http client methods that exist only to satisfy the interface * Simplify return signature of functions that can't actually return errors anyways (because of goroutines) * Make error messages a bit more detailed * Return errors from any methods that try to do something specifically not supported by the HTTP client, leave nil return as implied/only-option type functions --- go.mod | 1 + go.sum | 2 + pkg/httpclient/httpclient.go | 36 +++-- pkg/rpc/client.go | 46 +++--- pkg/rpc/clientoptions.go | 15 +- pkg/rpc/daemon.go | 39 +++++ pkg/rpc/readme.md | 63 +++++++- pkg/rpcinterface/client.go | 22 ++- pkg/util/request.go | 10 ++ pkg/websocketclient/websocketclient.go | 207 ++++++++++++++++++++----- 10 files changed, 359 insertions(+), 82 deletions(-) create mode 100644 pkg/rpc/daemon.go create mode 100644 pkg/util/request.go diff --git a/go.mod b/go.mod index 1e0deec..3c13a14 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/google/go-querystring v1.1.0 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/samber/mo v1.11.0 diff --git a/go.sum b/go.sum index d50e69e..f3aa86c 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= diff --git a/pkg/httpclient/httpclient.go b/pkg/httpclient/httpclient.go index bf55a30..df3c214 100644 --- a/pkg/httpclient/httpclient.go +++ b/pkg/httpclient/httpclient.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/go-querystring/query" + "github.com/google/uuid" "github.com/chia-network/go-chia-libs/pkg/config" "github.com/chia-network/go-chia-libs/pkg/rpcinterface" @@ -309,6 +310,8 @@ func (c *HTTPClient) httpClientForService(service rpcinterface.ServiceType) (*ht ) switch service { + case rpcinterface.ServiceDaemon: + return nil, fmt.Errorf("daemon RPC calls must be made with the websocket client") case rpcinterface.ServiceFullNode: if c.nodeClient == nil { c.nodeClient, err = c.generateHTTPClientForService(rpcinterface.ServiceFullNode) @@ -376,26 +379,37 @@ func (c *HTTPClient) httpClientForService(service rpcinterface.ServiceType) (*ht // The following are here to satisfy the interface, but are not used by the HTTP client -// SubscribeSelf subscribes to events in response to requests from this service -// Not applicable on the HTTP connection +// SubscribeSelf does not apply to the HTTP Client func (c *HTTPClient) SubscribeSelf() error { - return nil + return fmt.Errorf("subscriptions are not supported on the HTTP client - websockets are required for subscriptions") } -// Subscribe adds a subscription to events from a particular service +// Subscribe does not apply to the HTTP Client // Not applicable on the HTTP connection func (c *HTTPClient) Subscribe(service string) error { - return nil + return fmt.Errorf("subscriptions are not supported on the HTTP client - websockets are required for subscriptions") } -// ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else -// Not applicable on the HTTP connection -func (c *HTTPClient) ListenSync(handler rpcinterface.WebsocketResponseHandler) error { - return nil +// AddHandler does not apply to HTTP Client +func (c *HTTPClient) AddHandler(handler rpcinterface.WebsocketResponseHandler) (uuid.UUID, error) { + return uuid.Nil, fmt.Errorf("handlers are not supported on the HTTP client - reponses are returned directly from the calling functions") } -// AddDisconnectHandler Not applicable to the HTTP client +// RemoveHandler does not apply to HTTP Client +func (c *HTTPClient) RemoveHandler(handlerID uuid.UUID) {} + +// AddDisconnectHandler does not apply to the HTTP Client func (c *HTTPClient) AddDisconnectHandler(onDisconnect rpcinterface.DisconnectHandler) {} -// AddReconnectHandler Not applicable to the HTTP client +// AddReconnectHandler does not apply to the HTTP Client func (c *HTTPClient) AddReconnectHandler(onReconnect rpcinterface.ReconnectHandler) {} + +// SetSyncMode does not apply to the HTTP Client +func (c *HTTPClient) SetSyncMode() error { + return nil +} + +// SetAsyncMode does not apply to the HTTP Client +func (c *HTTPClient) SetAsyncMode() error { + return fmt.Errorf("async mode is not supported on the HTTP client") +} diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index bf94a67..6013499 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -1,13 +1,13 @@ package rpc import ( - "log" "net/http" + "github.com/google/uuid" + "github.com/chia-network/go-chia-libs/pkg/config" "github.com/chia-network/go-chia-libs/pkg/httpclient" "github.com/chia-network/go-chia-libs/pkg/rpcinterface" - "github.com/chia-network/go-chia-libs/pkg/types" "github.com/chia-network/go-chia-libs/pkg/websocketclient" ) @@ -18,6 +18,7 @@ type Client struct { activeClient rpcinterface.Client // Services for the different chia services + DaemonService *DaemonService FullNodeService *FullNodeService WalletService *WalletService FarmerService *FarmerService @@ -25,8 +26,6 @@ type Client struct { CrawlerService *CrawlerService DataLayerService *DataLayerService TimelordService *TimelordService - - websocketHandlers []rpcinterface.WebsocketResponseHandler } // ConnectionMode specifies the method used to connect to the server (HTTP or Websocket) @@ -64,6 +63,7 @@ func NewClient(connectionMode ConnectionMode, configOption rpcinterface.ConfigOp c.activeClient = activeClient // Init Services + c.DaemonService = &DaemonService{client: c} c.FullNodeService = &FullNodeService{client: c} c.WalletService = &WalletService{client: c} c.FarmerService = &FarmerService{client: c} @@ -103,16 +103,13 @@ func (c *Client) Subscribe(service string) error { // This is expected to NOT be used in conjunction with ListenSync // This will run in the background, and allow other things to happen in the foreground // while ListenSync will take over the foreground process -func (c *Client) AddHandler(handler rpcinterface.WebsocketResponseHandler) error { - c.websocketHandlers = append(c.websocketHandlers, handler) - - go func() { - err := c.ListenSync(c.handlerProxy) - if err != nil { - log.Printf("Error calling ListenSync: %s\n", err.Error()) - } - }() - return nil +func (c *Client) AddHandler(handler rpcinterface.WebsocketResponseHandler) (uuid.UUID, error) { + return c.activeClient.AddHandler(handler) +} + +// RemoveHandler removes the handler from the list of active response handlers +func (c *Client) RemoveHandler(handlerID uuid.UUID) { + c.activeClient.RemoveHandler(handlerID) } // AddDisconnectHandler the function to call when the client is disconnected @@ -125,15 +122,18 @@ func (c *Client) AddReconnectHandler(onReconnect rpcinterface.ReconnectHandler) c.activeClient.AddReconnectHandler(onReconnect) } -// handlerProxy matches the websocketRespHandler signature to send requests back to any registered handlers -// Here to support multiple handlers for a single event in the future -func (c *Client) handlerProxy(resp *types.WebsocketResponse, err error) { - for _, handler := range c.websocketHandlers { - handler(resp, err) - } +// SetSyncMode sets the client to wait for responses before returning +// This is default (and only option) for HTTP client +// Websocket client defaults to async mode +func (c *Client) SetSyncMode() error { + return c.activeClient.SetSyncMode() } -// ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else -func (c *Client) ListenSync(handler rpcinterface.WebsocketResponseHandler) error { - return c.activeClient.ListenSync(handler) +// SetAsyncMode sets the client to async mode +// This does not apply to the HTTP client +// For the websocket client, this is the default mode and means that RPC function calls return immediate with empty +// versions of the structs that would otherwise contain the response, and you should have an async handler defined +// to receive the response +func (c *Client) SetAsyncMode() error { + return c.activeClient.SetAsyncMode() } diff --git a/pkg/rpc/clientoptions.go b/pkg/rpc/clientoptions.go index e8a475e..add11d6 100644 --- a/pkg/rpc/clientoptions.go +++ b/pkg/rpc/clientoptions.go @@ -7,6 +7,7 @@ import ( "github.com/chia-network/go-chia-libs/pkg/config" "github.com/chia-network/go-chia-libs/pkg/httpclient" "github.com/chia-network/go-chia-libs/pkg/rpcinterface" + "github.com/chia-network/go-chia-libs/pkg/websocketclient" ) // WithAutoConfig automatically loads chia config from CHIA_ROOT @@ -23,6 +24,13 @@ func WithManualConfig(cfg config.ChiaConfig) rpcinterface.ConfigOptionFunc { } } +// WithSyncWebsocket is a helper to making the client and calling SetSyncMode to set the client to sync mode by default +func WithSyncWebsocket() rpcinterface.ClientOptionFunc { + return func(c rpcinterface.Client) error { + return c.SetSyncMode() + } +} + // WithBaseURL sets the host for RPC requests func WithBaseURL(url *url.URL) rpcinterface.ClientOptionFunc { return func(c rpcinterface.Client) error { @@ -46,11 +54,12 @@ func WithCache(validTime time.Duration) rpcinterface.ClientOptionFunc { // WithTimeout sets the timeout for the requests func WithTimeout(timeout time.Duration) rpcinterface.ClientOptionFunc { return func(c rpcinterface.Client) error { - typed, ok := c.(*httpclient.HTTPClient) - if ok { + switch typed := c.(type) { + case *httpclient.HTTPClient: + typed.Timeout = timeout + case *websocketclient.WebsocketClient: typed.Timeout = timeout } - return nil } } diff --git a/pkg/rpc/daemon.go b/pkg/rpc/daemon.go new file mode 100644 index 0000000..57576ab --- /dev/null +++ b/pkg/rpc/daemon.go @@ -0,0 +1,39 @@ +package rpc + +import ( + "net/http" + + "github.com/chia-network/go-chia-libs/pkg/rpcinterface" +) + +// DaemonService encapsulates direct daemon RPC methods +type DaemonService struct { + client *Client +} + +// NewRequest returns a new request specific to the crawler service +func (s *DaemonService) NewRequest(rpcEndpoint rpcinterface.Endpoint, opt interface{}) (*rpcinterface.Request, error) { + return s.client.NewRequest(rpcinterface.ServiceDaemon, rpcEndpoint, opt) +} + +// Do is just a shortcut to the client's Do method +func (s *DaemonService) Do(req *rpcinterface.Request, v interface{}) (*http.Response, error) { + return s.client.Do(req, v) +} + +// GetNetworkInfo gets the network name and prefix from the full node +func (s *DaemonService) GetNetworkInfo(opts *GetNetworkInfoOptions) (*GetNetworkInfoResponse, *http.Response, error) { + request, err := s.NewRequest("get_network_info", opts) + if err != nil { + return nil, nil, err + } + + r := &GetNetworkInfoResponse{} + + resp, err := s.Do(request, r) + if err != nil { + return nil, resp, err + } + + return r, resp, nil +} diff --git a/pkg/rpc/readme.md b/pkg/rpc/readme.md index bf1ff90..1a2563c 100644 --- a/pkg/rpc/readme.md +++ b/pkg/rpc/readme.md @@ -75,7 +75,7 @@ func main() { Websockets function asynchronously and as such, there are a few implementation differences compared to using the simpler HTTP request/response pattern. You must define a handler function to process responses received over the websocket connection, and you must also specifically subscribe to the events the handler should receive. -#### Handler Function +#### Handler Functions Handler functions must use the following signature: `func handlerFunc(data *types.WebsocketResponse, err error)`. The function will be passed the data that was received from the websocket and an error. @@ -106,13 +106,44 @@ func gotResponse(data *types.WebsocketResponse, err error) { } ``` -You may also use a blocking/synchronous handler function, if listening to websocket responses is all your main process is doing: +#### Synchronous Mode + +If you want websockets to behave more like request/response style calls, you can enable sync mode. + +To make all calls sync by default, you can set an option on the client: ```go package main import ( - "log" + "fmt" + + "github.com/chia-network/go-chia-libs/pkg/rpc" +) + +func main() { + client, err := rpc.NewClient(rpc.ConnectionModeWebsocket, rpc.WithAutoConfig(), rpc.WithSyncWebsocket()) + if err != nil { + // error happened + } + + netInfo, _, err := client.DaemonService.GetNetworkInfo(&rpc.GetNetworkInfoOptions{}) + if err != nil { + // error happened + } + + // netInfo has the actual network information, since we're running in sync mode + fmt.Println(netInfo.NetworkName.OrEmpty()) +} +``` + +You can also temporarily enable synchronous mode and then turn it back off + +```go +package main + +import ( + "fmt" "github.com/chia-network/go-chia-libs/pkg/rpc" "github.com/chia-network/go-chia-libs/pkg/types" @@ -121,19 +152,35 @@ import ( func main() { client, err := rpc.NewClient(rpc.ConnectionModeWebsocket, rpc.WithAutoConfig()) if err != nil { - log.Fatalln(err.Error()) + // error happened } - client.ListenSync(gotResponse) + client.AddHandler(gotAsyncResponse) - // Other application logic here + client.SetSyncMode() + + netInfo, _, err := client.DaemonService.GetNetworkInfo(&rpc.GetNetworkInfoOptions{}) + if err != nil { + // error happened + } + fmt.Println(netInfo.NetworkName.OrEmpty()) + + client.SetAsyncMode() } -func gotResponse(data *types.WebsocketResponse, err error) { - log.Printf("Received a `%s` command response\n", data.Command) +func gotAsyncResponse(data *types.WebsocketResponse, err error) { + log.Printf("Received a `%s` async command response\n", data.Command) } ``` +The output of this program will look something like the following. Note that both the async handler AND the sync response +variables saw the event and were able to handle it. + +```shell +Received a `get_network_info` command response +mainnet +``` + #### Subscribing to Events There are two helper functions to subscribe to events that come over the websocket. diff --git a/pkg/rpcinterface/client.go b/pkg/rpcinterface/client.go index fdc2275..1e0e1b8 100644 --- a/pkg/rpcinterface/client.go +++ b/pkg/rpcinterface/client.go @@ -3,6 +3,8 @@ package rpcinterface import ( "net/http" "net/url" + + "github.com/google/uuid" ) // Client defines the interface for a client @@ -19,8 +21,13 @@ type Client interface { SubscribeSelf() error // Subscribe adds a subscription to events from a particular service Subscribe(service string) error - // ListenSync Listens for async responses over the connection in a synchronous fashion, blocking anything else - ListenSync(handler WebsocketResponseHandler) error + + // AddHandler adds a handler function that will be called when a message is received over the websocket + // Does not apply to HTTP client + AddHandler(handler WebsocketResponseHandler) (uuid.UUID, error) + + // RemoveHandler removes the handler from the active websocket handlers + RemoveHandler(handlerID uuid.UUID) // AddDisconnectHandler adds a function to call if the connection is disconnected // Applies to websocket connections @@ -29,4 +36,15 @@ type Client interface { // AddReconnectHandler adds a function to call if the connection is reconnected // Applies to websocket connections AddReconnectHandler(onReconnect ReconnectHandler) + + // SetSyncMode enforces synchronous request/response behavior + // This is default for HTTP client, but websocket default is async, so this forces a different mode + // Note that anything received by the websocket in sync mode that is not the current expected response + // will be ignored + SetSyncMode() error + + // SetAsyncMode sets the client to async mode + // This is not supported for the HTTP client, but will set the websocket client back to async mode + // if it was set to sync mode temporarily + SetAsyncMode() error } diff --git a/pkg/util/request.go b/pkg/util/request.go new file mode 100644 index 0000000..72b703e --- /dev/null +++ b/pkg/util/request.go @@ -0,0 +1,10 @@ +package util + +import ( + "github.com/google/uuid" +) + +// GenerateRequestID generates a random string to use as a request ID +func GenerateRequestID() string { + return uuid.New().String() +} diff --git a/pkg/websocketclient/websocketclient.go b/pkg/websocketclient/websocketclient.go index 0c2a884..a241a94 100644 --- a/pkg/websocketclient/websocketclient.go +++ b/pkg/websocketclient/websocketclient.go @@ -1,20 +1,25 @@ package websocketclient import ( + "bytes" + "context" "crypto/tls" "encoding/json" "fmt" + "io" "log" "net/http" "net/url" "sync" "time" + "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/chia-network/go-chia-libs/pkg/config" "github.com/chia-network/go-chia-libs/pkg/rpcinterface" "github.com/chia-network/go-chia-libs/pkg/types" + "github.com/chia-network/go-chia-libs/pkg/util" ) const origin string = "go-chia-rpc" @@ -24,6 +29,9 @@ type WebsocketClient struct { config *config.ChiaConfig baseURL *url.URL + // Request timeout + Timeout time.Duration + daemonPort uint16 daemonKeyPair *tls.Certificate daemonDialer *websocket.Dialer @@ -31,11 +39,21 @@ type WebsocketClient struct { conn *websocket.Conn lock sync.Mutex + // listenSyncActive is tracking whether a client has opted to temporarily listen in sync mode for ALL requests listenSyncActive bool + // syncMode is tracking whether or not the actual RPC calls should behave like sync calls + // in this mode, we'll generate a request ID, and block until we get the request ID response back + // (or hit a timeout) + syncMode bool + // subscriptions Keeps track of subscribed topics, so we can re-subscribe if we lose a connection and reconnect subscriptions map[string]bool + // All registered functions that want data back from the websocket + websocketHandlerMutex sync.Mutex + websocketHandlers map[uuid.UUID]rpcinterface.WebsocketResponseHandler + disconnectHandlers []rpcinterface.DisconnectHandler reconnectHandlers []rpcinterface.ReconnectHandler } @@ -45,10 +63,13 @@ func NewWebsocketClient(cfg *config.ChiaConfig, options ...rpcinterface.ClientOp c := &WebsocketClient{ config: cfg, + Timeout: 10 * time.Second, // Default, overridable with client option + daemonPort: cfg.DaemonPort, - // Init the map - subscriptions: map[string]bool{}, + // Init the maps + subscriptions: map[string]bool{}, + websocketHandlers: map[uuid.UUID]rpcinterface.WebsocketResponseHandler{}, } // Sets the default host. Can be overridden by client options @@ -69,7 +90,7 @@ func NewWebsocketClient(cfg *config.ChiaConfig, options ...rpcinterface.ClientOp if fn == nil { continue } - if err := fn(c); err != nil { + if err = fn(c); err != nil { return nil, err } } @@ -103,12 +124,12 @@ func (c *WebsocketClient) NewRequest(service rpcinterface.ServiceType, rpcEndpoi } // Do sends an RPC request via the websocket -// *http.Response is always nil in this return, and exists to satisfy the interface that existed prior to -// websockets being supported in this library +// *http.Response is always nil in this return in async mode +// call SetSyncMode() to ensure the calls return the data in a synchronous fashion func (c *WebsocketClient) Do(req *rpcinterface.Request, v interface{}) (*http.Response, error) { err := c.ensureConnection() if err != nil { - return nil, err + return nil, fmt.Errorf("error ensuring connection: %w", err) } var destination string @@ -140,11 +161,70 @@ func (c *WebsocketClient) Do(req *rpcinterface.Request, v interface{}) (*http.Re Origin: origin, Destination: destination, Data: data, + RequestID: util.GenerateRequestID(), } c.lock.Lock() defer c.lock.Unlock() - return nil, c.conn.WriteJSON(request) + err = c.conn.WriteJSON(request) + if err != nil { + return nil, err + } + + return c.responseHelper(request, v) +} + +// responseHelper implements the logic to either immediately return in async mode +// or to wait for the expected response up to the defined timeout and returns the +// response in a synchronous fashion +func (c *WebsocketClient) responseHelper(request *types.WebsocketRequest, v interface{}) (*http.Response, error) { + if !c.syncMode { + return nil, nil + } + // We're in sync mode, so wait up to the timeout for the desired response, or else return an error + + errChan := make(chan error) + doneChan := make(chan bool) + ctx, cancelCtx := context.WithTimeout(context.Background(), c.Timeout) + defer cancelCtx() + + // Set up a handler to process responses and keep an eye out for the right one + handlerID, err := c.AddHandler(func(response *types.WebsocketResponse, err error) { + if response.RequestID == request.RequestID { + var err error + if v != nil { + reader := bytes.NewReader(response.Data) + if w, ok := v.(io.Writer); ok { + _, err = io.Copy(w, reader) + } else { + err = json.NewDecoder(reader).Decode(v) + } + if err != nil { + errChan <- err + } + } + doneChan <- true + } + }) + + if err != nil { + return nil, err + } + + for { + select { + case err = <-errChan: + c.RemoveHandler(handlerID) + return nil, err + case doneResult := <-doneChan: + if doneResult { + c.RemoveHandler(handlerID) + return nil, nil + } + case <-ctx.Done(): + return nil, fmt.Errorf("timeout of %s reached before getting a response", c.Timeout.String()) + } + } } // SubscribeSelf calls subscribe for any requests that this client makes to the server @@ -179,35 +259,29 @@ func (c *WebsocketClient) doSubscribe(service string) error { return err } -// ListenSync Listens for responses over the websocket connection in the foreground -// The error returned from this function would only correspond to an error setting up the listener -// Errors returned by ReadMessage, or some other part of the websocket request/response will be -// passed to the handler to deal with -func (c *WebsocketClient) ListenSync(handler rpcinterface.WebsocketResponseHandler) error { - if !c.listenSyncActive { - c.listenSyncActive = true +// AddHandler Adds a new handler function and returns its UUID for removing it later or an error +func (c *WebsocketClient) AddHandler(handler rpcinterface.WebsocketResponseHandler) (uuid.UUID, error) { + c.websocketHandlerMutex.Lock() + defer c.websocketHandlerMutex.Unlock() - for { - _, message, err := c.conn.ReadMessage() - if err != nil { - log.Println(err.Error()) - if _, isCloseErr := err.(*websocket.CloseError); !isCloseErr { - closeConnErr := c.conn.Close() - if closeConnErr != nil { - log.Printf("Error closing connection after error: %s\n", closeConnErr.Error()) - } - } - c.conn = nil - c.reconnectLoop() - continue - } - resp := &types.WebsocketResponse{} - err = json.Unmarshal(message, resp) - handler(resp, err) - } - } + handlerID := uuid.New() + c.websocketHandlers[handlerID] = handler - return nil + return handlerID, nil +} + +// RemoveHandler removes the handler from the list of active response handlers +func (c *WebsocketClient) RemoveHandler(handlerID uuid.UUID) { + c.websocketHandlerMutex.Lock() + defer c.websocketHandlerMutex.Unlock() + delete(c.websocketHandlers, handlerID) +} + +// handlerProxy matches the websocketRespHandler signature to send requests back to any registered handlers +func (c *WebsocketClient) handlerProxy(resp *types.WebsocketResponse, err error) { + for _, handler := range c.websocketHandlers { + handler(resp, err) + } } // AddDisconnectHandler the function to call when the client is disconnected @@ -220,6 +294,21 @@ func (c *WebsocketClient) AddReconnectHandler(onReconnect rpcinterface.Reconnect c.reconnectHandlers = append(c.reconnectHandlers, onReconnect) } +// SetSyncMode enforces synchronous request/response behavior +// RPC method calls return the actual expected RPC response when this mode is enabled +func (c *WebsocketClient) SetSyncMode() error { + c.syncMode = true + return nil +} + +// SetAsyncMode sets the client to async mode (default) +// RPC method calls return empty versions of the response objects, and you must have your own +// listeners to get the responses and handle them +func (c *WebsocketClient) SetAsyncMode() error { + c.syncMode = false + return nil +} + func (c *WebsocketClient) reconnectLoop() { for _, handler := range c.disconnectHandlers { handler() @@ -273,7 +362,7 @@ func (c *WebsocketClient) generateDialer() error { return nil } -// ensureConnection ensures there is an open websocket connection +// ensureConnection ensures there is an open websocket connection and the listener is listening func (c *WebsocketClient) ensureConnection() error { if c.conn == nil { u := url.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%d", c.baseURL.Host, c.daemonPort), Path: "/"} @@ -284,5 +373,53 @@ func (c *WebsocketClient) ensureConnection() error { } } + go c.listen() + return nil } + +// listen sets up a listener for all events and sends them back to handlerProxy +// The error returned from this function would only correspond to an error setting up the listener +// Errors returned by ReadMessage, or some other part of the websocket request/response will be +// passed to the handler to deal with +func (c *WebsocketClient) listen() { + if !c.listenSyncActive { + c.listenSyncActive = true + defer func() { + c.listenSyncActive = false + }() + + messageChan := make(chan []byte) + + // This reads messages from the websocket in the background allow us to either receive + // a message OR cancel + go func() { + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + log.Printf("Error reading message on chia websocket: %s\n", err.Error()) + if _, isCloseErr := err.(*websocket.CloseError); !isCloseErr { + log.Println("Chia websocket sent close message, attempting to close connection...") + closeConnErr := c.conn.Close() + if closeConnErr != nil { + log.Printf("Error closing chia websocket connection: %s\n", closeConnErr.Error()) + } + } + c.conn = nil + c.reconnectLoop() + continue + } + messageChan <- message + } + }() + + for { + message := <-messageChan + resp := &types.WebsocketResponse{} + err := json.Unmarshal(message, resp) + // Has to be called in goroutine so that the handler can potentially call cancel, which + // this select needs to also read in order to properly cancel + go c.handlerProxy(resp, err) + } + } +}