Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add otkafka processor #134

Merged
merged 14 commits into from
May 19, 2021
Merged

feat: add otkafka processor #134

merged 14 commits into from
May 19, 2021

Conversation

GGXXLL
Copy link
Contributor

@GGXXLL GGXXLL commented May 14, 2021

What is it?

It is an abstraction of Kafka consumer process.

Users only need to care about how to parse message and process data (implements processor.Handler).

They don't need to care about how to consume message and transfer data.

How to use it?

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/di"
	"github.com/DoNewsCode/core/otgorm"
	"github.com/DoNewsCode/core/otkafka"
	"github.com/DoNewsCode/core/otkafka/processor"
	"github.com/segmentio/kafka-go"
	"gorm.io/gorm"
)

type event struct {
	Id   int
	Name string
}

type handler struct {
	db *gorm.DB
}

func newHandler(db *gorm.DB) processor.Out {
	return processor.NewOut(&handler{db: db})
}

func (h *handler) Info() *processor.Info {
	// Set the name of the consumer reader to default. Same name as in configuration.
	return &processor.Info{
		Name:      "default",
		BatchSize: 3,
	}
}

func (h *handler) Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) {
	// Decode message
	e := &event{}
	if err := json.Unmarshal(msg.Value, &e); err != nil {
		return nil, err
	}

	// Dirty data
	if e.Id < 0 {
		return nil, nil
	}

	return e, nil
}

func (h *handler) Batch(ctx context.Context, data []interface{}) error {
	// Convert the interface to the correct data type
	save := make([]*event, len(data))
	for i, e := range data {
		save[i] = e.(*event)
	}

	// Do some thing
	if err := h.db.WithContext(ctx).Table("example").Save(save).Error; err != nil {
		return err
	}

	return nil
}

func main() {
	// Load config
	c := core.New(
		core.WithInline("kafka.reader.default.brokers", "127.0.0.1:9092"),
		core.WithInline("kafka.reader.default.topic", "example"),
		core.WithInline("kafka.reader.default.groupID", "default"),
		core.WithInline("kafka.reader.default.startOffset", -1),

		core.WithInline("gorm.default.database", "mysql"),
		core.WithInline("gorm.default.dsn", "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local"),

		core.WithInline("http.disable", "true"),
		core.WithInline("grpc.disable", "true"),
		core.WithInline("cron.disable", "true"),
		core.WithInline("log.level", "debug"),
	)
	// Provide dependence
	c.ProvideEssentials()
	c.Provide(otkafka.Providers())
	c.Provide(otgorm.Providers())

	// Provide your handlers
	c.Provide(di.Deps{
		newHandler,
	})

	c.AddModuleFunc(processor.New)

	// Run serve
	if err := c.Serve(context.Background()); err != nil {
		panic(err)
	}
}

@codecov
Copy link

codecov bot commented May 14, 2021

Codecov Report

Merging #134 (bd8393c) into master (f89361d) will increase coverage by 0.27%.
The diff coverage is 81.87%.

❗ Current head bd8393c differs from pull request most recent head 9390ff1. Consider uploading reports for the commit 9390ff1 to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master     #134      +/-   ##
==========================================
+ Coverage   77.12%   77.40%   +0.27%     
==========================================
  Files          84       86       +2     
  Lines        3205     3354     +149     
==========================================
+ Hits         2472     2596     +124     
- Misses        546      560      +14     
- Partials      187      198      +11     
Impacted Files Coverage Δ
otkafka/processor/processor.go 78.90% <78.90%> (ø)
otkafka/processor/info.go 100.00% <100.00%> (ø)
config/watcher/file.go 62.79% <0.00%> (+4.65%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6bae6a2...9390ff1. Read the comment docs.

return err
}
if h.batchFunc != nil {
if h.info.autoCommit() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个区分真的有必要吗

return err
}

data = data[0:0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块的含义是什么?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

推荐,doFunc从参数中接收 data和msg,而不是直接闭包使用。避免了很多可能出现的问题。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

处理成功之后, 清空切片. 闭包已经改了.


doFunc := func() error {
if err := h.batchFunc(h.ctx, data); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

发生错误时data和message没有清空,是故意的吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

抛出去error了, 服务应该会退出, 就没管了

messages = append(messages, *v.message)
if len(data) >= h.info.batchSize() {
if err := doFunc(); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个错误的测试用例很有必要。目前从代码分析应该是有问题的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯, 补充了

// Note:
// If sequence is necessary, make sure that per worker count is one.
// Multiple goroutines cannot guarantee the order in which data is processed.
type Info struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个用私有成员变量,公开方法更好吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

@Reasno Reasno merged commit 73b2511 into DoNewsCode:master May 19, 2021
@GGXXLL GGXXLL deleted the otkafka-processor branch May 24, 2021 08:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants