Skip to content

Commit

Permalink
Merge pull request #237 from jbenet/dont-write-twice
Browse files Browse the repository at this point in the history
blockservice: dont write blocks twice
  • Loading branch information
jbenet committed Oct 30, 2014
2 parents f0d823c + 483ccf9 commit 461e5a3
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 70 deletions.
17 changes: 15 additions & 2 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,26 @@ func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, err
// AddBlock adds a particular block to the service, Putting it into the datastore.
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
k := b.Key()
log.Debug("blockservice: storing [%s] in datastore", k)
// TODO(brian): define a block datastore with a Put method which accepts a
// block parameter
err := s.Datastore.Put(k.DsKey(), b.Data)

// check if we have it before adding. this is an extra read, but large writes
// are more expensive.
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
has, err := s.Datastore.Has(k.DsKey())
if err != nil {
return k, err
}
if has {
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
} else {
log.Debugf("blockservice: storing [%s] in datastore", k)
err := s.Datastore.Put(k.DsKey(), b.Data)
if err != nil {
return k, err
}
}

if s.Remote != nil {
ctx := context.TODO()
err = s.Remote.HasBlock(ctx, *b)
Expand Down
53 changes: 53 additions & 0 deletions importer/chunk/splitting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package chunk

import (
"bytes"
"crypto/rand"
"testing"
)

func randBuf(t *testing.T, size int) []byte {
buf := make([]byte, size)
if _, err := rand.Read(buf); err != nil {
t.Fatal("failed to read enough randomness")
}
return buf
}

func copyBuf(buf []byte) []byte {
cpy := make([]byte, len(buf))
copy(cpy, buf)
return cpy
}

func TestSizeSplitterIsDeterministic(t *testing.T) {

test := func() {
bufR := randBuf(t, 10000000) // crank this up to satisfy yourself.
bufA := copyBuf(bufR)
bufB := copyBuf(bufR)

chunksA := DefaultSplitter.Split(bytes.NewReader(bufA))
chunksB := DefaultSplitter.Split(bytes.NewReader(bufB))

for n := 0; ; n++ {
a, moreA := <-chunksA
b, moreB := <-chunksB

if !moreA {
if moreB {
t.Fatal("A ended, B didnt.")
}
return
}

if !bytes.Equal(a, b) {
t.Fatalf("chunk %d not equal", n)
}
}
}

for run := 0; run < 1; run++ { // crank this up to satisfy yourself.
test()
}
}
2 changes: 1 addition & 1 deletion pin/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func randNode() (*mdag.Node, util.Key) {
nd := new(mdag.Node)
nd.Data = make([]byte, 32)
util.NewFastRand().Read(nd.Data)
util.NewTimeSeededRand().Read(nd.Data)
k, _ := nd.Key()
return nd, k
}
Expand Down
8 changes: 4 additions & 4 deletions unixfs/io/dagmodifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService {
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})

n, err := io.CopyN(dw, u.NewFastRand(), size)
n, err := io.CopyN(dw, u.NewTimeSeededRand(), size)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -58,7 +58,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod

func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := u.NewFastRand()
r := u.NewTimeSeededRand()
r.Read(newdata)

if size+beg > uint64(len(orig)) {
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestMultiWrite(t *testing.T) {
}

data := make([]byte, 4000)
u.NewFastRand().Read(data)
u.NewTimeSeededRand().Read(data)

for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[i:i+1], uint64(i))
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestMultiWriteCoal(t *testing.T) {
}

data := make([]byte, 4000)
u.NewFastRand().Read(data)
u.NewTimeSeededRand().Read(data)

for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[:i+1], 0)
Expand Down
73 changes: 27 additions & 46 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,65 +60,46 @@ func NewByteChanReader(in chan []byte) io.Reader {
return &byteChanReader{in: in}
}

func (bcr *byteChanReader) Read(b []byte) (int, error) {
if len(bcr.buf) == 0 {
data, ok := <-bcr.in
if !ok {
return 0, io.EOF
}
bcr.buf = data
}

if len(bcr.buf) >= len(b) {
copy(b, bcr.buf)
bcr.buf = bcr.buf[len(b):]
return len(b), nil
}

copy(b, bcr.buf)
b = b[len(bcr.buf):]
totread := len(bcr.buf)
func (bcr *byteChanReader) Read(output []byte) (int, error) {
remain := output
remainLen := len(output)
outputLen := 0
more := false
next := bcr.buf

for data := range bcr.in {
if len(data) > len(b) {
totread += len(b)
copy(b, data[:len(b)])
bcr.buf = data[len(b):]
return totread, nil
for {
n := copy(remain, next)
remainLen -= n
outputLen += n
if remainLen == 0 {
bcr.buf = next[n:]
return outputLen, nil
}
copy(b, data)
totread += len(data)
b = b[len(data):]
if len(b) == 0 {
return totread, nil

remain = remain[n:]
next, more = <-bcr.in
if !more {
return outputLen, io.EOF
}
}
return totread, io.EOF
}

type randGen struct {
src rand.Source
rand.Rand
}

func NewFastRand() io.Reader {
return &randGen{rand.NewSource(time.Now().UnixNano())}
func NewTimeSeededRand() io.Reader {
src := rand.NewSource(time.Now().UnixNano())
return &randGen{
Rand: *rand.New(src),
}
}

func (r *randGen) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.src.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val & 0xff)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
for i := 0; i < len(p); i++ {
p[i] = byte(r.Rand.Intn(255))
}
return len(p), nil
}

// GetenvBool is the way to check an env var as a boolean
Expand Down
40 changes: 23 additions & 17 deletions util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"bytes"
"io/ioutil"
"math/rand"
"testing"

Expand Down Expand Up @@ -30,31 +29,38 @@ func TestKey(t *testing.T) {
}

func TestByteChanReader(t *testing.T) {
data := make([]byte, 1024*1024)
r := NewFastRand()
r.Read(data)

var data bytes.Buffer
var data2 bytes.Buffer
dch := make(chan []byte, 8)
randr := NewTimeSeededRand()

go func() {
beg := 0
for i := 0; i < len(data); {
i += rand.Intn(100) + 1
if i > len(data) {
i = len(data)
}
dch <- data[beg:i]
beg = i
defer close(dch)
for i := 0; i < rand.Intn(100)+100; i++ {
chunk := make([]byte, rand.Intn(100000)+10)
randr.Read(chunk)
data.Write(chunk)
// fmt.Printf("chunk: %6.d %v\n", len(chunk), chunk[:10])
dch <- chunk
}
close(dch)
}()

read := NewByteChanReader(dch)
out, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)

// read in random, weird sizes to exercise saving buffer.
for {
buf := make([]byte, rand.Intn(10)*10)
n, err := read.Read(buf)
data2.Write(buf[:n])
// fmt.Printf("read: %6.d\n", n)
if err != nil {
break
}
}

if !bytes.Equal(out, data) {
// fmt.Printf("lens: %d == %d\n", len(out), len(data.Bytes()))
if !bytes.Equal(data2.Bytes(), data.Bytes()) {
t.Fatal("Reader failed to stream correct bytes")
}
}
Expand Down

0 comments on commit 461e5a3

Please sign in to comment.