Skip to content
This repository has been archived by the owner on Dec 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #30 from usedatabrew/feat/source-kafka
Browse files Browse the repository at this point in the history
feat(source-plugin): kafka as a source plugin
  • Loading branch information
le-vlad authored Jun 23, 2024
2 parents 98500b7 + 43664f4 commit 8a38c62
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 30 deletions.
10 changes: 10 additions & 0 deletions internal/sources/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package kafka

type Config struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Sasl bool `json:"sasl" yaml:"sasl"`
SaslPassword string `json:"sasl_password" yaml:"sasl_password"`
SaslUser string `json:"sasl_user" yaml:"sasl_user"`
SaslMechanism string `json:"sasl_mechanism" yaml:"sasl_mechanism"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
}
149 changes: 149 additions & 0 deletions internal/sources/kafka/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/message"
)

type SourcePlugin struct {
ctx context.Context
config Config
client *kgo.Client
stream string
inputSchema []schema.StreamSchema
outputSchema map[string]*arrow.Schema
messageStream chan sources.MessageEvent
}

func NewKafkaSourcePlugin(config Config, schema []schema.StreamSchema) sources.DataSource {
return &SourcePlugin{
ctx: context.Background(),
config: config,
stream: schema[0].StreamName,
inputSchema: schema,
outputSchema: sources.BuildOutputSchema(schema),
messageStream: make(chan sources.MessageEvent),
}
}

func (p *SourcePlugin) Connect(ctx context.Context) error {
client, err := kgo.NewClient(p.GetConfig()...)

if err != nil {
panic(err)
}

err = client.Ping(p.ctx)

if err != nil {
return err
}

p.client = client

return nil
}

func (p *SourcePlugin) Start() {
go func(c *kgo.Client) {
fetches := p.client.PollFetches(p.ctx)

if errs := fetches.Errors(); len(errs) > 0 {
panic(fmt.Sprint(errs))
}

iter := fetches.RecordIter()

for !iter.Done() {
record := iter.Next()

builder := array.NewRecordBuilder(memory.DefaultAllocator, p.outputSchema[record.Topic])

err := json.Unmarshal(record.Value, &builder)

if err != nil {
panic(err)
}

mBytes, _ := builder.NewRecord().MarshalJSON()
m := message.NewMessage(message.Insert, p.stream, mBytes)

p.messageStream <- sources.MessageEvent{
Message: m,
Err: nil,
}
}
}(p.client)
}

func (p *SourcePlugin) Stop() {
p.client.Close()
p.ctx.Done()
}

func (p *SourcePlugin) Events() chan sources.MessageEvent {
return p.messageStream
}

func (p *SourcePlugin) GetConfig() []kgo.Opt {
topics := make([]string, 0, len(p.outputSchema))

for topic := range p.outputSchema {
topics = append(topics, topic)
}

opts := []kgo.Opt{
kgo.SeedBrokers(p.config.Brokers...),
kgo.ConsumerGroup(p.config.ConsumerGroup),
kgo.ConsumeTopics(topics...),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
}

if p.config.Sasl {
if p.config.SaslMechanism == "" || p.config.SaslUser == "" || p.config.SaslPassword == "" {
panic("sasl param must be specified")
}
method := strings.ToLower(p.config.SaslMechanism)
method = strings.ReplaceAll(method, "-", "")
method = strings.ReplaceAll(method, "_", "")
switch method {
case "plain":
opts = append(opts, kgo.SASL(plain.Auth{
User: p.config.SaslUser,
Pass: p.config.SaslPassword,
}.AsMechanism()))
case "scramsha256":
opts = append(opts, kgo.SASL(scram.Auth{
User: p.config.SaslUser,
Pass: p.config.SaslPassword,
}.AsSha256Mechanism()))
case "scramsha512":
opts = append(opts, kgo.SASL(scram.Auth{
User: p.config.SaslUser,
Pass: p.config.SaslPassword,
}.AsSha512Mechanism()))
case "awsmskiam":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: p.config.SaslUser,
SecretKey: p.config.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
default:
panic(fmt.Sprintf("unrecognized sasl option %s", p.config.SaslMechanism))
}
}

return opts
}
18 changes: 3 additions & 15 deletions internal/sources/mongo_stream/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mongo_stream
import (
"context"
"fmt"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
Expand All @@ -26,15 +27,12 @@ type SourcePlugin struct {
}

func NewMongoStreamSourcePlugin(config Config, schema []schema.StreamSchema) sources.DataSource {
instance := &SourcePlugin{
return &SourcePlugin{
config: config,
inputSchema: schema,
outputSchema: sources.BuildOutputSchema(schema),
messageStream: make(chan sources.MessageEvent),
}

instance.buildOutputSchema()

return instance
}

func (p *SourcePlugin) Connect(ctx context.Context) error {
Expand Down Expand Up @@ -161,13 +159,3 @@ func (p *SourcePlugin) process(stream string, data map[string]interface{}, snaps
Err: nil,
}
}

func (p *SourcePlugin) buildOutputSchema() {
outputSchemas := make(map[string]*arrow.Schema)
for _, collection := range p.inputSchema {
outputSchema := collection.AsArrow()
outputSchemas[collection.StreamName] = outputSchema
}

p.outputSchema = outputSchemas
}
1 change: 1 addition & 0 deletions internal/sources/source_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ const (
AirTable SourceDriver = "airtable"
Playground SourceDriver = "playground"
MysqlCDC SourceDriver = "mysql_cdc"
Kafka SourceDriver = "kafka"
)
25 changes: 10 additions & 15 deletions public/stream/source_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"github.com/usedatabrew/blink/config"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/blink/internal/sources/airtable"
"github.com/usedatabrew/blink/internal/sources/kafka"
"github.com/usedatabrew/blink/internal/sources/mongo_stream"
"github.com/usedatabrew/blink/internal/sources/mysql_cdc"
"github.com/usedatabrew/blink/internal/sources/playground"
"github.com/usedatabrew/blink/internal/sources/postgres_cdc"
"github.com/usedatabrew/blink/internal/sources/postgres_incr_sync"
"github.com/usedatabrew/blink/internal/sources/websockets"
"github.com/usedatabrew/blink/internal/stream_context"
"gopkg.in/yaml.v3"
)

// SourceWrapper wraps source plugin in order to
Expand Down Expand Up @@ -67,19 +67,6 @@ func (p *SourceWrapper) SetStreamContext(ctx *stream_context.Context) {
p.ctx = ctx
}

func (p *SourceWrapper) GetPluginConfigs(driver sources.SourceDriver, fcg *yaml.Node) (any, error) {
switch driver {
case sources.PostgresCDC:
return config.ReadDriverConfig[postgres_cdc.Config](fcg, postgres_cdc.Config{})
case sources.MongoStream:
return config.ReadDriverConfig[mongo_stream.Config](fcg, mongo_stream.Config{})
case sources.MysqlCDC:
return config.ReadDriverConfig[mysql_cdc.Config](fcg, mysql_cdc.Config{})
}

return nil, nil
}

func (p *SourceWrapper) LoadDriver(driver sources.SourceDriver, fcg config.Configuration) sources.DataSource {
switch driver {
case sources.Playground:
Expand Down Expand Up @@ -128,10 +115,18 @@ func (p *SourceWrapper) LoadDriver(driver sources.SourceDriver, fcg config.Confi
driverConfig, err := config.ReadDriverConfig[postgres_incr_sync.Config](fcg.Source.Config, postgres_incr_sync.Config{})

if err != nil {
panic("cannot ready driver config")
panic("cannot read driver config")
}

return postgres_incr_sync.NewPostgresIncrSourcePlugin(p.ctx, driverConfig, fcg.Source.StreamSchema)
case sources.Kafka:
driverConfig, err := config.ReadDriverConfig[kafka.Config](fcg.Source.Config, kafka.Config{})

if err != nil {
panic("cannot read driver config")
}

return kafka.NewKafkaSourcePlugin(driverConfig, fcg.Source.StreamSchema)
default:
p.ctx.Logger.WithPrefix("Source driver loader").Fatal("Failed to load driver", "driver", driver)
}
Expand Down

0 comments on commit 8a38c62

Please sign in to comment.