Skip to content

Commit

Permalink
更新,修复测试代码概率性失败的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong committed Aug 13, 2023
1 parent 4f1965c commit c3f3dec
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,10 @@ func (c *Conn) writerDelayBufInner() (err error) {
return
}

// 延迟写消息
// 延迟写消息, 对流量密集型的场景有用
// 1. 如果缓存的消息超过了多少条数
// 2. 如果缓存的消费超过了多久的时间
// 3. TODO 最大缓存多少字节
func (c *Conn) WriteMessageDelay(op Opcode, writeBuf []byte) (err error) {
if atomic.LoadInt32(&c.closed) == 1 {
return ErrClosed
Expand Down
39 changes: 26 additions & 13 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ var (

type testMessageHandler struct {
DefCallback
t *testing.T
need []byte
callbed int32
server bool
count int
got chan []byte
done chan struct{}
output bool
t *testing.T
need []byte
callbed int32
callbedChan chan bool
server bool
count int
got chan []byte
done chan struct{}
output bool
}

func (t *testMessageHandler) OnMessage(c *Conn, op opcode.Opcode, msg []byte) {
need := append([]byte(nil), t.need...)
atomic.StoreInt32(&t.callbed, 1)
t.callbedChan <- true
if t.count == 0 {
return
}
Expand Down Expand Up @@ -96,7 +98,7 @@ func (t *testMessageHandler) OnClose(c *Conn, err error) {
func newServrEcho(t *testing.T, data []byte, output bool) *httptest.Server {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := Upgrade(w, r,
WithServerCallback(&testMessageHandler{t: t, need: data, server: true, count: -1, output: true}),
WithServerCallback(&testMessageHandler{t: t, need: data, server: true, count: -1, output: true, callbedChan: make(chan bool, 1)}),
)
if err != nil {
t.Error(err)
Expand All @@ -115,6 +117,7 @@ func Test_ReadMessage(t *testing.T) {
t.Run("ReadMessage10", func(t *testing.T) {
ts := newServrEcho(t, testBinaryMessage10, true)
client := &testMessageHandler{t: t, need: append([]byte(nil), testBinaryMessage10...), count: 1, done: make(chan struct{}), output: true}
client.callbedChan = make(chan bool, 1)
c, err := Dial(ts.URL, WithClientCallback(client))
if err != nil {
t.Error(err)
Expand All @@ -130,7 +133,10 @@ func Test_ReadMessage(t *testing.T) {
return
}
// <-client.done
time.Sleep(time.Second / 3)
select {
case <-client.callbedChan:
case <-time.After(time.Second / 3):
}
if atomic.LoadInt32(&client.callbed) != 1 {
t.Error("not callbed")
}
Expand All @@ -139,6 +145,7 @@ func Test_ReadMessage(t *testing.T) {
t.Run("ReadMessage64K", func(t *testing.T) {
ts := newServrEcho(t, testBinaryMessage64kb, false)
client := &testMessageHandler{t: t, need: append([]byte(nil), testBinaryMessage64kb...), count: 1}
client.callbedChan = make(chan bool, 1)
c, err := Dial(ts.URL, WithClientCallback(client))
if err != nil {
t.Error(err)
Expand All @@ -148,7 +155,10 @@ func Test_ReadMessage(t *testing.T) {

tmp := append([]byte(nil), testBinaryMessage64kb...)
err = c.WriteMessage(Binary, tmp)
time.Sleep(time.Second / 3)
select {
case <-client.callbedChan:
case <-time.After(time.Second / 3):
}
if err != nil {
t.Error(err)
return
Expand All @@ -162,6 +172,7 @@ func Test_ReadMessage(t *testing.T) {
t.Run("ReadMessage64K_Text", func(t *testing.T) {
ts := newServrEcho(t, testTextMessage64kb, false)
client := &testMessageHandler{t: t, need: append([]byte(nil), testTextMessage64kb...), count: 1}
client.callbedChan = make(chan bool, 1)
c, err := Dial(ts.URL, WithClientCallback(client))
if err != nil {
t.Error(err)
Expand All @@ -176,8 +187,10 @@ func Test_ReadMessage(t *testing.T) {
t.Error(err)
return
}
time.Sleep(time.Second / 3)

select {
case <-client.callbedChan:
case <-time.After(time.Second / 3):
}
if atomic.LoadInt32(&client.callbed) != 1 {
t.Errorf("not callbed:%d\n", client.callbed)
}
Expand Down

0 comments on commit c3f3dec

Please sign in to comment.