From 381a32f80a861a2e28812441563546202f8bebf4 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sun, 20 Aug 2023 12:42:12 +0800 Subject: [PATCH] =?UTF-8?q?+=E6=B5=8B=E8=AF=95=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common_options_test.go | 61 +++++++++++++++++++++++++++++++++++++----- conn.go | 3 +-- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/common_options_test.go b/common_options_test.go index 8b9cb0a..cade761 100644 --- a/common_options_test.go +++ b/common_options_test.go @@ -1268,7 +1268,7 @@ func Test_CommonOption(t *testing.T) { } }) - t.Run("13-15.server.local.1: WithServerBufioMultipleTimesPayloadSize", func(t *testing.T) { + t.Run("13-15.server.local.1: WriteMessageDelay", func(t *testing.T) { run := int32(0) data := make(chan string, 1) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1313,7 +1313,7 @@ func Test_CommonOption(t *testing.T) { } }) - t.Run("13-15.server.global.2: WithServerBufioMultipleTimesPayloadSize", func(t *testing.T) { + t.Run("13-15.server.global.2: WriteMessageDelay", func(t *testing.T) { run := int32(0) data := make(chan string, 1) upgrade := NewUpgrade(WithServerMaxDelayWriteDuration(time.Millisecond*20), WithServerMaxDelayWriteNum(3), WithServerDelayWriteInitBufferSize(4096), WithServerOnMessageFunc(func(c *Conn, op Opcode, payload []byte) { @@ -1356,7 +1356,7 @@ func Test_CommonOption(t *testing.T) { } }) - t.Run("13-15.client: WithServerBufioMultipleTimesPayloadSize", func(t *testing.T) { + t.Run("13-15.client: WriteMessageDelay", func(t *testing.T) { run := int32(0) data := make(chan string, 1) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1407,7 +1407,7 @@ func Test_CommonOption(t *testing.T) { } }) - t.Run("13-15.client: WithServerBufioMultipleTimesPayloadSize-Compress", func(t *testing.T) { + t.Run("13-15.client: WriteMessageDelay-Compress", func(t *testing.T) { run := int32(0) data := make(chan string, 1) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1491,7 +1491,7 @@ func Test_CommonOption(t *testing.T) { } }) - t.Run("13-15.client: WithServerBufioMultipleTimesPayloadSize-Compress-checkUTF8", func(t *testing.T) { + t.Run("13-15.client: WriteMessageDelay-Compress-checkUTF8", func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c, err := Upgrade(w, r, WithServerBufioParseMode(), @@ -1522,7 +1522,56 @@ func Test_CommonOption(t *testing.T) { con.StartReadLoop() }) - t.Run("13-15.client: WithServerBufioMultipleTimesPayloadSize-Compress-Close", func(t *testing.T) { + t.Run("13-15.client: WriteMessageDelay-timeout-send", func(t *testing.T) { + run := int32(0) + data := make(chan string, 1) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := Upgrade(w, r, + WithServerDecompressAndCompress(), + WithServerBufioParseMode(), + WithServerOnMessageFunc(func(c *Conn, op Opcode, payload []byte) { + c.WriteMessage(op, payload) + atomic.AddInt32(&run, int32(1)) + data <- string(payload) + })) + if err != nil { + t.Error(err) + } + c.StartReadLoop() + })) + + defer ts.Close() + + url := strings.ReplaceAll(ts.URL, "http", "ws") + con, err := Dial(url, WithClientDecompressAndCompress(), + WithClientDecompression(), + WithClientMaxDelayWriteDuration(10*time.Millisecond), + WithClientMaxDelayWriteNum(3), + WithClientWindowsParseMode(), + WithClientDelayWriteInitBufferSize(4096), + WithClientOnMessageFunc(func(c *Conn, op Opcode, payload []byte) { + // c.WriteMessageDelay(op, []byte("hello")) + })) + if err != nil { + t.Error(err) + } + defer con.Close() + + con.WriteMessageDelay(Text, []byte("hello")) + con.WriteMessageDelay(Text, []byte("hello")) + con.StartReadLoop() + select { + case d := <-data: + if d != "hello" { + t.Errorf("write message or read message fail:got:%s, need:hello\n", d) + } + case <-time.After(1000 * time.Millisecond): + } + if atomic.LoadInt32(&run) != 1 { + t.Error("not run server:method fail") + } + }) + t.Run("13-15.client: WriteMessageDelay-Compress-Close", func(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c, err := Upgrade(w, r, WithServerBufioParseMode(), diff --git a/conn.go b/conn.go index b27a80f..c691219 100644 --- a/conn.go +++ b/conn.go @@ -49,7 +49,7 @@ type delayWrite struct { delayMu sync.Mutex // 延迟写的锁 delayBuf *bytes.Buffer // 延迟写的缓冲区 delayTimeout *time.Timer // 延迟写的定时器 - delayErr error + delayErr error // TODO 原子操作 } type Conn struct { @@ -526,7 +526,6 @@ func (c *Conn) writerDelayBufSafe() { c.delayMu.Lock() c.delayErr = c.writerDelayBufInner() c.delayMu.Unlock() - return } func (c *Conn) writerDelayBufInner() (err error) {