Skip to content

Commit

Permalink
Fix: Addressed bugs related to partitioned tables (#241)
Browse files Browse the repository at this point in the history
Closes #238

* Resolved an issue where `Dump.createTocEntries` processed partitioned
tables as if they were physical entities, despite being logical.
* Refined the `findPartitionsOfPartitionedTable` function for improved
accuracy.
* Corrected merging in the pre-data, data, and post-data sections, which
previously caused a panic in dump command when the post-data section was
excluded.
* Added metadata about the first parent root table in the
`toolkit.Table` struct.
* Updated database schema metadata to include a root partition, which is
necessary for performing `--inserts` during restoration.
* Fixed an issue where dumps created with `--load-via-partition-root`
did not use the root partition table in `--inserts` generation during
restoration.
* Revised a documentation example to ensure it aligns with table
constraints.
  • Loading branch information
wwoytenko authored Nov 12, 2024
1 parent 7119261 commit 470a33c
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 60 deletions.
4 changes: 2 additions & 2 deletions docs/built_in_transformers/transformation_inheritance.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ configuration:
transformers:
- name: RandomDate
params:
min: "2000-01-01"
max: "2005-01-01"
min: "2022-01-01"
max: "2022-03-01"
column: "sale_date"
engine: "random"
```
Expand Down
8 changes: 6 additions & 2 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ func (d *Dump) createTocEntries() error {
Original: v.OriginalSize,
Compressed: v.CompressedSize,
}
tablesEntry = append(tablesEntry, entry)
if v.RelKind != 'p' {
// Do not create TOC entry for partitioned tables because they are not dumped. Only their partitions are
// dumped
tablesEntry = append(tablesEntry, entry)
}
tables = append(tables, v)
case *entries.Sequence:
sequences = append(sequences, entry)
Expand Down Expand Up @@ -529,7 +533,7 @@ func (d *Dump) MergeTocEntries(schemaEntries []*toc.Entry, dataEntries []*toc.En
res := make([]*toc.Entry, 0, len(schemaEntries)+len(dataEntries))

preDataEnd := 0
postDataStart := 0
postDataStart := len(schemaEntries) - 1

// Find predata last index and postdata first index
for idx, item := range schemaEntries {
Expand Down
70 changes: 32 additions & 38 deletions internal/db/postgres/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,52 +550,36 @@ func (r *Restore) logWarningsIfHasCycles() {
}
}

func (r *Restore) sortTocEntriesInTopoOrder() []*toc.Entry {
res := make([]*toc.Entry, 0, len(r.tocObj.Entries))

preDataEnd := 0
postDataStart := 0

func (r *Restore) sortTocEntriesInTopoOrder(entries []*toc.Entry) []*toc.Entry {
r.logWarningsIfHasCycles()

// Find predata last index and postdata first index
for idx, item := range r.tocObj.Entries {
if item.Section == toc.SectionPreData {
preDataEnd = idx
}
if item.Section == toc.SectionPostData {
postDataStart = idx
break
}
}
dataEntries := r.tocObj.Entries[preDataEnd+1 : postDataStart]
lastTableIdx := slices.IndexFunc(dataEntries, func(entry *toc.Entry) bool {
return *entry.Desc == toc.SequenceSetDesc || *entry.Desc == toc.BlobsDesc
})
tableEntries := dataEntries
if lastTableIdx != -1 {
tableEntries = dataEntries[:lastTableIdx]
}
sortedTablesEntries := make([]*toc.Entry, 0, len(tableEntries))
// Find data section entries
sortedTablesEntries := make([]*toc.Entry, 0, len(entries))
for _, dumpId := range r.metadata.DumpIdsOrder {
idx := slices.IndexFunc(tableEntries, func(entry *toc.Entry) bool {
idx := slices.IndexFunc(entries, func(entry *toc.Entry) bool {
return entry.DumpId == dumpId
})
if idx == -1 {
tableOid, ok := r.metadata.DumpIdsToTableOid[dumpId]
if !ok {
panic(fmt.Sprintf("table with dumpId %d is not found in dumpId to Oids map", dumpId))
}
skippedTableIdx := slices.IndexFunc(r.metadata.DatabaseSchema, func(t *toolkit.Table) bool {
return t.Oid == tableOid
})
if skippedTableIdx == -1 {
panic(fmt.Sprintf("table with oid %d is not found in DatabaseSchema", tableOid))
}
log.Debug().
Int32("DumpId", dumpId).
Msg("entry not found in table entries it might be excluded from dump")
Str("SchemaName", r.metadata.DatabaseSchema[skippedTableIdx].Schema).
Str("TableName", r.metadata.DatabaseSchema[skippedTableIdx].Name).
Msg("table might be excluded from dump or it is a partitioned table (not partition itself): table is not found in dump entries")
continue
}
sortedTablesEntries = append(sortedTablesEntries, tableEntries[idx])
}

res = append(res, r.tocObj.Entries[:preDataEnd+1]...)
res = append(res, sortedTablesEntries...)
if lastTableIdx != -1 {
res = append(res, dataEntries[lastTableIdx:]...)
sortedTablesEntries = append(sortedTablesEntries, entries[idx])
}
res = append(res, r.tocObj.Entries[postDataStart:]...)
return res
return sortedTablesEntries
}

func (r *Restore) waitDependenciesAreRestore(ctx context.Context, deps []int32) error {
Expand All @@ -615,9 +599,9 @@ func (r *Restore) waitDependenciesAreRestore(ctx context.Context, deps []int32)
func (r *Restore) taskPusher(ctx context.Context, tasks chan restorers.RestoreTask) func() error {
return func() error {
defer close(tasks)
tocEntries := r.tocObj.Entries
tocEntries := getDataSectionTocEntries(r.tocObj.Entries)
if r.restoreOpt.RestoreInOrder {
tocEntries = r.sortTocEntriesInTopoOrder()
tocEntries = r.sortTocEntriesInTopoOrder(tocEntries)
}
for _, entry := range tocEntries {
select {
Expand Down Expand Up @@ -842,3 +826,13 @@ func removeEscapeQuotes(v string) string {
}
return v
}

func getDataSectionTocEntries(tocEntries []*toc.Entry) []*toc.Entry {
var dataSectionEntries []*toc.Entry
for _, entry := range tocEntries {
if entry.Section == toc.SectionData {
dataSectionEntries = append(dataSectionEntries, entry)
}
}
return dataSectionEntries
}
4 changes: 2 additions & 2 deletions internal/db/postgres/context/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func isEndToEndPKFK(graph *subset.Graph, table *entries.Table) bool {
return foundInFK
}

func findPartitionsOfPartitionedTable(ctx context.Context, tx pgx.Tx, t *entries.Table) ([]toolkit.Oid, error) {
func findPartitionsOfPartitionedTable(ctx context.Context, tx pgx.Tx, t *toolkit.Table) ([]toolkit.Oid, error) {
log.Debug().
Str("TableSchema", t.Schema).
Str("TableName", t.Name).
Expand Down Expand Up @@ -615,7 +615,7 @@ func checkTransformerAlreadyExists(
func setupConfigForPartitionedTableChildren(
ctx context.Context, tx pgx.Tx, parentTcm *tableConfigMapping, tables []*entries.Table, cfg []*domains.Table,
) ([]*tableConfigMapping, error) {
parts, err := findPartitionsOfPartitionedTable(ctx, tx, parentTcm.entry)
parts, err := findPartitionsOfPartitionedTable(ctx, tx, parentTcm.entry.Table)
if err != nil {
return nil, fmt.Errorf(
"cannot find partitions of the table %s.%s: %w",
Expand Down
34 changes: 30 additions & 4 deletions internal/db/postgres/context/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package context

import (
"context"
"slices"

"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"

"github.com/greenmaskio/greenmask/internal/db/postgres/pgdump"
"github.com/greenmaskio/greenmask/pkg/toolkit"
Expand All @@ -12,7 +14,7 @@ import (
func getDatabaseSchema(
ctx context.Context, tx pgx.Tx, options *pgdump.Options, version int,
) ([]*toolkit.Table, error) {
var res []*toolkit.Table
var tables []*toolkit.Table
query, err := buildSchemaIntrospectionQuery(
options.Table, options.ExcludeTable,
options.IncludeForeignData, options.Schema,
Expand All @@ -36,11 +38,11 @@ func getDatabaseSchema(
if err != nil {
return nil, err
}
res = append(res, table)
tables = append(tables, table)
}

// fill columns
for _, table := range res {
for _, table := range tables {
// We do not exclude generated columns here, because the schema must be compared with the original
columns, err := getColumnsConfig(ctx, tx, table.Oid, version, false)
if err != nil {
Expand All @@ -49,5 +51,29 @@ func getDatabaseSchema(
table.Columns = columns
}

return res, nil
// 1. Find partitioned tables
// 2. Find all children of partitioned tables
// 3. Find children in the tables
// 4. Set RootPtSchema, RootPtName, RootPtOid for children
for _, table := range tables {
if table.Kind != "p" || table.Parent != 0 {
continue
}
for _, ptOId := range table.Children {
idx := slices.IndexFunc(tables, func(table *toolkit.Table) bool {
return table.Oid == ptOId
})
if idx == -1 {
log.Debug().
Int("TableOid", int(ptOId)).
Msg("table might be excluded: unable to find partitioned table")
continue
}
t := tables[idx]
t.RootPtName = table.Name
t.RootPtSchema = table.Schema
t.RootPtOid = table.Oid
}
}
return tables, nil
}
12 changes: 10 additions & 2 deletions internal/db/postgres/restorers/table_insert_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,18 @@ func (td *TableRestorerInsertFormat) generateInsertStmt(onConflictDoNothing bool
overridingSystemValue = "OVERRIDING SYSTEM VALUE "
}

tableName := *td.Entry.Tag
tableSchema := *td.Entry.Namespace

if td.Table.RootPtOid != 0 {
tableName = td.Table.RootPtName
tableSchema = td.Table.RootPtSchema
}

res := fmt.Sprintf(
`INSERT INTO %s.%s (%s) %sVALUES(%s)%s`,
*td.Entry.Namespace,
*td.Entry.Tag,
tableSchema,
tableName,
strings.Join(columnNames, ", "),
overridingSystemValue,
strings.Join(placeholders, ", "),
Expand Down
24 changes: 14 additions & 10 deletions pkg/toolkit/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ type Reference struct {
}

type Table struct {
Schema string `json:"schema"`
Name string `json:"name"`
Oid Oid `json:"oid"`
Columns []*Column `json:"columns"`
Kind string `json:"kind"`
Parent Oid `json:"parent"`
Children []Oid `json:"children"`
Size int64 `json:"size"`
PrimaryKey []string `json:"primary_key"`
Constraints []Constraint `json:"-"`
Schema string `json:"schema"`
Name string `json:"name"`
Oid Oid `json:"oid"`
Columns []*Column `json:"columns"`
Kind string `json:"kind"`
Parent Oid `json:"parent"`
Children []Oid `json:"children"`
Size int64 `json:"size"`
PrimaryKey []string `json:"primary_key"`
// RootPtSchema, RootPtName, RootPtOid - the first parent of the partitioned table
RootPtSchema string `json:"root_pt_schema"`
RootPtName string `json:"root_pt_name"`
RootPtOid Oid `json:"root_pt_oid"`
Constraints []Constraint `json:"-"`
}

func (t *Table) Validate() error {
Expand Down

0 comments on commit 470a33c

Please sign in to comment.