-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdirectio.go
213 lines (173 loc) · 4.03 KB
/
directio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package directio
import (
"errors"
"io"
"os"
"unsafe"
)
const (
// O_DIRECT alignment is 512B
blockSize = 512
// Default buffer is 8KB (2 pages).
defaultBufSize = 8192
)
var _ io.WriteCloser = (*DirectIO)(nil)
// align returns an offset for alignment.
func align(b []byte) int {
return int(uintptr(unsafe.Pointer(&b[0])) & uintptr(blockSize-1))
}
// allocAlignedBuf allocates buffer that is aligned by blockSize.
func allocAlignedBuf(n int) ([]byte, error) {
if n == 0 {
return nil, errors.New("size is `0` can't allocate buffer")
}
// Allocate memory buffer
buf := make([]byte, n+blockSize)
// First memmory alignment
a1 := align(buf)
offset := 0
if a1 != 0 {
offset = blockSize - a1
}
buf = buf[offset : offset+n]
// Was alredy aligned. So just exit
if a1 == 0 {
return buf, nil
}
// Second alignment – check and exit
a2 := align(buf)
if a2 != 0 {
return nil, errors.New("can't allocate aligned buffer")
}
return buf, nil
}
// DirectIO bypasses page cache.
type DirectIO struct {
f *os.File
buf []byte
n int
err error
}
// NewSize returns a new DirectIO writer.
func NewSize(f *os.File, size int) (*DirectIO, error) {
if err := checkDirectIO(f.Fd()); err != nil {
return nil, err
}
if size%blockSize != 0 {
// align to blockSize
size = size & -blockSize
}
if size < defaultBufSize {
size = defaultBufSize
}
buf, err := allocAlignedBuf(size)
if err != nil {
return nil, err
}
return &DirectIO{
buf: buf,
f: f,
}, nil
}
// New returns a new DirectIO writer with default buffer size.
func New(f *os.File) (*DirectIO, error) {
return NewSize(f, defaultBufSize)
}
// flush writes buffered data to the underlying os.File.
func (d *DirectIO) flush() error {
if d.err != nil {
return d.err
}
if d.n == 0 {
return nil
}
n, err := d.f.Write(d.buf[0:d.n])
if n < d.n && err == nil {
err = io.ErrShortWrite
}
if err != nil {
if n > 0 && n < d.n {
copy(d.buf[0:d.n-n], d.buf[n:d.n])
}
}
d.n -= n
return err
}
// Flush writes buffered data to the underlying file.
func (d *DirectIO) Flush() error {
fd := d.f.Fd()
// Disable direct IO
err := setDirectIO(fd, false)
if err != nil {
return err
}
// Making write without alignment
err = d.flush()
if err != nil {
return err
}
// Enable direct IO back
return setDirectIO(fd, true)
}
// Available returns how many bytes are unused in the buffer.
func (d *DirectIO) Available() int { return len(d.buf) - d.n }
// Buffered returns the number of bytes that have been written into the current buffer.
func (d *DirectIO) Buffered() int { return d.n }
// Write writes the contents of p into the buffer.
// It returns the number of bytes written.
// If nn < len(p), it also returns an error explaining
// why the write is short.
func (d *DirectIO) Write(p []byte) (nn int, err error) {
// Write more than available in buffer.
for len(p) >= d.Available() && d.err == nil {
var n int
// Check if buffer is zero size for direct and zero copy write to Writer.
// Here we also check the p memory alignment.
// If buffer p is not aligned, than write through buffer d.buf and flush.
if d.Buffered() == 0 && align(p) == 0 {
// Large write, empty buffer.
if (len(p) % blockSize) == 0 {
// Data and buffer p are already aligned to block size.
// So write directly from p to avoid copy.
n, d.err = d.f.Write(p)
} else {
// Data needs alignment. Buffer alredy aligned.
// Align data
l := len(p) & -blockSize
// Write directly from p to avoid copy.
var nl int
nl, d.err = d.f.Write(p[:l])
// Save other data to buffer.
n = copy(d.buf[d.n:], p[l:])
d.n += n
// written and buffered data
n += nl
}
} else {
n = copy(d.buf[d.n:], p)
d.n += n
err = d.flush()
if err != nil {
return nn, err
}
}
nn += n
p = p[n:]
}
if d.err != nil {
return nn, d.err
}
n := copy(d.buf[d.n:], p)
d.n += n
nn += n
return nn, nil
}
func (d *DirectIO) Close() error {
if d.err == nil {
err := d.Flush()
if err != nil {
return err
}
}
return d.f.Close()
}