Skip to content

Commit

Permalink
feat: add support for row deletion policy
Browse files Browse the repository at this point in the history
Adds support in spanDDL for creating/add/replacing row deletion policy
  • Loading branch information
alethenorio committed Sep 21, 2021
1 parent faddb4a commit ad266a8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 4 deletions.
9 changes: 5 additions & 4 deletions spanddl/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func (d *Database) applyCreateTable(stmt *spansql.CreateTable) (err error) {
return fmt.Errorf("table %s already exists", stmt.Name)
}
table := &Table{
Name: stmt.Name,
Columns: make([]*Column, 0, len(stmt.Columns)),
Interleave: stmt.Interleave,
PrimaryKey: stmt.PrimaryKey,
Name: stmt.Name,
Columns: make([]*Column, 0, len(stmt.Columns)),
Interleave: stmt.Interleave,
PrimaryKey: stmt.PrimaryKey,
RowDeletionPolicy: stmt.RowDeletionPolicy,
}
for _, columnDef := range stmt.Columns {
var column Column
Expand Down
100 changes: 100 additions & 0 deletions spanddl/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,43 @@ func TestDatabase_ApplyDDL(t *testing.T) {
},
},

{
name: "create table with row deletion policy",
ddls: []string{
`CREATE TABLE Singers (
CreatedAt TIMESTAMP NOT NULL,
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX),
BirthDate DATE,
) PRIMARY KEY(SingerId)
, ROW DELETION POLICY (OLDER_THAN(CreatedAt, INTERVAL 30 DAY));`,
},
expected: &Database{
Tables: []*Table{
{
Name: "Singers",
Columns: []*Column{
{Name: "CreatedAt", Type: spansql.Type{Base: spansql.Timestamp}, NotNull: true},
{Name: "SingerId", Type: spansql.Type{Base: spansql.Int64}, NotNull: true},
{Name: "FirstName", Type: spansql.Type{Base: spansql.String, Len: 1024}},
{Name: "LastName", Type: spansql.Type{Base: spansql.String, Len: 1024}},
{Name: "SingerInfo", Type: spansql.Type{Base: spansql.Bytes, Len: spansql.MaxLen}},
{Name: "BirthDate", Type: spansql.Type{Base: spansql.Date}},
},
PrimaryKey: []spansql.KeyPart{
{Column: "SingerId"},
},
RowDeletionPolicy: &spansql.RowDeletionPolicy{
Column: "CreatedAt",
NumDays: 30,
},
},
},
},
},

{
name: "add column",
ddls: []string{
Expand All @@ -70,6 +107,69 @@ func TestDatabase_ApplyDDL(t *testing.T) {
},
},

{
name: "add row deletion policy",
ddls: []string{
`CREATE TABLE Singers (
CreatedAt TIMESTAMP NOT NULL,
SingerId INT64 NOT NULL,
) PRIMARY KEY(SingerId);`,

`ALTER TABLE Singers ADD ROW DELETION POLICY (OLDER_THAN(CreatedAt, INTERVAL 1 DAY));`,
},
expected: &Database{
Tables: []*Table{
{
Name: "Singers",
Columns: []*Column{
{Name: "CreatedAt", Type: spansql.Type{Base: spansql.Timestamp}, NotNull: true},
{Name: "SingerId", Type: spansql.Type{Base: spansql.Int64}, NotNull: true},
},
PrimaryKey: []spansql.KeyPart{
{Column: "SingerId"},
},
RowDeletionPolicy: &spansql.RowDeletionPolicy{
Column: "CreatedAt",
NumDays: 1,
},
},
},
},
},

{
name: "replace row deletion policy",
ddls: []string{
`CREATE TABLE Singers (
CreatedAt TIMESTAMP NOT NULL,
ModifiedAt TIMESTAMP NOT NULL,
SingerId INT64 NOT NULL,
) PRIMARY KEY(SingerId)
, ROW DELETION POLICY (OLDER_THAN(CreatedAt, INTERVAL 30 DAY));`,

`ALTER TABLE Singers REPLACE ROW DELETION POLICY (OLDER_THAN(ModifiedAt, INTERVAL 7 DAY));`,
},
expected: &Database{
Tables: []*Table{
{
Name: "Singers",
Columns: []*Column{
{Name: "CreatedAt", Type: spansql.Type{Base: spansql.Timestamp}, NotNull: true},
{Name: "ModifiedAt", Type: spansql.Type{Base: spansql.Timestamp}, NotNull: true},
{Name: "SingerId", Type: spansql.Type{Base: spansql.Int64}, NotNull: true},
},
PrimaryKey: []spansql.KeyPart{
{Column: "SingerId"},
},
RowDeletionPolicy: &spansql.RowDeletionPolicy{
Column: "ModifiedAt",
NumDays: 7,
},
},
},
},
},

{
name: "set column options",
ddls: []string{
Expand Down
31 changes: 31 additions & 0 deletions spanddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Table struct {
InterleavedTables []*Table
PrimaryKey []spansql.KeyPart
Interleave *spansql.Interleave
RowDeletionPolicy *spansql.RowDeletionPolicy
}

func (t *Table) applyAlterTable(stmt *spansql.AlterTable) error {
Expand All @@ -29,6 +30,10 @@ func (t *Table) applyAlterTable(stmt *spansql.AlterTable) error {
return t.applyDropConstraintAlteration(alteration)
case spansql.SetOnDelete:
return t.applySetOnDeleteAlteration(alteration)
case spansql.AddRowDeletionPolicy:
return t.applyAddRowDeletionPolicy(alteration)
case spansql.ReplaceRowDeletionPolicy:
return t.applyReplaceRowDeletionPolicy(alteration)
default:
return fmt.Errorf("unhandled alteration (%s)", alteration.SQL())
}
Expand Down Expand Up @@ -121,6 +126,32 @@ func (t *Table) applySetOnDeleteAlteration(alteration spansql.SetOnDelete) (err
return nil
}

func (t *Table) applyAddRowDeletionPolicy(alteration spansql.AddRowDeletionPolicy) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("apply ADD ROW DELETION POLICY: %w", err)
}
}()
if _, ok := t.Column(alteration.RowDeletionPolicy.Column); !ok {
return fmt.Errorf("column %s does not exist", alteration.RowDeletionPolicy.Column)
}
t.RowDeletionPolicy = &alteration.RowDeletionPolicy
return nil
}

func (t *Table) applyReplaceRowDeletionPolicy(alteration spansql.ReplaceRowDeletionPolicy) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("apply REPLACE ROW DELETION POLICY: %w", err)
}
}()
if _, ok := t.Column(alteration.RowDeletionPolicy.Column); !ok {
return fmt.Errorf("column %s does not exist", alteration.RowDeletionPolicy.Column)
}
t.RowDeletionPolicy = &alteration.RowDeletionPolicy
return nil
}

func (t *Table) indexOfColumn(name spansql.ID) int {
for i, column := range t.Columns {
if column.Name == name {
Expand Down

0 comments on commit ad266a8

Please sign in to comment.