diff --git a/gossip/processor.go b/gossip/processor.go index 6aa0843a2..e6e9fc67c 100644 --- a/gossip/processor.go +++ b/gossip/processor.go @@ -90,13 +90,15 @@ func (d *BatchProcessor) wasProcessed(b *protocol.BatchSnapshots) bool { log.Infof("Error encoding batchsnapshots to calculate its digest. Dropping batch.") return false } - digest := hashing.NewSha256Hasher().Do(buf.Bytes()) + bb := buf.Bytes() + digest := hashing.NewSha256Hasher().Do(bb) // batch already processed, discard it _, err = d.a.Cache.Get(digest) if err == nil { + log.Debugf("Batch processor found %v on cache calculated from %v", digest, bb) return true } - d.a.Cache.Set(digest, []byte{0x0}, 0) + d.a.Cache.Set(digest, []byte{0x1}, 0) return false } @@ -127,7 +129,10 @@ func (d *BatchProcessor) Subscribe(id int, ch <-chan *Message) { ctx := context.WithValue(d.ctx, "batch", batch) for _, t := range d.tf { log.Debugf("Batch processor creating a new task") - d.a.Tasks.Add(t.New(ctx)) + err := d.a.Tasks.Add(t.New(ctx)) + if err != nil { + log.Infof("BatchProcessor was unable to enqueue new task becasue %v", err) + } } d.a.Out.Publish(msg)