Skip to content

Commit

Permalink
feat: Introduce PrimaryKeyComponent (#1491)
Browse files Browse the repository at this point in the history

Primary key components are the inputs to a deterministic cq id.

Still needs tests...
  • Loading branch information
bbernays authored Jan 31, 2024
1 parent 2b98dab commit ae4a26e
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 101 deletions.
18 changes: 10 additions & 8 deletions docs/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type jsonTable struct {
}

type jsonColumn struct {
Name string `json:"name"`
Type string `json:"type"`
IsPrimaryKey bool `json:"is_primary_key,omitempty"`
IsIncrementalKey bool `json:"is_incremental_key,omitempty"`
Name string `json:"name"`
Type string `json:"type"`
IsPrimaryKey bool `json:"is_primary_key,omitempty"`
IsPrimaryKeyComponent bool `json:"is_primary_key_component,omitempty"`
IsIncrementalKey bool `json:"is_incremental_key,omitempty"`
}

func (g *Generator) renderTablesAsJSON(dir string) error {
Expand All @@ -44,10 +45,11 @@ func (g *Generator) jsonifyTables(tables schema.Tables) []jsonTable {
jsonColumns := make([]jsonColumn, len(table.Columns))
for c, col := range table.Columns {
jsonColumns[c] = jsonColumn{
Name: col.Name,
Type: col.Type.String(),
IsPrimaryKey: col.PrimaryKey,
IsIncrementalKey: col.IncrementalKey,
Name: col.Name,
Type: col.Type.String(),
IsPrimaryKey: col.PrimaryKey,
IsPrimaryKeyComponent: col.PrimaryKeyComponent,
IsIncrementalKey: col.IncrementalKey,
}
}
jsonTables[i] = jsonTable{
Expand Down
171 changes: 99 additions & 72 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ func testTableSuccessWithCQIDPK() *schema.Table {
}
}

func testTableSuccessWithPKComponents() *schema.Table {
cqID := schema.CqIDColumn
cqID.PrimaryKey = true
return &schema.Table{
Name: "test_table_succes_vpk__cq_id",
Resolver: testResolverSuccess,
Columns: []schema.Column{
cqID,
{
Name: "test_column",
Type: arrow.PrimitiveTypes.Int64,
PrimaryKeyComponent: true,
},
},
}
}

func testTableResolverPanic() *schema.Table {
return &schema.Table{
Name: "test_table_resolver_panic",
Expand Down Expand Up @@ -270,6 +287,16 @@ var syncTestCases = []syncTestCase{
},
deterministicCQID: false,
},
{
table: testTableSuccessWithPKComponents(),
data: []scalar.Vector{
{
// This value will not be validated as it will be randomly set by the scheduler
&scalar.UUID{},
&scalar.Int{Value: 3, Valid: true},
},
},
},
}

func TestScheduler(t *testing.T) {
Expand All @@ -289,77 +316,6 @@ func TestScheduler(t *testing.T) {
}
}

func TestScheduler_Cancellation(t *testing.T) {
data := make([]any, 100)

tests := []struct {
name string
data []any
cancel bool
messageCount int
}{
{
name: "should consume all message",
data: data,
cancel: false,
messageCount: len(data) + 1, // 9 data + 1 migration message
},
{
name: "should not consume all message on cancel",
data: data,
cancel: true,
messageCount: len(data) + 1, // 9 data + 1 migration message
},
}

for _, strategy := range AllStrategies {
strategy := strategy
for _, tc := range tests {
tc := tc
t.Run(fmt.Sprintf("%s_%s", tc.name, strategy.String()), func(t *testing.T) {
logger := zerolog.New(zerolog.NewTestWriter(t))
if tc.cancel {
logger = zerolog.Nop() // FIXME without this, zerolog usage causes a race condition when tests are run with `-race -count=100`
}
sc := NewScheduler(WithLogger(logger), WithStrategy(strategy))

messages := make(chan message.SyncMessage)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
err := sc.Sync(
ctx,
&testExecutionClient{},
[]*schema.Table{testTableSuccessWithData(tc.data)},
messages,
)
if tc.cancel {
assert.Equal(t, err, context.Canceled)
} else {
require.NoError(t, err)
}
close(messages)
}()

messageConsumed := 0
for range messages {
if tc.cancel {
cancel()
}
messageConsumed++
}

if tc.cancel {
assert.NotEqual(t, tc.messageCount, messageConsumed)
} else {
assert.Equal(t, tc.messageCount, messageConsumed)
}
})
}
}
}

// nolint:revive
func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, deterministicCQID bool) {
ctx := context.Background()
Expand Down Expand Up @@ -411,7 +367,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist
initialTable := tables.Get(v.Table.Name)

pks := migratedTable.PrimaryKeys()
if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil {
if (deterministicCQID || len(migratedTable.PrimaryKeyComponents()) > 0) && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil {
if len(pks) != 1 {
t.Fatalf("expected 1 pk. got %d", len(pks))
}
Expand All @@ -433,3 +389,74 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist
t.Fatalf("expected %d resources. got %d", len(tc.data), i)
}
}

func TestScheduler_Cancellation(t *testing.T) {
data := make([]any, 100)

tests := []struct {
name string
data []any
cancel bool
messageCount int
}{
{
name: "should consume all message",
data: data,
cancel: false,
messageCount: len(data) + 1, // 9 data + 1 migration message
},
{
name: "should not consume all message on cancel",
data: data,
cancel: true,
messageCount: len(data) + 1, // 9 data + 1 migration message
},
}

for _, strategy := range AllStrategies {
strategy := strategy
for _, tc := range tests {
tc := tc
t.Run(fmt.Sprintf("%s_%s", tc.name, strategy.String()), func(t *testing.T) {
logger := zerolog.New(zerolog.NewTestWriter(t))
if tc.cancel {
logger = zerolog.Nop() // FIXME without this, zerolog usage causes a race condition when tests are run with `-race -count=100`
}
sc := NewScheduler(WithLogger(logger), WithStrategy(strategy))

messages := make(chan message.SyncMessage)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
err := sc.Sync(
ctx,
&testExecutionClient{},
[]*schema.Table{testTableSuccessWithData(tc.data)},
messages,
)
if tc.cancel {
assert.Equal(t, err, context.Canceled)
} else {
require.NoError(t, err)
}
close(messages)
}()

messageConsumed := 0
for range messages {
if tc.cancel {
cancel()
}
messageConsumed++
}

if tc.cancel {
assert.NotEqual(t, tc.messageCount, messageConsumed)
} else {
assert.Equal(t, tc.messageCount, messageConsumed)
}
})
}
}
}
9 changes: 5 additions & 4 deletions schema/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
)

const (
MetadataUnique = "cq:extension:unique"
MetadataPrimaryKey = "cq:extension:primary_key"
MetadataConstraintName = "cq:extension:constraint_name"
MetadataIncremental = "cq:extension:incremental"
MetadataUnique = "cq:extension:unique"
MetadataPrimaryKey = "cq:extension:primary_key"
MetadataPrimaryKeyComponent = "cq:extension:primary_key_component"
MetadataConstraintName = "cq:extension:constraint_name"
MetadataIncremental = "cq:extension:incremental"

MetadataTrue = "true"
MetadataFalse = "false"
Expand Down
36 changes: 26 additions & 10 deletions schema/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Column struct {
IncrementalKey bool `json:"incremental_key"`
// Unique requires the destinations supporting this to mark this column as unique
Unique bool `json:"unique"`

// PrimaryKeyComponent is a flag that indicates if the column is used as part of the input to calculate the value of `_cq_id`.
PrimaryKeyComponent bool `json:"primary_key_component"`
}

// NewColumnFromArrowField creates a new Column from an arrow.Field
Expand All @@ -64,14 +67,18 @@ func NewColumnFromArrowField(f arrow.Field) Column {
v, ok = f.Metadata.GetValue(MetadataIncremental)
column.IncrementalKey = ok && v == MetadataTrue

v, ok = f.Metadata.GetValue(MetadataPrimaryKeyComponent)
column.PrimaryKeyComponent = ok && v == MetadataTrue

return column
}

func (c Column) ToArrowField() arrow.Field {
mdKV := map[string]string{
MetadataPrimaryKey: MetadataFalse,
MetadataUnique: MetadataFalse,
MetadataIncremental: MetadataFalse,
MetadataPrimaryKey: MetadataFalse,
MetadataUnique: MetadataFalse,
MetadataIncremental: MetadataFalse,
MetadataPrimaryKeyComponent: MetadataFalse,
}
if c.PrimaryKey {
mdKV[MetadataPrimaryKey] = MetadataTrue
Expand All @@ -82,6 +89,9 @@ func (c Column) ToArrowField() arrow.Field {
if c.IncrementalKey {
mdKV[MetadataIncremental] = MetadataTrue
}
if c.PrimaryKeyComponent {
mdKV[MetadataPrimaryKeyComponent] = MetadataTrue
}

return arrow.Field{
Name: c.Name,
Expand All @@ -93,13 +103,14 @@ func (c Column) ToArrowField() arrow.Field {

func (c Column) MarshalJSON() ([]byte, error) {
type Alias struct {
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
PrimaryKey bool `json:"primary_key"`
NotNull bool `json:"not_null"`
Unique bool `json:"unique"`
IncrementalKey bool `json:"incremental_key"`
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
PrimaryKey bool `json:"primary_key"`
NotNull bool `json:"not_null"`
Unique bool `json:"unique"`
IncrementalKey bool `json:"incremental_key"`
PrimaryKeyComponent bool `json:"primary_key_component"`
}
var alias Alias
alias.Name = c.Name
Expand All @@ -109,6 +120,7 @@ func (c Column) MarshalJSON() ([]byte, error) {
alias.NotNull = c.NotNull
alias.Unique = c.Unique
alias.IncrementalKey = c.IncrementalKey
alias.PrimaryKeyComponent = c.PrimaryKeyComponent

return json.Marshal(alias)
}
Expand All @@ -130,6 +142,10 @@ func (c Column) String() string {
if c.IncrementalKey {
sb.WriteString(":IncrementalKey")
}

if c.PrimaryKeyComponent {
sb.WriteString(":PrimaryKeyComponent")
}
return sb.String()
}

Expand Down
2 changes: 0 additions & 2 deletions schema/doc.go

This file was deleted.

24 changes: 19 additions & 5 deletions schema/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schema
import (
"crypto/sha256"
"fmt"
"hash"
"slices"

"github.com/cloudquery/plugin-sdk/v4/scalar"
Expand Down Expand Up @@ -79,21 +80,34 @@ func (r *Resource) GetValues() scalar.Vector {

//nolint:revive
func (r *Resource) CalculateCQID(deterministicCQID bool) error {
// if `PrimaryKeyComponent` is set, we calculate the CQID based on those components
pkComponents := r.Table.PrimaryKeyComponents()
if len(pkComponents) > 0 {
return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, pkComponents).Sum(nil)))
}

// If deterministicCQID is false, we generate a random CQID
if !deterministicCQID {
return r.storeCQID(uuid.New())
}
names := r.Table.PrimaryKeys()
// If there are no primary keys or if CQID is the only PK, we generate a random CQID
if len(names) == 0 || (len(names) == 1 && names[0] == CqIDColumn.Name) {
return r.storeCQID(uuid.New())
}
slices.Sort(names)

return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, names).Sum(nil)))
}

func calculateCqIDValue(r *Resource, cols []string) hash.Hash {
h := sha256.New()
for _, name := range names {
slices.Sort(cols)
for _, col := range cols {
// We need to include the column name in the hash because the same value can be present in multiple columns and therefore lead to the same hash
h.Write([]byte(name))
h.Write([]byte(r.Get(name).String()))
h.Write([]byte(col))
h.Write([]byte(r.Get(col).String()))
}
return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, h.Sum(nil)))
return h
}

func (r *Resource) storeCQID(value uuid.UUID) error {
Expand Down
Loading

1 comment on commit ae4a26e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏱️ Benchmark results

  • Glob-8 ns/op: 90.95

Please sign in to comment.