Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(channel): performance improvements #36

Merged
merged 8 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
logger.Info("Exiting server")
os.Exit(0)
cancel()
default:
logger.Info(fmt.Sprintf("[App.Server] Received a unexpected signal %s", sig))
}
Expand Down
8 changes: 0 additions & 8 deletions deserialization/deserializer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
package deserialization

type Deserializer interface {
Deserialize(b []byte, i interface{}) error
}

type DeserializeFunc func(b []byte, i interface{}) error

func (f DeserializeFunc) Deserialize(b []byte, i interface{}) error {
return f(b, i)
}
4 changes: 1 addition & 3 deletions deserialization/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package deserialization

import "encoding/json"

type JSONDeserializer struct{}

func (j *JSONDeserializer) Deserialize(b []byte, i interface{}) error {
func DeserializeJSON(b []byte, i interface{}) error {
return json.Unmarshal(b, i)
}
7 changes: 3 additions & 4 deletions deserialization/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ func TestJSONDeserializer_Deserialize(t *testing.T) {
}
tests := []struct {
name string
j *JSONDeserializer
j DeserializeFunc
args args
wantErr bool
}{
{
name: "Use JSON Deserializer",
j: &JSONDeserializer{},
j: DeserializeJSON,
args: args{
b: []byte(`{"A": "a"}`),
i: &struct {
Expand All @@ -27,8 +27,7 @@ func TestJSONDeserializer_Deserialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &JSONDeserializer{}
if err := j.Deserialize(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
if err := tt.j(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
t.Errorf("JSONDeserializer.Deserialize() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
4 changes: 2 additions & 2 deletions deserialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

var ErrInvalidProtoMessage = errors.New("invalld proto message")

type ProtoDeserilizer struct{}
// type ProtoDeserilizer struct{}

func (d *ProtoDeserilizer) Deserialize(b []byte, i interface{}) error {
func DeserializeProto(b []byte, i interface{}) error {
msg, ok := i.(proto.Message)
if !ok {
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
return ErrInvalidProtoMessage
Expand Down
7 changes: 3 additions & 4 deletions deserialization/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ func TestProtoDeserilizer_Deserialize(t *testing.T) {
}
tests := []struct {
name string
d *ProtoDeserilizer
d DeserializeFunc
args args
wantErr bool
}{
{
name: "Deserialize a proto message",
d: &ProtoDeserilizer{},
d: DeserializeProto,
args: args{
b: []byte{},
i: &pb.SendEventRequest{},
Expand All @@ -29,8 +29,7 @@ func TestProtoDeserilizer_Deserialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &ProtoDeserilizer{}
if err := d.Deserialize(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
if err := tt.d(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
t.Errorf("ProtoDeserilizer.Deserialize() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ go 1.14

require (
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
github.com/golang/protobuf v1.5.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.7.0
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 // indirect
google.golang.org/grpc v1.41.0
google.golang.org/protobuf v1.27.1
google.golang.org/protobuf v1.28.0
gopkg.in/alexcesaro/statsd.v2 v2.0.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
Expand Down
24 changes: 11 additions & 13 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package publisher
import (
"encoding/json"
"fmt"
"strings"

"github.com/odpf/raccoon/collection"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

// Importing librd to make it work on vendor mode
"strings"
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"

"github.com/odpf/raccoon/config"
"github.com/odpf/raccoon/logger"
"github.com/odpf/raccoon/metrics"
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"
pb "github.com/odpf/raccoon/proto"
)

// KafkaProducer Produce data to kafka synchronously
type KafkaProducer interface {
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
ProduceBulk(request collection.CollectRequest, deliveryChannel chan kafka.Event) error
ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error
}

func NewKafka() (*Kafka, error) {
Expand Down Expand Up @@ -50,8 +49,7 @@ type Kafka struct {

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
// DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose.
func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel chan kafka.Event) error {
events := request.GetEvents()
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
errors := make([]error, len(events))
totalProcessed := 0
for order, event := range events {
Expand All @@ -64,26 +62,26 @@ func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel

err := pr.kp.Produce(message, deliveryChannel)
if err != nil {
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
if err.Error() == "Local: Unknown topic" {
errors[order] = fmt.Errorf("%v %s", err, topic)
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, request.ConnectionIdentifier.Group))
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
} else {
errors[order] = err
}
continue
}
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, event.Type))
totalProcessed++
}
// Wait for deliveryChannel as many as processed
for i := 0; i < totalProcessed; i++ {
d := <-deliveryChannel
m := d.(*kafka.Message)
if m.TopicPartition.Error != nil {
eventType := events[i].GetType()
metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
eventType := events[i].Type
metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, eventType))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, eventType))
order := m.Opaque.(int)
errors[order] = m.TopicPartition.Error
}
Expand Down
13 changes: 8 additions & 5 deletions publisher/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"os"
"testing"

"github.com/odpf/raccoon/collection"
"github.com/odpf/raccoon/logger"
pb "github.com/odpf/raccoon/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

const (
group1 = "group-1"
)

type void struct{}

func (v void) Write(_ []byte) (int, error) {
Expand Down Expand Up @@ -54,7 +57,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
})
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
assert.NoError(t, err)
})
})
Expand All @@ -78,7 +81,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
assert.Len(t, err.(BulkError).Errors, 3)
assert.Error(t, err.(BulkError).Errors[0])
assert.Empty(t, err.(BulkError).Errors[1])
Expand All @@ -90,7 +93,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
})
})
Expand All @@ -114,7 +117,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
assert.NotEmpty(t, err)
assert.Len(t, err.(BulkError).Errors, 2)
assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error())
Expand Down
4 changes: 1 addition & 3 deletions serialization/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package serialization

import "encoding/json"

type JSONSerializer struct{}

func (s *JSONSerializer) Serialize(m interface{}) ([]byte, error) {
func SerializeJSON(m interface{}) ([]byte, error) {
return json.Marshal(m)
}
7 changes: 3 additions & 4 deletions serialization/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func TestJSONSerializer_Serialize(t *testing.T) {
}
tests := []struct {
name string
s *JSONSerializer
s SerializeFunc
args args
want []byte
wantErr bool
}{
{
name: "Serialize JSON",
s: &JSONSerializer{},
s: SerializeJSON,
args: args{
m: &pb.SendEventRequest{},
},
Expand All @@ -31,8 +31,7 @@ func TestJSONSerializer_Serialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &JSONSerializer{}
got, err := s.Serialize(tt.args.m)
got, err := tt.s(tt.args.m)
if (err != nil) != tt.wantErr {
t.Errorf("JSONSerializer.Serialize() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
13 changes: 0 additions & 13 deletions serialization/mock.go

This file was deleted.

4 changes: 1 addition & 3 deletions serialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ var (
ErrInvalidProtoMessage = errors.New("invalld proto message")
)

type ProtoSerilizer struct{}

func (p *ProtoSerilizer) Serialize(m interface{}) ([]byte, error) {
func SerializeProto(m interface{}) ([]byte, error) {
msg, ok := m.(proto.Message)
if !ok {
return nil, ErrInvalidProtoMessage
Expand Down
1 change: 0 additions & 1 deletion serialization/proto_test.go

This file was deleted.

4 changes: 1 addition & 3 deletions serialization/serializer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package serialization

type Serializer interface {
Serialize(m interface{}) ([]byte, error)
}
type SerializeFunc func(m interface{}) ([]byte, error)
20 changes: 13 additions & 7 deletions services/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@ type Service struct {
s *grpc.Server
}

func (s Service) Init(ctx context.Context) error {
func NewGRPCService(c collection.Collector) *Service {
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: c})
return &Service{
s: server,
Collector: c,
}
}

func (s *Service) Init(context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.ServerGRPC.Port))
if err != nil {
return err
}
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: s.Collector})
s.s = server
return server.Serve(lis)
return s.s.Serve(lis)
}

func (s Service) Name() string {
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
func (*Service) Name() string {
return "GRPC"
}

func (s Service) Shutdown(ctx context.Context) error {
func (s *Service) Shutdown(context.Context) error {
s.s.GracefulStop()
return nil
}
17 changes: 11 additions & 6 deletions services/pprof/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pprof
import (
"context"
"net/http"

// enable pprof https://pkg.go.dev/net/http/pprof#pkg-overview
_ "net/http/pprof"
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
)
Expand All @@ -11,16 +12,20 @@ type Service struct {
s *http.Server
}

func (s Service) Init(ctx context.Context) error {
server := &http.Server{Addr: "localhost:6060", Handler: nil}
s.s = server
return server.ListenAndServe()
func NewPprofService() *Service {
return &Service{
s: &http.Server{Addr: "localhost:6060", Handler: nil},
}
}

func (s *Service) Init(context.Context) error {
return s.s.ListenAndServe()
}

func (s Service) Name() string {
func (*Service) Name() string {
return "pprof"
}

func (s Service) Shutdown(ctx context.Context) error {
func (s *Service) Shutdown(ctx context.Context) error {
return s.s.Shutdown(ctx)
}
Loading