Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request exceeded the user-specified time limit in the request #1139

Closed
andy2046 opened this issue Jul 24, 2018 · 1 comment
Closed

Request exceeded the user-specified time limit in the request #1139

andy2046 opened this issue Jul 24, 2018 · 1 comment

Comments

@andy2046
Copy link
Contributor

Versions

Sarama Version: latest
Kafka Version: kafka_2.11-1.1.0
Go Version: 1.10

Configuration

Kafka config:

`
I background.threads = 10

I broker.id = 1

I broker.id.generation.enable = true

I broker.rack = null

I compression.type = producer

I connections.max.idle.ms = 600000

I controlled.shutdown.enable = true

I controlled.shutdown.max.retries = 3

I controlled.shutdown.retry.backoff.ms = 5000

I controller.socket.timeout.ms = 30000

I create.topic.policy.class.name = null

I default.replication.factor = 1

I delete.records.purgatory.purge.interval.requests = 1

I delete.topic.enable = true

I fetch.purgatory.purge.interval.requests = 1000

I group.initial.rebalance.delay.ms = 0

I group.max.session.timeout.ms = 300000

I group.min.session.timeout.ms = 6000

I host.name =

I inter.broker.listener.name = null

I inter.broker.protocol.version = 1.1-IV0

I leader.imbalance.check.interval.seconds = 300

I listeners = PLAINTEXT://:9092

I log.segment.bytes = 1073741824

I log.segment.delete.delay.ms = 60000

I max.connections.per.ip = 2147483647

I max.connections.per.ip.overrides =

I max.incremental.fetch.session.cache.slots = 1000

I message.max.bytes = 1000012

I min.insync.replicas = 1

I num.io.threads = 8

I num.network.threads = 3

I num.partitions = 1

I num.recovery.threads.per.data.dir = 1

I num.replica.alter.log.dirs.threads = null

I num.replica.fetchers = 1

I offset.metadata.max.bytes = 4096

I offsets.commit.required.acks = -1

I offsets.commit.timeout.ms = 5000

I offsets.load.buffer.size = 5242880

I offsets.retention.check.interval.ms = 600000

I offsets.retention.minutes = 1440

I offsets.topic.compression.codec = 0

I offsets.topic.num.partitions = 50

I offsets.topic.replication.factor = 1

I offsets.topic.segment.bytes = 104857600

I port = 9092

I replica.fetch.backoff.ms = 1000

I replica.fetch.max.bytes = 1048576

I replica.fetch.min.bytes = 1

I replica.fetch.response.max.bytes = 10485760

I replica.fetch.wait.max.ms = 500

I replica.high.watermark.checkpoint.interval.ms = 5000

I replica.lag.time.max.ms = 10000

I replica.socket.receive.buffer.bytes = 65536

I replica.socket.timeout.ms = 30000

I replication.quota.window.num = 11

I replication.quota.window.size.seconds = 1

I request.timeout.ms = 30000

`

Logs

sarama logs:

`
2018/07/24 17:38:55 Kafka brokers: 35.xxx.xxx.xxx:9092, 35.xxx.xxx.xxx:9092

sarama2018/07/24 17:38:55 Initializing new client

sarama2018/07/24 17:38:55 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

sarama2018/07/24 17:38:55 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

sarama2018/07/24 17:38:55 client/metadata fetching metadata for all topics from broker 35.xxx.xxx.xxx:9092

sarama2018/07/24 17:38:55 Connected to broker at 35.xxx.xxx.xxx:9092 (unregistered)

sarama2018/07/24 17:38:56 client/brokers registered new broker #1 at 35.xxx.xxx.xxx:9092

sarama2018/07/24 17:38:56 client/brokers registered new broker #0 at 35.xxx.xxx.xxx:9092

sarama2018/07/24 17:38:56 Successfully initialized new client

sarama2018/07/24 17:38:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

sarama2018/07/24 17:38:56 Connected to broker at 35.xxx.xxx.xxx:9092 (registered as #1)
2018/07/24 17:39:08 in: newPartition
`

kafka logs:

`
I [2018-07-24 09:40:33,918] INFO Creating 1 partitions for 'topicX009' with the following replica assignment: Map(12 -> ArrayBuffer(0, 1)). (kafka.zk.AdminZkClient)

I [2018-07-24 09:40:33,918] INFO Topic update Map(topicX009-7 -> Vector(1, 0), topicX009-3 -> Vector(1, 0), topicX009-11 -> Vector(1, 0), topicX009-4 -> Vector(0, 1), topicX009-8 -> Vector(0, 1), topicX009-2 -> Vector(0, 1), topicX009-10 -> Vector(0, 1), topicX009-5 -> Vector(1, 0), topicX009-1 -> Vector(1, 0), topicX009-9 -> Vector(1, 0), topicX009-6 -> Vector(0, 1), topicX009-0 -> Vector(0, 1), topicX009-12 -> ArrayBuffer(0, 1)) (kafka.zk.AdminZkClient)

I [2018-07-24 09:40:33,997] INFO Replica loaded for partition topicX009-12 with initial high watermark 0 (kafka.cluster.Replica)

I [2018-07-24 09:40:33,999] INFO [Log partition=topicX009-12, dir=/storage/kafka/data] Loading producer state from offset 0 with message format version 2 (kafka.log.Log)

I [2018-07-24 09:40:34,000] INFO [Log partition=topicX009-12, dir=/storage/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)

I [2018-07-24 09:40:34,000] INFO Created log for partition topicX009-12 in /storage/kafka/data with properties {compression.type -> producer, message.format.version -> 1.1-IV0, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 1000, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 172800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 10000}. (kafka.log.LogManager)

I [2018-07-24 09:40:34,001] INFO [Partition topicX009-12 broker=1] No checkpointed highwatermark is found for partition topicX009-12 (kafka.cluster.Partition)

I [2018-07-24 09:40:34,001] INFO Replica loaded for partition topicX009-12 with initial high watermark 0 (kafka.cluster.Replica)

I [2018-07-24 09:40:34,001] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions topicX009-12 (kafka.server.ReplicaFetcherManager)

I [2018-07-24 09:40:34,002] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions List([topicX009-12, initOffset 0 to broker BrokerEndPoint(0,kafka-0.kafka-headless-svc.kafka-test.svc.cluster.local,29092)] ) (kafka.server.ReplicaFetcherManager)

I [2018-07-24 09:40:34,002] INFO [ReplicaAlterLogDirsManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

I [2018-07-24 09:40:34,389] INFO [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Based on follower's leader epoch, leader replied with an offset 0 >= the follower's log end offset 0 in topicX009-12. No truncation needed. (kafka.server.ReplicaFetcherThread)

`

Problem Description

while trying to post to end point http://localhost:8080/topics/topicX009/partition/13 to increase the partition number for topic, the error below occur:
{ "error": "Fail to increase partitions for Topic topicX009kafka server: Request exceeded the user-specified time limit in the request." }

source code below:

var (
	addr    = flag.String("address", ":8080", "The address to bind to")
	brokers = flag.String("brokers", os.Getenv("KAFKA_BROKERS"), "The Kafka brokers to connect to, as a comma separated list")
	admin   sarama.ClusterAdmin
)

func newPartition(w http.ResponseWriter, r *http.Request) {
	log.Printf("in: %v", "newPartition")
	vars := mux.Vars(r)
	topicName := vars["topicName"]
	count, _ := strconv.Atoi(vars["count"])

	err := admin.CreatePartitions(topicName, int32(count), nil, false)
	if err != nil {
		respondWithError(w, http.StatusInternalServerError, "Fail to increase partitions for Topic "+topicName+err.Error())
	}

}

func main() {
	flag.Parse()

	if *brokers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	brokerList := strings.Split(*brokers, ",")
	log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
	sarama.Logger = log.New(os.Stdout, "sarama", log.LstdFlags)
	config := sarama.NewConfig()
	config.Version = sarama.V1_0_0_0

	var err error
	admin, err = sarama.NewClusterAdmin(brokerList, config)
	if err != nil {
		log.Printf("error: %v", err)
		return
	}
	defer admin.Close()

	r := mux.NewRouter()
	r.HandleFunc("/topics/{topicName}/partition/{count:[0-9]+}", newPartition).Methods("POST")
	log.Fatal(http.ListenAndServe(*addr, r))
}
@eapache
Copy link
Contributor

eapache commented Jul 27, 2018

Duplicate of #1120

@eapache eapache marked this as a duplicate of #1120 Jul 27, 2018
@eapache eapache closed this as completed Jul 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants