-
Notifications
You must be signed in to change notification settings - Fork 3
/
bursts.go
74 lines (68 loc) · 1.42 KB
/
bursts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main
import (
"net"
"time"
)
// ReceiveBursts reads "packets" (as separated by time of
// no data transfer) and sends them to a channel.
func ReceiveBursts(conn *net.TCPConn, maxTime time.Duration) <-chan []byte {
// Use a larger buffer to attempt to prevent backpressure from
// hurting our ability to measure time.
// It can still happen though if there is a lot of data being
// received.
res := make(chan []byte, 64)
go func() {
defer close(res)
rawData := readConnAsChan(conn)
for {
buffer, ok := <-rawData
if !ok {
break
}
ReadMoreLoop:
for {
select {
case data, ok := <-rawData:
if ok {
buffer = append(buffer, data...)
} else {
break ReadMoreLoop
}
case <-time.After(maxTime):
break ReadMoreLoop
}
}
res <- buffer
}
}()
return res
}
func readConnAsChan(conn *net.TCPConn) <-chan []byte {
rawReads := make(chan []byte, 64)
// Goroutine to read the raw data.
go func() {
defer close(rawReads)
for {
data := make([]byte, 0x10000)
n, err := conn.Read(data)
if err != nil {
return
}
rawReads <- data[:n]
}
}()
return rawReads
}
// WriteOrClose writes a buffer to a socket, and closes the
// socket if the write fails.
func WriteOrClose(conn *net.TCPConn, data []byte) error {
for len(data) > 0 {
n, err := conn.Write(data)
data = data[n:]
if err != nil {
conn.Close()
return err
}
}
return nil
}