Skip to content

Commit

Permalink
feat: buffer 包新增 Unbounded 实现
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Sep 18, 2023
1 parent 12d1aba commit d56c1df
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions utils/buffer/unbounded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package buffer

import (
"sync"
)

// NewUnbounded 创建一个无界缓冲区
// - generateNil: 生成空值的函数,该函数仅需始终返回 nil 即可
//
// 该缓冲区来源于 gRPC 的实现,用于在不使用额外 goroutine 的情况下实现无界缓冲区
// - 该缓冲区的所有方法都是线程安全的,除了用于同步的互斥锁外,不会阻塞任何东西
func NewUnbounded[V any](generateNil func() V) *Unbounded[V] {
return &Unbounded[V]{c: make(chan V, 1), nil: generateNil()}
}

// Unbounded 是无界缓冲区的实现
type Unbounded[V any] struct {
c chan V
closed bool
mu sync.Mutex
backlog []V
nil V
}

// Put 将数据放入缓冲区
func (slf *Unbounded[V]) Put(t V) {
slf.mu.Lock()
defer slf.mu.Unlock()
if slf.closed {
return
}
if len(slf.backlog) == 0 {
select {
case slf.c <- t:
return
default:
}
}
slf.backlog = append(slf.backlog, t)
}

// Load 将缓冲区中的数据发送到读取通道中,如果缓冲区中没有数据,则不会发送
func (slf *Unbounded[V]) Load() {
slf.mu.Lock()
defer slf.mu.Unlock()
if slf.closed {
return
}
if len(slf.backlog) > 0 {
select {
case slf.c <- slf.backlog[0]:
slf.backlog[0] = slf.nil
slf.backlog = slf.backlog[1:]
default:
}
}
}

// Get returns a read channel on which values added to the buffer, via Put(),
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
// is closed.
func (slf *Unbounded[V]) Get() <-chan V {
return slf.c
}

// Close closes the unbounded buffer.
func (slf *Unbounded[V]) Close() {
slf.mu.Lock()
defer slf.mu.Unlock()
if slf.closed {
return
}
slf.closed = true
close(slf.c)
}

0 comments on commit d56c1df

Please sign in to comment.