diff --git a/conn.go b/conn.go index d92dceb..17e87b2 100644 --- a/conn.go +++ b/conn.go @@ -541,9 +541,10 @@ func (c *Conn) writerDelayBufInner() (err error) { return } -// 延迟写消息 +// 延迟写消息, 对流量密集型的场景有用 // 1. 如果缓存的消息超过了多少条数 // 2. 如果缓存的消费超过了多久的时间 +// 3. TODO 最大缓存多少字节 func (c *Conn) WriteMessageDelay(op Opcode, writeBuf []byte) (err error) { if atomic.LoadInt32(&c.closed) == 1 { return ErrClosed diff --git a/conn_test.go b/conn_test.go index 5a349fb..bcc04ad 100644 --- a/conn_test.go +++ b/conn_test.go @@ -35,19 +35,21 @@ var ( type testMessageHandler struct { DefCallback - t *testing.T - need []byte - callbed int32 - server bool - count int - got chan []byte - done chan struct{} - output bool + t *testing.T + need []byte + callbed int32 + callbedChan chan bool + server bool + count int + got chan []byte + done chan struct{} + output bool } func (t *testMessageHandler) OnMessage(c *Conn, op opcode.Opcode, msg []byte) { need := append([]byte(nil), t.need...) atomic.StoreInt32(&t.callbed, 1) + t.callbedChan <- true if t.count == 0 { return } @@ -96,7 +98,7 @@ func (t *testMessageHandler) OnClose(c *Conn, err error) { func newServrEcho(t *testing.T, data []byte, output bool) *httptest.Server { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c, err := Upgrade(w, r, - WithServerCallback(&testMessageHandler{t: t, need: data, server: true, count: -1, output: true}), + WithServerCallback(&testMessageHandler{t: t, need: data, server: true, count: -1, output: true, callbedChan: make(chan bool, 1)}), ) if err != nil { t.Error(err) @@ -115,6 +117,7 @@ func Test_ReadMessage(t *testing.T) { t.Run("ReadMessage10", func(t *testing.T) { ts := newServrEcho(t, testBinaryMessage10, true) client := &testMessageHandler{t: t, need: append([]byte(nil), testBinaryMessage10...), count: 1, done: make(chan struct{}), output: true} + client.callbedChan = make(chan bool, 1) c, err := Dial(ts.URL, WithClientCallback(client)) if err != nil { t.Error(err) @@ -130,7 +133,10 @@ func Test_ReadMessage(t *testing.T) { return } // <-client.done - time.Sleep(time.Second / 3) + select { + case <-client.callbedChan: + case <-time.After(time.Second / 3): + } if atomic.LoadInt32(&client.callbed) != 1 { t.Error("not callbed") } @@ -139,6 +145,7 @@ func Test_ReadMessage(t *testing.T) { t.Run("ReadMessage64K", func(t *testing.T) { ts := newServrEcho(t, testBinaryMessage64kb, false) client := &testMessageHandler{t: t, need: append([]byte(nil), testBinaryMessage64kb...), count: 1} + client.callbedChan = make(chan bool, 1) c, err := Dial(ts.URL, WithClientCallback(client)) if err != nil { t.Error(err) @@ -148,7 +155,10 @@ func Test_ReadMessage(t *testing.T) { tmp := append([]byte(nil), testBinaryMessage64kb...) err = c.WriteMessage(Binary, tmp) - time.Sleep(time.Second / 3) + select { + case <-client.callbedChan: + case <-time.After(time.Second / 3): + } if err != nil { t.Error(err) return @@ -162,6 +172,7 @@ func Test_ReadMessage(t *testing.T) { t.Run("ReadMessage64K_Text", func(t *testing.T) { ts := newServrEcho(t, testTextMessage64kb, false) client := &testMessageHandler{t: t, need: append([]byte(nil), testTextMessage64kb...), count: 1} + client.callbedChan = make(chan bool, 1) c, err := Dial(ts.URL, WithClientCallback(client)) if err != nil { t.Error(err) @@ -176,8 +187,10 @@ func Test_ReadMessage(t *testing.T) { t.Error(err) return } - time.Sleep(time.Second / 3) - + select { + case <-client.callbedChan: + case <-time.After(time.Second / 3): + } if atomic.LoadInt32(&client.callbed) != 1 { t.Errorf("not callbed:%d\n", client.callbed) }