From 43664f40b23b8851e9d530e6bbd6eda5a7c411bf Mon Sep 17 00:00:00 2001 From: Vladimir Shtefan Date: Sat, 22 Jun 2024 19:04:32 +0200 Subject: [PATCH] feat(source-plugin): kafka as a source plugin --- internal/sources/kafka/config.go | 10 ++ internal/sources/kafka/plugin.go | 149 ++++++++++++++++++++++++ internal/sources/mongo_stream/plugin.go | 18 +-- internal/sources/source_drivers.go | 1 + public/stream/source_wrapper.go | 25 ++-- 5 files changed, 173 insertions(+), 30 deletions(-) create mode 100644 internal/sources/kafka/config.go create mode 100644 internal/sources/kafka/plugin.go diff --git a/internal/sources/kafka/config.go b/internal/sources/kafka/config.go new file mode 100644 index 0000000..17e1a8e --- /dev/null +++ b/internal/sources/kafka/config.go @@ -0,0 +1,10 @@ +package kafka + +type Config struct { + Brokers []string `json:"brokers" yaml:"brokers"` + Sasl bool `json:"sasl" yaml:"sasl"` + SaslPassword string `json:"sasl_password" yaml:"sasl_password"` + SaslUser string `json:"sasl_user" yaml:"sasl_user"` + SaslMechanism string `json:"sasl_mechanism" yaml:"sasl_mechanism"` + ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` +} diff --git a/internal/sources/kafka/plugin.go b/internal/sources/kafka/plugin.go new file mode 100644 index 0000000..c5fe260 --- /dev/null +++ b/internal/sources/kafka/plugin.go @@ -0,0 +1,149 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl/aws" + "github.com/twmb/franz-go/pkg/sasl/plain" + "github.com/twmb/franz-go/pkg/sasl/scram" + "github.com/usedatabrew/blink/internal/schema" + "github.com/usedatabrew/blink/internal/sources" + "github.com/usedatabrew/message" +) + +type SourcePlugin struct { + ctx context.Context + config Config + client *kgo.Client + stream string + inputSchema []schema.StreamSchema + outputSchema map[string]*arrow.Schema + messageStream chan sources.MessageEvent +} + +func NewKafkaSourcePlugin(config Config, schema []schema.StreamSchema) sources.DataSource { + return &SourcePlugin{ + ctx: context.Background(), + config: config, + stream: schema[0].StreamName, + inputSchema: schema, + outputSchema: sources.BuildOutputSchema(schema), + messageStream: make(chan sources.MessageEvent), + } +} + +func (p *SourcePlugin) Connect(ctx context.Context) error { + client, err := kgo.NewClient(p.GetConfig()...) + + if err != nil { + panic(err) + } + + err = client.Ping(p.ctx) + + if err != nil { + return err + } + + p.client = client + + return nil +} + +func (p *SourcePlugin) Start() { + go func(c *kgo.Client) { + fetches := p.client.PollFetches(p.ctx) + + if errs := fetches.Errors(); len(errs) > 0 { + panic(fmt.Sprint(errs)) + } + + iter := fetches.RecordIter() + + for !iter.Done() { + record := iter.Next() + + builder := array.NewRecordBuilder(memory.DefaultAllocator, p.outputSchema[record.Topic]) + + err := json.Unmarshal(record.Value, &builder) + + if err != nil { + panic(err) + } + + mBytes, _ := builder.NewRecord().MarshalJSON() + m := message.NewMessage(message.Insert, p.stream, mBytes) + + p.messageStream <- sources.MessageEvent{ + Message: m, + Err: nil, + } + } + }(p.client) +} + +func (p *SourcePlugin) Stop() { + p.client.Close() + p.ctx.Done() +} + +func (p *SourcePlugin) Events() chan sources.MessageEvent { + return p.messageStream +} + +func (p *SourcePlugin) GetConfig() []kgo.Opt { + topics := make([]string, 0, len(p.outputSchema)) + + for topic := range p.outputSchema { + topics = append(topics, topic) + } + + opts := []kgo.Opt{ + kgo.SeedBrokers(p.config.Brokers...), + kgo.ConsumerGroup(p.config.ConsumerGroup), + kgo.ConsumeTopics(topics...), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + } + + if p.config.Sasl { + if p.config.SaslMechanism == "" || p.config.SaslUser == "" || p.config.SaslPassword == "" { + panic("sasl param must be specified") + } + method := strings.ToLower(p.config.SaslMechanism) + method = strings.ReplaceAll(method, "-", "") + method = strings.ReplaceAll(method, "_", "") + switch method { + case "plain": + opts = append(opts, kgo.SASL(plain.Auth{ + User: p.config.SaslUser, + Pass: p.config.SaslPassword, + }.AsMechanism())) + case "scramsha256": + opts = append(opts, kgo.SASL(scram.Auth{ + User: p.config.SaslUser, + Pass: p.config.SaslPassword, + }.AsSha256Mechanism())) + case "scramsha512": + opts = append(opts, kgo.SASL(scram.Auth{ + User: p.config.SaslUser, + Pass: p.config.SaslPassword, + }.AsSha512Mechanism())) + case "awsmskiam": + opts = append(opts, kgo.SASL(aws.Auth{ + AccessKey: p.config.SaslUser, + SecretKey: p.config.SaslPassword, + }.AsManagedStreamingIAMMechanism())) + default: + panic(fmt.Sprintf("unrecognized sasl option %s", p.config.SaslMechanism)) + } + } + + return opts +} diff --git a/internal/sources/mongo_stream/plugin.go b/internal/sources/mongo_stream/plugin.go index c51517d..e3b0769 100644 --- a/internal/sources/mongo_stream/plugin.go +++ b/internal/sources/mongo_stream/plugin.go @@ -3,6 +3,7 @@ package mongo_stream import ( "context" "fmt" + "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/memory" @@ -26,15 +27,12 @@ type SourcePlugin struct { } func NewMongoStreamSourcePlugin(config Config, schema []schema.StreamSchema) sources.DataSource { - instance := &SourcePlugin{ + return &SourcePlugin{ config: config, inputSchema: schema, + outputSchema: sources.BuildOutputSchema(schema), messageStream: make(chan sources.MessageEvent), } - - instance.buildOutputSchema() - - return instance } func (p *SourcePlugin) Connect(ctx context.Context) error { @@ -161,13 +159,3 @@ func (p *SourcePlugin) process(stream string, data map[string]interface{}, snaps Err: nil, } } - -func (p *SourcePlugin) buildOutputSchema() { - outputSchemas := make(map[string]*arrow.Schema) - for _, collection := range p.inputSchema { - outputSchema := collection.AsArrow() - outputSchemas[collection.StreamName] = outputSchema - } - - p.outputSchema = outputSchemas -} diff --git a/internal/sources/source_drivers.go b/internal/sources/source_drivers.go index d606e74..e91a34b 100644 --- a/internal/sources/source_drivers.go +++ b/internal/sources/source_drivers.go @@ -10,4 +10,5 @@ const ( AirTable SourceDriver = "airtable" Playground SourceDriver = "playground" MysqlCDC SourceDriver = "mysql_cdc" + Kafka SourceDriver = "kafka" ) diff --git a/public/stream/source_wrapper.go b/public/stream/source_wrapper.go index b7831f8..8de6fb1 100644 --- a/public/stream/source_wrapper.go +++ b/public/stream/source_wrapper.go @@ -4,6 +4,7 @@ import ( "github.com/usedatabrew/blink/config" "github.com/usedatabrew/blink/internal/sources" "github.com/usedatabrew/blink/internal/sources/airtable" + "github.com/usedatabrew/blink/internal/sources/kafka" "github.com/usedatabrew/blink/internal/sources/mongo_stream" "github.com/usedatabrew/blink/internal/sources/mysql_cdc" "github.com/usedatabrew/blink/internal/sources/playground" @@ -11,7 +12,6 @@ import ( "github.com/usedatabrew/blink/internal/sources/postgres_incr_sync" "github.com/usedatabrew/blink/internal/sources/websockets" "github.com/usedatabrew/blink/internal/stream_context" - "gopkg.in/yaml.v3" ) // SourceWrapper wraps source plugin in order to @@ -67,19 +67,6 @@ func (p *SourceWrapper) SetStreamContext(ctx *stream_context.Context) { p.ctx = ctx } -func (p *SourceWrapper) GetPluginConfigs(driver sources.SourceDriver, fcg *yaml.Node) (any, error) { - switch driver { - case sources.PostgresCDC: - return config.ReadDriverConfig[postgres_cdc.Config](fcg, postgres_cdc.Config{}) - case sources.MongoStream: - return config.ReadDriverConfig[mongo_stream.Config](fcg, mongo_stream.Config{}) - case sources.MysqlCDC: - return config.ReadDriverConfig[mysql_cdc.Config](fcg, mysql_cdc.Config{}) - } - - return nil, nil -} - func (p *SourceWrapper) LoadDriver(driver sources.SourceDriver, fcg config.Configuration) sources.DataSource { switch driver { case sources.Playground: @@ -128,10 +115,18 @@ func (p *SourceWrapper) LoadDriver(driver sources.SourceDriver, fcg config.Confi driverConfig, err := config.ReadDriverConfig[postgres_incr_sync.Config](fcg.Source.Config, postgres_incr_sync.Config{}) if err != nil { - panic("cannot ready driver config") + panic("cannot read driver config") } return postgres_incr_sync.NewPostgresIncrSourcePlugin(p.ctx, driverConfig, fcg.Source.StreamSchema) + case sources.Kafka: + driverConfig, err := config.ReadDriverConfig[kafka.Config](fcg.Source.Config, kafka.Config{}) + + if err != nil { + panic("cannot read driver config") + } + + return kafka.NewKafkaSourcePlugin(driverConfig, fcg.Source.StreamSchema) default: p.ctx.Logger.WithPrefix("Source driver loader").Fatal("Failed to load driver", "driver", driver) }