Skip to content

Commit

Permalink
Merge pull request #85 from xataio/update-json-lib-2
Browse files Browse the repository at this point in the history
Wrap external json lib
  • Loading branch information
eminano authored Oct 28, 2024
2 parents 110b333 + 7b2bd84 commit c795714
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 23 deletions.
15 changes: 15 additions & 0 deletions internal/json/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-License-Identifier: Apache-2.0

package json

import (
json "github.com/bytedance/sonic"
)

func Unmarshal(b []byte, v any) error {
return json.Unmarshal(b, v)
}

func Marshal(v any) ([]byte, error) {
return json.Marshal(v)
}
3 changes: 2 additions & 1 deletion internal/searchstore/search_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package searchstore
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/xataio/pgstream/internal/json"
)

type Client interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schemalog/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (m *LogEntry) Diff(previous *LogEntry) *SchemaDiff {
return m.Schema.Diff(&previous.Schema)
}

func (m *LogEntry) GetTableByName(tableName string) *Table {
func (m *LogEntry) GetTableByName(tableName string) (Table, bool) {
return m.Schema.getTableByName(tableName)
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/schemalog/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
package schemalog

import (
"encoding/json"
"slices"

"github.com/xataio/pgstream/internal/json"
)

type Schema struct {
Expand Down Expand Up @@ -95,13 +96,13 @@ func (s *Schema) Diff(previous *Schema) *SchemaDiff {
return &d
}

func (s *Schema) getTableByName(tableName string) *Table {
func (s *Schema) getTableByName(tableName string) (Table, bool) {
for _, t := range s.Tables {
if t.Name == tableName {
return &t
return t, true
}
}
return nil
return Table{}, false
}

func (s *Schema) getTableByID(pgstreamID string) *Table {
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/listener/kafka/wal_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package kafka

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/pkg/kafka"
loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/wal"
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/listener/postgres/wal_pg_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"errors"
"fmt"

json "github.com/bytedance/sonic"
"github.com/xataio/pgstream/internal/json"
loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/wal"
"github.com/xataio/pgstream/pkg/wal/replication"
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/kafka/wal_kafka_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"runtime/debug"
"time"

json "github.com/bytedance/sonic"
"github.com/xataio/pgstream/internal/json"
synclib "github.com/xataio/pgstream/internal/sync"
"github.com/xataio/pgstream/pkg/kafka"
kafkainstrumentation "github.com/xataio/pgstream/pkg/kafka/instrumentation"
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/search/search_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
package search

import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal"
"github.com/xataio/pgstream/pkg/wal/processor"
Expand Down
6 changes: 4 additions & 2 deletions pkg/wal/processor/search/search_store_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package search

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/pkg/backoff"
loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/schemalog"
Expand All @@ -19,6 +19,7 @@ type StoreRetrier struct {
inner Store
logger loglib.Logger
backoffProvider backoff.Provider
marshaler func(any) ([]byte, error)
}

type StoreRetryConfig struct {
Expand All @@ -42,6 +43,7 @@ func NewStoreRetrier(s Store, cfg StoreRetryConfig, opts ...StoreOption) *StoreR
inner: s,
logger: loglib.NewNoopLogger(),
backoffProvider: backoff.NewProvider(cfg.backoffConfig()),
marshaler: json.Marshal,
}

for _, opt := range opts {
Expand Down Expand Up @@ -171,7 +173,7 @@ func (s *StoreRetrier) getRetriableDocs(failedDocs []DocumentError) ([]Document,
}

func (s *StoreRetrier) logFailure(docErr DocumentError) {
docBytes, err := json.Marshal(docErr.Document.Data)
docBytes, err := s.marshaler(docErr.Document.Data)
if err != nil {
docBytes = []byte(fmt.Sprintf("failed to marshal document data: %s", err))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/wal/processor/search/search_store_retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package search

import (
"context"
"encoding/json"
"fmt"
"testing"

Expand Down Expand Up @@ -229,6 +230,7 @@ func TestStoreRetrier_SendDocuments(t *testing.T) {
inner: tc.store,
logger: loglib.NewNoopLogger(),
backoffProvider: newMockBackoffProvider(),
marshaler: json.Marshal,
}

failedDocs, err := retrier.SendDocuments(context.Background(), testDocs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/search/store/search_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
package store

import (
"encoding/json"
"fmt"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/internal/searchstore"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal/processor/search"
Expand Down
6 changes: 4 additions & 2 deletions pkg/wal/processor/search/store/search_pg_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
package store

import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/jackc/pgx/v5/pgtype"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/internal/searchstore"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal/processor/search"
Expand All @@ -19,6 +19,7 @@ import (
type PgMapper struct {
searchMapper searchstore.Mapper
pgTypeMap *pgtype.Map
unmarshaler func([]byte, any) error
}

const (
Expand All @@ -34,6 +35,7 @@ func NewPostgresMapper(mapper searchstore.Mapper) *PgMapper {
return &PgMapper{
searchMapper: mapper,
pgTypeMap: pgtype.NewMap(),
unmarshaler: json.Unmarshal,
}
}

Expand Down Expand Up @@ -82,7 +84,7 @@ func (m *PgMapper) MapColumnValue(column schemalog.Column, value any) (any, erro
return nil, fmt.Errorf("unexpected value type for jsonb column")
}
var array []float64
err := json.Unmarshal([]byte(stringContent), &array)
err := m.unmarshaler([]byte(stringContent), &array)
if err != nil {
return nil, fmt.Errorf("vector value is not array: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/wal/processor/search/store/search_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ package store
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"

"github.com/xataio/pgstream/internal/json"
"github.com/xataio/pgstream/internal/searchstore"
elasticsearchstore "github.com/xataio/pgstream/internal/searchstore/elasticsearch"
opensearchstore "github.com/xataio/pgstream/internal/searchstore/opensearch"

loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/schemalog"
"github.com/xataio/pgstream/pkg/wal/processor/search"
Expand Down Expand Up @@ -423,7 +422,7 @@ func (s *Store) updateMappingAddNewColumns(ctx context.Context, indexName IndexN
}

func (s *Store) insertNewSchemaLog(ctx context.Context, m *schemalog.LogEntry) error {
logBytes, err := json.Marshal(m)
logBytes, err := s.marshaler(m)
if err != nil {
return fmt.Errorf("insert schema log, failed to marshal search doc: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/wal/processor/translator/wal_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ func (t *Translator) translate(ctx context.Context, data *wal.Data) error {
return fmt.Errorf("failed to retrieve schema for translate %w", err)
}

table := logEntry.GetTableByName(data.Table)
if table == nil {
table, found := logEntry.GetTableByName(data.Table)
if !found {
return processor.ErrTableNotFound
}

if err = t.fillEventMetadata(data, logEntry, table); err != nil {
if err = t.fillEventMetadata(data, logEntry, &table); err != nil {
return fmt.Errorf("failed to fill event metadata: %w", err)
}

if err = t.translateColumnNames(data, table); err != nil {
if err = t.translateColumnNames(data, &table); err != nil {
return fmt.Errorf("failed to translate column names: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/wal/processor/webhook/notifier/webhook_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ package notifier
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"runtime/debug"
"sync"

httplib "github.com/xataio/pgstream/internal/http"
"github.com/xataio/pgstream/internal/json"
synclib "github.com/xataio/pgstream/internal/sync"
loglib "github.com/xataio/pgstream/pkg/log"
"github.com/xataio/pgstream/pkg/wal"
Expand Down

0 comments on commit c795714

Please sign in to comment.