Skip to content

Commit

Permalink
Merge pull request #146 from Mashape/refactor/dao-prepare
Browse files Browse the repository at this point in the history
[refactor] dao prepare
  • Loading branch information
thibaultcha committed Apr 21, 2015
2 parents 86ddf6f + 1947fc1 commit 092032e
Show file tree
Hide file tree
Showing 9 changed files with 708 additions and 659 deletions.
5 changes: 0 additions & 5 deletions kong/cli/db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ if args.command == "seed" then
cutils.logger:error_exit(err)
end

local err = dao_factory:prepare()
if err then
cutils.logger:error(err)
end

local faker = Faker(dao_factory)
faker:seed(args.random and args.number or nil)
cutils.logger:success("Populated")
Expand Down
107 changes: 58 additions & 49 deletions kong/dao/cassandra/base_dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local cassandra = require "cassandra"
local timestamp = require "kong.tools.timestamp"
local validate = require("kong.dao.schemas").validate
local DaoError = require "kong.dao.error"
local stringy = require "stringy"
local Object = require "classic"
local utils = require "kong.tools.utils"
local uuid = require "uuid"
Expand All @@ -17,8 +18,7 @@ uuid.seed()

function BaseDao:new(properties)
self._properties = properties
self._statements = {} -- Mirror of _queries but with prepared statements instead of strings
self._statements_cache = {} -- Prepared statements of SELECTS generated with find_by_keys
self._statements_cache = {}
end

-------------
Expand Down Expand Up @@ -87,7 +87,7 @@ end
function BaseDao:_check_unique(statement, t, is_updating)
local results, err = self:_execute(statement, t)
if err then
return false, "Error during UNIQUE check: "..err
return false, "Error during UNIQUE check: "..err.message
elseif results and #results > 0 then
if not is_updating then
return false
Expand Down Expand Up @@ -118,7 +118,7 @@ end
function BaseDao:_check_foreign(statement, t)
local results, err = self:_execute(statement, t)
if err then
return false, "Error during FOREIGN check: "..err
return false, "Error during FOREIGN check: "..err.message
elseif not results or #results == 0 then
return false
else
Expand All @@ -133,10 +133,10 @@ end
-- @return {table|nil} Error if any during execution
-- @return {table|nil} A table with the list of not existing foreign entities
function BaseDao:_check_all_foreign(t)
if not self._statements.__foreign then return true end
if not self._queries.__foreign then return true end

local errors
for k, statement in pairs(self._statements.__foreign) do
for k, statement in pairs(self._queries.__foreign) do
if t[k] and t[k] ~= constants.DATABASE_NULL_ID then
local exists, err = self:_check_foreign(statement, t)
if err then
Expand All @@ -158,10 +158,10 @@ end
-- @return {table|nil} Error if any during execution
-- @return {table|nil} A table with the list of already existing entities
function BaseDao:_check_all_unique(t, is_updating)
if not self._statements.__unique then return true end
if not self._queries.__unique then return true end

local errors
for k, statement in pairs(self._statements.__unique) do
for k, statement in pairs(self._queries.__unique) do
if t[k] or k == "self" then
local unique, err = self:_check_unique(statement, t, is_updating)
if err then
Expand Down Expand Up @@ -219,9 +219,11 @@ end

-- Execute an operation statement.
-- The operation can be one of the following:
-- * _statements (which contains .query and .param for ordered binding of parameters)
-- * _queries (which contains .query and .param for ordered binding of parameters) and
-- will be prepared on the go if not already in the statements cache
-- * a lua-resty-cassandra BatchStatement (see ratelimiting_metrics.lua)
-- * a lua-resty-cassandra prepared statement
-- * a plain query (string)
--
-- @param {table} operation The operation to execute
-- @param {table} values_to_bind Raw values to bind
Expand All @@ -232,33 +234,39 @@ end
-- Boolean if type of results is VOID
-- @return {table|nil} Cassandra error if any
function BaseDao:_execute(operation, values_to_bind, options)
local statement
local statement = operation

-- Determine kind of operation
if operation.is_kong_statement then
statement = operation.query
-- Retrieve the prepared statement from cache or prepare and cache
local cache_key
if operation.query then
cache_key = operation.query
elseif type(operation) == "string" then
cache_key = operation
end

if operation.params and values_to_bind then
local errors
values_to_bind, errors = encode_cassandra_values(self._schema, values_to_bind, operation.params)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end
if cache_key then
if not self._statements_cache[cache_key] then
statement = self:prepare_kong_statement(cache_key, operation.params)
else
statement = self._statements_cache[cache_key].statement
end
end

-- Bind parameters if operation has some
if operation.params and values_to_bind then
local errors
values_to_bind, errors = encode_cassandra_values(self._schema, values_to_bind, operation.params)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end
elseif operation.is_batch_statement then
statement = operation
values_to_bind = nil
options = nil
else
statement = operation
end

-- Execute operation
local session, err = self:_open_session()
if err then
return nil, err
end

-- Execute operation
local results, err = session:execute(statement, values_to_bind, options)
if err then
err = DaoError(err, error_types.DATABASE)
Expand All @@ -271,11 +279,12 @@ function BaseDao:_execute(operation, values_to_bind, options)

-- Parse result
if results and results.type == "ROWS" then
-- do we have more pages to fetch?
-- do we have more pages to fetch? if so, alias the paging_state
if results.meta.has_more_pages then
results.next_page = results.meta.paging_state
end

-- only the DAO needs those, it should be transparant in the application
results.meta = nil
results.type = nil

Expand All @@ -285,7 +294,7 @@ function BaseDao:_execute(operation, values_to_bind, options)

return results, err
elseif results and results.type == "VOID" then
-- return boolean
-- result is not a set of rows, let's return a boolean to indicate success
return err == nil, err
else
return results, err
Expand All @@ -304,15 +313,19 @@ end
--
-- @param {string} query A CQL query to prepare
-- @param {table} params An array of parameters (ordered) matching the query placeholders order
-- @return {table|nil} A "kong statement" to be used by _execute
-- @return {table|nil} A "kong statement" with a prepared statement and parameters to be used by _execute
-- @return {table|nil} Error if any
function BaseDao:prepare_kong_statement(query, params)
-- handle SELECT queries with %s for dynamic select by keys
local query_to_prepare = string.format(query, "")
query_to_prepare = stringy.strip(query_to_prepare)

local session, err = self:_open_session()
if err then
return nil, err
end

local prepared_stmt, prepare_err = session:prepare(query)
local prepared_stmt, prepare_err = session:prepare(query_to_prepare)

local err = self:_close_session(session)
if err then
Expand All @@ -322,11 +335,16 @@ function BaseDao:prepare_kong_statement(query, params)
if prepare_err then
return nil, DaoError("Failed to prepare statement: \""..query_to_prepare.."\". "..prepare_err, error_types.DATABASE)
else
return {
is_kong_statement = true,
local kong_statement = {
query = query,
params = params,
query = prepared_stmt
statement = prepared_stmt
}

-- cache key is the non-striped/non-formatted query from _queries
self._statements_cache[query] = kong_statement

return prepared_stmt
end
end

Expand Down Expand Up @@ -370,7 +388,7 @@ function BaseDao:insert(t)
return nil, DaoError(errors, error_types.FOREIGN)
end

local _, stmt_err = self:_execute(self._statements.insert, self:_marshall(t))
local _, stmt_err = self:_execute(self._queries.insert, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand All @@ -392,7 +410,7 @@ function BaseDao:update(t)

-- Check if exists to prevent upsert and manually set UNSET values (pfffff...)
local results
ok, err, results = self:_check_foreign(self._statements.select_one, t)
ok, err, results = self:_check_foreign(self._queries.select_one, t)
if err then
return nil, DaoError(err, error_types.DATABASE)
elseif not ok then
Expand Down Expand Up @@ -430,7 +448,7 @@ function BaseDao:update(t)
return nil, DaoError(errors, error_types.FOREIGN)
end

local _, stmt_err = self:_execute(self._statements.update, self:_marshall(t))
local _, stmt_err = self:_execute(self._queries.update, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand All @@ -443,7 +461,7 @@ end
-- @param {string} id UUID of element to select
-- @return _execute()
function BaseDao:find_one(id)
local data, err = self:_execute(self._statements.select_one, { id = id })
local data, err = self:_execute(self._queries.select_one, { id = id })

-- Return the 1st and only element of the result set
if data and utils.table_size(data) > 0 then
Expand Down Expand Up @@ -491,16 +509,7 @@ function BaseDao:find_by_keys(t, page_size, paging_state)

local select_query = string.format(self._queries.select.query, where_str)

-- prepare query in a statement cache
if not self._statements_cache[select_query] then
local kong_stmt, err = self:prepare_kong_statement(select_query, keys)
if err then
return nil, DaoError(err, error_types.DATABASE)
end
self._statements_cache[select_query] = kong_stmt
end

return self:_execute(self._statements_cache[select_query], t, {
return self:_execute({ query = select_query, params = keys }, t, {
page_size = page_size,
paging_state = paging_state
})
Expand All @@ -521,14 +530,14 @@ end
-- @return {boolean} True if deleted, false if otherwise or not found
-- @return {table|nil} Error if any
function BaseDao:delete(id)
local exists, err = self:_check_foreign(self._statements.select_one, { id = id })
local exists, err = self:_check_foreign(self._queries.select_one, { id = id })
if err then
return false, DaoError(err, error_types.DATABASE)
elseif not exists then
return false
end

return self:_execute(self._statements.delete, { id = id })
return self:_execute(self._queries.delete, { id = id })
end

return BaseDao
67 changes: 37 additions & 30 deletions kong/dao/cassandra/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ local stringy = require "stringy"
local Object = require "classic"

local Apis = require "kong.dao.cassandra.apis"
local RateLimitingMetrics = require "kong.dao.cassandra.ratelimiting_metrics"
local PluginsConfigurations = require "kong.dao.cassandra.plugins_configurations"
local Consumers = require "kong.dao.cassandra.consumers"
local PluginsConfigurations = require "kong.dao.cassandra.plugins_configurations"
local BasicAuthCredentials = require "kong.dao.cassandra.basicauth_credentials"
local RateLimitingMetrics = require "kong.dao.cassandra.ratelimiting_metrics"
local KeyAuthCredentials = require "kong.dao.cassandra.keyauth_credentials"

local CassandraFactory = Object:extend()
Expand Down Expand Up @@ -43,54 +43,61 @@ function CassandraFactory:new(properties)
self._properties.hosts = normalize_localhost(self._properties.hosts)

self.apis = Apis(properties)
self.ratelimiting_metrics = RateLimitingMetrics(properties)
self.plugins_configurations = PluginsConfigurations(properties)
self.consumers = Consumers(properties)
self.plugins_configurations = PluginsConfigurations(properties)
self.basicauth_credentials = BasicAuthCredentials(properties)
self.ratelimiting_metrics = RateLimitingMetrics(properties)
self.keyauth_credentials = KeyAuthCredentials(properties)
end

function CassandraFactory:drop()
return self:execute_queries [[
TRUNCATE apis;
TRUNCATE ratelimiting_metrics;
TRUNCATE plugins_configurations;
TRUNCATE consumers;
TRUNCATE plugins_configurations;
TRUNCATE basicauth_credentials;
TRUNCATE keyauth_credentials;
TRUNCATE ratelimiting_metrics;
]]
end

-- Prepare all statements in collection._queries and put them in collection._statements.
-- Should be called with only a collection and will recursively call itself for nested statements.
-- @param collection A collection with a ._queries property
local function prepare_collection(collection, queries, statements)
if not queries then queries = collection._queries end
if not statements then statements = collection._statements end

for stmt_name, query in pairs(queries) do
if type(query) == "table" and query.query == nil then
collection._statements[stmt_name] = {}
prepare_collection(collection, query, collection._statements[stmt_name])
else
local q = stringy.strip(query.query)
q = string.format(q, "")
local kong_stmt, err = collection:prepare_kong_statement(q, query.params)
if err then
error(err)
-- Prepare all statements of collections `._queries` property and put them
-- in a statements cache
--
-- Note:
-- Even if the BaseDAO's :_execute() method support preparation of statements on-the-go,
-- this method should be called when Kong starts in order to detect any failure in advance
-- as well as test the connection to Cassandra.
--
-- @return error if any
function CassandraFactory:prepare()
local function prepare_collection(collection, collection_queries)
if not collection_queries then collection_queries = collection._queries end
for stmt_name, collection_query in pairs(collection_queries) do
if type(collection_query) == "table" and collection_query.query == nil then
-- Nested queries, let's recurse to prepare them too
prepare_collection(collection, collection_query)
else
-- _queries can contain strings or tables with string + keys of parameters to bind
local query_to_prepare
if type(collection_query) == "string" then
query_to_prepare = collection_query
elseif collection_query.query then
query_to_prepare = collection_query.query
end

local _, err = collection:prepare_kong_statement(query_to_prepare, collection_query.params)
if err then
error(err)
end
end
statements[stmt_name] = kong_stmt
end
end
end

-- Prepare all statements of collections
-- @return error if any
function CassandraFactory:prepare()
for _, collection in ipairs({ self.apis,
self.ratelimiting_metrics,
self.plugins_configurations,
self.consumers,
self.plugins_configurations,
self.ratelimiting_metrics,
self.basicauth_credentials,
self.keyauth_credentials }) do
local status, err = pcall(function() prepare_collection(collection) end)
Expand Down
2 changes: 1 addition & 1 deletion kong/dao/cassandra/plugins_configurations.lua
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function PluginsConfigurations:find_distinct()

-- Execute query
local distinct_names = {}
for _, rows, page, err in session:execute(self._statements.select.query, nil, {auto_paging=true}) do
for _, rows, page, err in session:execute(string.format(self._queries.select.query, ""), nil, {auto_paging=true}) do
if err then
return nil, err
end
Expand Down
Loading

0 comments on commit 092032e

Please sign in to comment.