-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreamptive_reader.go
150 lines (125 loc) · 4.35 KB
/
preamptive_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package dialogue
import (
"context"
"errors"
"io"
"sync/atomic"
)
// NewPreamptiveReader creates a new preamptive reader with the provided context used to inidcate cancelation. The created
// preamptive reader wraps and re directs reads to r.
func NewPreamptiveReader(ctx context.Context, r io.Reader) *PreamptiveReader {
pr := &PreamptiveReader{
r: r,
ctx: ctx,
writeFromMem: make(chan []byte),
writeFromRead: make(chan []byte),
bytesRead: make(chan int),
}
go pr.listen()
return pr
}
// PreamptiveReader is a wrapper around r which provides cancellable reads. Reads are 1:1 up untill the context gets cancelled.
// Calls to read after the context cancellation can block untill the previous read opperation is over. Once the read is over
// the previous buffer provided by the cancelled read call will be consumed on each subsequent read call till its all consumed and
// io.EOF is returned.
//
// IMPORTANT:
//
// Concurrent read calls will return an error.
//
// The read call to the source reader isnt cancelled itself, only the wrapped Read call returns early, the
// source read wont get cleaned up unitl the previous buffer is consumed.
//
// inspired by https://benjamincongdon.me/blog/2020/04/23/Cancelable-Reads-in-Go/
type PreamptiveReader struct {
// the source reader.
r io.Reader
// the context used to signal cancellations.
ctx context.Context
// claimRead signals wether the current read can be claimed or is already claimed.
//
// this makes the process synchronous, one reader at a time, the rest will return an error.
claimRead atomic.Bool
buf []byte // refference to the current buffer provided by the call to Read().
writeFromMem chan []byte // reads from buf.
writeFromRead chan []byte // reads from the reader.
bytesRead chan int // signals that the request read is over to whoever is intereseted.
err error // sticky error.
}
// listen reads from r or buf on demand.
func (r *PreamptiveReader) listen() {
var n int
var err error
for {
select {
case buf := <-r.writeFromMem: // a read request on a closed context, try to read from the remaining buffer, this is quick.
n, err = r.readFromBuf(buf)
case buf := <-r.writeFromRead: // commit to a new to read.
n, err = r.r.Read(buf)
r.buf = buf
}
if err != nil {
r.err = err
close(r.bytesRead)
return
}
select {
case buf := <-r.writeFromMem: // got read from mem request druing the read call.
n, _ := r.readFromBuf(buf)
r.bytesRead <- n
case r.bytesRead <- n:
}
}
}
// readFromBuf copies the accumulated r.buf into buf and truncates r.buf. If len(r.buf) == 0 io.EOF is returned.
func (r *PreamptiveReader) readFromBuf(buf []byte) (int, error) {
if len(r.buf) == 0 {
return 0, io.EOF
}
n := copy(buf, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// Read reads requests a read from the preamptive reader. It behaves normally but returns early with n = 0 and the context error
// when the context gets cancelled.
//
// The listen go routine wont get cleaned up till the stranded read returns, up until the cleanup, the first registered read will claim the
// stranded read and work as expected.
func (r *PreamptiveReader) Read(buf []byte) (int, error) {
if !r.claimRead.CompareAndSwap(false, true) {
return 0, errors.New("cannot claim a read during another read")
}
defer r.claimRead.Store(false)
// check if we are claiming a faulty read, if when claiming a reader we dont have an error we will
// either be the first ones to find out about the error or find no error at all.
if r.err != nil {
return 0, r.err
}
// check wether we are calling after a cancelled context or before.
select {
case <-r.ctx.Done(): // call after cancelled context. Read only from memory, this opperation is usually fast excluding the times when we wait for an ongoing read.
// FASTPATH: buffer size is 0 and read is from memory, return early.
if len(buf) == 0 {
return 0, nil
}
r.writeFromMem <- buf
n, ok := <-r.bytesRead
if !ok {
return 0, r.err
}
return n, nil
default:
}
// context not cancelled and we claimed the reader. We need to request a new read.
r.writeFromRead <- buf
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
case n, ok := <-r.bytesRead:
r.buf = nil // nullate the buffer when a 1:1 communication has gone through.
if !ok {
return 0, r.err
}
return n, nil
}
}