Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete opensearch schema synchronously #83

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,18 @@ One of exponential/constant backoff policies can be provided for the Kafka commi
<details>
<summary>Search Batch Indexer</summary>

| Environment Variable | Default | Required | Description |
| ------------------------------------------------------------ | ------- | -------- | -------------------------------------------------------------------------------------------------------------- |
| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search indexer cleanup retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |
| Environment Variable | Default | Required | Description |
| -------------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | --- |
| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. |
| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |

One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply.

Expand Down
7 changes: 3 additions & 4 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {

return &stream.SearchProcessorConfig{
Indexer: search.IndexerConfig{
BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"),
BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"),
MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"),
CleanupBackoff: parseBackoffConfig("PGSTREAM_SEARCH_INDEXER_CLEANUP"),
BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"),
BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"),
MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"),
},
Store: store.Config{
OpenSearchURL: opensearchStore,
Expand Down
3 changes: 0 additions & 3 deletions kafka2os.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES=60
# Processor config
PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100
PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_INTERVAL=1m
PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES=60
PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200"
PGSTREAM_SEARCH_STORE_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_STORE_BACKOFF_MAX_INTERVAL=1m
Expand Down
3 changes: 0 additions & 3 deletions pg2os.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=d
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost?sslmode=disable"
PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100
PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL=1m
PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES=60
PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200"
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL=1s
PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL=1m
Expand Down
5 changes: 0 additions & 5 deletions pkg/wal/processor/search/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package search

import (
"time"

"github.com/xataio/pgstream/pkg/backoff"
)

type IndexerConfig struct {
Expand All @@ -18,9 +16,6 @@ type IndexerConfig struct {
// MaxQueueBytes is the max memory used by the batch indexer for inflight
// batches. Defaults to 100MiB
MaxQueueBytes int64
// CleanupBackoff is the retry policy to follow for the async index
// deletion. If no config is provided, no retry policy is applied.
CleanupBackoff backoff.Config
}

const (
Expand Down
24 changes: 4 additions & 20 deletions pkg/wal/processor/search/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ func (m *mockAdapter) walEventToMsg(e *wal.Event) (*msg, error) {
type mockStore struct {
getMapperFn func() Mapper
applySchemaChangeFn func(ctx context.Context, le *schemalog.LogEntry) error
deleteSchemaFn func(ctx context.Context, schemaName string) error
deleteSchemaFn func(ctx context.Context, i uint, schemaName string) error
deleteTableDocumentsFn func(ctx context.Context, schemaName string, tableIDs []string) error
sendDocumentsFn func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error)
sendDocumentsCalls uint
deleteSchemaCalls uint
}

func (m *mockStore) GetMapper() Mapper {
Expand All @@ -39,7 +40,8 @@ func (m *mockStore) ApplySchemaChange(ctx context.Context, le *schemalog.LogEntr
}

func (m *mockStore) DeleteSchema(ctx context.Context, schemaName string) error {
return m.deleteSchemaFn(ctx, schemaName)
m.deleteSchemaCalls++
return m.deleteSchemaFn(ctx, m.deleteSchemaCalls, schemaName)
}

func (m *mockStore) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error {
Expand All @@ -51,24 +53,6 @@ func (m *mockStore) SendDocuments(ctx context.Context, docs []Document) ([]Docum
return m.sendDocumentsFn(ctx, m.sendDocumentsCalls, docs)
}

type mockCleaner struct {
deleteSchemaFn func(context.Context, string) error
startFn func(context.Context)
stopFn func()
}

func (m *mockCleaner) deleteSchema(ctx context.Context, schema string) error {
return m.deleteSchemaFn(ctx, schema)
}

func (m *mockCleaner) start(ctx context.Context) {
m.startFn(ctx)
}

func (m *mockCleaner) stop() {
m.stopFn()
}

const (
testSchemaName = "test_schema"
testTableName = "test_table"
Expand Down
11 changes: 2 additions & 9 deletions pkg/wal/processor/search/search_batch_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type BatchIndexer struct {

// checkpoint callback to mark what was safely stored
checkpoint checkpointer.Checkpoint

cleaner cleaner
}

type Option func(*BatchIndexer)
Expand All @@ -66,10 +64,6 @@ func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsn
opt(indexer)
}

indexer.cleaner = newSchemaCleaner(&config.CleanupBackoff, store, indexer.logger)
// start a goroutine for processing schema deletes asynchronously.
// routine ends when the internal channel is closed.
go indexer.cleaner.start(ctx)
return indexer
}

Expand Down Expand Up @@ -191,7 +185,6 @@ func (i *BatchIndexer) Name() string {

func (i *BatchIndexer) Close() error {
close(i.msgChan)
i.cleaner.stop()
return nil
}

Expand Down Expand Up @@ -270,8 +263,8 @@ func (i *BatchIndexer) applySchemaChange(ctx context.Context, new *schemalog.Log
}

if new.Schema.Dropped {
if err := i.cleaner.deleteSchema(ctx, new.SchemaName); err != nil {
return fmt.Errorf("register schema for delete: %w", err)
if err := i.store.DeleteSchema(ctx, new.SchemaName); err != nil {
return fmt.Errorf("deleting schema: %w", err)
}
return nil
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/wal/processor/search/search_batch_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
checkpoint checkpointer.Checkpoint
batch *msgBatch
skipSchema func(string) bool
cleaner cleaner

wantErr error
}{
Expand Down Expand Up @@ -517,9 +516,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
},
positions: []wal.CommitPosition{testCommitPos},
},
store: &mockStore{},
cleaner: &mockCleaner{
deleteSchemaFn: func(ctx context.Context, s string) error {
store: &mockStore{
deleteSchemaFn: func(ctx context.Context, _ uint, s string) error {
require.Equal(t, testSchemaName, s)
return nil
},
Expand Down Expand Up @@ -557,9 +555,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
},
positions: []wal.CommitPosition{testCommitPos},
},
store: &mockStore{},
cleaner: &mockCleaner{
deleteSchemaFn: func(ctx context.Context, s string) error {
store: &mockStore{
deleteSchemaFn: func(ctx context.Context, _ uint, s string) error {
return errTest
},
},
Expand Down Expand Up @@ -680,10 +677,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) {
indexer.skipSchema = tc.skipSchema
}

if tc.cleaner != nil {
indexer.cleaner = tc.cleaner
}

err := indexer.sendBatch(context.Background(), tc.batch)
require.ErrorIs(t, err, tc.wantErr)
})
Expand Down
103 changes: 0 additions & 103 deletions pkg/wal/processor/search/search_schema_cleaner.go

This file was deleted.

Loading