diff --git a/value.go b/value.go index e67507525..75bb8b4c7 100644 --- a/value.go +++ b/value.go @@ -45,6 +45,10 @@ import ( "golang.org/x/net/trace" ) +// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of +// uint32, so limiting at max uint32. +var maxVlogFileSize = math.MaxUint32 + // Values have their first byte being byteData or byteDelete. This helps us distinguish between // a key that has never been seen and a key that has been explicitly deleted. const ( @@ -1364,11 +1368,52 @@ func (vlog *valueLog) woffset() uint32 { return atomic.LoadUint32(&vlog.writableLogOffset) } +// validateWrites will check whether the given requests can fit into 4GB vlog file. +// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type +// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB. +func (vlog *valueLog) validateWrites(reqs []*request) error { + vlogOffset := uint64(vlog.woffset()) + for _, req := range reqs { + // calculate size of the request. + size := estimateRequestSize(req) + estimatedVlogOffset := vlogOffset + size + if estimatedVlogOffset > uint64(maxVlogFileSize) { + return errors.Errorf("Request size offset %d is bigger than maximum offset %d", + estimatedVlogOffset, maxVlogFileSize) + } + + if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) { + // We'll create a new vlog file if the estimated offset is greater or equal to + // max vlog size. So, resetting the vlogOffset. + vlogOffset = 0 + continue + } + // Estimated vlog offset will become current vlog offset if the vlog is not rotated. + vlogOffset = estimatedVlogOffset + } + return nil +} + +// estimateRequestSize returns the size that needed to be written for the given request. +func estimateRequestSize(req *request) uint64 { + size := uint64(0) + for _, e := range req.Entries { + size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size) + } + return size +} + // write is thread-unsafe by design and should not be called concurrently. func (vlog *valueLog) write(reqs []*request) error { if vlog.db.opt.InMemory { return nil } + // Validate writes before writing to vlog. Because, we don't want to partially write and return + // an error. + if err := vlog.validateWrites(reqs); err != nil { + return err + } + vlog.filesLock.RLock() maxFid := vlog.maxFid curlf := vlog.filesMap[maxFid] diff --git a/value_test.go b/value_test.go index fd5b9b096..f08348c9c 100644 --- a/value_test.go +++ b/value_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "math/rand" "os" "reflect" @@ -1237,3 +1238,64 @@ func TestValueEntryChecksum(t *testing.T) { require.NoError(t, db.Close()) }) } + +func TestValidateWrite(t *testing.T) { + // Mocking the file size, so that we don't allocate big memory while running test. + maxVlogFileSize = 400 + defer func() { + maxVlogFileSize = math.MaxUint32 + }() + + bigBuf := make([]byte, maxVlogFileSize+1) + log := &valueLog{ + opt: DefaultOptions("."), + } + + // Sending a request with big values which will overflow uint32. + key := []byte("HelloKey") + req := &request{ + Entries: []*Entry{ + { + Key: key, + Value: bigBuf, + }, + { + Key: key, + Value: bigBuf, + }, + { + Key: key, + Value: bigBuf, + }, + }, + } + + err := log.validateWrites([]*request{req}) + require.Error(t, err) + + // Testing with small values. + smallBuf := make([]byte, 4) + req1 := &request{ + Entries: []*Entry{ + { + Key: key, + Value: smallBuf, + }, + { + Key: key, + Value: smallBuf, + }, + { + Key: key, + Value: smallBuf, + }, + }, + } + + err = log.validateWrites([]*request{req1}) + require.NoError(t, err) + + // Batching small and big request. + err = log.validateWrites([]*request{req1, req}) + require.Error(t, err) +}