Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
use a single global lock to prevent concurrent calls to flush
Browse files Browse the repository at this point in the history
more fined grained locking is likely possible but this guarantees flush is not called for the same collection

fixes #269
  • Loading branch information
jipperinbham committed Feb 15, 2017
1 parent fad12d5 commit 0ea445e
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions pkg/adaptor/mongodb/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type bulkOperation struct {
avgTotal int
avgOpSize float64
bsonOpSize int
*sync.Mutex
}

func newBulker(db string, done chan struct{}, wg *sync.WaitGroup) *Bulk {
Expand All @@ -55,21 +54,16 @@ func newBulker(db string, done chan struct{}, wg *sync.WaitGroup) *Bulk {
func (b *Bulk) Write(msg message.Msg) func(client.Session) error {
return func(s client.Session) error {
coll := msg.Namespace()
b.RLock()
b.Lock()
bOp, ok := b.bulkMap[coll]
b.RUnlock()
if !ok {
s := s.(*Session).mgoSession.Copy()
bOp = &bulkOperation{
s: s,
bulk: s.DB(b.db).C(coll).Bulk(),
Mutex: &sync.Mutex{},
s: s,
bulk: s.DB(b.db).C(coll).Bulk(),
}
b.Lock()
b.bulkMap[coll] = bOp
b.Unlock()
}
bOp.Lock()
switch msg.OP() {
case ops.Delete:
bOp.bulk.Remove(bson.M{"_id": msg.Data().Get("_id")})
Expand All @@ -84,11 +78,11 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) error {
bOp.calculateAvgObjSize(msg.Data())
}
bOp.bsonOpSize = int(bOp.avgOpSize) * bOp.opCounter
bOp.Unlock()
var err error
if bOp.opCounter >= maxObjSize || bOp.bsonOpSize >= maxBSONObjSize {
err = b.flush(coll, bOp)
}
b.Unlock()
return err
}
}
Expand Down Expand Up @@ -122,11 +116,11 @@ func (b *Bulk) run(done chan struct{}, wg *sync.WaitGroup) {
}

func (b *Bulk) flushAll() error {
b.Lock()
for c, bOp := range b.bulkMap {
bOp.Lock()
b.flush(c, bOp)
bOp.Unlock()
}
b.Unlock()
return nil
}

Expand All @@ -142,8 +136,6 @@ func (b *Bulk) flush(c string, bOp *bulkOperation) error {
With("modified", result.Modified).
With("match", result.Matched).
Infoln("flush complete")
b.Lock()
delete(b.bulkMap, c)
b.Unlock()
return nil
}

0 comments on commit 0ea445e

Please sign in to comment.