Skip to content

Commit

Permalink
Merge pull request #75 from xataio/add-insert-to-schemalog-store
Browse files Browse the repository at this point in the history
Add insert to schemalog store
  • Loading branch information
eminano authored Sep 25, 2024
2 parents 8642f97 + 2948874 commit 16a51c6
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 34 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ require (
github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0
github.com/testcontainers/testcontainers-go/modules/opensearch v0.31.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.31.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
golang.org/x/sync v0.7.0
)

Expand All @@ -50,7 +52,7 @@ require (
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down Expand Up @@ -105,8 +107,6 @@ require (
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zerologr v1.2.3 h1:up5N9vcH9Xck3jJkXzgyOxozT14R47IyDODz8LM1KSs=
Expand Down Expand Up @@ -307,18 +307,18 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w=
go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
Expand Down
11 changes: 11 additions & 0 deletions internal/postgres/mocks/mock_row.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0

package mocks

type Row struct {
ScanFn func(args ...any) error
}

func (m *Row) Scan(args ...any) error {
return m.ScanFn(args)
}
27 changes: 27 additions & 0 deletions internal/postgres/mocks/mock_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: Apache-2.0

package mocks

import (
"context"

"github.com/xataio/pgstream/internal/postgres"
)

type Tx struct {
QueryRowFn func(ctx context.Context, query string, args ...any) postgres.Row
QueryFn func(ctx context.Context, query string, args ...any) (postgres.Rows, error)
ExecFn func(ctx context.Context, query string, args ...any) (postgres.CommandTag, error)
}

func (m *Tx) QueryRow(ctx context.Context, query string, args ...any) postgres.Row {
return m.QueryRowFn(ctx, query, args)
}

func (m *Tx) Query(ctx context.Context, query string, args ...any) (postgres.Rows, error) {
return m.QueryFn(ctx, query, args)
}

func (m *Tx) Exec(ctx context.Context, query string, args ...any) (postgres.CommandTag, error) {
return m.ExecFn(ctx, query, args)
}
6 changes: 6 additions & 0 deletions pkg/schemalog/instrumentation/instrumented_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func NewStore(inner schemalog.Store, instrumentation *otel.Instrumentation) sche
}
}

func (s *Store) Insert(ctx context.Context, schemaName string) (le *schemalog.LogEntry, err error) {
ctx, span := otel.StartSpan(ctx, s.tracer, "schemalogstore.Insert", trace.WithAttributes(attribute.String("schema", schemaName)))
defer otel.CloseSpan(span, err)
return s.inner.Insert(ctx, schemaName)
}

func (s *Store) Fetch(ctx context.Context, schemaName string, acked bool) (le *schemalog.LogEntry, err error) {
ctx, span := otel.StartSpan(ctx, s.tracer, "schemalogstore.Fetch", trace.WithAttributes(attribute.String("schema", schemaName)))
defer otel.CloseSpan(span, err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/schemalog/mocks/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ import (
)

type Store struct {
FetchFn func(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error)
AckFn func(ctx context.Context, le *schemalog.LogEntry) error
CloseFn func() error
fetchCalls uint64
ackCalls uint64
InsertFn func(ctx context.Context, schemaName string) (*schemalog.LogEntry, error)
FetchFn func(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error)
AckFn func(ctx context.Context, le *schemalog.LogEntry) error
CloseFn func() error
insertCalls uint64
fetchCalls uint64
ackCalls uint64
}

var _ schemalog.Store = (*Store)(nil)

func (m *Store) Insert(ctx context.Context, schemaName string) (*schemalog.LogEntry, error) {
atomic.AddUint64(&m.insertCalls, 1)
return m.InsertFn(ctx, schemaName)
}

func (m *Store) Fetch(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error) {
atomic.AddUint64(&m.fetchCalls, 1)
return m.FetchFn(ctx, schemaName, ackedOnly)
Expand All @@ -33,6 +40,10 @@ func (m *Store) Close() error {
return m.CloseFn()
}

func (m *Store) GetInsertCalls() uint64 {
return atomic.LoadUint64(&m.insertCalls)
}

func (m *Store) GetFetchCalls() uint64 {
return atomic.LoadUint64(&m.fetchCalls)
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/schemalog/postgres/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type mockRow struct {
logEntry *schemalog.LogEntry
version *int
scanFn func(args ...any) error
}

Expand All @@ -19,11 +20,21 @@ func (m *mockRow) Scan(args ...any) error {
return m.scanFn(args...)
}

id, ok := args[0].(*xid.ID)
if !ok {
return fmt.Errorf("unexpected type for xid.ID in scan: %T", args[0])
if m.logEntry != nil {
id, ok := args[0].(*xid.ID)
if !ok {
return fmt.Errorf("unexpected type for xid.ID in scan: %T", args[0])
}
*id = m.logEntry.ID
}

if m.version != nil {
version, ok := args[0].(*int)
if !ok {
return fmt.Errorf("unexpected type for version in scan: %T", args[0])
}
*version = *m.version
}
*id = m.logEntry.ID

return nil
}
43 changes: 37 additions & 6 deletions pkg/schemalog/postgres/pg_schemalog_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

pglib "github.com/xataio/pgstream/internal/postgres"
"github.com/xataio/pgstream/pkg/schemalog"

"github.com/jackc/pgx/v5"
)

// Store is a postgres implementation of the schemalog.Store interface
Expand Down Expand Up @@ -38,25 +36,54 @@ func NewStoreWithQuerier(querier pglib.Querier) *Store {
}
}

// Insert inserts a new schema log for the schema name on input. It will insert
// a new row with the latest schema view with an incremented version, or if no
// row exists for the schema, with version 1.
func (s *Store) Insert(ctx context.Context, schemaName string) (*schemalog.LogEntry, error) {
l := &schemalog.LogEntry{}
err := s.querier.ExecInTx(ctx, func(tx pglib.Tx) error {
nextVersionQuery := fmt.Sprintf(`select coalesce((select version+1 from %s where schema_name = $1 order by version desc limit 1), 1)`, s.table())
var nextVersion int
err := tx.QueryRow(ctx, nextVersionQuery, schemaName).Scan(&nextVersion)
if err != nil {
return fmt.Errorf("query for next schema log version: %w", err)
}

insertQuery := fmt.Sprintf(`insert into %s (version, schema_name, schema) values ($1, $2, pgstream.get_schema($2))
returning id, version, schema_name, schema, created_at, acked`, s.table())
if err := tx.QueryRow(ctx, insertQuery, nextVersion, schemaName).Scan(&l.ID, &l.Version, &l.SchemaName, &l.Schema, &l.CreatedAt, &l.Acked); err != nil {
return mapError(fmt.Errorf("error inserting schema log for schema %s: %w", schemaName, err))
}

return nil
})
if err != nil {
return nil, err
}

return l, nil
}

// Fetch retrieves the latest schema log entry for schemaName. If ackedOnly is set, the function returns the last acked
// entry. If ackedOnly is NOT set, the function returns the latest entry no matter the acked flag.
func (s *Store) Fetch(ctx context.Context, schemaName string, ackedOnly bool) (*schemalog.LogEntry, error) {
ackCondition := ""
if ackedOnly {
ackCondition = "and acked"
}
sql := fmt.Sprintf(`select id, version, schema_name, schema, created_at, acked from %s.%s where schema_name = $1 %s order by version desc limit 1`, schemalog.SchemaName, schemalog.TableName, ackCondition)
sql := fmt.Sprintf(`select id, version, schema_name, schema, created_at, acked from %s where schema_name = $1 %s order by version desc limit 1`, s.table(), ackCondition)

l := &schemalog.LogEntry{}
if err := s.querier.QueryRow(ctx, sql, schemaName).Scan(&l.ID, &l.Version, &l.SchemaName, &l.Schema, &l.CreatedAt, &l.Acked); err != nil {
err := s.querier.QueryRow(ctx, sql, schemaName).Scan(&l.ID, &l.Version, &l.SchemaName, &l.Schema, &l.CreatedAt, &l.Acked)
if err != nil {
return nil, mapError(fmt.Errorf("error fetching schema log for schema %s: %w", schemaName, err))
}

return l, nil
}

func (s *Store) Ack(ctx context.Context, logEntry *schemalog.LogEntry) error {
sql := fmt.Sprintf(`update %s.%s set acked = true where id = $1 and schema_name = $2`, schemalog.SchemaName, schemalog.TableName)
sql := fmt.Sprintf(`update %s set acked = true where id = $1 and schema_name = $2`, s.table())
_, err := s.querier.Exec(ctx, sql, logEntry.ID.String(), logEntry.SchemaName)
if err != nil {
return mapError(fmt.Errorf("failed to ack schema log (%s) for schema (%s) version (%d): %w", logEntry.ID, logEntry.SchemaName, logEntry.Version, err))
Expand All @@ -70,8 +97,12 @@ func (s *Store) Close() error {
return nil
}

func (s *Store) table() string {
return fmt.Sprintf("%q.%q", schemalog.SchemaName, schemalog.TableName)
}

func mapError(err error) error {
if errors.Is(err, pgx.ErrNoRows) {
if errors.Is(err, pglib.ErrNoRows) {
return schemalog.ErrNoRows
}
return err
Expand Down
Loading

0 comments on commit 16a51c6

Please sign in to comment.