Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
olegrok committed Apr 30, 2024
1 parent 825579a commit f3ed6e3
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 29 deletions.
2 changes: 2 additions & 0 deletions kafka/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ new_event_queues() {

void
destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
if (event_queues == NULL)
return;
if (event_queues->consume_queue != NULL) {
msg_t *msg = NULL;
while (true) {
Expand Down
36 changes: 29 additions & 7 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
* Consumer poll thread
*/

struct consumer_poller_t {
rd_kafka_t *rd_consumer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
};

static void *
consumer_poll_loop(void *arg) {
set_thread_name("kafka_consumer");
Expand Down Expand Up @@ -605,8 +613,8 @@ lua_create_consumer(struct lua_State *L) {
}

char errstr[512];
rd_kafka_conf_t *rd_config = rd_kafka_conf_new();

// FIXME: this value is not handled in consumer_destroy
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
lua_pushstring(L, "default_topic_options");
lua_gettable(L, -2);
Expand All @@ -618,15 +626,15 @@ lua_create_consumer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "consumer config default topic options must contains only string keys and string values");
return 2;
goto topic_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto topic_error;
}

// pop value, leaving original key
Expand All @@ -636,6 +644,8 @@ lua_create_consumer(struct lua_State *L) {
// stack now contains: -1 => table
}
lua_pop(L, 1);

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);

event_queues_t *event_queues = new_event_queues();
Expand Down Expand Up @@ -678,15 +688,15 @@ lua_create_consumer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "consumer config options must contains only string keys and string values");
return 2;
goto config_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

// pop value, leaving original key
Expand All @@ -701,13 +711,14 @@ lua_create_consumer(struct lua_State *L) {
if (!(rd_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, rd_config, errstr, sizeof(errstr)))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

rd_config = NULL; // was freed by rd_kafka_new
if (rd_kafka_brokers_add(rd_consumer, brokers) == 0) {
lua_pushnil(L);
lua_pushliteral(L, "No valid brokers specified");
return 2;
goto broker_error;
}

rd_kafka_poll_set_consumer(rd_consumer);
Expand All @@ -728,6 +739,17 @@ lua_create_consumer(struct lua_State *L) {
luaL_getmetatable(L, consumer_label);
lua_setmetatable(L, -2);
return 1;

broker_error:
rd_kafka_destroy(rd_consumer);
config_error:
if (rd_config != NULL)
rd_kafka_conf_destroy(rd_config);
destroy_event_queues(L, event_queues);
return 2;
topic_error:
rd_kafka_topic_conf_destroy(topic_conf);
return 2;
}

int
Expand Down
8 changes: 1 addition & 7 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@
* Consumer
*/

typedef struct {
rd_kafka_t *rd_consumer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} consumer_poller_t;
typedef struct consumer_poller_t consumer_poller_t;

typedef struct {
rd_kafka_t *rd_consumer;
Expand Down
37 changes: 29 additions & 8 deletions kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
* Producer poll thread
*/

struct producer_poller_t {
rd_kafka_t *rd_producer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
};

static void *
producer_poll_loop(void *arg) {
set_thread_name("kafka_producer");
Expand Down Expand Up @@ -448,8 +456,7 @@ lua_create_producer(struct lua_State *L) {

char errstr[512];

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();

// FIXME: this value is not handled in consumer_destroy
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
lua_pushstring(L, "default_topic_options");
lua_gettable(L, -2);
Expand All @@ -461,15 +468,15 @@ lua_create_producer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "producer config default topic options must contains only string keys and string values");
return 2;
goto topic_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_topic_conf_set(topic_conf, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto topic_error;
}

// pop value, leaving original key
Expand All @@ -479,6 +486,8 @@ lua_create_producer(struct lua_State *L) {
// stack now contains: -1 => table
}
lua_pop(L, 1);

rd_kafka_conf_t *rd_config = rd_kafka_conf_new();
rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf);

event_queues_t *event_queues = new_event_queues();
Expand Down Expand Up @@ -522,15 +531,15 @@ lua_create_producer(struct lua_State *L) {
if (!(lua_isstring(L, -1)) || !(lua_isstring(L, -2))) {
lua_pushnil(L);
lua_pushliteral(L, "producer config options must contains only string keys and string values");
return 2;
goto config_error;
}

const char *value = lua_tostring(L, -1);
const char *key = lua_tostring(L, -2);
if (rd_kafka_conf_set(rd_config, key, value, errstr, sizeof(errstr))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

// pop value, leaving original key
Expand All @@ -545,13 +554,14 @@ lua_create_producer(struct lua_State *L) {
if (!(rd_producer = rd_kafka_new(RD_KAFKA_PRODUCER, rd_config, errstr, sizeof(errstr)))) {
lua_pushnil(L);
lua_pushstring(L, errstr);
return 2;
goto config_error;
}

rd_config = NULL; // was freed by rd_kafka_new
if (rd_kafka_brokers_add(rd_producer, brokers) == 0) {
lua_pushnil(L);
lua_pushliteral(L, "No valid brokers specified");
return 2;
goto broker_error;
}

// creating background thread for polling consumer
Expand All @@ -570,6 +580,17 @@ lua_create_producer(struct lua_State *L) {
luaL_getmetatable(L, producer_label);
lua_setmetatable(L, -2);
return 1;

broker_error:
rd_kafka_destroy(rd_producer);
config_error:
if (rd_config != NULL)
rd_kafka_conf_destroy(rd_config);
destroy_event_queues(L, event_queues);
return 2;
topic_error:
rd_kafka_topic_conf_destroy(topic_conf);
return 2;
}

int
Expand Down
8 changes: 1 addition & 7 deletions kafka/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
* Producer
*/

typedef struct {
rd_kafka_t *rd_producer;
pthread_t thread;
pthread_attr_t attr;
int should_stop;
pthread_mutex_t lock;
} producer_poller_t;
typedef struct producer_poller_t producer_poller_t;

typedef struct {
rd_kafka_topic_t **elements;
Expand Down
35 changes: 35 additions & 0 deletions tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,40 @@ local function test_seek_partitions()
return messages
end

local function test_create_errors()
log.info('Create without config')
local _, err = tnt_kafka.Consumer.create()
assert(err == 'config must not be nil')

log.info('Create with empty config')
local _, err = tnt_kafka.Consumer.create({})
assert(err == 'consumer config table must have non nil key \'brokers\' which contains string')

log.info('Create with empty brokers')
local _, err = tnt_kafka.Consumer.create({brokers = ''})
assert(err == 'No valid brokers specified')

log.info('Create with invalid default_topic_options keys')
local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[{}] = 2}})
assert(err == 'consumer config default topic options must contains only string keys and string values')

log.info('Create with invalid default_topic_options property')
local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')

log.info('Create with invalid options keys')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[{}] = 2}})
assert(err == 'consumer config options must contains only string keys and string values')

log.info('Create with invalid options property')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')

log.info('Create with incompatible properties')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {['reconnect.backoff.max.ms'] = '2', ['reconnect.backoff.ms'] = '1000'}})
assert(err == '`reconnect.backoff.max.ms` must be >= `reconnect.max.ms`')
end

return {
create = create,
subscribe = subscribe,
Expand All @@ -231,4 +265,5 @@ return {
resume = resume,

test_seek_partitions = test_seek_partitions,
test_create_errors = test_create_errors,
}
36 changes: 36 additions & 0 deletions tests/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,40 @@ local function close()
end
end

local function test_create_errors()
log.info('Create without config')
local _, err = tnt_kafka.Producer.create()
assert(err == 'config must not be nil')

log.info('Create with empty config')
local _, err = tnt_kafka.Producer.create({})
assert(err == 'producer config table must have non nil key \'brokers\' which contains string')

log.info('Create with empty brokers')
local _, err = tnt_kafka.Producer.create({brokers = ''})
assert(err == 'No valid brokers specified')

log.info('Create with invalid default_topic_options keys')
local _, err = tnt_kafka.Producer.create({brokers = '', default_topic_options = {[{}] = 2}})
assert(err == 'producer config default topic options must contains only string keys and string values')

log.info('Create with invalid default_topic_options property')
local _, err = tnt_kafka.Producer.create({brokers = '', default_topic_options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')

log.info('Create with invalid options keys')
local _, err = tnt_kafka.Producer.create({brokers = '', options = {[{}] = 2}})
assert(err == 'producer config options must contains only string keys and string values')

log.info('Create with invalid options property')
local _, err = tnt_kafka.Producer.create({brokers = '', options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')

log.info('Create with incompatible properties')
local _, err = tnt_kafka.Producer.create({brokers = '', options = {['reconnect.backoff.max.ms'] = '2', ['reconnect.backoff.ms'] = '1000'}})
assert(err == '`reconnect.backoff.max.ms` must be >= `reconnect.max.ms`')
end

return {
create = create,
produce = produce,
Expand All @@ -121,4 +155,6 @@ return {
dump_conf = dump_conf,
metadata = metadata,
list_groups = list_groups,

test_create_errors = test_create_errors,
}
5 changes: 5 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def test_consumer_seek_partitions():
assert item['value'] == value


def test_consumer_create_errors_partitions():
server = get_server()
server.call("consumer.test_create_errors")


def test_consumer_should_consume_msgs_from_multiple_topics():
message1 = {
"key": "test1",
Expand Down
4 changes: 4 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,7 @@ def test_producer_should_log_debug():
assert len(response[0]) > 0

server.call("producer.close", [])

def test_producer_create_errors_partitions():
server = get_server()
server.call("producer.test_create_errors")

0 comments on commit f3ed6e3

Please sign in to comment.