Skip to content

Commit

Permalink
Merge pull request #135 from stephenplaza/master
Browse files Browse the repository at this point in the history
fixes some memory and concurrency bugs in gbucket
  • Loading branch information
DocSavage committed Mar 11, 2016
2 parents bcb248b + def48e6 commit 2687646
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 95 deletions.
19 changes: 12 additions & 7 deletions datatype/imageblk/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package imageblk

import (
"fmt"
"io"
"log"
"sync"

"github.com/janelia-flyem/dvid/datastore"
"github.com/janelia-flyem/dvid/dvid"
"github.com/janelia-flyem/dvid/server"
"github.com/janelia-flyem/dvid/storage"
"io"
"log"
"sync"
)

// WriteBlock writes a subvolume or 2d image into a possibly intersecting block.
func (v *Voxels) WriteBlock(block *storage.TKeyValue, blockSize dvid.Point) error {
return v.writeBlock(block, blockSize)
}

func (v *Voxels) writeBlock(block *storage.TKeyValue, blockSize dvid.Point) error {
if blockSize.NumDims() > 3 {
return fmt.Errorf("DVID voxel blocks currently only supports up to 3d, not 4+ dimensions")
Expand Down Expand Up @@ -373,8 +371,14 @@ func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) {
return
}

ready := make(chan error, 1)
callback := func() {
// Notify any subscribers that you've changed block.
resperr := <-ready
if resperr != nil {
dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, resperr)
return
}
var event string
var delta interface{}
if op.mutate {
Expand All @@ -394,13 +398,14 @@ func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) {
// put data -- use buffer if available
ctx := datastore.NewVersionedCtx(d, op.version)
if putbuffer != nil {
putbuffer.PutCallback(ctx, chunk.K, serialization, callback)
go callback()
putbuffer.PutCallback(ctx, chunk.K, serialization, ready)
} else {
if err := store.Put(ctx, chunk.K, serialization); err != nil {
dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, err)
return
}

ready <- nil
callback()
}
}
Expand Down
136 changes: 62 additions & 74 deletions storage/gbucket/gbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@ Explore tradeoff between smaller parallel requests and single big requests.
* Restrict the number of parallel requests.
* Refactor to call batcher for any calls that require multiple requests
* DeleteAll should page deletion to avoid memory issues for very large deletes
Note:
* Range calls require a list query to find potentially matching objects. After this
call, specific objects can be fetched.
* Lists are eventually consistent and objects are strongly consistent after object post/change.
It is possible to post an object and not see the object when searching the list.
* The Batcher implementation does not wrap operations into an atomic transaction.
*/

import (
"bytes"
"encoding/base64"
Expand Down Expand Up @@ -202,31 +197,32 @@ func (db *GBucket) deleteV(k storage.Key) error {

// put value from a given key or an error if nothing exists
func (db *GBucket) putV(k storage.Key, value []byte) (err error) {

/*for i := 0; i < NUM_TRIES; i++ {
//debug.PrintStack()
// returns error if it doesn't exist
obj := obj_handle.NewWriter(db.ctx)
// write data to buffer
numwrite, err2 := obj.Write(value)
if err2 == nil {
if numwrite != len(value) {
err2 = fmt.Errorf("correct number of bytes not written")
/*
for i := 0; i < NUM_TRIES; i++ {
//debug.PrintStack()
// returns error if it doesn't exist
obj := obj_handle.NewWriter(db.ctx)
// write data to buffer
numwrite, err2 := obj.Write(value)
if err2 == nil {
if numwrite != len(value) {
err2 = fmt.Errorf("correct number of bytes not written")
}
}
}
// close will flush buffer
err = obj.Close()
// close will flush buffer
err = obj.Close()
if err2 == nil && err == nil {
break
}
if err2 == nil && err == nil {
break
}
if err2 != nil {
err = err2
if err2 != nil {
err = err2
}
}
}*/
*/

for i := 0; i < NUM_TRIES; i++ {
// gets handle (no network op)
Expand Down Expand Up @@ -456,6 +452,11 @@ func (db *GBucket) SendKeysInRange(ctx storage.Context, TkBeg, TkEnd storage.TKe
return nil
}

type keyvalue_t struct {
key storage.Key
value []byte
}

// GetRange returns a range of values spanning (TkBeg, kEnd) keys.
func (db *GBucket) GetRange(ctx storage.Context, TkBeg, TkEnd storage.TKey) ([]*storage.TKeyValue, error) {
if db == nil {
Expand All @@ -470,32 +471,28 @@ func (db *GBucket) GetRange(ctx storage.Context, TkBeg, TkEnd storage.TKey) ([]*
// grab keys
keys, _ := db.getKeysInRange(ctx, TkBeg, TkEnd)

// process keys in parallel
kvmap := make(map[string][]byte)
for _, key := range keys {
kvmap[string(key)] = nil
}

var wg sync.WaitGroup
keyvalchan := make(chan keyvalue_t, len(keys))
for _, key := range keys {
wg.Add(1)
go func(lkey storage.Key) {
defer wg.Done()
value, err := db.getV(lkey)
if value == nil || err != nil {
kvmap[string(lkey)] = nil
keyvalchan <- keyvalue_t{lkey, nil}
} else {
kvmap[string(lkey)] = value
keyvalchan <- keyvalue_t{lkey, value}
}

}(key)
}

kvmap := make(map[string][]byte)
for range keys {
keyval := <-keyvalchan
kvmap[string(keyval.key)] = keyval.value
}
wg.Wait()

var err error
// return keyvalues
for key, val := range kvmap {
for _, key := range keys {
val := kvmap[string(key)]
tk, err := ctx.TKeyFromKey(storage.Key(key))
if err != nil {
return nil, err
Expand Down Expand Up @@ -545,30 +542,27 @@ func (db *GBucket) RawRangeQuery(kStart, kEnd storage.Key, keysOnly bool, out ch
// grab keys
keys, _ := db.getKeysInRangeRaw(kStart, kEnd)

// process keys in parallel
kvmap := make(map[string][]byte)
for _, key := range keys {
kvmap[string(key)] = nil
}
var wg sync.WaitGroup
keyvalchan := make(chan keyvalue_t, len(keys))
for _, key := range keys {
wg.Add(1)
go func(lkey storage.Key) {
defer wg.Done()
value, err := db.getV(lkey)
if value == nil || err != nil {
kvmap[string(lkey)] = nil
keyvalchan <- keyvalue_t{lkey, nil}
} else {
kvmap[string(lkey)] = value
keyvalchan <- keyvalue_t{lkey, value}
}

}(key)
}

kvmap := make(map[string][]byte)
for range keys {
keyval := <-keyvalchan
kvmap[string(keyval.key)] = keyval.value
}
wg.Wait()

// return keyvalues
for key, val := range kvmap {
for _, key := range keys {
val := kvmap[string(key)]
if val == nil {
return fmt.Errorf("Could not retrieve value")
}
Expand Down Expand Up @@ -843,7 +837,7 @@ type dbOp struct {
tkEnd storage.TKey
chunkop *storage.ChunkOp
chunkfunc storage.ChunkFunc
callback func()
readychan chan error
}

// goBuffer allow operations to be submitted in parallel
Expand Down Expand Up @@ -913,8 +907,8 @@ func (db *goBuffer) Put(ctx storage.Context, tkey storage.TKey, value []byte) er

}

// PutCallback writes a value with given key in a possibly versioned context and call the callback.
func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value []byte, callback func()) error {
// PutCallback writes a value with given key in a possibly versioned context and notify the callback function.
func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value []byte, ready chan error) error {
if db == nil {
return fmt.Errorf("Can't call Put() on nil Google Bucket")
}
Expand All @@ -925,15 +919,15 @@ func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value []
var err error
key := ctx.ConstructKey(tkey)
if !ctx.Versioned() {
db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, callback: callback})
db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, readychan: ready})
} else {
vctx, ok := ctx.(storage.VersionedCtx)
if !ok {
return fmt.Errorf("Non-versioned context that says it's versioned received in Put(): %v", ctx)
}
tombstoneKey := vctx.TombstoneKey(tkey)
db.ops = append(db.ops, dbOp{op: delOpIgnoreExists, key: tombstoneKey})
db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, callback: callback})
db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, readychan: ready})
if err != nil {
dvid.Criticalf("Error on data put: %v\n", err)
err = fmt.Errorf("Error on data put: %v", err)
Expand Down Expand Up @@ -1047,7 +1041,7 @@ func (buffer *goBuffer) Flush() error {
err = buffer.db.putV(opdata.key, opdata.value)
storage.StoreKeyBytesWritten <- len(opdata.key)
storage.StoreValueBytesWritten <- len(opdata.value)
opdata.callback()
opdata.readychan <- err
} else if opdata.op == getOp {
err = buffer.processRangeLocal(buffer.ctx, opdata.tkBeg, opdata.tkEnd, opdata.chunkop, opdata.chunkfunc, workQueue)
} else {
Expand Down Expand Up @@ -1120,50 +1114,44 @@ func (db *goBuffer) processRangeLocal(ctx storage.Context, TkBeg, TkEnd storage.
// grab keys
keys, _ := db.db.getKeysInRange(ctx, TkBeg, TkEnd)

// process keys in parallel
kvmap := make(map[string][]byte)
for _, key := range keys {

kvmap[string(key)] = nil
}

// hackish -- release resource
<-workQueue

var wg sync.WaitGroup
keyvalchan := make(chan keyvalue_t, len(keys))
for _, key := range keys {
// use available threads
workQueue <- nil
wg.Add(1)
go func(lkey storage.Key) {
defer func() {
<-workQueue
wg.Done()
}()
value, err := db.db.getV(lkey)
if value == nil || err != nil {
kvmap[string(lkey)] = nil
keyvalchan <- keyvalue_t{lkey, nil}
} else {
kvmap[string(lkey)] = value
keyvalchan <- keyvalue_t{lkey, value}
}

}(key)

}
wg.Wait()

kvmap := make(map[string][]byte)
for range keys {
keyval := <-keyvalchan
kvmap[string(keyval.key)] = keyval.value
}

// hackish -- reask for resource
workQueue <- nil

var err error
// return keyvalues
for _, key := range keys {
val := kvmap[string(key)]
tk, err := ctx.TKeyFromKey(key)
if err != nil {
return err
}

if val == nil {
return fmt.Errorf("Could not retrieve value")
}
Expand Down
16 changes: 2 additions & 14 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ type BufferableOps interface {
type RequestBuffer interface {
BufferableOps

// PutCallback writes a value with given key in a possibly versioned context and calls the call back when finished.
PutCallback(Context, TKey, []byte, func()) error
// PutCallback writes a value with given key in a possibly versioned context and signals the callback
PutCallback(Context, TKey, []byte, chan error) error

// Flush processes all the queued jobs
Flush() error
Expand Down Expand Up @@ -499,22 +499,16 @@ type GraphSetter interface {

// SetVertexProperty adds arbitrary data to a vertex using a string key
SetVertexProperty(ctx Context, id dvid.VertexID, key string, value []byte) error

// SetEdgeProperty adds arbitrary data to an edge using a string key
SetEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string, value []byte) error

// RemoveVertex removes the vertex and its properties and edges
RemoveVertex(ctx Context, id dvid.VertexID) error

// RemoveEdge removes the edge defined by id1 and id2 and its properties
RemoveEdge(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID) error

// RemoveGraph removes the entire graph including all vertices, edges, and properties
RemoveGraph(ctx Context) error

// RemoveVertexProperty removes the property data for vertex id at the key
RemoveVertexProperty(ctx Context, id dvid.VertexID, key string) error

// RemoveEdgeProperty removes the property data for edge at the key
RemoveEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string) error
}
Expand All @@ -523,19 +517,14 @@ type GraphSetter interface {
type GraphGetter interface {
// GetVertices retrieves a list of all vertices in the graph
GetVertices(ctx Context) ([]dvid.GraphVertex, error)

// GetEdges retrieves a list of all edges in the graph
GetEdges(ctx Context) ([]dvid.GraphEdge, error)

// GetVertex retrieves a vertex given a vertex id
GetVertex(ctx Context, id dvid.VertexID) (dvid.GraphVertex, error)

// GetVertex retrieves an edges between two vertex IDs
GetEdge(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID) (dvid.GraphEdge, error)

// GetVertexProperty retrieves a property as a byte array given a vertex id
GetVertexProperty(ctx Context, id dvid.VertexID, key string) ([]byte, error)

// GetEdgeProperty retrieves a property as a byte array given an edge defined by id1 and id2
GetEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string) ([]byte, error)
}
Expand All @@ -544,6 +533,5 @@ type GraphGetter interface {
type GraphDB interface {
GraphSetter
GraphGetter

Close()
}

0 comments on commit 2687646

Please sign in to comment.