Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#46 - Implement KafkaSerializer for JSON serialization instead using … #49

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# https://www.keycloak.org/server/containers
FROM quay.io/keycloak/keycloak:19.0.3
RUN curl -sL https://github.com/SnuK87/keycloak-kafka/releases/download/1.1.1/keycloak-kafka-1.1.1-jar-with-dependencies.jar -o /opt/keycloak/providers/keycloak-kafka-1.1.1-jar-with-dependencies.jar
FROM quay.io/keycloak/keycloak:latest
COPY target/keycloak-kafka-1.2.0-jar-with-dependencies.jar /opt/keycloak/providers/keycloak-kafka-1.2.0-jar-with-dependencies.jar
RUN /opt/keycloak/bin/kc.sh build

ENTRYPOINT ["/opt/keycloak/bin/kc.sh"]
54 changes: 50 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ version: '3'
services:
keycloak:
depends_on:
- "kafka"
- kafka
image: keycloak-kafka
build: .
ports:
- "8080:8080"
environment:
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
KAFKA_TOPIC: keycloak-events
KAFKA_ADMIN_TOPIC: keycloak-admin-events
KAFKA_CLIENT_ID: keycloak
KAFKA_BOOTSTRAP_SERVERS: kafka:9094
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_VALUE_SERIALIZER_CLASS: io.confluent.kafka.serializers.KafkaAvroSerializer
AUTO_REGISTER_SCHEMAS: 'true'
command: start-dev

zookeeper:
Expand All @@ -24,7 +29,7 @@ services:

kafka:
depends_on:
- "zookeeper"
- zookeeper
image: bitnami/kafka:latest
ports:
- "9092:9092"
Expand All @@ -38,4 +43,45 @@ services:
KAFKA_CREATE_TOPICS: "keycloak-events:1:1"
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/run/docker.sock:/var/run/docker.sock

schema-registry:
image: confluentinc/cp-schema-registry:7.3.3
restart: always
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
deploy:
resources:
limits:
memory: 512MB

redpanda-console:
image: docker.redpanda.com/vectorized/console:latest
restart: always
depends_on:
- kafka
ports:
- "9000:8080"
environment:
SERVER_BASEPATH: /redpanda
#METRICSNAMESPACE: redpanda-console
KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:9092}
KAFKA_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_SCHEMAREGISTRY_URLS: "http://schema-registry:8081"
CONNECT_ENABLED: "false"
CONNECT_CLUSTERS_NAME: connect-cluster
CONNECT_CLUSTERS_URL: "http://connect:8083"
deploy:
resources:
limits:
memory: 1G

networks:
default:
name: keycloak-kafka-network
37 changes: 37 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
<jboss-logging.version>3.6.1.Final</jboss-logging.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.keycloak</groupId>
Expand Down Expand Up @@ -62,10 +69,40 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- other maven plugins in the project -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.12.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory> (5)
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.keycloak.Config.Scope;

public class KafkaProducerConfig {

// https://kafka.apache.org/documentation/#producerconfigs

public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();
final Map<String, Object> propertyMap = new HashMap<>();
final KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();

for (KafkaProducerProperty property : producerProperties) {
String propertyEnv = System.getenv("KAFKA_" + property.name());
Expand All @@ -27,6 +29,13 @@ public static Map<String, Object> init(Scope scope) {
}

enum KafkaProducerProperty {
KEY_SERIALIZER_CLASS(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
VALUE_SERIALIZER_CLASS(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
AUTO_REGISTER_SCHEMAS("auto.register.schemas"),
USE_LATEST_VERSION("use.latest.version"),
SCHEMA_REGISTRY_URL("schema.registry.url"),
SCHEMA_REGISTRY_USER("schema.registry.user"),
SCHEMA_REGISTRY_PASSWORD("schema.registry.password"),
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.Map;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.Producer;

public interface KafkaProducerFactory {

Producer<String, String> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties);
Producer<String, SpecificRecordBase> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.Map;
import java.util.Properties;

import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -11,13 +13,13 @@
public final class KafkaStandardProducerFactory implements KafkaProducerFactory {

@Override
public Producer<String, String> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
public Producer<String, SpecificRecordBase> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.putAll(optionalProperties);

return new KafkaProducer<>(props);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.github.snuk87.keycloak.kafka.mapper;

import org.keycloak.kafka.dto.AuthDetails;
import org.keycloak.kafka.dto.KeycloakAdminEvent;
import org.keycloak.kafka.dto.KeycloakEvent;
import org.keycloak.kafka.dto.OperationType;
import org.keycloak.events.Event;
import org.keycloak.events.admin.AdminEvent;

import java.util.Optional;

public class KeycloakMapper {

public static KeycloakAdminEvent mapEventToKeycloakEvent(final AdminEvent event){
return KeycloakAdminEvent.newBuilder()
.setId(event.getId())
.setTime(event.getTime())
.setRealmId(event.getRealmId())
.setRealmName(event.getRealmName())
.setError(event.getError())
.setResourceType(event.getResourceTypeAsString())
.setResourcePath(event.getResourcePath())
.setRepresentation(event.getRepresentation())
.setOperationType(OperationType.valueOf(Optional.ofNullable(event.getOperationType()).map(ot -> ot.toString()).orElse(OperationType.ACTION.toString())))
.setAuthDetails(
Optional.ofNullable(event.getAuthDetails()).map(authDetails -> AuthDetails.newBuilder()
.setClientId(authDetails.getClientId())
.setIpAddress(authDetails.getIpAddress())
.setRealmId(authDetails.getRealmId())
.setRealmName(authDetails.getRealmName())
.setUserId(authDetails.getUserId())
.build())
.orElse(null)
).build();
}

public static KeycloakEvent mapEventToKeycloakEvent(final Event event){
return KeycloakEvent.newBuilder()
.setId(event.getId())
.setTime(event.getTime())
.setRealmId(event.getRealmId())
.setRealmName(event.getRealmName())
.setError(event.getError())
.setClientId(event.getClientId())
.setUserId(event.getUserId())
.setSessionId(event.getSessionId())
.setIpAddress(event.getIpAddress())
.setDetails(event.getDetails())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.github.snuk87.keycloak.kafka.serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.snuk87.keycloak.kafka.serializer.mixin.JacksonIgnoreAvroPropertiesMixIn;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonSerializer implements Serializer<SpecificRecordBase> {
private final ObjectMapper objectMapper;

public JsonSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.addMixIn(SpecificRecord.class, JacksonIgnoreAvroPropertiesMixIn.class);
}

@Override
public void configure(Map<String, ?> config, boolean isKey) {

}

@Override
public byte[] serialize(String topic, SpecificRecordBase data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.github.snuk87.keycloak.kafka.serializer.mixin;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificData;

public abstract class JacksonIgnoreAvroPropertiesMixIn {

@JsonIgnore
public abstract Schema getSchema();

@JsonIgnore
public abstract SpecificData getSpecificData();

}
Loading
Loading