Skip to content

Commit

Permalink
Merge pull request #39 from lxzan/testing
Browse files Browse the repository at this point in the history
v1.6.6
  • Loading branch information
lxzan authored Aug 17, 2023
2 parents 7c1bb91 + 7b576be commit 6c89b70
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 103 deletions.
138 changes: 60 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# gws

### event-driven go websocket server & client
### Event-Driven Go WebSocket Server & Client

[![Mentioned in Awesome Go][11]][12] [![Build Status][1]][2] [![MIT licensed][3]][4] [![Go Version][5]][6] [![codecov][7]][8] [![Go Report Card][9]][10]

Expand Down Expand Up @@ -29,21 +29,21 @@
[12]: https://github.com/avelino/awesome-go#networking

- [gws](#gws)
- [Feature](#feature)
- [Attention](#attention)
- [Install](#install)
- [Event](#event)
- [Quick Start](#quick-start)
- [Best Practice](#best-practice)
- [Usage](#usage)
- [Upgrade from HTTP](#upgrade-from-http)
- [Unix Domain Socket](#unix-domain-socket)
- [Client Proxy](#client-proxy)
- [Broadcast](#broadcast)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
- [Communication](#communication)
- [Acknowledgments](#acknowledgments)
- [Event-Driven Go WebSocket Server \& Client](#event-driven-go-websocket-server--client)
- [Feature](#feature)
- [Attention](#attention)
- [Install](#install)
- [Event](#event)
- [Quick Start](#quick-start)
- [Best Practice](#best-practice)
- [Usage](#usage)
- [KCP](#kcp)
- [Proxy](#proxy)
- [Broadcast](#broadcast)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
- [Communication](#communication)
- [Acknowledgments](#acknowledgments)

### Feature

Expand All @@ -52,12 +52,14 @@
- [x] Dial via Proxy
- [x] IO Multiplexing
- [x] Concurrent Write
- [x] Zero Allocs Read/Write (Compression Disabled)
- [x] Passes WebSocket [autobahn-testsuite](https://lxzan.github.io/gws/reports/servers/)

### Attention

- The errors returned by the gws.Conn export methods are ignored, and are handled internally
- Transferring large files with gws tends to block the connection
- The errors returned by the gws.Conn export methods are ignored, and are handled internally.
- Transferring large files with gws tends to block the connection.
- If HTTP Server is reused, it is recommended to enable goroutine, as blocking will prevent the context from being GC.

### Install

Expand Down Expand Up @@ -96,71 +98,57 @@ package main

import (
"github.com/lxzan/gws"
"net/http"
"time"
)

const PingInterval = 10 * time.Second
const (
PingInterval = 10 * time.Second
PingWait = 5 * time.Second
)

func main() {
options := &gws.ServerOption{ReadAsyncEnabled: true, ReadAsyncGoLimit: 4, CompressEnabled: true}
gws.NewServer(new(Handler), options).Run(":6666")
upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
ReadAsyncEnabled: true,
CompressEnabled: true,
})
http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
return
}
go func() {
// Blocking prevents the context from being GC.
socket.ReadLoop()
}()
})
http.ListenAndServe(":6666", nil)
}

type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) { _ = socket.SetDeadline(time.Now().Add(2 * PingInterval)) }
func (c *Handler) OnOpen(socket *gws.Conn) {
_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
_ = socket.SetDeadline(time.Now().Add(2 * PingInterval))
_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
_ = socket.WritePong(nil)
}

func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
socket.WriteMessage(message.Opcode, message.Bytes())
}
```

### Usage

#### Upgrade from HTTP

```go
package main

import (
"github.com/lxzan/gws"
"log"
"net/http"
)

func main() {
upgrader := gws.NewUpgrader(new(gws.BuiltinEventHandler), &gws.ServerOption{
Authorize: func(r *http.Request, session gws.SessionStorage) bool {
session.Store("username", r.URL.Query().Get("username"))
return true
},
})

http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
log.Printf(err.Error())
return
}
socket.ReadLoop()
})

if err := http.ListenAndServe(":6666", nil); err != nil {
log.Fatalf("%v", err)
}
}
```

#### Unix Domain Socket
#### KCP

- server

Expand All @@ -169,20 +157,18 @@ package main

import (
"log"
"net"
"github.com/lxzan/gws"
kcp "github.com/xtaci/kcp-go"
)

func main() {
listener, err := net.Listen("unix", "/tmp/gws.sock")
listener, err := kcp.Listen(":6666")
if err != nil {
log.Println(err.Error())
return
}
var app = gws.NewServer(new(gws.BuiltinEventHandler), nil)
if err := app.RunListener(listener); err != nil {
log.Println(err.Error())
}
app := gws.NewServer(&gws.BuiltinEventHandler{}, nil)
app.RunListener(listener)
}
```

Expand All @@ -192,29 +178,27 @@ func main() {
package main

import (
"log"
"net"
"github.com/lxzan/gws"
kcp "github.com/xtaci/kcp-go"
"log"
)

func main() {
conn, err := net.Dial("unix", "/tmp/gws.sock")
conn, err := kcp.Dial("127.0.0.1:6666")
if err != nil {
log.Println(err.Error())
return
}

option := gws.ClientOption{}
socket, _, err := gws.NewClientFromConn(new(gws.BuiltinEventHandler), &option, conn)
app, _, err := gws.NewClientFromConn(&gws.BuiltinEventHandler{}, nil, conn)
if err != nil {
log.Println(err.Error())
return
}
socket.ReadLoop()
app.ReadLoop()
}
```

#### Client Proxy
#### Proxy

```go
package main
Expand Down Expand Up @@ -280,12 +264,10 @@ $ go test -benchmem -run=^$ -bench ^(BenchmarkConn_WriteMessage|BenchmarkConn_Re
goos: darwin
goarch: arm64
pkg: github.com/lxzan/gws
BenchmarkConn_WriteMessage/compress_disabled-8 4494459 239.2 ns/op 0 B/op 0 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-8 107365 10726 ns/op 509 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-8 3037701 395.6 ns/op 120 B/op 3 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-8 175388 6355 ns/op 7803 B/op 7 allocs/op
PASS
ok github.com/lxzan/gws 5.813s
BenchmarkConn_WriteMessage/compress_disabled-8 8713082 138.0 ns/op 0 B/op 0 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-8 144266 8066 ns/op 235 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-8 11608689 102.8 ns/op 12 B/op 0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-8 435176 2498 ns/op 98 B/op 1 allocs/op
```

### Communication
Expand Down
49 changes: 45 additions & 4 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/binary"
klauspost "github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
"io"
"net"
"testing"
)
Expand Down Expand Up @@ -55,8 +56,11 @@ func BenchmarkConn_WriteMessage(b *testing.B) {
}

func BenchmarkConn_ReadMessage(b *testing.B) {
var handler = &webSocketMocker{}
handler.onMessage = func(socket *Conn, message *Message) { _ = message.Close() }

b.Run("compress disabled", func(b *testing.B) {
var upgrader = NewUpgrader(&BuiltinEventHandler{}, nil)
var upgrader = NewUpgrader(handler, nil)
var conn1 = &Conn{
isServer: false,
conn: &benchConn{},
Expand All @@ -73,14 +77,14 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
handler: upgrader.eventHandler,
}
for i := 0; i < b.N; i++ {
reader = bytes.NewBuffer(buf.Bytes())
internal.ResetBuffer(reader, buf.Bytes())
conn2.br.Reset(reader)
_ = conn2.readMessage()
}
})

b.Run("compress enabled", func(b *testing.B) {
var upgrader = NewUpgrader(&BuiltinEventHandler{}, &ServerOption{CompressEnabled: true})
var upgrader = NewUpgrader(handler, &ServerOption{CompressEnabled: true})
var config = upgrader.option.getConfig()
var conn1 = &Conn{
isServer: false,
Expand All @@ -104,7 +108,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
decompressor: config.decompressors.Select(),
}
for i := 0; i < b.N; i++ {
reader = bytes.NewBuffer(buf.Bytes())
internal.ResetBuffer(reader, buf.Bytes())
conn2.br.Reset(reader)
_ = conn2.readMessage()
}
Expand Down Expand Up @@ -135,6 +139,43 @@ func BenchmarkKlauspostCompress(b *testing.B) {
}
}

func BenchmarkStdDeCompress(b *testing.B) {
buffer := bytes.NewBuffer(make([]byte, 0, len(githubData)))
fw, _ := flate.NewWriter(buffer, flate.BestSpeed)
contents := githubData
fw.Write(contents)
fw.Flush()

p := make([]byte, 4096)
fr := flate.NewReader(nil)
src := bytes.NewBuffer(nil)
for i := 0; i < b.N; i++ {
internal.ResetBuffer(src, buffer.Bytes())
_, _ = src.Write(internal.FlateTail)
resetter := fr.(flate.Resetter)
_ = resetter.Reset(src, nil)
io.CopyBuffer(io.Discard, fr, p)
}
}

func BenchmarkKlauspostDeCompress(b *testing.B) {
buffer := bytes.NewBuffer(make([]byte, 0, len(githubData)))
fw, _ := klauspost.NewWriter(buffer, klauspost.BestSpeed)
contents := githubData
fw.Write(contents)
fw.Flush()

fr := klauspost.NewReader(nil)
src := bytes.NewBuffer(nil)
for i := 0; i < b.N; i++ {
internal.ResetBuffer(src, buffer.Bytes())
_, _ = src.Write(internal.FlateTail)
resetter := fr.(klauspost.Resetter)
_ = resetter.Reset(src, nil)
fr.(io.WriterTo).WriteTo(io.Discard)
}
}

func BenchmarkMask(b *testing.B) {
var s1 = internal.AlphabetNumeric.Generate(1280)
var s2 = s1
Expand Down
9 changes: 7 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
if err != nil {
return nil, c.resp, err
}
var compressEnabled = c.option.CompressEnabled && strings.Contains(c.resp.Header.Get(internal.SecWebSocketExtensions.Key), "permessage-deflate")

var extensions = c.resp.Header.Get(internal.SecWebSocketExtensions.Key)
var compressEnabled = c.option.CompressEnabled && strings.Contains(extensions, internal.PermessageDeflate)
if compressEnabled && !strings.Contains(extensions, "server_no_context_takeover") {
return nil, c.resp, ErrCompressionNegotiation
}
return serveWebSocket(false, c.option.getConfig(), c.option.NewSessionStorage(), c.conn, br, c.eventHandler, compressEnabled, subprotocol), c.resp, nil
}

Expand All @@ -132,7 +137,7 @@ func (c *connector) getSubProtocol() (string, error) {
b := internal.Split(c.resp.Header.Get(internal.SecWebSocketProtocol.Key), ",")
subprotocol := internal.GetIntersectionElem(a, b)
if len(a) > 0 && subprotocol == "" {
return "", ErrHandshake
return "", ErrSubprotocolNegotiation
}
return subprotocol, nil
}
Expand Down
Loading

0 comments on commit 6c89b70

Please sign in to comment.