Skip to content

Commit

Permalink
fixup! fix: fail when trying to disable remote storage on an enabled …
Browse files Browse the repository at this point in the history
…topic
  • Loading branch information
jeqo committed Nov 23, 2023
1 parent 4b0e057 commit df03ad9
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, KafkaConfig}
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
Expand Down Expand Up @@ -477,6 +478,14 @@ class AdminZkClient(zkClient: KafkaZkClient,
Topic.validate(topic)
if (!zkClient.topicExists(topic))
throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not exist.")

// fix: workaround to fail when trying to disable tiered storage instead of silently update value while logging warning
val currentRemoteStorageEnable = zkClient.getEntityConfigs(ConfigType.Topic, topic).getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
val newRemoteStorageEnable = configs.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, currentRemoteStorageEnable)
if (java.lang.Boolean.parseBoolean(currentRemoteStorageEnable) && !java.lang.Boolean.parseBoolean(newRemoteStorageEnable)) {
throw new InvalidConfigurationException(s"Disabling remote log on the topic is not supported.")
}

// remove the topic overrides
LogConfig.validate(configs,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
Expand Down

0 comments on commit df03ad9

Please sign in to comment.