Skip to content
This repository has been archived by the owner on Dec 11, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' of github.com:usedatabrew/blink into feature/sink…
Browse files Browse the repository at this point in the history
…-redis
  • Loading branch information
le-vlad committed Jan 17, 2024
2 parents 549b1af + 937336f commit 547db1b
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 15 deletions.
10 changes: 5 additions & 5 deletions Formula/blink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
class Blink < Formula
desc "Open Source stream processing framework"
homepage "https://github.com/usedatabrew/blink"
version "1.6.3"
version "1.7.0"

on_macos do
url "https://github.com/usedatabrew/blink/releases/download/v1.6.3/blink_1.6.3_darwin_amd64.tar.gz"
sha256 "39bf522b65c1881855de8f027048d72fbbe23d352510d6bc2095840bece74b17"
url "https://github.com/usedatabrew/blink/releases/download/v1.7.0/blink_1.7.0_darwin_amd64.tar.gz"
sha256 "f79d872531e64a6bfbc2cfd4a5000a5ae7d4d83462bf3952c750b95c2ab6f120"

def install
bin.install "blink"
Expand All @@ -28,8 +28,8 @@ def caveats

on_linux do
if Hardware::CPU.intel?
url "https://github.com/usedatabrew/blink/releases/download/v1.6.3/blink_1.6.3_linux_amd64.tar.gz"
sha256 "ddfedb42e2205442ccfa2229b4ec9187b86ec6d0a8cdc8eb8e55b2639b9b57be"
url "https://github.com/usedatabrew/blink/releases/download/v1.7.0/blink_1.7.0_linux_amd64.tar.gz"
sha256 "00687259c85a7c3338029abc1d2157ea3411c74568bb442eea1663b65d16c08c"

def install
bin.install "blink"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/gorilla/websocket v1.5.0
github.com/jackc/pgx/v5 v5.4.3
github.com/mehanizm/airtable v0.3.1
github.com/prometheus/client_golang v1.11.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/redis/go-redis/v9 v9.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mehanizm/airtable v0.3.1 h1:PyuSukkZScm9/onHqJWA1/kscDjp/3c0ANsQiCdJTws=
github.com/mehanizm/airtable v0.3.1/go.mod h1:0wD9HInozzelKMw8XiY6czjsDygmAg1bzxSqAha/WLg=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
1 change: 1 addition & 0 deletions internal/sinks/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type Config struct {
// In case provided both - namespace by stream will be used
CustomNamespace string `json:"custom_namespace" yaml:"custom_namespace"`
NamespaceByStream bool `json:"namespace_by_stream" yaml:"namespace_by_stream"`
KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`
SetWithTTL int64 `json:"set_with_ttl" yaml:"set_with_ttl"`
}
30 changes: 22 additions & 8 deletions internal/sinks/redis/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"github.com/charmbracelet/log"
"github.com/goccy/go-json"
"github.com/redis/go-redis/v9"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sinks"
"github.com/usedatabrew/blink/internal/stream_context"
"github.com/usedatabrew/message"
"strings"
"time"
)

Expand All @@ -31,7 +33,7 @@ func NewRedisSinkPlugin(config Config, schema []schema.StreamSchema, appCtx *str
}
}

func (s SinkPlugin) Connect(context context.Context) error {
func (s *SinkPlugin) Connect(context context.Context) error {
rdb := redis.NewClient(&redis.Options{
Addr: s.config.RedisAddr,
Password: s.config.RedisPassword,
Expand All @@ -43,27 +45,34 @@ func (s SinkPlugin) Connect(context context.Context) error {
return status.Err()
}

s.logger.Debug("Connect check info", "result", status)

s.redisConn = rdb
return nil
}

func (s SinkPlugin) SetExpectedSchema(schema []schema.StreamSchema) {
func (s *SinkPlugin) SetExpectedSchema(schema []schema.StreamSchema) {
for _, stream := range schema {
var pkCol string
for _, c := range stream.Columns {
if c.PK {
pkCol = c.Name
}
}
s.pksByStream[stream.StreamName] = pkCol
if strings.Index(stream.StreamName, ".") != -1 {
splitName := strings.Split(stream.StreamName, ".")
s.pksByStream[splitName[1]] = pkCol
} else {
s.pksByStream[stream.StreamName] = pkCol
}
}
}

func (s SinkPlugin) GetType() sinks.SinkDriver {
func (s *SinkPlugin) GetType() sinks.SinkDriver {
return sinks.RedisSinkType
}

func (s SinkPlugin) Write(m *message.Message) error {
func (s *SinkPlugin) Write(m *message.Message) error {
streamName := m.GetStream()
namespace := ""
if s.config.CustomNamespace != "" {
Expand All @@ -74,7 +83,7 @@ func (s SinkPlugin) Write(m *message.Message) error {
namespace = streamName
}

messageKey := fmt.Sprintf("%v", m.Data.AccessProperty(s.pksByStream[streamName]))
messageKey := fmt.Sprintf("%s%v", s.config.KeyPrefix, m.Data.AccessProperty(s.pksByStream[streamName]))
if namespace != "" {
messageKey = namespace + ":" + messageKey
}
Expand All @@ -85,12 +94,17 @@ func (s SinkPlugin) Write(m *message.Message) error {
ttl = time.Second * time.Duration(s.config.SetWithTTL)
}

status := s.redisConn.Set(s.appCtx.GetContext(), messageKey, payload, ttl)
data, err := json.Marshal(payload)
if err != nil {
return err
}

s.logger.Debug("Writing message to redis", "key", messageKey, "payload", string(data))
status := s.redisConn.Set(s.appCtx.GetContext(), messageKey, data, ttl)
return status.Err()
}

func (s SinkPlugin) Stop() {
func (s *SinkPlugin) Stop() {
if err := s.redisConn.Close(); err != nil {
s.logger.Fatal("Failed to close redis client", "error", err)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/sources/airtable/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package airtable

type Config struct {
ApiKey string `json:"api_key" yaml:"api_key"`
}
174 changes: 174 additions & 0 deletions internal/sources/airtable/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package airtable

import (
"context"
"errors"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/charmbracelet/log"
"github.com/goccy/go-json"
"github.com/mehanizm/airtable"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/message"
"strings"
"time"
)

type SourcePlugin struct {
messageEvents chan sources.MessageEvent
config Config
client *airtable.Client
schema []schema.StreamSchema
schemaMapper map[string]schema.StreamSchema
streamPks map[string]string
streamFields map[string][]string
streamOffsets map[string]string
lastRecordStreamId map[string]string
logger *log.Logger
}

func NewAirTableSourcePlugin(config Config, s []schema.StreamSchema) sources.DataSource {
plugin := &SourcePlugin{
config: config,
schema: s,
messageEvents: make(chan sources.MessageEvent),
schemaMapper: map[string]schema.StreamSchema{},
streamFields: map[string][]string{},
streamOffsets: map[string]string{},
lastRecordStreamId: map[string]string{},
streamPks: map[string]string{},
logger: log.WithPrefix("[source]: AirTable"),
}

plugin.extractFields()

return plugin
}

func (s *SourcePlugin) Connect(ctx context.Context) error {
s.client = airtable.NewClient(s.config.ApiKey)
if err := s.client.SetBaseURL("https://api.airtable.com/v0"); err != nil {
return err
}
_, err := s.client.GetBases().Do()
if err != nil {
return err
}

return err
}

func (s *SourcePlugin) Start() {
for {
for stream, v := range s.streamFields {
streamSchemaByName, ok := s.schemaMapper[stream]
if !ok {
s.messageEvents <- sources.MessageEvent{
Message: nil,
Err: errors.New("failed to extract stream schema"),
}
continue
}

builder := array.NewRecordBuilder(memory.DefaultAllocator, streamSchemaByName.AsArrow())
splitStream := strings.Split(stream, ".")
getRowsRequest := s.client.GetTable(splitStream[0], splitStream[1]).GetRecords()

getRowsRequest.ReturnFields(v...)

streamOffset, offsetExists := s.streamOffsets[stream]
queryingWithOffset := offsetExists && streamOffset != ""
if queryingWithOffset {
getRowsRequest.WithOffset(streamOffset)
} else {
s.logger.Info("Initial load. Querying without offset")
}

if s.streamPks[stream] != "" {
getRowsRequest.WithSort(struct {
FieldName string
Direction string
}{FieldName: s.streamPks[stream], Direction: "asc"})
}

getRowsRequest.PageSize(100)
result, err := getRowsRequest.Do()
if err != nil {
s.messageEvents <- sources.MessageEvent{
Message: nil,
Err: err,
}
continue
}

if queryingWithOffset && streamOffset == result.Offset {
s.logger.Info("All the records fetched. Skipping the iteration", "stream", splitStream[1])
continue
} else {
s.logger.Info("Querying records with offset", "offset", streamOffset)
}

lastRecordId := result.Records[len(result.Records)-1].ID
lastRecordIdStored, lastRecordExist := s.lastRecordStreamId[stream]
if lastRecordExist && lastRecordIdStored == lastRecordId {
continue
}

for _, result := range result.Records {
var data []byte
if data, err = json.Marshal(result.Fields); err != nil {
s.messageEvents <- sources.MessageEvent{
Message: nil,
Err: err,
}
continue
}

if err = json.Unmarshal(data, &builder); err != nil {
s.messageEvents <- sources.MessageEvent{
Message: nil,
Err: err,
}
continue
}

data, _ = builder.NewRecord().MarshalJSON()
m := message.NewMessage(message.Insert, splitStream[1], data)
s.messageEvents <- sources.MessageEvent{
Message: m,
Err: nil,
}
}
if result.Offset != "" {
s.streamOffsets[stream] = result.Offset
s.logger.Info("Storing offset for stream", "stream", splitStream[1], "offset", result.Offset)
} else {
s.logger.Info("All records were fetched for stream", "stream", splitStream[1])
}
s.lastRecordStreamId[stream] = lastRecordId
}
<-time.After(time.Second * 25)
}
}

func (s *SourcePlugin) Events() chan sources.MessageEvent {
return s.messageEvents
}

func (s *SourcePlugin) Stop() {}

func (s *SourcePlugin) extractFields() {
for _, stream := range s.schema {
var fields []string
for _, col := range stream.Columns {
fields = append(fields, col.Name)
if col.PK {
s.streamPks[stream.StreamName] = col.Name
}
}

s.schemaMapper[stream.StreamName] = stream
s.streamFields[stream.StreamName] = fields
}
}
1 change: 1 addition & 0 deletions internal/sources/source_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ const (
PostgresCDC SourceDriver = "postgres_cdc"
MongoStream SourceDriver = "mongo_stream"
WebSockets SourceDriver = "websocket"
AirTable SourceDriver = "airtable"
)
9 changes: 9 additions & 0 deletions public/stream/source_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stream
import (
"github.com/usedatabrew/blink/config"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/blink/internal/sources/airtable"
"github.com/usedatabrew/blink/internal/sources/mongo_stream"
"github.com/usedatabrew/blink/internal/sources/postgres_cdc"
"github.com/usedatabrew/blink/internal/sources/websockets"
Expand Down Expand Up @@ -92,6 +93,14 @@ func (p *SourceWrapper) LoadDriver(driver sources.SourceDriver, config config.Co
}

return mongo_stream.NewMongoStreamSourcePlugin(driverConfig, config.Source.StreamSchema)
case sources.AirTable:
driverConfig, err := ReadDriverConfig[airtable.Config](config.Source.Config, airtable.Config{})

if err != nil {
panic("cannot ready driver config")
}

return airtable.NewAirTableSourcePlugin(driverConfig, config.Source.StreamSchema)
default:
p.ctx.Logger.WithPrefix("Source driver loader").Fatal("Failed to load driver", "driver", driver)
}
Expand Down
7 changes: 5 additions & 2 deletions public/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func (s *Stream) Start() error {
for {
select {
case sourceEvent := <-s.source.Events():
streamProxyChan <- sourceEvent.Message
if sourceEvent.Err != nil {
s.ctx.Logger.Errorf("Error processing message %s", sourceEvent.Err.Error())
} else {
streamProxyChan <- sourceEvent.Message
}
}
}
}()
Expand Down Expand Up @@ -230,6 +234,5 @@ func (s *Stream) evolveSchemaForSinks(streamSchema *schema.StreamSchemaObj) {
if err != nil {
s.ctx.Logger.Fatalf("error evolving schema %s", err.Error())
}

}
}

0 comments on commit 547db1b

Please sign in to comment.