Skip to content

Commit

Permalink
Tested schema with mutlitple files and references.
Browse files Browse the repository at this point in the history
  • Loading branch information
smyrgeorge committed Oct 2, 2023
1 parent b3896b1 commit 06e8cbe
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 34 deletions.
3 changes: 3 additions & 0 deletions kafka-connect-json-to-proto/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dependencies {
implementation("com.google.protobuf:protobuf-java:$protocVersion")
implementation("com.google.protobuf:protobuf-java-util:$protocVersion")

// Logging
compileOnly("org.slf4j:slf4j-api:2.0.9")

// https://github.com/mockito/mockito-kotlin
testImplementation("org.mockito.kotlin:mockito-kotlin:$mockitoVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.storage.Converter
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.math.BigDecimal
import java.math.BigInteger

class JsonNodeConverter : Converter {

private val log: Logger = LoggerFactory.getLogger(this::class.java)
private val om = ObjectMapperFactory.createCamelCase()

// set<pair<path, propertyName>>
private lateinit var skipProperties: Set<Pair<String, String>>

override fun configure(configs: Map<String, *>, isKey: Boolean) {
println("[JsonNodeConverter] :: Hola! $configs")
log.info("Hola from JsonNodeConverter! :: $configs")
skipProperties = configs[Config.SKIP_PROPERTIES]?.let { c ->
(c as String)
.split(',')
Expand All @@ -37,7 +40,7 @@ class JsonNodeConverter : Converter {
return om.writeValueAsBytes(jsonNode)
}

@Suppress("UNUSED_PARAMETER")
@Suppress("UNUSED_PARAMETER", "MemberVisibilityCanBePrivate")
fun fromConnectDataToJsonNode(topic: String, schema: Schema, value: Any): JsonNode {
value as Struct
return value.toJsonNode()
Expand All @@ -49,7 +52,7 @@ class JsonNodeConverter : Converter {
}

override fun toConnectData(topic: String, value: ByteArray): SchemaAndValue =
error("[JsonNodeConverter] toConnectData method not supported")
error("JsonNodeConverter :: toConnectData method not supported")

private fun Struct.toJsonNode(): JsonNode {
// Create an empty [ObjectNode].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.smyrgeorge.connect.util.KafkaProtobufSerializer
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.storage.Converter
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import kotlin.time.Duration.Companion.minutes
Expand All @@ -24,22 +26,26 @@ typealias Cache = MutableMap<String, Triple<Descriptors.Descriptor, ProtobufSche

class ProtobufConverter : Converter {

private val log: Logger = LoggerFactory.getLogger(this::class.java)

private var isKey: Boolean = false
private var useLatestVersion: Boolean = false
private lateinit var schemaRegistryUrl: String
private var schemaRegistryCacheCapacity: Int = 1000
private var schemaRegistryCacheExpiryMinutes: Duration = 10.minutes.toJavaDuration()

private val subjectNameStrategy = TopicNameStrategy()
lateinit var schemaRegistryClient: SchemaRegistryClient
private val jsonNodeConverter: JsonNodeConverter = JsonNodeConverter()
private val jsonNodeConverter = JsonNodeConverter()

// <subject, triple<message protobuf descriptor, schema, expiry>>
private val cache: Cache = BoundedConcurrentHashMap()
private lateinit var serializer: KafkaProtobufSerializer<DynamicMessage>

override fun configure(configs: Map<String, *>, isKey: Boolean) {
println("[ProtobufConverter] :: Hola! $configs")
log.info("Hola! from ProtobufConverter! :: $configs")

// Configure [JsonNodeConverter].
jsonNodeConverter.configure(configs.toJsonNodeConverterProps(), isKey)

this.isKey = isKey

Expand All @@ -55,15 +61,10 @@ class ProtobufConverter : Converter {
}

configs[Config.SCHEMA_REGISTRY_CACHE_EXPIRY_MINUTES]?.let {
schemaRegistryCacheExpiryMinutes =
if (it is String) it.toInt().minutes.toJavaDuration() else (it as Int).minutes.toJavaDuration()
val value = if (it is String) it.toInt().minutes else (it as Int).minutes
schemaRegistryCacheExpiryMinutes = value.toJavaDuration()
}

val jsonProps = configs[Config.SKIP_PROPERTIES]?.let {
mapOf(JsonNodeConverter.Config.SKIP_PROPERTIES to it as String)
} ?: emptyMap()
jsonNodeConverter.configure(jsonProps, isKey)

schemaRegistryClient = CachedSchemaRegistryClient(
/* baseUrls = */ schemaRegistryUrl,
/* identityMapCapacity = */ schemaRegistryCacheCapacity,
Expand Down Expand Up @@ -120,7 +121,17 @@ class ProtobufConverter : Converter {
}

override fun toConnectData(topic: String, value: ByteArray): SchemaAndValue =
error("[ProtobufConverter] :: toConnectData method not supported")
error("ProtobufConverter :: toConnectData method not supported")

private fun Map<String, *>.toJsonNodeConverterProps(): Map<String, *> {
val configs: MutableMap<String, Any> = mutableMapOf()

this[Config.SKIP_PROPERTIES]?.let {
configs[JsonNodeConverter.Config.SKIP_PROPERTIES] = it as String
}

return configs
}

object Config {
const val SCHEMA_REGISTRY_URL: String = "protobuf.schema.registry.url"
Expand Down
4 changes: 2 additions & 2 deletions register-postgres-proto.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"table.include.list": "inventory.customers,inventory.products",
"key.converter": "io.smyrgeorge.connect.converter.JsonNodeConverter",
"key.converter": "io.smyrgeorge.connect.converter.ProtobufConverter",
"key.converter.protobuf.schema.registry.url": "http://schema-registry:8085",
"value.converter": "io.smyrgeorge.connect.converter.ProtobufConverter",
"value.converter.protobuf.schema.registry.url": "http://schema-registry:8085",
"value.converter.protobuf.schema.cache.capacity": 100,
"value.converter.protobuf.json.exclude.properties": "transaction,source.tsMs,source.snapshot,source.sequence,source.txid,source.lsn,source.xmin"
}
}
32 changes: 32 additions & 0 deletions spring-boot-kafka-protobuf/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import com.google.protobuf.gradle.id
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

buildscript {
repositories {
gradlePluginPortal()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
}
}

plugins {
kotlin("jvm")
kotlin("plugin.spring")
Expand All @@ -10,6 +18,8 @@ plugins {
id("io.spring.dependency-management")
// https://github.com/google/protobuf-gradle-plugin
id("com.google.protobuf") version "0.9.4"
// https://github.com/ImFlog/schema-registry-plugin
id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "1.11.1"
}

group = rootProject.group
Expand Down Expand Up @@ -109,4 +119,26 @@ protobuf {
task.plugins { id("kotlin") }
}
}
}

val protoBasePackage = "io.smyrgeorge.test.proto.domain"
val protoSourcePath = "spring-boot-kafka-protobuf/src/main/proto"

schemaRegistry {
url = "http://localhost:58085/"
register {
subject("$protoBasePackage.source", "$protoSourcePath/source.proto", "PROTOBUF")

subject("$protoBasePackage.customer", "$protoSourcePath/customer.proto", "PROTOBUF")
subject("dbserver1.inventory.customers-key", "$protoSourcePath/customer-key.proto", "PROTOBUF")
subject("dbserver1.inventory.customers-value", "$protoSourcePath/customer-change-event.proto", "PROTOBUF")
.addReference("source.proto", "$protoBasePackage.source")
.addReference("customer.proto", "$protoBasePackage.customer")

subject("$protoBasePackage.product", "$protoSourcePath/product.proto", "PROTOBUF")
subject("dbserver1.inventory.products-key", "$protoSourcePath/product-key.proto", "PROTOBUF")
subject("dbserver1.inventory.products-value", "$protoSourcePath/product-change-event.proto", "PROTOBUF")
.addReference("source.proto", "$protoBasePackage.source")
.addReference("product.proto", "$protoBasePackage.product")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig
import io.smyrgeorge.test.proto.domain.CustomerOuterClass
import io.smyrgeorge.test.proto.domain.CustomerChangeEventOuterClass.CustomerChangeEvent
import jakarta.annotation.PostConstruct
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand Down Expand Up @@ -68,12 +68,12 @@ class CustomerConsumer {
)

private val protobufDeserializer =
KafkaProtobufDeserializer<CustomerOuterClass.CustomerChangeEvent>(schemaRegistryClient).apply {
KafkaProtobufDeserializer<CustomerChangeEvent>(schemaRegistryClient).apply {
val conf = mapOf<String, Any>(
KafkaProtobufDeserializerConfig.USE_LATEST_VERSION to true,
KafkaProtobufDeserializerConfig.AUTO_REGISTER_SCHEMAS to false,
KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl,
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE to CustomerOuterClass.CustomerChangeEvent::class.java
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE to CustomerChangeEvent::class.java
)
configure(conf, false)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

import "customer.proto";
import "source.proto";

message CustomerChangeEvent {
optional Customer before = 1;
optional Customer after = 2;
Source source = 3;
string op = 4;
int64 tsMs = 5;
}
6 changes: 6 additions & 0 deletions spring-boot-kafka-protobuf/src/main/proto/customer-key.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

message CustomerKey {
int32 id = 1;
}
17 changes: 2 additions & 15 deletions spring-boot-kafka-protobuf/src/main/proto/customer.proto
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

message CustomerChangeEvent {
optional Customer before = 1;
optional Customer after = 2;
Source source = 3;
string op = 4;
int64 tsMs = 5;
}
message Source {
string version = 1;
string connector = 2;
string name = 3;
string db = 4;
string schema = 5;
string table = 6;
}
import "source.proto";

message Customer {
int32 id = 1;
string firstName = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

import "product.proto";
import "source.proto";

message ProductChangeEvent {
optional Product before = 1;
optional Product after = 2;
Source source = 3;
string op = 4;
int64 tsMs = 5;
}
6 changes: 6 additions & 0 deletions spring-boot-kafka-protobuf/src/main/proto/product-key.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

message ProductKey {
int32 id = 1;
}
11 changes: 11 additions & 0 deletions spring-boot-kafka-protobuf/src/main/proto/product.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

import "source.proto";

message Product {
int32 id = 1;
string name = 2;
string description = 3;
float weight = 4;
}
11 changes: 11 additions & 0 deletions spring-boot-kafka-protobuf/src/main/proto/source.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";
package io.smyrgeorge.test.proto.domain;

message Source {
string version = 1;
string connector = 2;
string name = 3;
string db = 4;
string schema = 5;
string table = 6;
}

0 comments on commit 06e8cbe

Please sign in to comment.