Skip to content

Commit

Permalink
fix #500
Browse files Browse the repository at this point in the history
Signed-off-by: ruitianzhong <ruitian-zhong@outlook.com>
  • Loading branch information
ruitianzhong authored and cfc4n committed Mar 6, 2024
1 parent bfb4a8c commit 144c89a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
58 changes: 56 additions & 2 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"ecapture/user/event"
"encoding/hex"
"sync/atomic"
"time"
)

Expand All @@ -30,6 +31,9 @@ type IWorker interface {
// 收包
Write(event.IEventStruct) error
GetUUID() string
IfUsed() bool
Get()
Put()
}

const (
Expand All @@ -49,6 +53,7 @@ type eventWorker struct {
processor *EventProcessor
parser IParser
payload *bytes.Buffer
used atomic.Bool
}

func NewEventWorker(uuid string, processor *EventProcessor) IWorker {
Expand Down Expand Up @@ -127,12 +132,43 @@ func (ew *eventWorker) parserEvents() []byte {
func (ew *eventWorker) Run() {
for {
select {
case _ = <-ew.ticker.C:
case <-ew.ticker.C:
// 输出包
if ew.tickerCount > MaxTickerCount {
//ew.processor.GetLogger().Printf("eventWorker TickerCount > %d, event closed.", MaxTickerCount)
ew.Close()
return
/*
When returned from ew.Close(), there are two possiblities:
1) no routine can touch it.
2) one routine can still touch ew because getWorkerByUUID()
*happen before* ew.Close()
When no routine can touch it (i.e.,ew.IfUsed == false),
we just drain the ew.incoming and return.
When one routine can touch it (i.e.,ew.IfUsed == true), we ensure
that we only return after the routine can not touch it
(i.e.,ew.IfUsed == false). At this point, we can ensure that no
other routine will touch it and send events through the ew.incoming.
So, we return.
Because eworker has been deleted from workqueue after ew.Close()
(ordered by a workqueue lock), at this point, we can ensure that
no ew will not be touched even **in the future**. So the return is
safe.
*/
for {
select {
case e := <-ew.incoming:
ew.writeEvent(e)
default:
if ew.IfUsed() {
continue
}
return
}
}
}
ew.tickerCount++
case e := <-ew.incoming:
Expand All @@ -151,3 +187,21 @@ func (ew *eventWorker) Close() {
ew.tickerCount = 0
ew.processor.delWorkerByUUID(ew)
}

func (ew *eventWorker) Get() {
if !ew.used.CompareAndSwap(false, true) {
panic("unexpected behavior and incorrect usage for eventWorker")
}
}

func (ew *eventWorker) Put() {
if !ew.used.CompareAndSwap(true, false) {
panic("unexpected behavior and incorrect usage for eventWorker")
}

}

func (ew *eventWorker) IfUsed() bool {

return ew.used.Load()
}
3 changes: 3 additions & 0 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (this *EventProcessor) dispatch(e event.IEventStruct) {
}

err := eWorker.Write(e)
eWorker.Put() // never touch eWorker again
if err != nil {
//...
this.GetLogger().Fatalf("write event failed , error:%v", err)
Expand All @@ -89,13 +90,15 @@ func (this *EventProcessor) getWorkerByUUID(uuid string) (bool, IWorker) {
if !found {
return false, eWorker
}
eWorker.Get()
return true, eWorker
}

func (this *EventProcessor) addWorkerByUUID(worker IWorker) {
this.Lock()
defer this.Unlock()
this.workerQueue[worker.GetUUID()] = worker
worker.Get()
}

// 每个worker调用该方法,从处理器中删除自己
Expand Down

0 comments on commit 144c89a

Please sign in to comment.