Skip to content

Commit

Permalink
Add CRDB changefeed input
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Goodman authored and Jeffail committed Feb 8, 2024
1 parent 14548cb commit ab0a2bc
Show file tree
Hide file tree
Showing 4 changed files with 479 additions and 0 deletions.
32 changes: 32 additions & 0 deletions internal/impl/crdb/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package crdb

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/public/service"
)

func TestCRDBConfigParse(t *testing.T) {
conf := `
crdb_changefeed:
dsn: postgresql://dan:xxxx@free-tier.gcp-us-central1.cockroachlabs.cloud:26257/defaultdb?sslmode=require&options=--cluster%3Dportly-impala-2852
tables:
- strm_2
options:
- UPDATED
- CURSOR='1637953249519902405.0000000000'
`

spec := crdbChangefeedInputConfig()
env := service.NewEnvironment()

selectConfig, err := spec.ParseYAML(conf, env)
require.NoError(t, err)

selectInput, err := newCRDBChangefeedInputFromConfig(selectConfig, nil)
require.NoError(t, err)
require.NoError(t, selectInput.Close(context.Background()))
}
246 changes: 246 additions & 0 deletions internal/impl/crdb/input_changefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package crdb

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/benthosdev/benthos/v4/public/service"
)

var (
sampleString = `{
"primary_key": "[\"1a7ff641-3e3b-47ee-94fe-a0cadb56cd8f\", 2]", // stringifed JSON array
"row": "{\"after\": {\"k\": \"1a7ff641-3e3b-47ee-94fe-a0cadb56cd8f\", \"v\": 2}, \"updated\": \"1637953249519902405.0000000000\"}", // stringified JSON object
"table": "strm_2"
}`
)

func crdbChangefeedInputConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Categories("Integration").
Summary(fmt.Sprintf("Listens to a CockroachDB Core Changefeed and creates a message for each row received. Each message is a json object looking like: \n```json\n%s\n```", sampleString)).
Description("Will continue to listen to the Changefeed until shutdown. If provided, will maintain the previous `timestamp` so that in the event of a restart the chanfeed will resume from where it last processed. This is at-least-once processing, as there is a chance that a timestamp is not successfully stored after being emitted and therefore a row may be sent again on restart.\n\nNote: You must have `SET CLUSTER SETTING kv.rangefeed.enabled = true;` on your CRDB cluster").
Field(service.NewStringField("dsn").
Description(`A Data Source Name to identify the target database.`).
Example("postgresql://user:password@example.com:26257/defaultdb?sslmode=require")).
Field(service.NewStringField("root_ca").
Description(`A Root CA Certificate`).
Example(`-----BEGIN CERTIFICATE-----
FEIWJFOIEW...
-----END CERTIFICATE-----`).
Optional()).
Field(service.NewStringListField("tables").
Description("CSV of tables to be included in the changefeed").
Example([]string{"table1", "table2"})).
Field(service.NewStringListField("options").
Description("CSV of options to be included in the changefeed (WITH X, Y...).\n**NOTE: The CURSOR option here will only be applied if a cached cursor value is not found. If the cursor is cached, it will overwrite this option.**").
Example([]string{"UPDATE", "CURSOR=1536242855577149065.0000000000"}).
Optional())
}

type crdbChangefeedInput struct {
dsn string
pgConfig *pgxpool.Config
pgPool *pgxpool.Pool
rootCA string
statement string

closeChan chan struct{}

rows pgx.Rows

logger *service.Logger
}

func newCRDBChangefeedInputFromConfig(conf *service.ParsedConfig, logger *service.Logger) (*crdbChangefeedInput, error) {
c := &crdbChangefeedInput{
logger: logger,
closeChan: make(chan struct{}),
}

var err error
c.dsn, err = conf.FieldString("dsn")
if err != nil {
return nil, err
}
c.pgConfig, err = pgxpool.ParseConfig(c.dsn)
if err != nil {
return nil, err
}

// Setup the cert if given
if c.rootCA, err = conf.FieldString("root_ca"); err != nil {
if c.rootCA != "" {
certPool := x509.NewCertPool()
rootCert, err := x509.ParseCertificate([]byte(c.rootCA))
if err != nil {
return nil, err
}
certPool.AddCert(rootCert)
c.pgConfig.ConnConfig.TLSConfig = &tls.Config{
RootCAs: certPool,
}
}
}

// Setup the query
tables, err := conf.FieldStringList("tables")
if err != nil {
return nil, err
}

options, err := conf.FieldStringList("options")
if err != nil {
return nil, err
}

changeFeedOptions := ""
if len(options) > 0 {
changeFeedOptions = fmt.Sprintf(" WITH %s", strings.Join(options, ", "))
}

c.statement = fmt.Sprintf("EXPERIMENTAL CHANGEFEED FOR %s%s", strings.Join(tables, ", "), changeFeedOptions)
logger.Debug("Creating changefeed: " + c.statement)

return c, nil
}

func init() {
err := service.RegisterInput(
"crdb_changefeed", crdbChangefeedInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
i, err := newCRDBChangefeedInputFromConfig(conf, mgr.Logger())
if err != nil {
return nil, err
}
return service.AutoRetryNacks(i), nil
})

if err != nil {
panic(err)
}
}

func (c *crdbChangefeedInput) Connect(ctx context.Context) (err error) {
c.pgPool, err = pgxpool.ConnectConfig(ctx, c.pgConfig)
return
}

func (c *crdbChangefeedInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if c.pgPool == nil && c.rows == nil {
return nil, nil, service.ErrNotConnected
}

rowChan := make(chan *service.Message)
readyChan := make(chan struct{})
errChan := make(chan error)

go func() {
cc, cancelFunc := context.WithCancel(context.Background())
if c.rows == nil {
// Put this in a goroutine because .Query() will block until there is a row available to read
go func() {
var err error
c.logger.Debug(fmt.Sprintf("Running query '%s'", c.statement))
c.rows, err = c.pgPool.Query(cc, c.statement)
if err != nil {
c.logger.Error("Error querying")
errChan <- err
}
// Check if the context was cancelled
select {
case <-ctx.Done():
c.logger.Debug("Context was cancelled before query go a first row")
c.rows.Close() // Need to close here, c.rows is nil in Close()
c.logger.Debug("Closed rows")
default:
// We found a row and can keep going
readyChan <- struct{}{}
}
}()

c.logger.Debug("Waiting for first row to be available")
select {
case <-readyChan:
// We got the first line, go read it
break
case <-errChan:
c.logger.Debug("Got error before reading rows, exiting")
cancelFunc()
return
case <-ctx.Done():
c.logger.Debug("Read context cancelled, ending")
cancelFunc()
return
}
}

// Put this in a goroutine because .Read() will block until there is a row available to read
go func() {
for c.rows == nil {
// If we are nil we want to wait for the above goroutine to give the row
time.Sleep(time.Millisecond * 1) // lets not use 100% cpu
}
c.logger.Debug("Checking for available rows")
if c.rows.Next() {
values, err := c.rows.Values()
if err != nil {
c.logger.Error("Error reading row values!")
errChan <- err
}
// Construct the new JSON
var jsonBytes []byte
jsonBytes, err = json.Marshal(map[string]string{
"table": values[0].(string),
"primary_key": string(values[1].([]byte)), // Stringified JSON (Array)
"row": string(values[2].([]byte)), // Stringified JSON (Object)
})
// TODO: Store the current time for the CURSOR offset to cache
if err != nil {
c.logger.Error("Failed to marshal JSON")
errChan <- err
}
msg := service.NewMessage(jsonBytes)
rowChan <- msg
}
}()

// Wait for the close signal
<-c.closeChan
c.logger.Debug("Got close signal from closeChan")
cancelFunc()
}()

select {
case msg := <-rowChan:
return msg, func(ctx context.Context, err error) error {
// No acking in core changefeeds
return nil
}, nil
case err := <-errChan:
return nil, nil, err
case <-ctx.Done():
c.logger.Debug("Read context cancelled, ending")
return nil, nil, service.ErrEndOfInput
}
}

func (c *crdbChangefeedInput) Close(ctx context.Context) error {
c.logger.Debug("Got close signal")
close(c.closeChan)
if c.rows != nil {
c.rows.Close()
}
if c.pgPool != nil {
c.pgPool.Close()
}
return nil
}
94 changes: 94 additions & 0 deletions internal/impl/crdb/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package crdb

import (
"context"
"testing"
"time"

"github.com/jackc/pgx/v4/pgxpool"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/integration"
_ "github.com/benthosdev/benthos/v4/public/components/pure"
"github.com/benthosdev/benthos/v4/public/service"
)

func TestIntegrationCRDB(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "cockroachdb/cockroach",
Tag: "latest",
Cmd: []string{"start-single-node", "--insecure"},
ExposedPorts: []string{"8080", "26257"},
PortBindings: map[docker.Port][]docker.PortBinding{
"26257/tcp": {{HostIP: "", HostPort: "26257"}},
},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

var pgpool *pgxpool.Pool
resource.Expire(900)
require.NoError(t, pool.Retry(func() error {
pgpool, err = pgxpool.Connect(context.Background(), "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
if err != nil {
return err
}
// Enable changefeeds
_, err = pgpool.Exec(context.Background(), "SET CLUSTER SETTING kv.rangefeed.enabled = true;")
if err != nil {
return err
}
// Create table
_, err = pgpool.Exec(context.Background(), "CREATE TABLE foo (a INT PRIMARY KEY);")
if err != nil {
return err
}
// Insert a row in
_, err = pgpool.Exec(context.Background(), "INSERT INTO foo VALUES (0);")
return err
}))
t.Cleanup(func() {
_, _ = pgpool.Exec(context.Background(), "DROP TABLE foo;")
pgpool.Close()
})

template := `
crdb_changefeed:
dsn: postgresql://root@localhost:26257/defaultdb?sslmode=disable
tables:
- foo
`
streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`))
require.NoError(t, streamOutBuilder.AddInputYAML(template))

var outBatches []string
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error {
msgBytes, err := mb[0].AsBytes()
require.NoError(t, err)
outBatches = append(outBatches, string(msgBytes))
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

require.NoError(t, streamOut.Run(context.Background()))

// This is where I get stuck because the output is going to change
assert.Contains(t, []string{
"foo,[0],\"{\"\"after\"\": {\"\"a\"\": 0}}",
}, outBatches)
}
Loading

0 comments on commit ab0a2bc

Please sign in to comment.