Skip to content
This repository has been archived by the owner on Dec 11, 2024. It is now read-only.

feat(sources): added postgres incremental sync #23

Merged
merged 3 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type Columns struct {

type Service struct {
ReloadOnRestart bool `yaml:"reload_on_restart"`
PipelineId int `yaml:"pipeline_id" validate:"required"`
PipelineId int64 `yaml:"pipeline_id" validate:"required"`
InfluxEnabled bool `yaml:"enable_influx"`
EnableETCDRegistry bool `yaml:"enable_etcd_registry"`
OffsetStorageURI string `yaml:"offset_storage_uri"`
ETCD *ETCD `yaml:"etcd"`
Influx interface{} `yaml:"influx"`
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/twmb/franz-go v1.15.2
github.com/usedatabrew/message v0.0.3
github.com/usedatabrew/pglogicalstream v0.0.22
github.com/usedatabrew/pglogicalstream v0.0.23
github.com/usedatabrew/tango v0.0.5
github.com/wagslane/go-rabbitmq v0.12.4
github.com/zeebo/assert v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqj
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/usedatabrew/message v0.0.3 h1:NRynMSekP3U4kRUMxjbv5dCHW7Q2P33GVu0DkYBIH6w=
github.com/usedatabrew/message v0.0.3/go.mod h1:dwSl52lgKKo51dk3DBjW7Whi6kQl06mXFfPumGxumeA=
github.com/usedatabrew/pglogicalstream v0.0.22 h1:XyvWZQQ1SU7OjalYJP+9AJ+LtIEx98rtpzbLRLwN6BU=
github.com/usedatabrew/pglogicalstream v0.0.22/go.mod h1:VmPhp8W+MSHR2sIPdwQxGBzMVR2zTzUxEbiMrjH+5eU=
github.com/usedatabrew/pglogicalstream v0.0.23 h1:IiDWzDRgV86v9uoLCQ+kJaRD3fN4KtPXmwOxJVu0+gY=
github.com/usedatabrew/pglogicalstream v0.0.23/go.mod h1:VmPhp8W+MSHR2sIPdwQxGBzMVR2zTzUxEbiMrjH+5eU=
github.com/usedatabrew/tango v0.0.5 h1:YXSe4gnYsjGCoODFbtWjhqhmq2JIFJ9Zo4X4ED7wnZE=
github.com/usedatabrew/tango v0.0.5/go.mod h1:6/hSAFH1NvS/wdsDlsLyHq1nbZLOIClzbOc7jxqSZ54=
github.com/wagslane/go-rabbitmq v0.12.4 h1:dxpmTew/wrBlltcu9kBZNTVftT7tsguF4n4IAawK2d8=
Expand Down
7 changes: 7 additions & 0 deletions internal/offset_storage/key_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package offset_storage

import "fmt"

func BuildKey(pipelineId int64, stream string) string {
return fmt.Sprintf("pipeline_%d_stream_%s", pipelineId, stream)
}
8 changes: 8 additions & 0 deletions internal/offset_storage/offset_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package offset_storage

// OffsetStorage is used to store checkpoints/offsets for source connectors
// which use incremental sync by primary keys
type OffsetStorage interface {
SetOffsetForPipeline(key string, offset int64) error
GetOffsetByPipelineStream(key string) (int64, error)
}
39 changes: 39 additions & 0 deletions internal/offset_storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package offset_storage

import (
"context"
"crypto/tls"
"fmt"
"github.com/redis/go-redis/v9"
"net/url"
)

type Storage struct {
redisCache *redis.Client
}

func NewOffsetStorage(storageURI string) OffsetStorage {
parsedRedisUrl, err := url.Parse(storageURI)
if err != nil {
panic(err)
}

passwd, _ := parsedRedisUrl.User.Password()
op := &redis.Options{Addr: parsedRedisUrl.Host, Password: passwd, TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}}
fmt.Println(parsedRedisUrl.Host, passwd)
client := redis.NewClient(op)

return &Storage{
redisCache: client,
}
}

func (o *Storage) GetOffsetByPipelineStream(key string) (int64, error) {
resCmd := o.redisCache.Get(context.Background(), key)
return resCmd.Int64()
}

func (o *Storage) SetOffsetForPipeline(key string, offset int64) error {
cmd := o.redisCache.Set(context.Background(), key, offset, 0)
return cmd.Err()
}
24 changes: 24 additions & 0 deletions internal/offset_storage/storage_inmem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package offset_storage

type StorageInMem struct {
offsets map[string]int64
}

func NewStorageInMem() *StorageInMem {
return &StorageInMem{
offsets: map[string]int64{},
}
}

func (o *StorageInMem) GetOffsetByPipelineStream(key string) (int64, error) {
if off, exist := o.offsets[key]; exist {
return off, nil
}

return 0, nil
}

func (o *StorageInMem) SetOffsetForPipeline(key string, offset int64) error {
o.offsets[key] = offset
return nil
}
6 changes: 3 additions & 3 deletions internal/processors/sql/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestPlugin_EvolveSchema(t *testing.T) {
},
})

plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(), Config{
plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(1), Config{
Query: "SELECT destination from stream.flights",
})
if err != nil {
Expand All @@ -51,7 +51,7 @@ func TestPlugin_EvolveSchema(t *testing.T) {
t.Fatal("Schema should contain only one column")
}

plugin, err = NewSqlTransformPlugin(stream_context.CreateContext(), Config{
plugin, err = NewSqlTransformPlugin(stream_context.CreateContext(1), Config{
Query: "SELECT destination from stream.flightas",
})

Expand Down Expand Up @@ -103,7 +103,7 @@ func TestPlugin_Process(t *testing.T) {
mbytes, _ := updatedBuilder.NewRecord().MarshalJSON()
mess := message.NewMessage(message.Insert, "test", mbytes)

plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(), Config{
plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(1), Config{
Query: "SELECT id, user from stream.test WHERE id = 123",
})
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/sinks/postgres/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func generateCreateTableStatement(table string, columns []schema.Column) string
func generateBatchInsertStatement(table schema.StreamSchema) string {
columnNames := getColumnNames(table.Columns)
valuesPlaceholder := getValuesPlaceholder(len(table.Columns))
fmt.Println(columnNames, fmt.Sprintf("INSERT INTO \"%s\" (%s) VALUES %s;", table.StreamName, columnNames, valuesPlaceholder), valuesPlaceholder)
return fmt.Sprintf("INSERT INTO \"%s\" (%s) VALUES %s;", table.StreamName, columnNames, valuesPlaceholder)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/sinks/postgres/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ var testMessage = message.NewMessage(message.Snapshot, "flights", []byte(`{"mess

func Test_generateBatchInsertStatement(t *testing.T) {
result := generateBatchInsertStatement(testStreamSchema[0])
if result != "INSERT INTO flights (flights_name, id) VALUES ($1, $2 );" {
if result != "INSERT INTO \"flights\" (flights_name, id) VALUES ($1, $2 );" {
t.Fatal("Generated Insert Query is not correct")
}
}

func Test_generateBatchUpdateStatement(t *testing.T) {
result := generateBatchUpdateStatement(testStreamSchema[0])
if result != "UPDATE flights SET flights_name = $1 WHERE id = $2;\n" {
if result != "UPDATE \"flights\" SET flights_name = $1 WHERE id = $2;\n" {
t.Fatal("Generated Update Query is not correct")
}
}

func Test_generateBatchDeleteStatement(t *testing.T) {
result := generateBatchDeleteStatement(testStreamSchema[0])
fmt.Println(result)
if result != "DELETE FROM flights WHERE id = $1;\n" {
if result != "DELETE FROM \"flights\" WHERE id = $1;\n" {
t.Fatal("Generated Update Query is not correct")
}
}
Expand Down
15 changes: 15 additions & 0 deletions internal/sources/postgres_incr_sync/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package postgres_incr_sync

import "github.com/usedatabrew/pglogicalstream"

type Config struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Database string `json:"database" yaml:"database"`
User string `json:"user" yaml:"user"`
Schema string `json:"schema" yaml:"schema"`
Password string `json:"password" yaml:"password"`
TablesSchema []pglogicalstream.DbTablesSchema `json:"tables_schema" yaml:"tables_schema"`
SSLRequired bool `json:"ssl_required" yaml:"ssl_required"`
StreamSnapshot bool `json:"stream_snapshot" yaml:"stream_snapshot"`
}
179 changes: 179 additions & 0 deletions internal/sources/postgres_incr_sync/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package postgres_incr_sync

import (
"context"
"fmt"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/charmbracelet/log"
"github.com/cloudquery/plugin-sdk/v4/scalar"
"github.com/jackc/pgx/v5"
"github.com/usedatabrew/blink/internal/offset_storage"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/blink/internal/stream_context"
"github.com/usedatabrew/message"
"strings"
"time"
)

type SourcePlugin struct {
ctx context.Context
config Config

syncTicker *time.Ticker
appCtx *stream_context.Context
streamSchema []schema.StreamSchema
streamSchemaMap map[string]schema.StreamSchema
streamPks map[string]string
logger *log.Logger
streamColumnsToSelect map[string][]string
pgConn *pgx.Conn
messagesStream chan sources.MessageEvent
}

func NewPostgresIncrSourcePlugin(appCtx *stream_context.Context, config Config, s []schema.StreamSchema) sources.DataSource {
plugin := &SourcePlugin{
appCtx: appCtx,
syncTicker: time.NewTicker(time.Second * 5),
config: config,
streamSchema: s,
logger: appCtx.Logger.WithPrefix("Source [postgres_incremental_sync]"),
streamSchemaMap: map[string]schema.StreamSchema{},
streamPks: map[string]string{},
streamColumnsToSelect: map[string][]string{},
messagesStream: make(chan sources.MessageEvent),
}

plugin.buildSchemaMap()
return plugin
}

func (p *SourcePlugin) Connect(ctx context.Context) error {
sslVerifySettings := ""
if p.config.SSLRequired {
sslVerifySettings = "?sslmode=verify-full"
}

link := fmt.Sprintf("postgres://%s:%s@%s:%d/%s%s",
p.config.User,
p.config.Password,
p.config.Host,
p.config.Port,
p.config.Database,
sslVerifySettings,
)

conn, err := pgx.Connect(ctx, link)
if err != nil {
return err
}

p.pgConn = conn
p.ctx = ctx
return nil
}

func (p *SourcePlugin) Events() chan sources.MessageEvent {
return p.messagesStream
}

func (p *SourcePlugin) Start() {
for {
select {
case <-p.syncTicker.C:
for _, stream := range p.streamSchema {
var offset int64 = 0
streamStoredOffset, _ := p.appCtx.OffsetStorage().GetOffsetByPipelineStream(offset_storage.BuildKey(p.appCtx.PipelineId(), stream.StreamName))
if !p.config.StreamSnapshot {
if streamStoredOffset == 0 {
offset = p.selectLastRecordId(stream.StreamName)
p.logger.Info("Snapshot streaming is disabled. Starting sync form the offset", "offset", offset)
}
} else {
if streamStoredOffset > 0 {
offset = streamStoredOffset
p.logger.Info("Stream snapshot enabled, continue from stored offset", "offset", 0)
} else {
p.logger.Info("Stream snapshot enabled, not stored offset not found. Starting from 0", "offset", 0)
}
}

if schem, ok := p.streamSchemaMap[stream.StreamName]; ok {
arrowSchema := schem.AsArrow()
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
pkForStream := p.streamPks[stream.StreamName]
selectColumns := strings.Join(p.streamColumnsToSelect[stream.StreamName], ", ")
q := fmt.Sprintf("SELECT %s FROM %s ORDER BY %s ASC LIMIT 10000 OFFSET %d", selectColumns, stream.StreamName, pkForStream, offset)
rows, err := p.pgConn.Query(p.ctx, q)
if err != nil {
p.appCtx.Logger.Fatalf("Failed to query data for offset %s", err.Error())
}

var rowsFetched = 0
for rows.Next() {
rowsFetched += 1
values, err := rows.Values()
if err != nil {
panic(err)
}

for i, v := range values {
s := scalar.NewScalar(arrowSchema.Field(i).Type)
if err := s.Set(v); err != nil {
panic(err)
}

scalar.AppendToBuilder(builder.Field(i), s)
}

data, _ := builder.NewRecord().MarshalJSON()
splitStream := strings.Split(stream.StreamName, ".")
m := message.NewMessage(message.Insert, splitStream[len(splitStream)-1], data)
p.messagesStream <- sources.MessageEvent{
Message: m,
Err: err,
}
}
newOffset := streamStoredOffset + int64(rowsFetched)
p.logger.Infof("Fetched %d storing new offset %d", rowsFetched, newOffset)
err = p.appCtx.OffsetStorage().SetOffsetForPipeline(offset_storage.BuildKey(p.appCtx.PipelineId(), stream.StreamName), newOffset)
if err != nil {
p.logger.Fatalf("Failed to store the offset %s", err.Error())
}
}
}
}
}

}

func (p *SourcePlugin) Stop() {
p.syncTicker.Stop()
}

func (p *SourcePlugin) selectLastRecordId(streamName string) int64 {
pkForStream := p.streamPks[streamName]
res := p.pgConn.QueryRow(context.Background(), fmt.Sprintf("SELECT MAX(%s) as last_row FROM %s", pkForStream, streamName))
var result int64
err := res.Scan(&result)
if err != nil {
panic(err)
}

return result
}

func (p *SourcePlugin) buildSchemaMap() {
for _, stream := range p.streamSchema {
var columnsToSelect []string
p.streamSchemaMap[stream.StreamName] = stream
for _, c := range stream.Columns {
if c.PK {
p.streamPks[stream.StreamName] = c.Name
}
columnsToSelect = append(columnsToSelect, c.Name)
}
p.streamColumnsToSelect[stream.StreamName] = columnsToSelect
}
}
11 changes: 6 additions & 5 deletions internal/sources/source_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package sources
type SourceDriver string

const (
PostgresCDC SourceDriver = "postgres_cdc"
MongoStream SourceDriver = "mongo_stream"
WebSockets SourceDriver = "websocket"
AirTable SourceDriver = "airtable"
Playground SourceDriver = "playground"
PostgresCDC SourceDriver = "postgres_cdc"
PostgresIncremental SourceDriver = "postgres_incremental"
MongoStream SourceDriver = "mongo_stream"
WebSockets SourceDriver = "websocket"
AirTable SourceDriver = "airtable"
Playground SourceDriver = "playground"
)
Loading
Loading