Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wal: use page buffered writer for writing records #6310

Merged
merged 2 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions pkg/ioutil/pagewriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ioutil

import (
"io"
)

var defaultBufferBytes = 128 * 1024

// PageWriter implements the io.Writer interface so that writes will
// either be in page chunks or from flushing.
type PageWriter struct {
w io.Writer
// pageOffset tracks the page offset of the base of the buffer
pageOffset int
// pageBytes is the number of bytes per page
pageBytes int
// bufferedBytes counts the number of bytes pending for write in the buffer
bufferedBytes int
// buf holds the write buffer
buf []byte
// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
// to be flushed. It is less than len(buf) so there is space for slack writes
// to bring the writer to page alignment.
bufWatermarkBytes int
}

func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
return &PageWriter{
w: w,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,
}
}

func (pw *PageWriter) Write(p []byte) (n int, err error) {
if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
// no overflow
copy(pw.buf[pw.bufferedBytes:], p)
pw.bufferedBytes += len(p)
return len(p), nil
}
// complete the slack page in the buffer if unaligned
slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
if slack != pw.pageBytes {
partial := slack > len(p)
if partial {
// not enough data to complete the slack page
slack = len(p)
}
// special case: writing to slack page in buffer
copy(pw.buf[pw.bufferedBytes:], p[:slack])
pw.bufferedBytes += slack
n = slack
p = p[slack:]
if partial {
// avoid forcing an unaligned flush
return n, nil
}
}
// buffer contents are now page-aligned; clear out
if err = pw.Flush(); err != nil {
return n, err
}
// directly write all complete pages without copying
if len(p) > pw.pageBytes {
pages := len(p) / pw.pageBytes
c, werr := pw.w.Write(p[:pages*pw.pageBytes])
n += c
if werr != nil {
return n, werr
}
p = p[pages*pw.pageBytes:]
}
// write remaining tail to buffer
c, werr := pw.Write(p)
n += c
return n, werr
}

func (pw *PageWriter) Flush() error {
if pw.bufferedBytes == 0 {
return nil
}
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
pw.bufferedBytes = 0
return err
}
100 changes: 100 additions & 0 deletions pkg/ioutil/pagewriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ioutil

import (
"math/rand"
"testing"
)

func TestPageWriterRandom(t *testing.T) {
// smaller buffer for stress testing
defaultBufferBytes = 8 * 1024
pageBytes := 128
buf := make([]byte, 4*defaultBufferBytes)
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
w := NewPageWriter(cw, pageBytes)
n := 0
for i := 0; i < 4096; i++ {
c, err := w.Write(buf[:rand.Intn(len(buf))])
if err != nil {
t.Fatal(err)
}
n += c
}
if cw.writeBytes > n {
t.Fatalf("wrote %d bytes to io.Writer, but only wrote %d bytes", cw.writeBytes, n)
}
if cw.writeBytes-n > pageBytes {
t.Fatalf("got %d bytes pending, expected less than %d bytes", cw.writeBytes-n, pageBytes)
}
t.Logf("total writes: %d", cw.writes)
t.Logf("total write bytes: %d (of %d)", cw.writeBytes, n)
}

// TestPageWriterPariallack tests the case where a write overflows the buffer
// but there is not enough data to complete the slack write.
func TestPageWriterPartialSlack(t *testing.T) {
defaultBufferBytes = 1024
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes)
// put writer in non-zero page offset
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if cw.writes != 1 {
t.Fatalf("got %d writes, expected 1", cw.writes)
}
// nearly fill buffer
if _, err := w.Write(buf[:1022]); err != nil {
t.Fatal(err)
}
// overflow buffer, but without enough to write as aligned
if _, err := w.Write(buf[:8]); err != nil {
t.Fatal(err)
}
if cw.writes != 1 {
t.Fatalf("got %d writes, expected 1", cw.writes)
}
// finish writing slack space
if _, err := w.Write(buf[:128]); err != nil {
t.Fatal(err)
}
if cw.writes != 2 {
t.Fatalf("got %d writes, expected 2", cw.writes)
}
}

// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
type checkPageWriter struct {
pageBytes int
writes int
writeBytes int
t *testing.T
}

func (cw *checkPageWriter) Write(p []byte) (int, error) {
if len(p)%cw.pageBytes != 0 {
cw.t.Fatalf("got write len(p) = %d, expected len(p) == k*cw.pageBytes", len(p))
}
cw.writes++
cw.writeBytes += len(p)
return len(p), nil
}
11 changes: 8 additions & 3 deletions wal/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package wal

import (
"bufio"
"encoding/binary"
"hash"
"io"
"sync"

"github.com/coreos/etcd/pkg/crc"
"github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/wal/walpb"
)

// walPageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that WAL repair can
// safely between torn writes and ordinary data corruption.
const walPageBytes = 8 * minSectorSize

type encoder struct {
mu sync.Mutex
bw *bufio.Writer
bw *ioutil.PageWriter

crc hash.Hash32
buf []byte
Expand All @@ -36,7 +41,7 @@ type encoder struct {

func newEncoder(w io.Writer, prevCrc uint32) *encoder {
return &encoder{
bw: bufio.NewWriter(w),
bw: ioutil.NewPageWriter(w, walPageBytes),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),
Expand Down