Skip to content

Commit

Permalink
Implemented a database subset with circular dependencies:
Browse files Browse the repository at this point in the history
Main feature:
* Added support for both left and right joins, depending on the constraint.
* Generates reliable recursive queries with integrity checks for circular dependencies
* Fixed some bugs from the previous increment
* Refactored code
  • Loading branch information
wwoytenko committed Aug 9, 2024
1 parent 397fedc commit 900285c
Show file tree
Hide file tree
Showing 22 changed files with 1,303 additions and 472 deletions.
10 changes: 9 additions & 1 deletion internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func getTable(ctx context.Context, tx pgx.Tx, t *domains.Table) ([]*entries.Tabl
return nil, nil, fmt.Errorf("cannot aply custom query on partitioned table \"%s\".\"%s\": is not supported", table.Schema, table.Name)
}
table.Query = t.Query
table.SubsetConds = t.SubsetConds
table.SubsetConds = escapeSubsetConds(t.SubsetConds)

if table.RelKind == 'p' {
if !t.ApplyForInherited {
Expand Down Expand Up @@ -406,3 +406,11 @@ func getTableConstraints(ctx context.Context, tx pgx.Tx, tableOid toolkit.Oid, v

return constraints, nil
}

func escapeSubsetConds(conds []string) []string {
var res []string
for _, c := range conds {
res = append(res, fmt.Sprintf(`( %s )`, c))
}
return res
}
2 changes: 1 addition & 1 deletion internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (td *TableDumper) Execute(ctx context.Context, tx pgx.Tx, st storages.Stora
if doneErr != nil {
log.Warn().Err(err).Msg("error terminating transformation pipeline")
}
return fmt.Errorf("error processing table dump: %w", err)
return fmt.Errorf("error processing table dump %s.%s: %w", td.table.Schema, td.table.Name, err)
}
log.Debug().Msg("transformation pipeline executed successfully")
return pipeline.Done(gtx)
Expand Down
19 changes: 0 additions & 19 deletions internal/db/postgres/dumpers/transformation_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"slices"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -48,7 +46,6 @@ type TransformationPipeline struct {
Transform TransformationFunc
isAsync bool
record *toolkit.Record
cycleResolutionFiles []io.ReadWriteCloser
}

func NewTransformationPipeline(ctx context.Context, eg *errgroup.Group, table *entries.Table, w io.Writer) (*TransformationPipeline, error) {
Expand Down Expand Up @@ -130,17 +127,6 @@ func (tp *TransformationPipeline) Init(ctx context.Context) error {
}
}

// Initialize cycle resolution store files
tp.cycleResolutionFiles = make([]io.ReadWriteCloser, len(tp.table.CycleResolutionOps))
for cycleResOpIdx, op := range tp.table.CycleResolutionOps {
file, err := os.Create(path.Join(tmpFilePath, op.FileName))
if err != nil {
closeAllOpenFiles(tp.cycleResolutionFiles, tp.table.CycleResolutionOps[:cycleResOpIdx], true)
return fmt.Errorf("error creating cycle resolution store file: %w", err)
}
tp.cycleResolutionFiles[cycleResOpIdx] = file
}

return nil
}

Expand Down Expand Up @@ -172,9 +158,6 @@ func (tp *TransformationPipeline) Dump(ctx context.Context, data []byte) (err er
return fmt.Errorf("error decoding copy line: %w", err)
}
tp.record.SetRow(tp.row)
if err = storeCycleResolutionOps(tp.record, tp.table.CycleResolutionOps, tp.cycleResolutionFiles); err != nil {
return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error storing cycle resolution ops: %w", err))
}

_, err = tp.Transform(ctx, tp.record)
if err != nil {
Expand Down Expand Up @@ -225,8 +208,6 @@ func (tp *TransformationPipeline) Done(ctx context.Context) error {
}
}

closeAllOpenFiles(tp.cycleResolutionFiles, tp.table.CycleResolutionOps, false)

if lastErr != nil {
return fmt.Errorf("error terminating initialized transformer: %w", lastErr)
}
Expand Down
71 changes: 0 additions & 71 deletions internal/db/postgres/dumpers/utils.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/db/postgres/entries/cycle_resolution_op.go

This file was deleted.

4 changes: 1 addition & 3 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
// TODO: Deduplicate SubsetQueries and SubsetInQueries by path
type Table struct {
*toolkit.Table
Query string
// CycleResolutionOps - list of columns and file to store that must be dumped for future cycles resolution
CycleResolutionOps []*CycleResolutionOp
Query string
Owner string
RelKind rune
RootPtSchema string
Expand Down
155 changes: 155 additions & 0 deletions internal/db/postgres/subset/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package subset

import (
"fmt"
"slices"
"sort"
"strings"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
)

type Component struct {
id int
// componentGraph - contains the mapping of the vertexes in the component to the edges in the original graph
// if the component contains one vertex and no edges, then there is only one vertex with no cycles
componentGraph map[int][]*Edge
// tables - the vertexes in the component
tables map[int]*entries.Table
// Cycles
cycles [][]*Edge
cyclesIdents map[string]struct{}
keys []string
}

func NewComponent(id int, componentGraph map[int][]*Edge, tables map[int]*entries.Table) *Component {
c := &Component{
id: id,
componentGraph: componentGraph,
tables: tables,
cyclesIdents: make(map[string]struct{}),
}
c.findCycles()
if c.hasCycle() {
c.keys = c.getComponentKeys()
} else {
c.keys = c.getOneTable().PrimaryKey
}

return c
}

func (c *Component) getSubsetConds() []string {
var subsetConds []string
for _, table := range c.tables {
if len(table.SubsetConds) > 0 {
subsetConds = append(subsetConds, table.SubsetConds...)
}
}
return subsetConds
}

func (c *Component) getOneTable() *entries.Table {
if !c.hasCycle() {
for _, table := range c.tables {
return table
}
}
panic("cannot call get one table method for cycled scc")
}

func (c *Component) hasCycle() bool {
return len(c.cycles) > 0
}

// findCycles - finds all cycles in the component
func (c *Component) findCycles() {
visited := make(map[int]bool)
var path []*Edge
recStack := make(map[int]bool)

// Collect and sort all vertices
var vertices []int
for v := range c.componentGraph {
vertices = append(vertices, v)
}
sort.Ints(vertices) // Ensure deterministic order

for _, v := range vertices {
if !visited[v] {
c.findAllCyclesDfs(v, visited, recStack, path)
}
}
}

// findAllCyclesDfs - the basic DFS algorithm adapted to find all cycles in the graph and collect the cycle vertices
func (c *Component) findAllCyclesDfs(v int, visited map[int]bool, recStack map[int]bool, path []*Edge) {
visited[v] = true
recStack[v] = true

// Sort edges to ensure deterministic order
var edges []*Edge
edges = append(edges, c.componentGraph[v]...)
sort.Slice(edges, func(i, j int) bool {
return edges[i].to.idx < edges[j].to.idx
})

for _, to := range edges {

path = append(path, to)
if !visited[to.idx] {
c.findAllCyclesDfs(to.idx, visited, recStack, path)
} else if recStack[to.idx] {
// Cycle detected
var cycle []*Edge
for idx := len(path) - 1; idx >= 0; idx-- {
cycle = append(cycle, path[idx])
if path[idx].from.idx == to.to.idx {
break
}
}
cycleId := getCycleIdent(cycle)
if _, ok := c.cyclesIdents[cycleId]; !ok {
res := slices.Clone(cycle)
slices.Reverse(res)
c.cycles = append(c.cycles, res)
c.cyclesIdents[cycleId] = struct{}{}
}
}
path = path[:len(path)-1]
}

recStack[v] = false
}

func getCycleIdent(cycle []*Edge) string {
ids := make([]string, 0, len(cycle))
for _, edge := range cycle {
ids = append(ids, fmt.Sprintf("%d", edge.id))
}
slices.Sort(ids)
return strings.Join(ids, "_")
}

func (c *Component) getComponentKeys() []string {
if len(c.cycles) > 1 {
panic("IMPLEMENT ME: multiple cycles in the component")
}
if !c.hasCycle() {
return c.getOneTable().PrimaryKey
}

var vertexes []int
for _, edge := range c.cycles[0] {
vertexes = append(vertexes, edge.to.idx)
}

var keys []string
for _, v := range vertexes {
table := c.tables[v]
for _, key := range table.PrimaryKey {
keys = append(keys, fmt.Sprintf(`%s__%s__%s`, table.Schema, table.Name, key))
}
}
return keys
}
13 changes: 13 additions & 0 deletions internal/db/postgres/subset/component_link.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package subset

type ComponentLink struct {
idx int
component *Component
}

func NewComponentLink(idx int, c *Component, keys, overriddenKeys []string) *ComponentLink {
return &ComponentLink{
idx: idx,
component: c,
}
}
Loading

0 comments on commit 900285c

Please sign in to comment.