Skip to content

Commit

Permalink
Add selective table tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunlol committed Nov 28, 2024
1 parent 8cfa3ed commit c58814c
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ BEMIDB_STORAGE_PATH=./iceberg
PG_DATABASE_URL=postgres://[USER]:[PASSWORD]@localhost:5432/[DATABASE]
# PG_SYNC_INTERVAL=1h
# PG_SCHEMA_PREFIX=mydb_
# PG_INCLUDE_TABLES=public.users,public.posts
# PG_EXCLUDE_TABLES=public.logs
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
/iceberg
/iceberg-test
/src/iceberg
/src/iceberg-test
.DS_Store
.vscode
.env
/benchmark/tpch-kit
/benchmark/data/*.tbl
Expand Down
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,45 @@ Sync data periodically from a Postgres database:
sync
```

Alternatively, you can set the interval using environment variables:
Note that incremental real-time replication is not supported yet (WIP). Please see the [Future roadmap](#future-roadmap).

### Syncing from selective tables

You can sync only specific tables from your Postgres database using the `--include-tables` or `--exclude-tables` options.

To include specific tables during the sync:

```sh
./bemidb \
--pg-sync-interval 1h \
--pg-database-url postgres://postgres:postgres@localhost:5432/dbname \
--include-tables schema.table1,public.users \
sync
```

To exclude specific tables during the sync:

```sh
./bemidb \
--pg-sync-interval 1h \
--pg-database-url postgres://postgres:postgres@localhost:5432/dbname \
--exclude-tables schema.table3,public.cache \
sync
```

Note: You cannot use `--include-tables` and `--exclude-tables` simultaneously.

Alternatively, you can set the interval and table inclusion/exclusion using environment variables:

```sh
export PG_SYNC_INTERVAL=1h
export PG_DATABASE_URL=postgres://postgres:postgres@localhost:5432/dbname
export PG_INCLUDE_TABLES=schema.table1,schema.table2
export PG_EXCLUDE_TABLES=schema.table3,schema.table4

./bemidb sync
```

Note that incremental real-time replication is not supported yet (WIP). Please see the [Future roadmap](#future-roadmap).

### Syncing from multiple Postgres databases

BemiDB supports syncing data from multiple Postgres databases into the same BemiDB database by allowing prefixing schemas.
Expand Down
2 changes: 1 addition & 1 deletion scripts/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

VERSION="0.14.2"
VERSION="0.14.3"

# Detect OS and architecture
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down
29 changes: 22 additions & 7 deletions src/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const (
ENV_AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
ENV_AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"

ENV_PG_DATABASE_URL = "PG_DATABASE_URL"
ENV_PG_SYNC_INTERVAL = "PG_SYNC_INTERVAL"
ENV_PG_SCHEMA_PREFIX = "PG_SCHEMA_PREFIX"
ENV_PG_DATABASE_URL = "PG_DATABASE_URL"
ENV_PG_SYNC_INTERVAL = "PG_SYNC_INTERVAL"
ENV_PG_SCHEMA_PREFIX = "PG_SCHEMA_PREFIX"
ENV_PG_INCLUDE_TABLES = "PG_INCLUDE_TABLES"
ENV_PG_EXCLUDE_TABLES = "PG_EXCLUDE_TABLES"

DEFAULT_PORT = "54321"
DEFAULT_DATABASE = "bemidb"
Expand All @@ -47,9 +49,11 @@ type AwsConfig struct {
}

type PgConfig struct {
DatabaseUrl string
SyncInterval string // optional
SchemaPrefix string // optional
DatabaseUrl string
SyncInterval string // optional
SchemaPrefix string // optional
IncludeTables *Set // optional
ExcludeTables *Set // optional
}

type Config struct {
Expand All @@ -67,6 +71,7 @@ type Config struct {

var _config Config
var _password string
var _pgIncludeTables, _pgExcludeTables string

func init() {
registerFlags()
Expand All @@ -83,6 +88,8 @@ func registerFlags() {
flag.StringVar(&_config.StorageType, "storage-type", os.Getenv(ENV_STORAGE_TYPE), "Storage type: \"LOCAL\", \"S3\". Default: \""+DEFAULT_DB_STORAGE_TYPE+"\"")
flag.StringVar(&_config.Pg.SchemaPrefix, "pg-schema-prefix", os.Getenv(ENV_PG_SCHEMA_PREFIX), "(Optional) Prefix for PostgreSQL schema names")
flag.StringVar(&_config.Pg.SyncInterval, "pg-sync-interval", os.Getenv(ENV_PG_SYNC_INTERVAL), "(Optional) Interval between syncs. Valid units: \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
flag.StringVar(&_pgIncludeTables, "include-tables", os.Getenv(ENV_PG_INCLUDE_TABLES), "(Optional) Comma-separated list of tables to include in sync (format: schema.table)")
flag.StringVar(&_pgExcludeTables, "exclude-tables", os.Getenv(ENV_PG_EXCLUDE_TABLES), "(Optional) Comma-separated list of tables to exclude from sync (format: schema.table)")
flag.StringVar(&_config.Pg.DatabaseUrl, "pg-database-url", os.Getenv(ENV_PG_DATABASE_URL), "PostgreSQL database URL")
flag.StringVar(&_config.Aws.Region, "aws-region", os.Getenv(ENV_AWS_REGION), "AWS region")
flag.StringVar(&_config.Aws.S3Bucket, "aws-s3-bucket", os.Getenv(ENV_AWS_S3_BUCKET), "AWS S3 bucket name")
Expand Down Expand Up @@ -128,7 +135,6 @@ func parseFlags() {
} else if !slices.Contains(STORAGE_TYPES, _config.StorageType) {
panic("Invalid storage type " + _config.StorageType + ". Must be one of " + strings.Join(STORAGE_TYPES, ", "))
}

if _config.StorageType == STORAGE_TYPE_S3 {
if _config.Aws.Region == "" {
panic("AWS region is required")
Expand All @@ -143,6 +149,15 @@ func parseFlags() {
panic("AWS secret access key is required")
}
}
if _pgIncludeTables != "" && _pgExcludeTables != "" {
panic("Cannot specify both --include-tables and --exclude-tables")
}
if _pgIncludeTables != "" {
_config.Pg.IncludeTables = NewSet(strings.Split(_pgIncludeTables, ","))
}
if _pgExcludeTables != "" {
_config.Pg.ExcludeTables = NewSet(strings.Split(_pgExcludeTables, ","))
}
}

func LoadConfig(reRegisterFlags ...bool) *Config {
Expand Down
77 changes: 77 additions & 0 deletions src/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "" {
t.Errorf("Expected schemaPrefix to be empty, got %s", config.Pg.SchemaPrefix)
}
if config.Pg.IncludeTables != nil {
t.Errorf("Expected includeTables to be empty, got %v", config.Pg.IncludeTables)
}
if config.Pg.ExcludeTables != nil {
t.Errorf("Expected includeTables to be empty, got %v", config.Pg.ExcludeTables)
}
})

t.Run("Uses config values from environment variables with LOCAL storage", func(t *testing.T) {
Expand Down Expand Up @@ -117,6 +123,7 @@ func TestLoadConfig(t *testing.T) {
t.Setenv("PG_DATABASE_URL", "postgres://user:password@localhost:5432/template1")
t.Setenv("PG_SYNC_INTERVAL", "1h")
t.Setenv("PG_SCHEMA_PREFIX", "mydb_")
t.Setenv("PG_INCLUDE_TABLES", "public.users")

config := LoadConfig(true)

Expand All @@ -129,6 +136,9 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "mydb_" {
t.Errorf("Expected schemaPrefix to be empty, got %s", config.Pg.SchemaPrefix)
}
if !config.Pg.IncludeTables.Contains("public.users") {
t.Errorf("Expected includeTables to contain public.users, got %v", config.Pg.IncludeTables)
}
})

t.Run("Uses command line arguments", func(t *testing.T) {
Expand All @@ -142,6 +152,7 @@ func TestLoadConfig(t *testing.T) {
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--pg-sync-interval", "2h30m",
"--pg-schema-prefix", "mydb_",
"--include-tables", "public.users",
})

config := LoadConfig()
Expand Down Expand Up @@ -173,5 +184,71 @@ func TestLoadConfig(t *testing.T) {
if config.Pg.SchemaPrefix != "mydb_" {
t.Errorf("Expected schemaPrefix to be mydb_, got %s", config.Pg.SchemaPrefix)
}
if !config.Pg.IncludeTables.Contains("public.users") {
t.Errorf("Expected includeTables to have public.users, got %v", config.Pg.IncludeTables)
}
})

t.Run("Handles exclude-tables configuration", func(t *testing.T) {
setTestArgs([]string{
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--exclude-tables", "public.secrets,public.cache",
})
config := LoadConfig(true)

if !config.Pg.ExcludeTables.Contains("public.secrets") {
t.Errorf("Expected ExcludeTables to contain public.secrets, got %v", config.Pg.ExcludeTables)
}
if !config.Pg.ExcludeTables.Contains("public.cache") {
t.Errorf("Expected ExcludeTables to contain public.cache, got %v", config.Pg.ExcludeTables)
}
if config.Pg.IncludeTables != nil {
t.Errorf("Expected IncludeTables to be empty, got %v", config.Pg.IncludeTables)
}
})

t.Run("Panics when both include and exclude tables are specified in env", func(t *testing.T) {
t.Setenv("BEMIDB_PORT", "12345")
t.Setenv("BEMIDB_DATABASE", "mydb")
t.Setenv("BEMIDB_INIT_SQL", "./init/duckdb.sql")
t.Setenv("BEMIDB_STORAGE_PATH", "storage-path")
t.Setenv("BEMIDB_LOG_LEVEL", "ERROR")
t.Setenv("PG_DATABASE_URL", "postgres://user:password@localhost:5432/template1")
t.Setenv("PG_SYNC_INTERVAL", "1h")
t.Setenv("PG_SCHEMA_PREFIX", "mydb_")
t.Setenv("PG_INCLUDE_TABLES", "public.users")
t.Setenv("PG_EXCLUDE_TABLES", "public.orders")

defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when both include and exclude tables are specified")
}
}()

LoadConfig(true)
})

t.Run("Panics when both include and exclude tables are specified in args", func(t *testing.T) {
setTestArgs([]string{
"--port", "12345",
"--database", "mydb",
"--init-sql", "./init/duckdb.sql",
"--storage-path", "storage-path",
"--log-level", "ERROR",
"--storage-type", "local",
"--pg-database-url", "postgres://user:password@localhost:5432/db",
"--pg-sync-interval", "2h30m",
"--pg-schema-prefix", "mydb_",
"--include-tables", "public.users",
"--exclude-tables", "public.orders",
})

defer func() {
if r := recover(); r == nil {
t.Error("Expected panic when both include and exclude tables are specified")
}
}()

LoadConfig()
})
}
6 changes: 6 additions & 0 deletions src/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ func loadTestConfig() *Config {
}

func setTestArgs(args []string) {
// Reset state
_config = Config{}
_password = ""
_pgIncludeTables = ""
_pgExcludeTables = ""

os.Args = append([]string{"cmd"}, args...)
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
registerFlags()
Expand Down
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const VERSION = "0.14.2"
const VERSION = "0.14.3"

func main() {
config := LoadConfig()
Expand Down
21 changes: 19 additions & 2 deletions src/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/csv"
"fmt"
"os"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -41,14 +42,30 @@ func (syncer *Syncer) SyncFromPostgres() {
pgSchemaTables := []SchemaTable{}
for _, schema := range syncer.listPgSchemas(conn) {
for _, pgSchemaTable := range syncer.listPgSchemaTables(conn, schema) {
pgSchemaTables = append(pgSchemaTables, pgSchemaTable)
syncer.syncFromPgTable(conn, pgSchemaTable)
if syncer.shouldSyncTable(pgSchemaTable) {
pgSchemaTables = append(pgSchemaTables, pgSchemaTable)
syncer.syncFromPgTable(conn, pgSchemaTable)
}
}
}

syncer.deleteOldIcebergSchemaTables(pgSchemaTables)
}

func (syncer *Syncer) shouldSyncTable(schemaTable SchemaTable) bool {
tableId := fmt.Sprintf("%s.%s", schemaTable.Schema, schemaTable.Table)

if syncer.config.Pg.IncludeTables != nil {
return syncer.config.Pg.IncludeTables.Contains(tableId)
}

if syncer.config.Pg.ExcludeTables != nil {
return !syncer.config.Pg.ExcludeTables.Contains(tableId)
}

return true
}

func (syncer *Syncer) listPgSchemas(conn *pgx.Conn) []string {
var schemas []string

Expand Down
59 changes: 59 additions & 0 deletions src/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import "testing"

func TestShouldSyncTable(t *testing.T) {
t.Run("returns true when no filters are set", func(t *testing.T) {
config := &Config{
Pg: PgConfig{
DatabaseUrl: "postgres://user:pass@localhost:5432/db",
},
}
syncer := NewSyncer(config)
table := SchemaTable{Schema: "public", Table: "users"}

if !syncer.shouldSyncTable(table) {
t.Error("Expected shouldSyncTable to return true when no filters are set")
}
})

t.Run("respects include filter", func(t *testing.T) {
config := &Config{
Pg: PgConfig{
DatabaseUrl: "postgres://user:pass@localhost:5432/db",
IncludeTables: NewSet([]string{"public.users", "public.orders"}),
},
}
syncer := NewSyncer(config)

included := SchemaTable{Schema: "public", Table: "users"}
if !syncer.shouldSyncTable(included) {
t.Error("Expected shouldSyncTable to return true for included table")
}

excluded := SchemaTable{Schema: "public", Table: "secrets"}
if syncer.shouldSyncTable(excluded) {
t.Error("Expected shouldSyncTable to return false for non-included table")
}
})

t.Run("respects exclude filter", func(t *testing.T) {
config := &Config{
Pg: PgConfig{
DatabaseUrl: "postgres://user:pass@localhost:5432/db",
ExcludeTables: NewSet([]string{"public.secrets", "public.cache"}),
},
}
syncer := NewSyncer(config)

included := SchemaTable{Schema: "public", Table: "users"}
if !syncer.shouldSyncTable(included) {
t.Error("Expected shouldSyncTable to return true for non-excluded table")
}

excluded := SchemaTable{Schema: "public", Table: "secrets"}
if syncer.shouldSyncTable(excluded) {
t.Error("Expected shouldSyncTable to return false for excluded table")
}
})
}

0 comments on commit c58814c

Please sign in to comment.