Skip to content

Commit

Permalink
feat(migrations) allow up_f field in Cassandra migrations
Browse files Browse the repository at this point in the history
The initial assumption behind only allowing strings of SQL and CQL on the "up" side of migrations was that the operations on this phase needed to be safe, fast and reentrant.

Unfortunately CQL is much more limited than Pg's SQL. So what's trivial to do in Pg is impossible to do in Cassandra. So we arrive to a point where trivial and safe operations get delayed to the `teardown` phase, simply because they need a more complete language than CQL to be done.

This change adds a new `up_f` method to which accepts a Lua function with a connector, effectively allowing the use of Lua inside Cassandra migrations' "up" phase. While adding the same features in Postgres would not be difficult, we think that SQL is powerful enough to do most "safe" things we need (perhaps with some trivial string replacement from Lua).

A migration can have both up and up_f. In that case, the up string is run first, and up_f immediately afterwards.

Incidentally we have also changed the validations on the migrations schema so that `up` isn't always required. This validation has forced us to set it to "" (empty string) in the past, which is not ideal.
  • Loading branch information
kikito authored and dndx committed Dec 9, 2020
1 parent aa11ec6 commit f66c3ec
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 38 deletions.
25 changes: 19 additions & 6 deletions kong/db/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -547,12 +547,25 @@ do
if run_up then
-- kong migrations bootstrap
-- kong migrations up
ok, err = self.connector:run_up_migration(mig.name,
strategy_migration.up)
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' up: %s",
mig.name, err)
if strategy_migration.up and strategy_migration.up ~= "" then
ok, err = self.connector:run_up_migration(mig.name,
strategy_migration.up)
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' up: %s",
mig.name, err)
end
end

if strategy_migration.up_f then
local pok, perr, err = xpcall(strategy_migration.up_f,
debug.traceback, self.connector,
mig_helpers)
if not pok or err then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' up_f: %s",
mig.name, perr or err)
end
end

local state = "executed"
Expand Down
35 changes: 27 additions & 8 deletions kong/db/schema/others/migrations.lua
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
local strat_migration = {
{ up = { type = "string", required = true, len_min = 0 } },
{ teardown = { type = "function" } },
}


return {
name = "migration",
fields = {
{ name = { type = "string", required = true } },
{ postgres = { type = "record", required = true, fields = strat_migration } },
{ cassandra = { type = "record", required = true, fields = strat_migration } },
{
postgres = {
type = "record", required = true,
fields = {
{ up = { type = "string", len_min = 0 } },
{ teardown = { type = "function" } },
},
},
},
{
cassandra = {
type = "record", required = true,
fields = {
{ up = { type = "string", len_min = 0 } },
{ up_f = { type = "function" } },
{ teardown = { type = "function" } },
},
}
},
},
entity_checks = {
{
at_least_one_of = {
"postgres.up", "postgres.teardown",
"cassandra.up", "cassandra.up_f", "cassandra.teardown"
},
},
},
}
43 changes: 26 additions & 17 deletions spec/01-unit/01-db/01-schema/10-migrations_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,49 @@ describe("migrations schema", function()

for _, strategy in helpers.each_strategy({"postgres", "cassandra"}) do

it("requires all strategies to be specified", function()
local t = {
postgres = { up = "" },
cassandra = { up = "" },
}

t[strategy] = nil
it("requires at least one field of pg.up, pg.teardown, c.up, c.up_f, c.teardown", function()
local t = {}

local ok, errs = MigrationsSchema:validate(t)
assert.is_nil(ok)
assert.equal("required field missing", errs[strategy])
assert.same({"at least one of these fields must be non-empty: " ..
"'postgres.up', 'postgres.teardown', 'cassandra.up', 'cassandra.up_f', " ..
"'cassandra.teardown'" },
errs["@entity"])
end)

it("validates '<strategy>.up' property", function()
local not_a_string = 1
local t = {
postgres = { up = "" },
cassandra = { up = "" },
[strategy] = {
up = not_a_string
}
}

t[strategy].up = nil

local ok, errs = MigrationsSchema:validate(t)
assert.is_nil(ok)
assert.equal("required field missing", errs[strategy]["up"])
assert.equal("expected a string", errs[strategy]["up"])
end)

if strategy == "cassandra" then
it("validates '<strategy>.up_f' property in cassandra", function()
local t = {
cassandra = { up_f = "this is not a function" },
}

local ok, errs = MigrationsSchema:validate(t)
assert.is_nil(ok)
assert.equal("expected a function", errs[strategy]["up_f"])
end)
end

it("validates '<strategy>.teardown' property", function()
local t = {
postgres = { up = "" },
cassandra = { up = "" },
[strategy] = {
teardown = "not a function"
}
}

t[strategy].teardown = ""

local ok, errs = MigrationsSchema:validate(t)
assert.is_nil(ok)
assert.equal("expected a function", errs[strategy]["teardown"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ return {
CREATE TABLE IF NOT EXISTS "foos" (
"color" TEXT PRIMARY KEY
);
INSERT INTO foos (color) values ('red');
]],
},

Expand All @@ -12,6 +14,20 @@ return {
CREATE TABLE IF NOT EXISTS foos (
color text PRIMARY KEY
);
INSERT INTO foos(color) values('red');
]],
up_f = function(connector)
local coordinator = assert(connector:get_stored_connection())
local _, err = coordinator:execute([[
INSERT INTO foos(color) values('green');
]])

if err then
return nil, err
end

return true
end,
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,34 @@ return {
]],

teardown = function(connector, _)
for rows, err in connector:iterate('SELECT * FROM "foos";') do
-- update shape in all foos
for row, err in connector:iterate('SELECT * FROM "foos";') do
if err then
return nil, err
end

for _, row in ipairs(rows) do
local shape = "triangle"
local sql = string.format([[
UPDATE "foos" SET "shape" = '%s' WHERE "color" = '%s';
]], shape, row.color)
assert(connector:query(sql))
local shape = "triangle"
local sql = string.format([[
UPDATE "foos" SET "shape" = '%s' WHERE "color" = '%s';
]], shape, row.color)
assert(connector:query(sql))
end


-- check insertion and update
local count = 0
for row, err in connector:iterate('SELECT * FROM "foos";') do
if err then
return nil, err
end

count = count + 1
assert(row.color == "red", "Wrong color: " .. tostring(row.color))
assert(row.shape == "triangle", "Wrong shape: " .. tostring(row.shape))
end

assert(count == 1, "Expected 1 foo, found " .. tostring(count))

return true
end,
},
Expand All @@ -34,9 +48,22 @@ return {
ALTER TABLE foos ADD shape text;
CREATE INDEX IF NOT EXISTS foos_shape_idx ON foos(shape);
]],
up_f = function(connector)
local coordinator = assert(connector:get_stored_connection())
local _, err = coordinator:execute([[
INSERT INTO foos(color) values('blue');
]])

if err then
return nil, err
end

return true
end,

teardown = function(connector, _)
local coordinator = assert(connector:get_stored_connection())
-- Update: assing shape=triangle to all foos
for rows, err in coordinator:iterate("SELECT * FROM foos") do
if err then
return nil, err
Expand All @@ -51,6 +78,24 @@ return {
end
end

-- final check of insertions/updates
local count = 0
for rows, err in coordinator:iterate("SELECT * FROM foos") do
if err then
return nil, err
end

for _, row in ipairs(rows) do
count = count + 1
assert(row.shape == "triangle", "Wrong shape: " .. tostring(row.shape))
local c = row.color
assert(
c == "red" or c == "green" or c == "blue",
"Wrong color: " .. tostring(c))
end
end
assert(count == 3, "Expected 3 foos, found " .. tostring(count))

return true
end,
},
Expand Down

0 comments on commit f66c3ec

Please sign in to comment.