diff --git a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java index e51c0db55..d154a0bd2 100644 --- a/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java +++ b/langstream-kafka/src/main/java/ai/langstream/kafka/runtime/KafkaStreamingClusterRuntime.java @@ -68,6 +68,10 @@ private CachedSchemaRegistryClient buildSchemaRegistryClient( adminConfig = new HashMap<>(); } log.info("SchemaRegistry client configuration: {}", adminConfig); + if (!adminConfig.containsKey("schema.registry.url")) { + throw new IllegalArgumentException( + "Missing 'schema.registry.url' property in streaming cluster configuration admin section"); + } return new CachedSchemaRegistryClient( adminConfig.get("schema.registry.url").toString(), 1000); } @@ -140,7 +144,7 @@ private void enforceSchemaOnTopic( return; } // there is no close() method in this client - CachedSchemaRegistryClient client = buildSchemaRegistryClient(streamingCluster); + CachedSchemaRegistryClient client = null; // here we are using TopicNameStrategy // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#sr-schemas-subject-name-strategy @@ -148,6 +152,7 @@ private void enforceSchemaOnTopic( if (keySchema != null && keySchema.type().equals("avro") && keySchema.schema() != null) { ParsedSchema parsedSchema = new AvroSchema(keySchema.schema()); String subjectName = topicNameStrategy.subjectName(newTopic.name(), true, parsedSchema); + client = buildSchemaRegistryClient(streamingCluster); client.register(subjectName, parsedSchema); } @@ -157,6 +162,9 @@ private void enforceSchemaOnTopic( ParsedSchema parsedSchema = new AvroSchema(valueSchema.schema()); String subjectName = topicNameStrategy.subjectName(newTopic.name(), false, parsedSchema); + if (client == null) { + client = buildSchemaRegistryClient(streamingCluster); + } client.register(subjectName, parsedSchema); } }