Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add webhook output plugin #45

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
bin/license-header-checker
pgstream
tools/webhook/webhook
24 changes: 24 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
"github.com/xataio/pgstream/pkg/wal/processor/translator"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/notifier"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/server"
pgreplication "github.com/xataio/pgstream/pkg/wal/replication/postgres"
)

Expand Down Expand Up @@ -114,6 +116,7 @@ func parseProcessorConfig() stream.ProcessorConfig {
return stream.ProcessorConfig{
Kafka: parseKafkaProcessorConfig(),
Search: parseSearchProcessorConfig(),
Webhook: parseWebhookProcessorConfig(),
Translator: parseTranslatorConfig(),
}
}
Expand Down Expand Up @@ -171,6 +174,27 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
}
}

func parseWebhookProcessorConfig() *stream.WebhookProcessorConfig {
subscriptionStore := viper.GetString("PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL")
if subscriptionStore == "" {
return nil
}

return &stream.WebhookProcessorConfig{
SubscriptionStoreURL: subscriptionStore,
Notifier: notifier.Config{
MaxQueueBytes: viper.GetInt64("PGSTREAM_WEBHOOK_NOTIFIER_MAX_QUEUE_BYTES"),
URLWorkerCount: viper.GetUint("PGSTREAM_WEBHOOK_NOTIFIER_WORKER_COUNT"),
ClientTimeout: viper.GetDuration("PGSTREAM_WEBHOOK_NOTIFIER_CLIENT_TIMEOUT"),
},
SubscriptionServer: server.Config{
Address: viper.GetString("PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_ADDRESS"),
ReadTimeout: viper.GetDuration("PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_READ_TIMEOUT"),
WriteTimeout: viper.GetDuration("PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_WRITE_TIMEOUT"),
},
}
}

func parseBackoffConfig(prefix string) backoff.Config {
return backoff.Config{
Exponential: parseExponentialBackoffConfig(prefix),
Expand Down
14 changes: 10 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/labstack/echo/v4 v4.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/pterm/pterm v0.12.79
github.com/rs/xid v1.5.0
Expand All @@ -31,6 +32,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/gookit/color v1.5.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand All @@ -40,11 +42,12 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
Expand All @@ -57,16 +60,19 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
27 changes: 20 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/go-logr/zerologr v1.2.3/go.mod h1:BxwGo7y5zgSHYR1BjbnHPyF/5ZjVKfKxAZA
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand Down Expand Up @@ -101,6 +103,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0=
github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4=
Expand All @@ -110,8 +116,9 @@ github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3v
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
Expand Down Expand Up @@ -188,6 +195,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand All @@ -211,8 +222,8 @@ go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTV
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand All @@ -225,8 +236,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -256,8 +267,8 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
Expand All @@ -267,6 +278,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
17 changes: 17 additions & 0 deletions internal/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-License-Identifier: Apache-2.0

package http

import (
"context"
"net/http"
)

type Client interface {
Do(*http.Request) (*http.Response, error)
}

type Server interface {
Start(address string) error
Shutdown(context.Context) error
}
13 changes: 13 additions & 0 deletions internal/http/mocks/mock_http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-License-Identifier: Apache-2.0

package mocks

import "net/http"

type Client struct {
DoFn func(*http.Request) (*http.Response, error)
}

func (m *Client) Do(req *http.Request) (*http.Response, error) {
return m.DoFn(req)
}
5 changes: 5 additions & 0 deletions pg2webhook.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=disable"

# Processor config
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL="postgres://postgres:postgres@localhost?sslmode=disable"
17 changes: 17 additions & 0 deletions pkg/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Logger interface {
Warn(err error, msg string, fields ...Fields)
Error(err error, msg string, fields ...Fields)
Panic(msg string, fields ...Fields)
WithFields(fields Fields) Logger
}

type Fields map[string]any
Expand All @@ -21,6 +22,11 @@ func (l *NoopLogger) Info(msg string, fields ...Fields) {}
func (l *NoopLogger) Warn(err error, msg string, fields ...Fields) {}
func (l *NoopLogger) Error(err error, msg string, fields ...Fields) {}
func (l *NoopLogger) Panic(msg string, fields ...Fields) {}
func (l *NoopLogger) WithFields(fields Fields) Logger {
return l
}

const ServiceField = "service"

func NewNoopLogger() *NoopLogger {
return &NoopLogger{}
Expand All @@ -34,3 +40,14 @@ func NewLogger(l Logger) Logger {
}
return l
}

func MergeFields(f1, f2 Fields) Fields {
allFields := make(Fields, len(f1)+len(f2))
fieldMaps := []Fields{f1, f2}
for _, fmap := range fieldMaps {
for k, v := range fmap {
allFields[k] = v
}
}
return allFields
}
20 changes: 14 additions & 6 deletions pkg/log/zerolog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type Logger struct {
zerologger *zerolog.Logger
fields loglib.Fields
}

// if we go over this limit the log will likely be truncated and it will not
Expand All @@ -25,27 +26,34 @@ func NewLogger(zl *zerolog.Logger) *Logger {
}

func (l *Logger) Trace(msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Trace(), fields...).Msg(msg)
withFields(l.zerologger.Trace(), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) Debug(msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Debug(), fields...).Msg(msg)
withFields(l.zerologger.Debug(), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) Info(msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Info(), fields...).Msg(msg)
withFields(l.zerologger.Info(), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) Warn(err error, msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Warn().Err(err), fields...).Msg(msg)
withFields(l.zerologger.Warn().Err(err), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) Error(err error, msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Error().Err(err), fields...).Msg(msg)
withFields(l.zerologger.Error().Err(err), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) Panic(msg string, fields ...loglib.Fields) {
withFields(l.zerologger.Panic(), fields...).Msg(msg)
withFields(l.zerologger.Panic(), append(fields, l.fields)...).Msg(msg)
}

func (l *Logger) WithFields(fields loglib.Fields) loglib.Logger {
return &Logger{
zerologger: l.zerologger,
fields: loglib.MergeFields(l.fields, fields),
}
}

func withFields(event *zerolog.Event, fieldMaps ...loglib.Fields) *zerolog.Event {
Expand Down
11 changes: 10 additions & 1 deletion pkg/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
"github.com/xataio/pgstream/pkg/wal/processor/translator"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/notifier"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/server"
pgreplication "github.com/xataio/pgstream/pkg/wal/replication/postgres"
)

Expand All @@ -36,6 +38,7 @@ type KafkaListenerConfig struct {
type ProcessorConfig struct {
Kafka *KafkaProcessorConfig
Search *SearchProcessorConfig
Webhook *WebhookProcessorConfig
Translator *translator.Config
}

Expand All @@ -49,12 +52,18 @@ type SearchProcessorConfig struct {
Retrier *search.StoreRetryConfig
}

type WebhookProcessorConfig struct {
SubscriptionStoreURL string
Notifier notifier.Config
SubscriptionServer server.Config
}

func (c *Config) IsValid() error {
if c.Listener.Kafka == nil && c.Listener.Postgres == nil {
return errors.New("need at least one listener configured")
}

if c.Processor.Kafka == nil && c.Processor.Search == nil {
if c.Processor.Kafka == nil && c.Processor.Search == nil && c.Processor.Webhook == nil {
return errors.New("need at least one processor configured")
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/stream/stream_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
"github.com/xataio/pgstream/pkg/wal/processor/translator"
"github.com/xataio/pgstream/pkg/wal/processor/webhook"
webhooknotifier "github.com/xataio/pgstream/pkg/wal/processor/webhook/notifier"
pgwebhook "github.com/xataio/pgstream/pkg/wal/processor/webhook/postgres"
webhookserver "github.com/xataio/pgstream/pkg/wal/processor/webhook/server"
"github.com/xataio/pgstream/pkg/wal/replication"
replicationinstrumentation "github.com/xataio/pgstream/pkg/wal/replication/instrumentation"
pgreplication "github.com/xataio/pgstream/pkg/wal/replication/postgres"
Expand Down Expand Up @@ -127,6 +131,41 @@ func Start(ctx context.Context, logger loglib.Logger, config *Config, meter metr
logger.Info("running search batch indexer...")
return searchIndexer.Send(ctx)
})

case config.Processor.Webhook != nil:
var subscriptionStore webhook.SubscriptionStore
var err error
subscriptionStore, err = pgwebhook.NewSubscriptionStore(ctx,
config.Processor.Webhook.SubscriptionStoreURL,
pgwebhook.WithLogger(logger),
)
if err != nil {
return err
}
notifier := webhooknotifier.New(
&config.Processor.Webhook.Notifier,
subscriptionStore,
webhooknotifier.WithLogger(logger),
webhooknotifier.WithCheckpoint(checkpoint))
defer notifier.Close()
processor = notifier

subscriptionServer := webhookserver.New(
&config.Processor.Webhook.SubscriptionServer,
subscriptionStore,
webhookserver.WithLogger(logger))

eg.Go(func() error {
logger.Info("running subscription server...")
go subscriptionServer.Start()
<-ctx.Done()
return subscriptionServer.Shutdown(ctx)
})
eg.Go(func() error {
logger.Info("running webhook notifier...")
return notifier.Notify(ctx)
})

default:
return errors.New("no processor found")
}
Expand Down
Loading