Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netobserv 390: fix kafka transformer #237

Merged
merged 6 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type IngestKafka struct {
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
}
62 changes: 46 additions & 16 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ type kafkaReadMessage interface {
}

type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
batchMaxLength int
}

const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)
const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) {
Expand Down Expand Up @@ -83,21 +86,36 @@ func (ingestK *ingestKafka) kafkaListener() {
func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
var records []interface{}
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
flushRecords := time.NewTicker(duration)
for {
select {
case <-ingestK.exitChan:
log.Debugf("exiting ingestKafka because of signal")
return
case record := <-ingestK.in:
records = append(records, record)
case <-time.After(duration): // Maximum batch time for each batch
if len(records) >= ingestK.batchMaxLength {
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
records = []interface{}{}
}
case <-flushRecords.C: // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestKafka sending %d records", len(records))
if len(ingestK.in) > 0 {
for len(records) < ingestK.batchMaxLength && len(ingestK.in) > 0 {
record := <-ingestK.in
records = append(records, record)
}
}
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
out <- decoded
}
records = []interface{}{}
}
Expand Down Expand Up @@ -145,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if jsonIngestKafka.CommitInterval != 0 {
commitInterval = jsonIngestKafka.CommitInterval
}

kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: jsonIngestKafka.Brokers,
Topic: jsonIngestKafka.Topic,
GroupID: jsonIngestKafka.GroupId,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
})
if kafkaReader == nil {
errMsg := "NewIngestKafka: failed to create kafka-go reader"
Expand All @@ -164,12 +188,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
return nil, err
}

bml := defaultKafkaBatchMaxLength
if jsonIngestKafka.BatchMaxLen != 0 {
bml = jsonIngestKafka.BatchMaxLen
}

return &ingestKafka{
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
batchMaxLength: bml,
}, nil
}
6 changes: 6 additions & 0 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ parameters:
groupBalancers: ["rackAffinity"]
decoder:
type: json
batchMaxLen: 1000
commitInterval: 1000
`

func initNewIngestKafka(t *testing.T, configTemplate string) Ingester {
Expand All @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) {
require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(500), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func Test_NewIngestKafka2(t *testing.T) {
Expand All @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(1000), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func removeTimestamp(receivedEntries []config.GenericMap) {
Expand Down