diff --git a/go.mod b/go.mod index efad30c6ba83b..fd6104c1c40bb 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/influxdata/go-syslog/v3 v3.0.1-0.20201128200927-a1889d947b48 github.com/influxdata/telegraf v1.16.3 github.com/jmespath/go-jmespath v0.4.0 - github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible + github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.16.7 github.com/klauspost/pgzip v1.2.5 diff --git a/go.sum b/go.sum index 465a01a6ac13a..792b91d419e70 100644 --- a/go.sum +++ b/go.sum @@ -802,6 +802,7 @@ github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBT github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/uuid v2.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -1176,8 +1177,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= -github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible h1:f4ZGkY12AQ+YvzWDDWMLMGejA4ceg7nIPlqJ9fQ9T4c= -github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw= +github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a h1:sfe532Ipn7GX0V6mHdynBk393rDmqgI0QmjLK7ct7TU= +github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62/go.mod h1:U+RSyWxWd04xTqnuOQxnai7XGS2PrPY2cfGoDKtMHjA= diff --git a/vendor/github.com/joncrlsn/dque/queue.go b/vendor/github.com/joncrlsn/dque/queue.go index 317abc141cff5..fb701cf9c74da 100644 --- a/vendor/github.com/joncrlsn/dque/queue.go +++ b/vendor/github.com/joncrlsn/dque/queue.go @@ -60,8 +60,7 @@ type DQue struct { mutex sync.Mutex - emptyCond *sync.Cond - mutexEmptyCond sync.Mutex + emptyCond *sync.Cond turbo bool } @@ -92,13 +91,17 @@ func New(name string, dirPath string, itemsPerSegment int, builder func() interf q.fullPath = fullPath q.config.ItemsPerSegment = itemsPerSegment q.builder = builder - q.emptyCond = sync.NewCond(&q.mutexEmptyCond) + q.emptyCond = sync.NewCond(&q.mutex) if err := q.lock(); err != nil { return nil, err } if err := q.load(); err != nil { + er := q.fileLock.Unlock() + if er != nil { + return nil, er + } return nil, err } @@ -127,13 +130,17 @@ func Open(name string, dirPath string, itemsPerSegment int, builder func() inter q.fullPath = fullPath q.config.ItemsPerSegment = itemsPerSegment q.builder = builder - q.emptyCond = sync.NewCond(&q.mutexEmptyCond) + q.emptyCond = sync.NewCond(&q.mutex) if err := q.lock(); err != nil { return nil, err } if err := q.load(); err != nil { + er := q.fileLock.Unlock() + if er != nil { + return nil, er + } return nil, err } @@ -241,6 +248,10 @@ func (q *DQue) Dequeue() (interface{}, error) { q.mutex.Lock() defer q.mutex.Unlock() + return q.dequeueLocked() +} + +func (q *DQue) dequeueLocked() (interface{}, error) { if q.fileLock == nil { return nil, ErrQueueClosed } @@ -305,6 +316,10 @@ func (q *DQue) Peek() (interface{}, error) { q.mutex.Lock() defer q.mutex.Unlock() + return q.peekLocked() +} + +func (q *DQue) peekLocked() (interface{}, error) { if q.fileLock == nil { return nil, ErrQueueClosed } @@ -324,10 +339,10 @@ func (q *DQue) Peek() (interface{}, error) { // DequeueBlock behaves similar to Dequeue, but is a blocking call until an item is available. func (q *DQue) DequeueBlock() (interface{}, error) { - q.mutexEmptyCond.Lock() - defer q.mutexEmptyCond.Unlock() + q.mutex.Lock() + defer q.mutex.Unlock() for { - obj, err := q.Dequeue() + obj, err := q.dequeueLocked() if err == ErrEmpty { q.emptyCond.Wait() // Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine. @@ -342,10 +357,10 @@ func (q *DQue) DequeueBlock() (interface{}, error) { // PeekBlock behaves similar to Peek, but is a blocking call until an item is available. func (q *DQue) PeekBlock() (interface{}, error) { - q.mutexEmptyCond.Lock() - defer q.mutexEmptyCond.Unlock() + q.mutex.Lock() + defer q.mutex.Unlock() for { - obj, err := q.Peek() + obj, err := q.peekLocked() if err == ErrEmpty { q.emptyCond.Wait() // Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine. diff --git a/vendor/modules.txt b/vendor/modules.txt index 97cf4c4cc6365..cafc9755c6684 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1042,8 +1042,8 @@ github.com/jcmturner/rpc/v2/ndr # github.com/jmespath/go-jmespath v0.4.0 ## explicit; go 1.14 github.com/jmespath/go-jmespath -# github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible -## explicit +# github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a +## explicit; go 1.13 github.com/joncrlsn/dque # github.com/josharian/intern v1.0.0 ## explicit; go 1.5