Skip to content

Commit

Permalink
Merge pull request #53 from algobot76/document-pusher
Browse files Browse the repository at this point in the history
doc: add missing comments in pusher.go
  • Loading branch information
kevwan authored Oct 5, 2023
2 parents f2bc42e + 48c26db commit 961c620
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type (
}
)

// NewPusher returns a Pusher with the given Kafka addresses and topic.
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
producer := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Expand All @@ -49,6 +50,7 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
return pusher
}

// Close closes the Pusher and releases any resources used by it.
func (p *Pusher) Close() error {
if p.executor != nil {
p.executor.Flush()
Expand All @@ -57,13 +59,15 @@ func (p *Pusher) Close() error {
return p.producer.Close()
}

// Name returns the name of the Kafka topic that the Pusher is sending messages to.
func (p *Pusher) Name() string {
return p.topic
}

// Push sends a message to the Kafka topic.
func (p *Pusher) Push(v string) error {
msg := kafka.Message{
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
Value: []byte(v),
}
if p.executor != nil {
Expand All @@ -73,12 +77,14 @@ func (p *Pusher) Push(v string) error {
}
}

// WithChunkSize customizes the Pusher with the given chunk size.
func WithChunkSize(chunkSize int) PushOption {
return func(options *chunkOptions) {
options.chunkSize = chunkSize
}
}

// WithFlushInterval customizes the Pusher with the given flush interval.
func WithFlushInterval(interval time.Duration) PushOption {
return func(options *chunkOptions) {
options.flushInterval = interval
Expand Down

0 comments on commit 961c620

Please sign in to comment.