Skip to content

Commit

Permalink
Merge pull request #27 from lxzan/testing
Browse files Browse the repository at this point in the history
v1.6.0
  • Loading branch information
lxzan authored Jun 12, 2023
2 parents d37aeb6 + cbf5b40 commit f72fc31
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 275 deletions.
52 changes: 38 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

[12]: https://github.com/avelino/awesome-go#networking

- [GWS](#gws)
- [gws](#gws)
- [Feature](#feature)
- [Attention](#attention)
- [Install](#install)
Expand All @@ -38,6 +38,7 @@
- [Usage](#usage)
- [Upgrade from HTTP](#upgrade-from-http)
- [Unix Domain Socket](#unix-domain-socket)
- [Client Proxy](#client-proxy)
- [Broadcast](#broadcast)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
Expand All @@ -48,16 +49,16 @@

### Feature

- [x] Fully passes the WebSocket [autobahn-testsuite](https://lxzan.github.io/gws/reports/servers/)
- [x] Thread safety guarantees for writing messages
- [x] High IOPS and low latency, low CPU usage
- [x] IO multiplexing support, concurrent message processing and asynchronous non-blocking message writing
- [x] Fast upgrade from TCP to WebSocket, dramatically reduce memory usage
- [x] fully passes the websocket [autobahn-testsuite](https://lxzan.github.io/gws/reports/servers/)
- [x] thread safety guarantees for writing messages
- [x] high iops and low latency, low cpu usage
- [x] io multiplexing support, concurrent message processing and asynchronous non-blocking message writing
- [x] fast upgrade from tcp to websocket, dramatically reduce memory usage
- [x] create client via proxy

### Attention

- The errors returned by the gws.Conn export methods are ignored, and are handled internally
- When an exception occurs, one and only one of OnError/OnClose will be triggered
- Transferring large files with gws tends to block the connection

### Install
Expand All @@ -71,8 +72,7 @@ go get -v github.com/lxzan/gws@latest
```go
type Event interface {
OnOpen(socket *Conn)
OnError(socket *Conn, err error)
OnClose(socket *Conn, code uint16, reason []byte)
OnClose(socket *Conn, err error)
OnPing(socket *Conn, payload []byte)
OnPong(socket *Conn, payload []byte)
OnMessage(socket *Conn, message *Message)
Expand Down Expand Up @@ -112,11 +112,7 @@ type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) { _ = socket.SetDeadline(time.Now().Add(2 * PingInterval)) }

func (c *Handler) DeleteSession(socket *gws.Conn) {}

func (c *Handler) OnError(socket *gws.Conn, err error) { c.DeleteSession(socket) }

func (c *Handler) OnClose(socket *gws.Conn, code uint16, reason []byte) { c.DeleteSession(socket) }
func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
_ = socket.SetDeadline(time.Now().Add(2 * PingInterval))
Expand Down Expand Up @@ -220,6 +216,34 @@ func main() {
}
```

#### Client Proxy

```go
package main

import (
"crypto/tls"
"github.com/lxzan/gws"
"golang.org/x/net/proxy"
"log"
)

func main() {
socket, _, err := gws.NewClient(new(gws.BuiltinEventHandler), &gws.ClientOption{
Addr: "wss://example.com/connect",
TlsConfig: &tls.Config{InsecureSkipVerify: true},
NewDialer: func() (gws.Dialer, error) {
return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
},
})
if err != nil {
log.Println(err.Error())
return
}
socket.ReadLoop()
}
```

#### Broadcast

```go
Expand Down
14 changes: 7 additions & 7 deletions assets/read_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"length": 2000000,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -76,7 +76,7 @@
"length": 0,
"payload": "cebae1bdb9cf83cebcceb5eda080656469746564",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -86,7 +86,7 @@
"length": 6,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -96,7 +96,7 @@
"length": 6,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -106,7 +106,7 @@
"length": 6,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -126,7 +126,7 @@
"length": 0,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand All @@ -146,7 +146,7 @@
"length": 256,
"payload": "",
"expected": {
"event": "onError"
"event": "onClose"
}
},
{
Expand Down
12 changes: 2 additions & 10 deletions autobahn/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ func (c *WebSocket) OnOpen(socket *gws.Conn) {
_ = socket.SetDeadline(time.Now().Add(30 * time.Second))
}

func (c *WebSocket) OnError(socket *gws.Conn, err error) {
c.onexit <- struct{}{}
}

func (c *WebSocket) OnClose(socket *gws.Conn, code uint16, reason []byte) {
func (c *WebSocket) OnClose(socket *gws.Conn, err error) {
c.onexit <- struct{}{}
}

Expand All @@ -72,11 +68,7 @@ func (c *updateReportsHandler) OnOpen(socket *gws.Conn) {
_ = socket.SetDeadline(time.Now().Add(5 * time.Second))
}

func (c *updateReportsHandler) OnError(socket *gws.Conn, err error) {
c.onexit <- struct{}{}
}

func (c *updateReportsHandler) OnClose(socket *gws.Conn, code uint16, reason []byte) {
func (c *updateReportsHandler) OnClose(socket *gws.Conn, err error) {
c.onexit <- struct{}{}
}

Expand Down
6 changes: 1 addition & 5 deletions autobahn/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func main() {

type WebSocket struct{}

func (c *WebSocket) OnClose(socket *gws.Conn, code uint16, reason []byte) {
fmt.Printf("onclose: code=%d, payload=%s\n", code, string(reason))
}

func (c *WebSocket) OnError(socket *gws.Conn, err error) {
func (c *WebSocket) OnClose(socket *gws.Conn, err error) {
fmt.Printf("onerror: err=%s\n", err.Error())
}

Expand Down
103 changes: 44 additions & 59 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,83 +14,65 @@ import (
"github.com/lxzan/gws/internal"
)

type dialer struct {
type Dialer interface {
Dial(network, addr string) (c net.Conn, err error)
}

type connector struct {
option *ClientOption
conn net.Conn
eventHandler Event
resp *http.Response
secWebsocketKey string
}

// NewClient 创建WebSocket客户端; 支持ws/wss
// Create WebSocket client, support ws/wss
func NewClient(handler Event, option *ClientOption) (client *Conn, resp *http.Response, e error) {
if option == nil {
option = new(ClientOption)
}
option.initialize()

var d = &dialer{eventHandler: handler, option: option}
defer func() {
if e != nil && !internal.IsNil(d.conn) {
_ = d.conn.Close()
}
}()

// NewClient
func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, eventHandler: handler, resp: &http.Response{}}
URL, err := url.Parse(option.Addr)
if err != nil {
return nil, d.resp, err
return nil, nil, err
}
if URL.Scheme != "ws" && URL.Scheme != "wss" {
return nil, nil, internal.ErrSchema
}

var dialError error
var hostname = URL.Hostname()
var port = URL.Port()
switch URL.Scheme {
case "ws":
if port == "" {
port = "80"
}
host := hostname + ":" + port
d.conn, dialError = net.DialTimeout("tcp", host, option.DialTimeout)
case "wss":
if port == "" {
port = "443"
}
host := hostname + ":" + port
var tlsDialer = &net.Dialer{Timeout: option.DialTimeout}
d.conn, dialError = tls.DialWithDialer(tlsDialer, "tcp", host, option.TlsConfig)
default:
return nil, d.resp, internal.ErrSchema
var tlsEnabled = URL.Scheme == "wss"
dialer, err := option.NewDialer()
if err != nil {
return nil, nil, err
}

if dialError != nil {
return nil, d.resp, dialError
port := internal.SelectValue(URL.Port() == "", internal.SelectValue(tlsEnabled, "443", "80"), URL.Port())
hp := internal.SelectValue(URL.Hostname() == "", "127.0.0.1", URL.Hostname()) + ":" + port
c.conn, err = dialer.Dial("tcp", hp)
if err != nil {
return nil, nil, err
}
if err := d.conn.SetDeadline(time.Now().Add(option.DialTimeout)); err != nil {
return nil, d.resp, err
if tlsEnabled {
c.conn = tls.Client(c.conn, option.TlsConfig)
}
return d.handshake()

client, resp, err := c.handshake()
if err != nil {
_ = c.conn.Close()
}
return client, resp, err
}

// NewClientFromConn
func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (client *Conn, resp *http.Response, e error) {
if option == nil {
option = new(ClientOption)
}
option.initialize()
d := &dialer{option: option, conn: conn, eventHandler: handler}
defer func() {
if e != nil && !internal.IsNil(d.conn) {
_ = d.conn.Close()
}
}()
if err := d.conn.SetDeadline(time.Now().Add(option.DialTimeout)); err != nil {
return nil, d.resp, err
func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, conn: conn, eventHandler: handler, resp: &http.Response{}}
client, resp, err := c.handshake()
if err != nil {
_ = c.conn.Close()
}
return d.handshake()
return client, resp, err
}

func (c *dialer) writeRequest() (*http.Request, error) {
func (c *connector) writeRequest() (*http.Request, error) {
r, err := http.NewRequest(http.MethodGet, c.option.Addr, nil)
if err != nil {
return nil, err
Expand All @@ -112,11 +94,14 @@ func (c *dialer) writeRequest() (*http.Request, error) {
return r, r.Write(c.conn)
}

func (c *dialer) handshake() (*Conn, *http.Response, error) {
func (c *connector) handshake() (*Conn, *http.Response, error) {
if err := c.conn.SetDeadline(time.Now().Add(c.option.HandshakeTimeout)); err != nil {
return nil, c.resp, err
}
br := bufio.NewReaderSize(c.conn, c.option.ReadBufferSize)
request, err := c.writeRequest()
if err != nil {
return nil, nil, err
return nil, c.resp, err
}
var channel = make(chan error)
go func() {
Expand All @@ -139,7 +124,7 @@ func (c *dialer) handshake() (*Conn, *http.Response, error) {
return serveWebSocket(false, c.option.getConfig(), new(sliceMap), c.conn, br, c.eventHandler, compressEnabled), c.resp, nil
}

func (c *dialer) checkHeaders() error {
func (c *connector) checkHeaders() error {
if c.resp.StatusCode != http.StatusSwitchingProtocols {
return internal.ErrStatusCode
}
Expand Down
Loading

0 comments on commit f72fc31

Please sign in to comment.