From 8abc8c0db1369e645cbb136b8aa13a9a25390809 Mon Sep 17 00:00:00 2001 From: ttimot24 Date: Wed, 18 Sep 2024 19:15:20 +0200 Subject: [PATCH 1/5] #46 - Implement KafkaSerializer for JSON serialization instead using explicit object mapping --- .../keycloak/kafka/KafkaProducerFactory.java | 2 +- .../kafka/KafkaStandardProducerFactory.java | 12 ++++-- .../kafka/serializer/JsonSerializer.java | 38 +++++++++++++++++++ .../{ => spi}/KafkaEventListenerProvider.java | 25 +++++------- .../KafkaEventListenerProviderFactory.java | 4 +- .../KafkaEventListenerProviderTests.java | 1 + .../kafka/KafkaMockProducerFactory.java | 5 ++- 7 files changed, 64 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java rename src/main/java/com/github/snuk87/keycloak/kafka/{ => spi}/KafkaEventListenerProvider.java (77%) rename src/main/java/com/github/snuk87/keycloak/kafka/{ => spi}/KafkaEventListenerProviderFactory.java (92%) diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java index 7ce70ec..1f3ce03 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java @@ -6,7 +6,7 @@ public interface KafkaProducerFactory { - Producer createProducer(String clientId, String bootstrapServer, + Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties); } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 610e663..b881b35 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -3,21 +3,25 @@ import java.util.Map; import java.util.Properties; +import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class KafkaStandardProducerFactory implements KafkaProducerFactory { + private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class); + @Override - public Producer createProducer(String clientId, String bootstrapServer, + public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.putAll(optionalProperties); return new KafkaProducer<>(props); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java new file mode 100644 index 0000000..fd455d5 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java @@ -0,0 +1,38 @@ +package com.github.snuk87.keycloak.kafka.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class JsonSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + public JsonSerializer() { + + } + + @Override + public void configure(Map config, boolean isKey) { + + } + + @Override + public byte[] serialize(String topic, Object data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java similarity index 77% rename from src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java rename to src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java index dc52ff3..6a41e54 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java @@ -1,4 +1,4 @@ -package com.github.snuk87.keycloak.kafka; +package com.github.snuk87.keycloak.kafka.spi; import java.util.ArrayList; import java.util.List; @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.github.snuk87.keycloak.kafka.KafkaProducerFactory; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -17,9 +18,6 @@ import org.keycloak.events.EventType; import org.keycloak.events.admin.AdminEvent; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - public class KafkaEventListenerProvider implements EventListenerProvider { private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class); @@ -30,9 +28,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider { private String topicAdminEvents; - private Producer producer; - - private ObjectMapper mapper; + private Producer producer; public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, String topicAdminEvents, Map kafkaProducerProperties, KafkaProducerFactory factory) { @@ -50,13 +46,12 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri } producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); - mapper = new ObjectMapper(); } - private void produceEvent(String eventAsString, String topic) + private void produceEvent(final Object event, final String topic) throws InterruptedException, ExecutionException, TimeoutException { LOG.debug("Produce to topic: " + topicEvents + " ..."); - ProducerRecord record = new ProducerRecord<>(topic, eventAsString); + final ProducerRecord record = new ProducerRecord<>(topic, event); Future metaData = producer.send(record); RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); LOG.debug("Produced to topic: " + recordMetadata.topic()); @@ -66,22 +61,22 @@ private void produceEvent(String eventAsString, String topic) public void onEvent(Event event) { if (events.contains(event.getType())) { try { - produceEvent(mapper.writeValueAsString(event), topicEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + this.produceEvent(event, topicEvents); + } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); Thread.currentThread().interrupt(); } - } + } } @Override public void onEvent(AdminEvent event, boolean includeRepresentation) { if (topicAdminEvents != null) { try { - produceEvent(mapper.writeValueAsString(event), topicAdminEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + this.produceEvent(event, topicAdminEvents); + } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java similarity index 92% rename from src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java rename to src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java index 3d4f39c..c961fc9 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java @@ -1,7 +1,9 @@ -package com.github.snuk87.keycloak.kafka; +package com.github.snuk87.keycloak.kafka.spi; import java.util.Map; +import com.github.snuk87.keycloak.kafka.KafkaProducerConfig; +import com.github.snuk87.keycloak.kafka.KafkaStandardProducerFactory; import org.jboss.logging.Logger; import org.keycloak.Config.Scope; import org.keycloak.events.EventListenerProvider; diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java index de5307e..66ea508 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java @@ -6,6 +6,7 @@ import java.lang.reflect.Field; import java.util.Map; +import com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProvider; import org.apache.kafka.clients.producer.MockProducer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java index bdc6a3b..64f7ea3 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java @@ -2,6 +2,7 @@ import java.util.Map; +import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; @@ -9,9 +10,9 @@ class KafkaMockProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, + public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { - return new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + return new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); } } From 4de966803ee1a2b5840cbc0eb6139e213070fdc6 Mon Sep 17 00:00:00 2001 From: Timot Tarjani Date: Thu, 19 Sep 2024 06:26:45 +0000 Subject: [PATCH 2/5] Revert the key serializer --- .../snuk87/keycloak/kafka/KafkaStandardProducerFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index b881b35..9e45c74 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +21,7 @@ public Producer createProducer(String clientId, String bootstrap Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.putAll(optionalProperties); From 15b663c21bc11a7ef761497b460731272f709ebd Mon Sep 17 00:00:00 2001 From: ttimot24 Date: Fri, 20 Sep 2024 11:02:17 +0200 Subject: [PATCH 3/5] #46 - Added AVRO serializer capability --- Dockerfile | 4 +- docker-compose.yml | 54 +++++++++++++++++-- pom.xml | 18 +++++++ .../keycloak/kafka/KafkaProducerConfig.java | 13 ++++- .../kafka/KafkaStandardProducerFactory.java | 8 +-- 5 files changed, 83 insertions(+), 14 deletions(-) diff --git a/Dockerfile b/Dockerfile index 73e7f99..cba2b28 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # https://www.keycloak.org/server/containers -FROM quay.io/keycloak/keycloak:19.0.3 -RUN curl -sL https://github.com/SnuK87/keycloak-kafka/releases/download/1.1.1/keycloak-kafka-1.1.1-jar-with-dependencies.jar -o /opt/keycloak/providers/keycloak-kafka-1.1.1-jar-with-dependencies.jar +FROM quay.io/keycloak/keycloak:latest +COPY target/keycloak-kafka-1.2.0-jar-with-dependencies.jar /opt/keycloak/providers/keycloak-kafka-1.2.0-jar-with-dependencies.jar RUN /opt/keycloak/bin/kc.sh build ENTRYPOINT ["/opt/keycloak/bin/kc.sh"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b44d7b2..226a7b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,16 +2,21 @@ version: '3' services: keycloak: depends_on: - - "kafka" + - kafka image: keycloak-kafka + build: . ports: - "8080:8080" environment: KEYCLOAK_ADMIN: admin KEYCLOAK_ADMIN_PASSWORD: admin KAFKA_TOPIC: keycloak-events + KAFKA_ADMIN_TOPIC: keycloak-admin-events KAFKA_CLIENT_ID: keycloak - KAFKA_BOOTSTRAP_SERVERS: kafka:9094 + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + KAFKA_VALUE_SERIALIZER_CLASS: io.confluent.kafka.serializers.KafkaAvroSerializer + AUTO_REGISTER_SCHEMAS: 'true' command: start-dev zookeeper: @@ -24,7 +29,7 @@ services: kafka: depends_on: - - "zookeeper" + - zookeeper image: bitnami/kafka:latest ports: - "9092:9092" @@ -38,4 +43,45 @@ services: KAFKA_CREATE_TOPICS: "keycloak-events:1:1" ALLOW_PLAINTEXT_LISTENER: "yes" volumes: - - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file + - /var/run/docker.sock:/var/run/docker.sock + + schema-registry: + image: confluentinc/cp-schema-registry:7.3.3 + restart: always + depends_on: + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + deploy: + resources: + limits: + memory: 512MB + + redpanda-console: + image: docker.redpanda.com/vectorized/console:latest + restart: always + depends_on: + - kafka + ports: + - "9000:8080" + environment: + SERVER_BASEPATH: /redpanda + #METRICSNAMESPACE: redpanda-console + KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:9092} + KAFKA_SCHEMAREGISTRY_ENABLED: "true" + KAFKA_SCHEMAREGISTRY_URLS: "http://schema-registry:8081" + CONNECT_ENABLED: "false" + CONNECT_CLUSTERS_NAME: connect-cluster + CONNECT_CLUSTERS_URL: "http://connect:8083" + deploy: + resources: + limits: + memory: 1G + +networks: + default: + name: keycloak-kafka-network \ No newline at end of file diff --git a/pom.xml b/pom.xml index 61d575f..7bb1221 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,13 @@ 3.6.1.Final + + + confluent + https://packages.confluent.io/maven/ + + + org.keycloak @@ -62,6 +69,17 @@ ${junit.version} test + + + io.confluent + kafka-schema-registry + ${confluent.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java index 3b67b8d..0308b27 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java @@ -2,6 +2,8 @@ import java.util.HashMap; import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; import org.keycloak.Config.Scope; public class KafkaProducerConfig { @@ -9,8 +11,8 @@ public class KafkaProducerConfig { // https://kafka.apache.org/documentation/#producerconfigs public static Map init(Scope scope) { - Map propertyMap = new HashMap<>(); - KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); + final Map propertyMap = new HashMap<>(); + final KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); for (KafkaProducerProperty property : producerProperties) { String propertyEnv = System.getenv("KAFKA_" + property.name()); @@ -27,6 +29,13 @@ public static Map init(Scope scope) { } enum KafkaProducerProperty { + KEY_SERIALIZER_CLASS(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), + VALUE_SERIALIZER_CLASS(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), + AUTO_REGISTER_SCHEMAS("auto.register.schemas"), + USE_LATEST_VERSION("use.latest.version"), + SCHEMA_REGISTRY_URL("schema.registry.url"), + SCHEMA_REGISTRY_USER("schema.registry.user"), + SCHEMA_REGISTRY_PASSWORD("schema.registry.password"), ACKS("acks"), // BUFFER_MEMORY("buffer.memory"), // COMPRESSION_TYPE("compression.type"), // diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 9e45c74..72216f7 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -8,21 +8,17 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class KafkaStandardProducerFactory implements KafkaProducerFactory { - private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class); - @Override public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())); props.putAll(optionalProperties); return new KafkaProducer<>(props); From 4678bad1a3484b4a168aece889db39da0cf16f71 Mon Sep 17 00:00:00 2001 From: ttimot24 Date: Fri, 20 Sep 2024 15:34:35 +0200 Subject: [PATCH 4/5] #46 - Resolve compatibility issues between JSON and AVRO --- pom.xml | 19 ++ .../keycloak/kafka/KafkaProducerFactory.java | 5 +- .../kafka/KafkaStandardProducerFactory.java | 5 +- .../keycloak/kafka/mapper/KeycloakMapper.java | 53 +++++ .../kafka/serializer/JsonSerializer.java | 12 +- .../JacksonIgnoreAvroPropertiesMixIn.java | 15 ++ .../kafka/spi/KafkaEventListenerProvider.java | 12 +- ...ycloak.events.EventListenerProviderFactory | 2 +- .../avro/keycloak-admin-events-value.avsc | 137 +++++++++++ .../resources/avro/keycloak-events-value.avsc | 213 ++++++++++++++++++ .../KafkaEventListenerProviderTests.java | 4 + .../kafka/KafkaMockProducerFactory.java | 5 +- .../KeycloakEventAvroSerializerTest.java | 37 +++ 13 files changed, 503 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java create mode 100644 src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java create mode 100644 src/main/resources/avro/keycloak-admin-events-value.avsc create mode 100644 src/main/resources/avro/keycloak-events-value.avsc create mode 100644 src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java diff --git a/pom.xml b/pom.xml index 7bb1221..fd1a3d6 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,25 @@ + + + org.apache.avro + avro-maven-plugin + 1.12.0 + + + generate-sources + + schema + + + src/main/resources/avro (5) + ${project.build.directory}/generated-sources + String + + + + maven-assembly-plugin diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java index 1f3ce03..2d1f9ee 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java @@ -2,11 +2,12 @@ import java.util.Map; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.Producer; public interface KafkaProducerFactory { - Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties); + Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties); } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 72216f7..d4018d4 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -4,6 +4,7 @@ import java.util.Properties; import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -12,8 +13,8 @@ public final class KafkaStandardProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties) { + public Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java new file mode 100644 index 0000000..ec95d72 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java @@ -0,0 +1,53 @@ +package com.github.snuk87.keycloak.kafka.mapper; + +import com.github.snuk87.keycloak.kafka.dto.AuthDetails; +import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; +import com.github.snuk87.keycloak.kafka.dto.KeycloakEvent; +import com.github.snuk87.keycloak.kafka.dto.OperationType; +import org.keycloak.events.Event; +import org.keycloak.events.admin.AdminEvent; + +import java.util.Map; +import java.util.Optional; + +public class KeycloakMapper { + + public static KeycloakAdminEvent mapEventToKeycloakEvent(final AdminEvent event){ + return KeycloakAdminEvent.newBuilder() + .setId(event.getId()) + .setTime(event.getTime()) + .setRealmId(event.getRealmId()) + .setRealmName(event.getRealmName()) + .setError(event.getError()) + .setResourceType(event.getResourceTypeAsString()) + .setResourcePath(event.getResourcePath()) + .setRepresentation(event.getRepresentation()) + .setOperationType(OperationType.valueOf(Optional.ofNullable(event.getOperationType()).map(ot -> ot.toString()).orElse(OperationType.ACTION.toString()))) + .setAuthDetails( + Optional.ofNullable(event.getAuthDetails()).map(authDetails -> AuthDetails.newBuilder() + .setClientId(authDetails.getClientId()) + .setIpAddress(authDetails.getIpAddress()) + .setRealmId(authDetails.getRealmId()) + .setRealmName(authDetails.getRealmName()) + .setUserId(authDetails.getUserId()) + .build()) + .orElse(null) + ).build(); + } + + public static KeycloakEvent mapEventToKeycloakEvent(final Event event){ + return KeycloakEvent.newBuilder() + .setId(event.getId()) + .setTime(event.getTime()) + .setRealmId(event.getRealmId()) + .setRealmName(event.getRealmName()) + .setError(event.getError()) + .setClientId(event.getClientId()) + .setUserId(event.getUserId()) + .setSessionId(event.getSessionId()) + .setIpAddress(event.getIpAddress()) + .setDetails(event.getDetails()) + .build(); + } + +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java index fd455d5..2638b1b 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java @@ -2,16 +2,20 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.snuk87.keycloak.kafka.serializer.mixin.JacksonIgnoreAvroPropertiesMixIn; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; -public class JsonSerializer implements Serializer { - private final ObjectMapper objectMapper = new ObjectMapper(); +public class JsonSerializer implements Serializer { + private final ObjectMapper objectMapper; public JsonSerializer() { - + this.objectMapper = new ObjectMapper(); + this.objectMapper.addMixIn(SpecificRecord.class, JacksonIgnoreAvroPropertiesMixIn.class); } @Override @@ -20,7 +24,7 @@ public void configure(Map config, boolean isKey) { } @Override - public byte[] serialize(String topic, Object data) { + public byte[] serialize(String topic, SpecificRecordBase data) { if (data == null) { return null; } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java new file mode 100644 index 0000000..b8888a1 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java @@ -0,0 +1,15 @@ +package com.github.snuk87.keycloak.kafka.serializer.mixin; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; + +public abstract class JacksonIgnoreAvroPropertiesMixIn { + + @JsonIgnore + public abstract Schema getSchema(); + + @JsonIgnore + public abstract SpecificData getSpecificData(); + +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java index 6a41e54..758935b 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java @@ -9,6 +9,8 @@ import java.util.concurrent.TimeoutException; import com.github.snuk87.keycloak.kafka.KafkaProducerFactory; +import com.github.snuk87.keycloak.kafka.mapper.KeycloakMapper; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -28,7 +30,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider { private String topicAdminEvents; - private Producer producer; + private Producer producer; public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, String topicAdminEvents, Map kafkaProducerProperties, KafkaProducerFactory factory) { @@ -48,10 +50,10 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); } - private void produceEvent(final Object event, final String topic) + private void produceEvent(final SpecificRecordBase event, final String topic) throws InterruptedException, ExecutionException, TimeoutException { LOG.debug("Produce to topic: " + topicEvents + " ..."); - final ProducerRecord record = new ProducerRecord<>(topic, event); + final ProducerRecord record = new ProducerRecord<>(topic, event); Future metaData = producer.send(record); RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); LOG.debug("Produced to topic: " + recordMetadata.topic()); @@ -61,7 +63,7 @@ private void produceEvent(final Object event, final String topic) public void onEvent(Event event) { if (events.contains(event.getType())) { try { - this.produceEvent(event, topicEvents); + this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicEvents); } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { @@ -75,7 +77,7 @@ public void onEvent(Event event) { public void onEvent(AdminEvent event, boolean includeRepresentation) { if (topicAdminEvents != null) { try { - this.produceEvent(event, topicAdminEvents); + this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicAdminEvents); } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { diff --git a/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory index d5d1328..8dea32f 100644 --- a/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory +++ b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory @@ -1 +1 @@ -com.github.snuk87.keycloak.kafka.KafkaEventListenerProviderFactory \ No newline at end of file +com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProviderFactory \ No newline at end of file diff --git a/src/main/resources/avro/keycloak-admin-events-value.avsc b/src/main/resources/avro/keycloak-admin-events-value.avsc new file mode 100644 index 0000000..82f722f --- /dev/null +++ b/src/main/resources/avro/keycloak-admin-events-value.avsc @@ -0,0 +1,137 @@ +{ + "type": "record", + "name": "KeycloakAdminEvent", + "namespace": "com.github.snuk87.keycloak.kafka.dto", + "fields": [ + { + "name": "authDetails", + "type": [ + "null", + { + "type": "record", + "name": "AuthDetails", + "fields": [ + { + "name": "clientId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ipAddress", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "userId", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "error", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "operationType", + "type": [ + "null", + { + "type": "enum", + "name": "OperationType", + "symbols": [ + "CREATE", + "UPDATE", + "DELETE", + "ACTION" + ] + } + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "representation", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "resourcePath", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "resourceType", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "time", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/src/main/resources/avro/keycloak-events-value.avsc b/src/main/resources/avro/keycloak-events-value.avsc new file mode 100644 index 0000000..41d74f2 --- /dev/null +++ b/src/main/resources/avro/keycloak-events-value.avsc @@ -0,0 +1,213 @@ +{ + "type": "record", + "name": "KeycloakEvent", + "namespace": "com.github.snuk87.keycloak.kafka.dto", + "fields": [ + { + "name": "clientId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "details", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": null + }, + { + "name": "error", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ipAddress", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "sessionId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "time", + "type": "long" + }, + { + "name": "type", + "type": [ + "null", + { + "type": "enum", + "name": "EventType", + "symbols": [ + "LOGIN", + "LOGIN_ERROR", + "REGISTER", + "REGISTER_ERROR", + "LOGOUT", + "LOGOUT_ERROR", + "CODE_TO_TOKEN", + "CODE_TO_TOKEN_ERROR", + "CLIENT_LOGIN", + "CLIENT_LOGIN_ERROR", + "REFRESH_TOKEN", + "REFRESH_TOKEN_ERROR", + "VALIDATE_ACCESS_TOKEN", + "VALIDATE_ACCESS_TOKEN_ERROR", + "INTROSPECT_TOKEN", + "INTROSPECT_TOKEN_ERROR", + "FEDERATED_IDENTITY_LINK", + "FEDERATED_IDENTITY_LINK_ERROR", + "REMOVE_FEDERATED_IDENTITY", + "REMOVE_FEDERATED_IDENTITY_ERROR", + "UPDATE_EMAIL", + "UPDATE_EMAIL_ERROR", + "UPDATE_PROFILE", + "UPDATE_PROFILE_ERROR", + "UPDATE_PASSWORD", + "UPDATE_PASSWORD_ERROR", + "UPDATE_TOTP", + "UPDATE_TOTP_ERROR", + "VERIFY_EMAIL", + "VERIFY_EMAIL_ERROR", + "VERIFY_PROFILE", + "VERIFY_PROFILE_ERROR", + "REMOVE_TOTP", + "REMOVE_TOTP_ERROR", + "GRANT_CONSENT", + "GRANT_CONSENT_ERROR", + "UPDATE_CONSENT", + "UPDATE_CONSENT_ERROR", + "REVOKE_GRANT", + "REVOKE_GRANT_ERROR", + "SEND_VERIFY_EMAIL", + "SEND_VERIFY_EMAIL_ERROR", + "SEND_RESET_PASSWORD", + "SEND_RESET_PASSWORD_ERROR", + "SEND_IDENTITY_PROVIDER_LINK", + "SEND_IDENTITY_PROVIDER_LINK_ERROR", + "RESET_PASSWORD", + "RESET_PASSWORD_ERROR", + "RESTART_AUTHENTICATION", + "RESTART_AUTHENTICATION_ERROR", + "INVALID_SIGNATURE", + "INVALID_SIGNATURE_ERROR", + "REGISTER_NODE", + "REGISTER_NODE_ERROR", + "UNREGISTER_NODE", + "UNREGISTER_NODE_ERROR", + "USER_INFO_REQUEST", + "USER_INFO_REQUEST_ERROR", + "IDENTITY_PROVIDER_LINK_ACCOUNT", + "IDENTITY_PROVIDER_LINK_ACCOUNT_ERROR", + "IDENTITY_PROVIDER_LOGIN", + "IDENTITY_PROVIDER_LOGIN_ERROR", + "IDENTITY_PROVIDER_FIRST_LOGIN", + "IDENTITY_PROVIDER_FIRST_LOGIN_ERROR", + "IDENTITY_PROVIDER_POST_LOGIN", + "IDENTITY_PROVIDER_POST_LOGIN_ERROR", + "IDENTITY_PROVIDER_RESPONSE", + "IDENTITY_PROVIDER_RESPONSE_ERROR", + "IDENTITY_PROVIDER_RETRIEVE_TOKEN", + "IDENTITY_PROVIDER_RETRIEVE_TOKEN_ERROR", + "IMPERSONATE", + "IMPERSONATE_ERROR", + "CUSTOM_REQUIRED_ACTION", + "CUSTOM_REQUIRED_ACTION_ERROR", + "EXECUTE_ACTIONS", + "EXECUTE_ACTIONS_ERROR", + "EXECUTE_ACTION_TOKEN", + "EXECUTE_ACTION_TOKEN_ERROR", + "CLIENT_INFO", + "CLIENT_INFO_ERROR", + "CLIENT_REGISTER", + "CLIENT_REGISTER_ERROR", + "CLIENT_UPDATE", + "CLIENT_UPDATE_ERROR", + "CLIENT_DELETE", + "CLIENT_DELETE_ERROR", + "CLIENT_INITIATED_ACCOUNT_LINKING", + "CLIENT_INITIATED_ACCOUNT_LINKING_ERROR", + "TOKEN_EXCHANGE", + "TOKEN_EXCHANGE_ERROR", + "OAUTH2_DEVICE_AUTH", + "OAUTH2_DEVICE_AUTH_ERROR", + "OAUTH2_DEVICE_VERIFY_USER_CODE", + "OAUTH2_DEVICE_VERIFY_USER_CODE_ERROR", + "OAUTH2_DEVICE_CODE_TO_TOKEN", + "OAUTH2_DEVICE_CODE_TO_TOKEN_ERROR", + "AUTHREQID_TO_TOKEN", + "AUTHREQID_TO_TOKEN_ERROR", + "PERMISSION_TOKEN", + "PERMISSION_TOKEN_ERROR", + "DELETE_ACCOUNT", + "DELETE_ACCOUNT_ERROR", + "PUSHED_AUTHORIZATION_REQUEST", + "PUSHED_AUTHORIZATION_REQUEST_ERROR", + "USER_DISABLED_BY_PERMANENT_LOCKOUT", + "USER_DISABLED_BY_PERMANENT_LOCKOUT_ERROR", + "USER_DISABLED_BY_TEMPORARY_LOCKOUT", + "USER_DISABLED_BY_TEMPORARY_LOCKOUT_ERROR", + "OAUTH2_EXTENSION_GRANT", + "OAUTH2_EXTENSION_GRANT_ERROR", + "FEDERATED_IDENTITY_OVERRIDE_LINK", + "FEDERATED_IDENTITY_OVERRIDE_LINK_ERROR", + "INVITE_ORG", + "INVITE_ORG_ERROR" + ] + } + ], + "default": null + }, + { + "name": "userId", + "type": [ + "null", + "string" + ], + "default": null + } + ] +} diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java index 66ea508..d820690 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java @@ -29,6 +29,7 @@ void setUp() throws Exception { @Test void shouldProduceEventWhenTypeIsDefined() throws Exception { Event event = new Event(); + event.setId("1"); event.setType(EventType.REGISTER); MockProducer producer = getProducerUsingReflection(); @@ -40,6 +41,7 @@ void shouldProduceEventWhenTypeIsDefined() throws Exception { @Test void shouldDoNothingWhenTypeIsNotDefined() throws Exception { Event event = new Event(); + event.setId("1"); event.setType(EventType.CLIENT_DELETE); MockProducer producer = getProducerUsingReflection(); @@ -51,6 +53,7 @@ void shouldDoNothingWhenTypeIsNotDefined() throws Exception { @Test void shouldProduceEventWhenTopicAdminEventsIsNotNull() throws Exception { AdminEvent event = new AdminEvent(); + event.setId("1"); MockProducer producer = getProducerUsingReflection(); listener.onEvent(event, false); @@ -62,6 +65,7 @@ void shouldProduceEventWhenTopicAdminEventsIsNotNull() throws Exception { void shouldDoNothingWhenTopicAdminEventsIsNull() throws Exception { listener = new KafkaEventListenerProvider("", "", "", new String[] { "REGISTER" }, null, Map.of(), factory); AdminEvent event = new AdminEvent(); + event.setId("1"); MockProducer producer = getProducerUsingReflection(); listener.onEvent(event, false); diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java index 64f7ea3..c36d5a0 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java @@ -3,6 +3,7 @@ import java.util.Map; import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; @@ -10,8 +11,8 @@ class KafkaMockProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties) { + public Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties) { return new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); } diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java new file mode 100644 index 0000000..66c932d --- /dev/null +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java @@ -0,0 +1,37 @@ +package com.github.snuk87.keycloak.kafka; + +import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; +import org.apache.avro.Schema; +import org.apache.kafka.clients.admin.Admin; +import org.junit.jupiter.api.Test; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; +import org.keycloak.events.Event; +import org.keycloak.events.EventType; +import org.keycloak.events.admin.AdminEvent; +import org.keycloak.events.admin.OperationType; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KeycloakEventAvroSerializerTest { + + @Test + void eventCanBeSerialized() throws Exception { + + final AdminEvent adminevent = new AdminEvent(); + adminevent.setOperationType(OperationType.CREATE); + + final Event event = new Event(); + event.setType(EventType.REGISTER); + + final Schema schema1 = AvroSchemaUtils.getSchema(adminevent, true, false, false); + final Schema schema2= AvroSchemaUtils.getSchema(event, true, true, true); + + + System.out.println(schema1); + System.out.println(schema2); + + assertNotNull(schema1); + assertNotNull(schema2); + } + +} From 8c3aa1161bfde799b131635848f18f1c73f174c8 Mon Sep 17 00:00:00 2001 From: ttimot24 Date: Fri, 20 Sep 2024 15:49:42 +0200 Subject: [PATCH 5/5] #46 - Small improvements --- .../keycloak/kafka/KafkaStandardProducerFactory.java | 4 ++-- .../snuk87/keycloak/kafka/mapper/KeycloakMapper.java | 9 ++++----- src/main/resources/avro/keycloak-admin-events-value.avsc | 2 +- src/main/resources/avro/keycloak-events-value.avsc | 2 +- .../keycloak/kafka/KeycloakEventAvroSerializerTest.java | 3 +-- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index d4018d4..22b75af 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -18,8 +18,8 @@ public Producer createProducer(String clientId, Stri Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.putAll(optionalProperties); return new KafkaProducer<>(props); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java index ec95d72..b4407f5 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java @@ -1,13 +1,12 @@ package com.github.snuk87.keycloak.kafka.mapper; -import com.github.snuk87.keycloak.kafka.dto.AuthDetails; -import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; -import com.github.snuk87.keycloak.kafka.dto.KeycloakEvent; -import com.github.snuk87.keycloak.kafka.dto.OperationType; +import org.keycloak.kafka.dto.AuthDetails; +import org.keycloak.kafka.dto.KeycloakAdminEvent; +import org.keycloak.kafka.dto.KeycloakEvent; +import org.keycloak.kafka.dto.OperationType; import org.keycloak.events.Event; import org.keycloak.events.admin.AdminEvent; -import java.util.Map; import java.util.Optional; public class KeycloakMapper { diff --git a/src/main/resources/avro/keycloak-admin-events-value.avsc b/src/main/resources/avro/keycloak-admin-events-value.avsc index 82f722f..c2fb6eb 100644 --- a/src/main/resources/avro/keycloak-admin-events-value.avsc +++ b/src/main/resources/avro/keycloak-admin-events-value.avsc @@ -1,7 +1,7 @@ { "type": "record", "name": "KeycloakAdminEvent", - "namespace": "com.github.snuk87.keycloak.kafka.dto", + "namespace": "org.keycloak.kafka.dto", "fields": [ { "name": "authDetails", diff --git a/src/main/resources/avro/keycloak-events-value.avsc b/src/main/resources/avro/keycloak-events-value.avsc index 41d74f2..a28af17 100644 --- a/src/main/resources/avro/keycloak-events-value.avsc +++ b/src/main/resources/avro/keycloak-events-value.avsc @@ -1,7 +1,7 @@ { "type": "record", "name": "KeycloakEvent", - "namespace": "com.github.snuk87.keycloak.kafka.dto", + "namespace": "org.keycloak.kafka.dto", "fields": [ { "name": "clientId", diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java index 66c932d..f800340 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java @@ -1,8 +1,7 @@ package com.github.snuk87.keycloak.kafka; -import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; +import org.keycloak.kafka.dto.KeycloakAdminEvent; import org.apache.avro.Schema; -import org.apache.kafka.clients.admin.Admin; import org.junit.jupiter.api.Test; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import org.keycloak.events.Event;