Skip to content

Commit

Permalink
Fixing TTL in Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
subnetmarco committed Apr 4, 2016
1 parent 7101496 commit 051813e
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 9 deletions.
6 changes: 6 additions & 0 deletions kong.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
## Key for encrypting network traffic within Kong. Must be a base64-encoded 16-byte key.
# encrypt: "foo"

######
## The TTL (time to live) of a node in the cluster when it stops sending healthcheck pings, maybe because of a
## failure. If the node is not able to send a new healthcheck before the expiration, then new nodes in the cluster
## will stop attempting to connect to it on startup.
# ttl_on_failure: 3600

######
## Specify which database to use. Only "cassandra" is currently available.
# database: cassandra
Expand Down
2 changes: 1 addition & 1 deletion kong/cli/services/serf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ function Serf:_add_node()
local _, err = self._dao_factory.nodes:insert({
name = name,
cluster_listening_address = stringy.strip(addr)
}, {ttl = 3600})
}, {ttl = self._configuration.cluster.ttl_on_failure})
if err then
return false, err
end
Expand Down
2 changes: 1 addition & 1 deletion kong/core/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ local function send_keepalive(premature)
ngx.log(ngx.ERR, tostring(err))
elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node)
local _, err = singletons.dao.nodes:update(node, node, {ttl=singletons.configuration.cluster.ttl_on_failure})
if err then
ngx.log(ngx.ERR, tostring(err))
end
Expand Down
26 changes: 25 additions & 1 deletion kong/dao/cassandra_db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ function CassandraDB:count(table_name, tbl, schema)
end
end

function CassandraDB:update(table_name, schema, constraints, filter_keys, values, nils, full, options)
function CassandraDB:update(table_name, schema, constraints, filter_keys, values, nils, full, model, options)
-- must check unique constaints manually too
local err = check_unique_constraints(self, table_name, constraints, values, filter_keys, true)
if err then
Expand All @@ -309,6 +309,30 @@ function CassandraDB:update(table_name, schema, constraints, filter_keys, values
return nil, err
end

-- Cassandra TTL on update is per-column and not per-row, and TTLs cannot be updated on primary keys.
-- Not only that, but TTL on other rows can only be incremented, and not decremented. Because of all
-- of these limitations, the only way to make this happen is to do an upsert operation.
-- This implementation can be changed once Cassandra closes this issue: https://issues.apache.org/jira/browse/CASSANDRA-9312
if options and options.ttl then
if schema.primary_key and #schema.primary_key == 1 and filter_keys[schema.primary_key[1]] then
local row, err = self:find(table_name, schema, filter_keys)
if err then
return nil, err
elseif row then
for k, v in pairs(row) do
if not values[k] then
model[k] = v -- Populate the model to be used later for the insert
end
end

-- Insert without any contraint check, since the check has already been executed
return self:insert(table_name, schema, model, {unique={}, foreign={}}, options)
end
else
return nil, "Cannot update TTL on entities that have more than one primary_key"
end
end

local sets, args, where = {}, {}
for col, value in pairs(values) do
local field = schema.fields[col]
Expand Down
2 changes: 1 addition & 1 deletion kong/dao/dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ function DAO:update(tbl, filter_keys, options)
fix(old, values, self.schema)
end

local res, err = self.db:update(self.table, self.schema, self.constraints, primary_keys, values, nils, full_update, options)
local res, err = self.db:update(self.table, self.schema, self.constraints, primary_keys, values, nils, full_update, model, options)
if err then
return nil, err
elseif res then
Expand Down
2 changes: 1 addition & 1 deletion kong/dao/postgres_db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ function PostgresDB:count(table_name, tbl, schema)
end
end

function PostgresDB:update(table_name, schema, _, filter_keys, values, nils, full, options)
function PostgresDB:update(table_name, schema, _, filter_keys, values, nils, full, _, options)
local args = {}
local values, err = self:serialize_timestamps(values, schema)
if err then
Expand Down
3 changes: 2 additions & 1 deletion kong/tools/config_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ return {
["auto-join"] = {type = "boolean", default = true},
["advertise"] = {type = "string", nullable = true},
["encrypt"] = {type = "string", nullable = true},
["profile"] = {type = "string", default = "wan", enum = {"wan", "lan", "local"}}
["profile"] = {type = "string", default = "wan", enum = {"wan", "lan", "local"}},
["ttl_on_failure"] = {type = "number", default = 3600}
}
},
["database"] = {type = "string", default = "cassandra", enum = {"cassandra", "postgres"}},
Expand Down
35 changes: 32 additions & 3 deletions spec/integration/dao/07-options_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ helpers.for_each_dao(function(db_type, default_options, TYPES)
assert.falsy(row)
end)

it("on update", function()
it("on update - increase ttl", function()
local api, err = factory.apis:insert({
name = "mockbin", request_host = "mockbin.com",
upstream_url = "http://mockbin.com"
Expand All @@ -57,20 +57,49 @@ helpers.for_each_dao(function(db_type, default_options, TYPES)
-- Updating the TTL to a higher value
factory.apis:update({name = "mockbin2"}, {id = api.id}, {ttl = 10})

os.execute("sleep 5")
os.execute("sleep 9")

row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.truthy(row)

os.execute("sleep 5")
os.execute("sleep 2")

-- It has now finally expired
row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.falsy(row)
end)

it("on update - decrease ttl", function()
local api, err = factory.apis:insert({
name = "mockbin", request_host = "mockbin.com",
upstream_url = "http://mockbin.com"
}, {ttl = 10})
assert.falsy(err)

os.execute("sleep 3")

-- Retrieval
local row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.truthy(row)

-- Updating the TTL to a lower value
local _, err = factory.apis:update({name = "mockbin2"}, {id = api.id}, {ttl = 3})
assert.falsy(err)

os.execute("sleep 4")

row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.falsy(row)
end)
Expand Down

0 comments on commit 051813e

Please sign in to comment.