Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add otkafka processor #134

Merged
merged 14 commits into from
May 19, 2021
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
branches:
- master
pull_request:
workflow_dispatch:
jobs:
build:
strategy:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.0
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/klauspost/compress v1.12.2 // indirect
github.com/knadh/koanf v0.15.0
github.com/mitchellh/mapstructure v1.4.1
github.com/oklog/run v1.1.0
Expand All @@ -29,7 +30,7 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/robfig/cron/v3 v3.0.1
github.com/rs/xid v1.2.1
github.com/segmentio/kafka-go v0.4.10
github.com/segmentio/kafka-go v0.4.16
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -204,6 +205,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -370,13 +373,16 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/knadh/koanf v0.15.0 h1:HMm8cJZZIokMn5ETu9Exut1jQhfu1dm3b0TZedvhSVo=
github.com/knadh/koanf v0.15.0/go.mod h1:Ut3d4JaTRZYfO5a0wdYIGE+oyGaGFo4vXQ3ZvaSWxNc=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -490,6 +496,8 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -554,6 +562,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.10 h1:YnI820ZLfh710adINqwuCVtN3wbnLsLnT/+xhI0oooQ=
github.com/segmentio/kafka-go v0.4.10/go.mod h1:BVDwBTF24avtlj4l8/xsWNb4papVeg16+jO6/0qjvhA=
github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U=
github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
80 changes: 80 additions & 0 deletions otkafka/processor/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package processor

import "time"

// Info the info of BatchHandler.
//
// Note:
// If sequence is necessary, make sure that per worker count is one.
// Multiple goroutines cannot guarantee the order in which data is processed.
type Info struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个用私有成员变量,公开方法更好吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

// used to get reader from otkafka.ReaderMaker.
// default: "default"
Name string
// reader workers count.
// default: 1
ReadWorker int
// batch workers count.
// default: 1
BatchWorker int
// data size for batch processing.
// default: 1
BatchSize int
// handler workers count.
HandleWorker int
// the size of the data channel.
// default: 100
ChanSize int
// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
// default: 30s
AutoBatchInterval time.Duration
}

func (i *Info) name() string {
if i.Name == "" {
return "default"
}
return i.Name
}

func (i *Info) readWorker() int {
if i.ReadWorker <= 0 {
return 1
}
return i.ReadWorker
}

func (i *Info) batchWorker() int {
if i.BatchWorker <= 0 {
return 1
}
return i.BatchWorker
}

func (i *Info) batchSize() int {
if i.BatchSize <= 0 {
return 1
}
return i.BatchSize
}

func (i *Info) handleWorker() int {
if i.HandleWorker <= 0 {
return 1
}
return i.HandleWorker
}

func (i *Info) chanSize() int {
if i.ChanSize <= 0 {
return 100
}
return i.ChanSize
}

func (i *Info) autoBatchInterval() time.Duration {
if i.AutoBatchInterval < 10 {
return 30 * time.Second
}
return i.AutoBatchInterval
}
43 changes: 43 additions & 0 deletions otkafka/processor/info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package processor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestInfo(t *testing.T) {
i := Info{
Name: "",
ReadWorker: 0,
BatchWorker: 0,
BatchSize: 0,
HandleWorker: 0,
ChanSize: 0,
}
assert.Equal(t, "default", i.name())
assert.Equal(t, 1, i.readWorker())
assert.Equal(t, 1, i.batchWorker())
assert.Equal(t, 1, i.batchSize())
assert.Equal(t, 1, i.handleWorker())
assert.Equal(t, 100, i.chanSize())
assert.Equal(t, 30*time.Second, i.autoBatchInterval())

j := Info{
Name: "test",
ReadWorker: 2,
BatchWorker: 2,
BatchSize: 10,
HandleWorker: 2,
ChanSize: 10,
AutoBatchInterval: 10 * time.Second,
}
assert.Equal(t, j.Name, j.name())
assert.Equal(t, j.ReadWorker, j.readWorker())
assert.Equal(t, j.BatchWorker, j.batchWorker())
assert.Equal(t, j.BatchSize, j.batchSize())
assert.Equal(t, j.HandleWorker, j.handleWorker())
assert.Equal(t, j.ChanSize, j.chanSize())
assert.Equal(t, j.AutoBatchInterval, j.autoBatchInterval())
}
Loading