From eeb0ed17bae422df371ed9deb09ca2e49efb365d Mon Sep 17 00:00:00 2001 From: Stephen Plaza Date: Thu, 10 Mar 2016 00:22:20 -0500 Subject: [PATCH 1/3] eliminates memory reference error in gbucket put; makes gbucket getprocessrange thread safe --- datatype/imageblk/write.go | 18 +++++--- storage/gbucket/gbucket.go | 89 ++++++++++++++++++-------------------- storage/storage.go | 16 +------ 3 files changed, 55 insertions(+), 68 deletions(-) diff --git a/datatype/imageblk/write.go b/datatype/imageblk/write.go index b5473282..3afc47d8 100644 --- a/datatype/imageblk/write.go +++ b/datatype/imageblk/write.go @@ -2,21 +2,19 @@ package imageblk import ( "fmt" - "io" - "log" - "sync" - "github.com/janelia-flyem/dvid/datastore" "github.com/janelia-flyem/dvid/dvid" "github.com/janelia-flyem/dvid/server" "github.com/janelia-flyem/dvid/storage" + "io" + "log" + "sync" ) // WriteBlock writes a subvolume or 2d image into a possibly intersecting block. func (v *Voxels) WriteBlock(block *storage.TKeyValue, blockSize dvid.Point) error { return v.writeBlock(block, blockSize) } - func (v *Voxels) writeBlock(block *storage.TKeyValue, blockSize dvid.Point) error { if blockSize.NumDims() > 3 { return fmt.Errorf("DVID voxel blocks currently only supports up to 3d, not 4+ dimensions") @@ -373,8 +371,13 @@ func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) { return } + ready := make(chan error, 1) callback := func() { // Notify any subscribers that you've changed block. + resperr := <-ready + if resperr != nil { + return + } var event string var delta interface{} if op.mutate { @@ -394,13 +397,14 @@ func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) { // put data -- use buffer if available ctx := datastore.NewVersionedCtx(d, op.version) if putbuffer != nil { - putbuffer.PutCallback(ctx, chunk.K, serialization, callback) + go callback() + putbuffer.PutCallback(ctx, chunk.K, serialization, ready) } else { if err := store.Put(ctx, chunk.K, serialization); err != nil { dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, err) return } - + ready <- nil callback() } } diff --git a/storage/gbucket/gbucket.go b/storage/gbucket/gbucket.go index 819163cf..47013e78 100644 --- a/storage/gbucket/gbucket.go +++ b/storage/gbucket/gbucket.go @@ -11,18 +11,13 @@ Explore tradeoff between smaller parallel requests and single big requests. * Restrict the number of parallel requests. * Refactor to call batcher for any calls that require multiple requests * DeleteAll should page deletion to avoid memory issues for very large deletes - Note: - * Range calls require a list query to find potentially matching objects. After this call, specific objects can be fetched. * Lists are eventually consistent and objects are strongly consistent after object post/change. It is possible to post an object and not see the object when searching the list. * The Batcher implementation does not wrap operations into an atomic transaction. - - */ - import ( "bytes" "encoding/base64" @@ -202,31 +197,32 @@ func (db *GBucket) deleteV(k storage.Key) error { // put value from a given key or an error if nothing exists func (db *GBucket) putV(k storage.Key, value []byte) (err error) { - - /*for i := 0; i < NUM_TRIES; i++ { - //debug.PrintStack() - // returns error if it doesn't exist - obj := obj_handle.NewWriter(db.ctx) - - // write data to buffer - numwrite, err2 := obj.Write(value) - - if err2 == nil { - if numwrite != len(value) { - err2 = fmt.Errorf("correct number of bytes not written") + /* + for i := 0; i < NUM_TRIES; i++ { + //debug.PrintStack() + // returns error if it doesn't exist + obj := obj_handle.NewWriter(db.ctx) + + // write data to buffer + numwrite, err2 := obj.Write(value) + + if err2 == nil { + if numwrite != len(value) { + err2 = fmt.Errorf("correct number of bytes not written") + } } - } - // close will flush buffer - err = obj.Close() + // close will flush buffer + err = obj.Close() - if err2 == nil && err == nil { - break - } + if err2 == nil && err == nil { + break + } - if err2 != nil { - err = err2 + if err2 != nil { + err = err2 + } } - }*/ + */ for i := 0; i < NUM_TRIES; i++ { // gets handle (no network op) @@ -843,7 +839,7 @@ type dbOp struct { tkEnd storage.TKey chunkop *storage.ChunkOp chunkfunc storage.ChunkFunc - callback func() + readychan chan error } // goBuffer allow operations to be submitted in parallel @@ -913,8 +909,8 @@ func (db *goBuffer) Put(ctx storage.Context, tkey storage.TKey, value []byte) er } -// PutCallback writes a value with given key in a possibly versioned context and call the callback. -func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value []byte, callback func()) error { +// PutCallback writes a value with given key in a possibly versioned context and notify the callback function. +func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value []byte, ready chan error) error { if db == nil { return fmt.Errorf("Can't call Put() on nil Google Bucket") } @@ -925,7 +921,7 @@ func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value [] var err error key := ctx.ConstructKey(tkey) if !ctx.Versioned() { - db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, callback: callback}) + db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, readychan: ready}) } else { vctx, ok := ctx.(storage.VersionedCtx) if !ok { @@ -933,7 +929,7 @@ func (db *goBuffer) PutCallback(ctx storage.Context, tkey storage.TKey, value [] } tombstoneKey := vctx.TombstoneKey(tkey) db.ops = append(db.ops, dbOp{op: delOpIgnoreExists, key: tombstoneKey}) - db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, callback: callback}) + db.ops = append(db.ops, dbOp{op: putOpCallback, key: key, value: value, readychan: ready}) if err != nil { dvid.Criticalf("Error on data put: %v\n", err) err = fmt.Errorf("Error on data put: %v", err) @@ -1047,7 +1043,7 @@ func (buffer *goBuffer) Flush() error { err = buffer.db.putV(opdata.key, opdata.value) storage.StoreKeyBytesWritten <- len(opdata.key) storage.StoreValueBytesWritten <- len(opdata.value) - opdata.callback() + opdata.readychan <- err } else if opdata.op == getOp { err = buffer.processRangeLocal(buffer.ctx, opdata.tkBeg, opdata.tkEnd, opdata.chunkop, opdata.chunkfunc, workQueue) } else { @@ -1115,55 +1111,54 @@ func (db *goBuffer) deleteRangeLocal(ctx storage.Context, TkBeg, TkEnd storage.T return nil } +type keyvalue_t struct { + key storage.Key + value []byte +} + // processRangeLocal implements ProcessRange functionality but with workQueue awareness func (db *goBuffer) processRangeLocal(ctx storage.Context, TkBeg, TkEnd storage.TKey, op *storage.ChunkOp, f storage.ChunkFunc, workQueue chan interface{}) error { // grab keys keys, _ := db.db.getKeysInRange(ctx, TkBeg, TkEnd) - // process keys in parallel - kvmap := make(map[string][]byte) - for _, key := range keys { - - kvmap[string(key)] = nil - } - // hackish -- release resource <-workQueue - var wg sync.WaitGroup + keyvalchan := make(chan keyvalue_t, len(keys)) for _, key := range keys { // use available threads workQueue <- nil - wg.Add(1) go func(lkey storage.Key) { defer func() { <-workQueue - wg.Done() }() value, err := db.db.getV(lkey) if value == nil || err != nil { - kvmap[string(lkey)] = nil + keyvalchan <- keyvalue_t{lkey, nil} } else { - kvmap[string(lkey)] = value + keyvalchan <- keyvalue_t{lkey, value} } }(key) } - wg.Wait() + + kvmap := make(map[string][]byte) + for range keys { + keyval := <-keyvalchan + kvmap[string(keyval.key)] = keyval.value + } // hackish -- reask for resource workQueue <- nil var err error - // return keyvalues for _, key := range keys { val := kvmap[string(key)] tk, err := ctx.TKeyFromKey(key) if err != nil { return err } - if val == nil { return fmt.Errorf("Could not retrieve value") } diff --git a/storage/storage.go b/storage/storage.go index 3b349ddb..a72338e6 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -458,8 +458,8 @@ type BufferableOps interface { type RequestBuffer interface { BufferableOps - // PutCallback writes a value with given key in a possibly versioned context and calls the call back when finished. - PutCallback(Context, TKey, []byte, func()) error + // PutCallback writes a value with given key in a possibly versioned context and signals the callback + PutCallback(Context, TKey, []byte, chan error) error // Flush processes all the queued jobs Flush() error @@ -499,22 +499,16 @@ type GraphSetter interface { // SetVertexProperty adds arbitrary data to a vertex using a string key SetVertexProperty(ctx Context, id dvid.VertexID, key string, value []byte) error - // SetEdgeProperty adds arbitrary data to an edge using a string key SetEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string, value []byte) error - // RemoveVertex removes the vertex and its properties and edges RemoveVertex(ctx Context, id dvid.VertexID) error - // RemoveEdge removes the edge defined by id1 and id2 and its properties RemoveEdge(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID) error - // RemoveGraph removes the entire graph including all vertices, edges, and properties RemoveGraph(ctx Context) error - // RemoveVertexProperty removes the property data for vertex id at the key RemoveVertexProperty(ctx Context, id dvid.VertexID, key string) error - // RemoveEdgeProperty removes the property data for edge at the key RemoveEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string) error } @@ -523,19 +517,14 @@ type GraphSetter interface { type GraphGetter interface { // GetVertices retrieves a list of all vertices in the graph GetVertices(ctx Context) ([]dvid.GraphVertex, error) - // GetEdges retrieves a list of all edges in the graph GetEdges(ctx Context) ([]dvid.GraphEdge, error) - // GetVertex retrieves a vertex given a vertex id GetVertex(ctx Context, id dvid.VertexID) (dvid.GraphVertex, error) - // GetVertex retrieves an edges between two vertex IDs GetEdge(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID) (dvid.GraphEdge, error) - // GetVertexProperty retrieves a property as a byte array given a vertex id GetVertexProperty(ctx Context, id dvid.VertexID, key string) ([]byte, error) - // GetEdgeProperty retrieves a property as a byte array given an edge defined by id1 and id2 GetEdgeProperty(ctx Context, id1 dvid.VertexID, id2 dvid.VertexID, key string) ([]byte, error) } @@ -544,6 +533,5 @@ type GraphGetter interface { type GraphDB interface { GraphSetter GraphGetter - Close() } From aadd0f37f6a3db3094ae1dfe0e4dbe1aa27f7a7f Mon Sep 17 00:00:00 2001 From: Stephen Plaza Date: Thu, 10 Mar 2016 00:44:10 -0500 Subject: [PATCH 2/3] fixes more range get concurrency issues in gbucket --- storage/gbucket/gbucket.go | 57 +++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/storage/gbucket/gbucket.go b/storage/gbucket/gbucket.go index 47013e78..f18e41a6 100644 --- a/storage/gbucket/gbucket.go +++ b/storage/gbucket/gbucket.go @@ -452,6 +452,11 @@ func (db *GBucket) SendKeysInRange(ctx storage.Context, TkBeg, TkEnd storage.TKe return nil } +type keyvalue_t struct { + key storage.Key + value []byte +} + // GetRange returns a range of values spanning (TkBeg, kEnd) keys. func (db *GBucket) GetRange(ctx storage.Context, TkBeg, TkEnd storage.TKey) ([]*storage.TKeyValue, error) { if db == nil { @@ -466,32 +471,28 @@ func (db *GBucket) GetRange(ctx storage.Context, TkBeg, TkEnd storage.TKey) ([]* // grab keys keys, _ := db.getKeysInRange(ctx, TkBeg, TkEnd) - // process keys in parallel - kvmap := make(map[string][]byte) - for _, key := range keys { - kvmap[string(key)] = nil - } - - var wg sync.WaitGroup + keyvalchan := make(chan keyvalue_t, len(keys)) for _, key := range keys { - wg.Add(1) go func(lkey storage.Key) { - defer wg.Done() value, err := db.getV(lkey) if value == nil || err != nil { - kvmap[string(lkey)] = nil + keyvalchan <- keyvalue_t{lkey, nil} } else { - kvmap[string(lkey)] = value + keyvalchan <- keyvalue_t{lkey, value} } - }(key) + } + kvmap := make(map[string][]byte) + for range keys { + keyval := <-keyvalchan + kvmap[string(keyval.key)] = keyval.value } - wg.Wait() var err error // return keyvalues - for key, val := range kvmap { + for _, key := range keys { + val := kvmap[string(key)] tk, err := ctx.TKeyFromKey(storage.Key(key)) if err != nil { return nil, err @@ -541,30 +542,27 @@ func (db *GBucket) RawRangeQuery(kStart, kEnd storage.Key, keysOnly bool, out ch // grab keys keys, _ := db.getKeysInRangeRaw(kStart, kEnd) - // process keys in parallel - kvmap := make(map[string][]byte) - for _, key := range keys { - kvmap[string(key)] = nil - } - var wg sync.WaitGroup + keyvalchan := make(chan keyvalue_t, len(keys)) for _, key := range keys { - wg.Add(1) go func(lkey storage.Key) { - defer wg.Done() value, err := db.getV(lkey) if value == nil || err != nil { - kvmap[string(lkey)] = nil + keyvalchan <- keyvalue_t{lkey, nil} } else { - kvmap[string(lkey)] = value + keyvalchan <- keyvalue_t{lkey, value} } - }(key) + } + kvmap := make(map[string][]byte) + for range keys { + keyval := <-keyvalchan + kvmap[string(keyval.key)] = keyval.value } - wg.Wait() // return keyvalues - for key, val := range kvmap { + for _, key := range keys { + val := kvmap[string(key)] if val == nil { return fmt.Errorf("Could not retrieve value") } @@ -1111,11 +1109,6 @@ func (db *goBuffer) deleteRangeLocal(ctx storage.Context, TkBeg, TkEnd storage.T return nil } -type keyvalue_t struct { - key storage.Key - value []byte -} - // processRangeLocal implements ProcessRange functionality but with workQueue awareness func (db *goBuffer) processRangeLocal(ctx storage.Context, TkBeg, TkEnd storage.TKey, op *storage.ChunkOp, f storage.ChunkFunc, workQueue chan interface{}) error { // grab keys From def48e6c371a1e6727657fb1157bf9830fc54c1d Mon Sep 17 00:00:00 2001 From: Stephen Plaza Date: Fri, 11 Mar 2016 00:15:12 -0500 Subject: [PATCH 3/3] adds error messaage for gbucket put fails --- datatype/imageblk/write.go | 1 + 1 file changed, 1 insertion(+) diff --git a/datatype/imageblk/write.go b/datatype/imageblk/write.go index 3afc47d8..c14e7436 100644 --- a/datatype/imageblk/write.go +++ b/datatype/imageblk/write.go @@ -376,6 +376,7 @@ func (d *Data) putChunk(chunk *storage.Chunk, putbuffer storage.RequestBuffer) { // Notify any subscribers that you've changed block. resperr := <-ready if resperr != nil { + dvid.Errorf("Unable to PUT voxel data for key %v: %v\n", chunk.K, resperr) return } var event string