Skip to content

Commit

Permalink
Merge pull request #104 from RADAR-base/fix-schemaretriever
Browse files Browse the repository at this point in the history
Fix schemaretriever
  • Loading branch information
Bdegraaf1234 authored Jun 11, 2024
2 parents ab07fe9 + 0e3d0ba commit 7a3de4e
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 34 deletions.
3 changes: 2 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ indent_size = 4
[*.{kt,kts}]
ij_kotlin_allow_trailing_comma_on_call_site = true
ij_kotlin_allow_trailing_comma = true
ktlint_standard_no-wildcard-imports = disabled
# To satisfy ktlint rules, see: https://stackoverflow.com/questions/59849619/intellij-does-not-sort-kotlin-imports-according-to-ktlints-expectations
ij_kotlin_imports_layout = *,java.**,javax.**,kotlin.**,^
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
KAFKA_CONFLUENT_VERSION=7.5.0
KAFKA_CONFLUENT_VERSION=7.6.0
8 changes: 3 additions & 5 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
@Suppress("ConstPropertyName")
object Versions {
const val project = "0.7.1-SNAPSHOT"
const val project = "0.7.2-SNAPSHOT"

const val java = 17
const val kotlin = "1.9.22"
const val dockerCompose = "0.17.6"

const val ktor = "2.3.10"
const val radarJersey = "0.11.1"
const val radarCommons = "1.1.2"
const val radarSchemas = "0.8.7"
const val radarCommons = "1.1.3-SNAPSHOT"
const val radarSchemas = "0.8.8"
const val jackson = "2.15.3"
const val slf4j = "2.0.13"
const val log4j2 = "2.23.1"
Expand All @@ -22,6 +22,4 @@ object Versions {
const val mockitoKotlin = "5.3.1"
const val grizzly = "4.0.2"
const val hamcrest = "2.2"

const val wrapper = "8.4"
}
2 changes: 1 addition & 1 deletion radar-gateway/src/integrationTest/docker/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KAFKA_CONFLUENT_VERSION=7.5.0
KAFKA_CONFLUENT_VERSION=7.6.0
RADAR_GATEWAY_TAG=SNAPSHOT
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.radarbase.gateway.resource

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.request.get
import io.ktor.client.request.head
import io.ktor.http.HttpStatusCode
import io.ktor.http.Url
import kotlinx.coroutines.runBlocking
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.equalTo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class KafkaTopicsTest {
}

val retriever = schemaRetriever(SCHEMA_REGISTRY_URL) {
httpClient {
httpClient = HttpClient(CIO) {
timeout(10.seconds)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import org.radarbase.gateway.io.AvroProcessor
import org.radarbase.gateway.io.AvroProcessorFactory
import org.radarbase.gateway.io.BinaryToAvroConverter
import org.radarbase.gateway.io.LzfseEncoder
import org.radarbase.gateway.kafka.*
import org.radarbase.gateway.kafka.KafkaAdminService
import org.radarbase.gateway.kafka.KafkaAdminServiceFactory
import org.radarbase.gateway.kafka.KafkaHealthMetric
import org.radarbase.gateway.kafka.ProducerPool
import org.radarbase.gateway.kafka.ProducerPoolFactory
import org.radarbase.gateway.service.SchedulingService
import org.radarbase.gateway.service.SchedulingServiceFactory
import org.radarbase.jersey.enhancer.JerseyResourceEnhancer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package org.radarbase.gateway.inject
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig.USER_INFO_CONFIG
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_CONFIG
import io.ktor.client.plugins.*
import io.ktor.client.plugins.auth.*
import io.ktor.client.plugins.auth.providers.*
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.plugins.auth.Auth
import io.ktor.client.plugins.auth.providers.BasicAuthCredentials
import io.ktor.client.plugins.auth.providers.basic
import jakarta.ws.rs.core.Context
import org.radarbase.config.ServerConfig
import org.radarbase.gateway.config.GatewayConfig
Expand All @@ -32,20 +34,18 @@ class SchemaRetrieverFactory(
?: config.kafka.serialization[USER_INFO_CONFIG].asNonEmptyString()

return schemaRetriever(baseUrl = server.urlString) {
httpClient {
httpClient = HttpClient(CIO) {
timeout(30.seconds)
if (basicCredentials != null && basicCredentials.contains(':')) {
val (apiKey, apiSecret) = basicCredentials.split(':', limit = 2)
install(Auth) {
basic {
sendWithoutRequest { true }
credentials {
val (username, password) = basicCredentials.split(':', limit = 2)
BasicAuthCredentials(
username = username,
password = password,
)
BasicAuthCredentials(username = apiKey, password = apiSecret)
}
}
}
timeout(30.seconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class AvroProcessor(
"Schema ID not found in subject",
)
} else {
throw HttpBadGatewayException("cannot get data from schema registry: ${ex.javaClass.simpleName}")
throw HttpBadGatewayException("cannot get data from schema registry: $ex")
}
}
createMapping(topic, ofValue, parsedSchema.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import org.apache.avro.Schema
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.radarbase.gateway.io

import jakarta.ws.rs.core.Context
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package org.radarbase.gateway.kafka

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroSerializer
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.*
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.errors.OutOfOrderSequenceException
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.errors.TimeoutException
import org.radarbase.gateway.config.GatewayConfig
import org.radarbase.jersey.exception.HttpApplicationException
import org.radarbase.jersey.exception.HttpBadGatewayException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package org.radarbase.gateway.resource
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.JsonNode
import jakarta.inject.Singleton
import jakarta.ws.rs.*
import jakarta.ws.rs.Consumes
import jakarta.ws.rs.GET
import jakarta.ws.rs.OPTIONS
import jakarta.ws.rs.POST
import jakarta.ws.rs.Path
import jakarta.ws.rs.PathParam
import jakarta.ws.rs.Produces
import jakarta.ws.rs.container.AsyncResponse
import jakarta.ws.rs.container.Suspended
import jakarta.ws.rs.core.Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.avro.Schema
import org.apache.avro.generic.GenericArray
import org.apache.avro.generic.GenericRecord
import org.hamcrest.CoreMatchers.*
import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.CoreMatchers.not
import org.hamcrest.CoreMatchers.nullValue
import org.hamcrest.MatcherAssert.assertThat
import org.junit.jupiter.api.Test
import org.radarbase.producer.avro.AvroDataMapperFactory.IDENTITY_MAPPER
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.radarbase.gateway.io

import io.ktor.http.content.*
import io.ktor.utils.io.*
import io.ktor.utils.io.jvm.javaio.*
import io.ktor.http.content.OutgoingContent
import io.ktor.utils.io.ByteChannel
import io.ktor.utils.io.jvm.javaio.toInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import org.apache.avro.generic.GenericRecordBuilder
Expand Down

0 comments on commit 7a3de4e

Please sign in to comment.