Skip to content

Commit

Permalink
feat: add otkafka processor (#134)
Browse files Browse the repository at this point in the history
* feat: add otkafka/processor

* Update go.yml

* feat: add otkafka processor

* test: use wait group for graceful shutdown

* test: add lock for data

* refactor: rename some variables and modify comment

* fix: hound check

* chore: update go.mod kafka-go

* refactor: restructure and fix some bug

* fix: single Handler's message missing commit

Co-authored-by: 谷溪 <guxi99@gmail.com>
  • Loading branch information
GGXXLL and Reasno authored May 19, 2021
1 parent 6bae6a2 commit 73b2511
Show file tree
Hide file tree
Showing 8 changed files with 937 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
branches:
- master
pull_request:
workflow_dispatch:
jobs:
build:
strategy:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.0
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/klauspost/compress v1.12.2 // indirect
github.com/knadh/koanf v0.15.0
github.com/mitchellh/mapstructure v1.4.1
github.com/oklog/run v1.1.0
Expand All @@ -29,7 +30,7 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/robfig/cron/v3 v3.0.1
github.com/rs/xid v1.2.1
github.com/segmentio/kafka-go v0.4.10
github.com/segmentio/kafka-go v0.4.16
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -204,6 +205,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -370,13 +373,16 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/knadh/koanf v0.15.0 h1:HMm8cJZZIokMn5ETu9Exut1jQhfu1dm3b0TZedvhSVo=
github.com/knadh/koanf v0.15.0/go.mod h1:Ut3d4JaTRZYfO5a0wdYIGE+oyGaGFo4vXQ3ZvaSWxNc=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -490,6 +496,8 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -554,6 +562,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.10 h1:YnI820ZLfh710adINqwuCVtN3wbnLsLnT/+xhI0oooQ=
github.com/segmentio/kafka-go v0.4.10/go.mod h1:BVDwBTF24avtlj4l8/xsWNb4papVeg16+jO6/0qjvhA=
github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U=
github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
80 changes: 80 additions & 0 deletions otkafka/processor/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package processor

import "time"

// Info the info of BatchHandler.
//
// Note:
// If sequence is necessary, make sure that per worker count is one.
// Multiple goroutines cannot guarantee the order in which data is processed.
type Info struct {
// used to get reader from otkafka.ReaderMaker.
// default: "default"
Name string
// reader workers count.
// default: 1
ReadWorker int
// batch workers count.
// default: 1
BatchWorker int
// data size for batch processing.
// default: 1
BatchSize int
// handler workers count.
HandleWorker int
// the size of the data channel.
// default: 100
ChanSize int
// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
// default: 30s
AutoBatchInterval time.Duration
}

func (i *Info) name() string {
if i.Name == "" {
return "default"
}
return i.Name
}

func (i *Info) readWorker() int {
if i.ReadWorker <= 0 {
return 1
}
return i.ReadWorker
}

func (i *Info) batchWorker() int {
if i.BatchWorker <= 0 {
return 1
}
return i.BatchWorker
}

func (i *Info) batchSize() int {
if i.BatchSize <= 0 {
return 1
}
return i.BatchSize
}

func (i *Info) handleWorker() int {
if i.HandleWorker <= 0 {
return 1
}
return i.HandleWorker
}

func (i *Info) chanSize() int {
if i.ChanSize <= 0 {
return 100
}
return i.ChanSize
}

func (i *Info) autoBatchInterval() time.Duration {
if i.AutoBatchInterval < 10 {
return 30 * time.Second
}
return i.AutoBatchInterval
}
43 changes: 43 additions & 0 deletions otkafka/processor/info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package processor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestInfo(t *testing.T) {
i := Info{
Name: "",
ReadWorker: 0,
BatchWorker: 0,
BatchSize: 0,
HandleWorker: 0,
ChanSize: 0,
}
assert.Equal(t, "default", i.name())
assert.Equal(t, 1, i.readWorker())
assert.Equal(t, 1, i.batchWorker())
assert.Equal(t, 1, i.batchSize())
assert.Equal(t, 1, i.handleWorker())
assert.Equal(t, 100, i.chanSize())
assert.Equal(t, 30*time.Second, i.autoBatchInterval())

j := Info{
Name: "test",
ReadWorker: 2,
BatchWorker: 2,
BatchSize: 10,
HandleWorker: 2,
ChanSize: 10,
AutoBatchInterval: 10 * time.Second,
}
assert.Equal(t, j.Name, j.name())
assert.Equal(t, j.ReadWorker, j.readWorker())
assert.Equal(t, j.BatchWorker, j.batchWorker())
assert.Equal(t, j.BatchSize, j.batchSize())
assert.Equal(t, j.HandleWorker, j.handleWorker())
assert.Equal(t, j.ChanSize, j.chanSize())
assert.Equal(t, j.AutoBatchInterval, j.autoBatchInterval())
}
Loading

0 comments on commit 73b2511

Please sign in to comment.