Skip to content

Commit

Permalink
Fixed bug where Kafka-Topic would use the wrong type of config
Browse files Browse the repository at this point in the history
  • Loading branch information
WinterAlexander committed May 12, 2022
1 parent 4953569 commit a64bb05
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 6 deletions.
6 changes: 3 additions & 3 deletions examples/simple_producer.adb
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ procedure Simple_Producer is
begin
Ada.Text_IO.Put_Line("Kafka version: " & Kafka.Version);

-- Create a new config object
-- Create a new config object
Config := Kafka.Config.Create;

-- Configure your properties
Kafka.Config.Set(Config, "client.id", GNAT.Sockets.Host_name);
Kafka.Config.Set(Config, "bootstrap.servers", "localhost:9092");

-- Create handle
-- Create handle
Handle := Kafka.Create_Handle(Kafka.RD_KAFKA_PRODUCER, Config);

-- Create topic handle
Topic := Kafka.Topic.Create_Topic_Handle(Handle, "test_topic", Config); -- topic must already exist
Topic := Kafka.Topic.Create_Topic_Handle(Handle, "test_topic"); -- topic must already exist

-- Producing a String
Kafka.Produce(Topic,
Expand Down
30 changes: 30 additions & 0 deletions src/kafka-topic-config.adb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package body Kafka.Topic.Config is
Error_Buffer_Size : constant size_t := 512;
RD_Kafka_Conf_OK : constant Integer := 0;

procedure Set(Config : Topic_Config_Type;
Name : String;
Value : String) is
C_Name : chars_ptr := New_String(Name);
C_Value : chars_ptr := New_String(Value);
C_Err : chars_ptr := Alloc(Error_Buffer_Size);
Result : Integer;
begin
Result := rd_kafka_topic_conf_set(Config, C_Name, C_Value, C_Err, Error_Buffer_Size);

if Result /= RD_Kafka_Conf_OK then
declare
Error : String := Interfaces.C.Strings.Value(C_Err);
begin
Free(C_Name);
Free(C_Value);
Free(C_Err);
raise Kafka_Error with Error;
end;
end if;

Free(C_Name);
Free(C_Value);
Free(C_Err);
end Set;
end Kafka.Topic.Config;
63 changes: 63 additions & 0 deletions src/kafka-topic-config.ads
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@


package Kafka.Topic.Config is

--
-- Creates a new kafka topic config object
--
-- librdkafka equivalent: rd_kafka_topic_conf_new
--
function Create return Topic_Config_Type
with Import => True,
Convention => C,
External_Name => "rd_kafka_topic_conf_new";

--
-- Destroys a kafka topic config object
--
-- librdkafka equivalent: rd_kafka_topic_conf_destroy
--
-- @param Config configuration to destroy
--
procedure Destroy(Config : Topic_Config_Type)
with Import => True,
Convention => C,
External_Name => "rd_kafka_topic_conf_destroy";

--
-- Duplicates a kafka topic config object
--
-- librdkafka equivalent: rd_kafka_topic_conf_dup
--
-- @param Config configuration to duplicate
--
function Duplicate(Config : Topic_Config_Type) return Topic_Config_Type
with Import => True,
Convention => C,
External_Name => "rd_kafka_topic_conf_dup";

--
-- Sets a kafka topic config property for a given kafka topic config.
--
-- librdkafka equivalent: rd_kafka_topic_conf_set
--
-- @param Config configuration to set the property in
-- @param Name name of property to set
-- @param Value value of property to set
-- @raises Kafka_Error on error
--
procedure Set(Config : Topic_Config_Type;
Name : String;
Value : String);
private

function rd_kafka_topic_conf_set(conf : Topic_Config_Type;
name : chars_ptr;
value : chars_ptr;
errstr : chars_ptr;
errstr_size : size_t) return Integer
with Import => True,
Convention => C,
External_Name => "rd_kafka_topic_conf_set";

end Kafka.Topic.Config;
8 changes: 7 additions & 1 deletion src/kafka-topic.adb
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package body Kafka.Topic is
function Create_Topic_Handle(Handle : Handle_Type;
Topic : String;
Config : Config_Type) return Topic_Type is
Config : Topic_Config_Type) return Topic_Type is
C_Topic : chars_ptr := New_String(Topic);
Topic_Handle : Topic_Type;
begin
Topic_Handle := rd_kafka_topic_new(Handle, C_Topic, Config);
Free(C_Topic);
return Topic_Handle;
end;

function Create_Topic_Handle(Handle : Handle_Type;
Topic : String) return Topic_Type is
begin
return Create_Topic_Handle(Handle, Topic, Topic_Config_Type(System.Null_Address));
end;

function Get_Name(Topic : Topic_Type) return String is
begin
Expand Down
14 changes: 12 additions & 2 deletions src/kafka-topic.ads
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@ package Kafka.Topic is
--
function Create_Topic_Handle(Handle : Handle_Type;
Topic : String;
Config : Config_Type) return Topic_Type;
Config : Topic_Config_Type) return Topic_Type;


--
-- Creates a handle for a given topic. Does not perform the admin command
-- to create a topic
--
-- librdkafka equivalent: rd_kafka_topic_new
--
function Create_Topic_Handle(Handle : Handle_Type;
Topic : String) return Topic_Type;

--
-- Destroys the specified topic handle
Expand Down Expand Up @@ -44,7 +54,7 @@ private

function rd_kafka_topic_new(Handle : Handle_Type;
Topic : chars_ptr;
Config : Config_Type) return Topic_Type
Config : Topic_Config_Type) return Topic_Type
with Import => True,
Convention => C,
External_Name => "rd_kafka_topic_new";
Expand Down
1 change: 1 addition & 0 deletions src/kafka.ads
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ package Kafka is
type Handle_Type is new System.Address;
type Topic_Type is new System.Address;
type Config_Type is new System.Address;
type Topic_Config_Type is new System.Address;
type Partition_List_Type is new System.Address;

type Kafka_Handle_Type is (RD_KAFKA_PRODUCER, RD_KAFKA_CONSUMER)
Expand Down

0 comments on commit a64bb05

Please sign in to comment.