Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Kafka: correct spec json and data types in config #6040

Merged
merged 7 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private KafkaDestinationConfig(String topicPattern, boolean sync, JsonNode confi
public static KafkaDestinationConfig getKafkaDestinationConfig(JsonNode config) {
return new KafkaDestinationConfig(
config.get("topic_pattern").asText(),
config.has("sync_producer") && config.get("sync_producer").booleanValue(),
config.has("sync_producer") && config.get("sync_producer").asBoolean(),
config);
}

Expand All @@ -66,25 +66,25 @@ private KafkaProducer<String, JsonNode> buildKafkaProducer(JsonNode config) {
.put(ProducerConfig.CLIENT_ID_CONFIG,
config.has("client_id") ? config.get("client_id").asText() : null)
.put(ProducerConfig.ACKS_CONFIG, config.get("acks").asText())
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").booleanValue())
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").asBoolean())
.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get("compression_type").asText())
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").intValue())
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").longValue())
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").asInt())
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").asLong())
.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
config.get("max_in_flight_requests_per_connection").intValue())
config.get("max_in_flight_requests_per_connection").asInt())
.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText())
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").longValue())
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").intValue())
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").intValue())
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").asLong())
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").asInt())
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").asInt())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
config.get("socket_connection_setup_timeout_ms").longValue())
config.get("socket_connection_setup_timeout_ms").asLong())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
config.get("socket_connection_setup_timeout_max_ms").longValue())
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").longValue())
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").intValue())
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").intValue())
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").intValue())
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").intValue())
config.get("socket_connection_setup_timeout_max_ms").asLong())
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").asLong())
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").asInt())
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").asInt())
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").asInt())
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").asInt())
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,19 @@
"title": "Batch size",
"description": "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.",
"type": "integer",
"default": 16384
"examples": [16384]
},
"linger_ms": {
"title": "Linger ms",
"description": "The producer groups together any records that arrive in between request transmissions into a single batched request.",
"type": "number",
"default": 0
"type": "string",
"examples": [0]
},
"max_in_flight_requests_per_connection": {
"title": "Max in flight requests per connection",
"description": "The maximum number of unacknowledged requests the client will send on a single connection before blocking.",
"type": "integer",
"default": 5
"examples": [5]
},
"client_dns_lookup": {
"title": "Client DNS lookup",
Expand All @@ -191,62 +191,62 @@
"buffer_memory": {
"title": "Buffer memory",
"description": "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.",
"type": "number",
"default": 33554432
"type": "string",
"examples": 33554432
},
"max_request_size": {
"title": "Max request size",
"description": "The maximum size of a request in bytes.",
"type": "integer",
"default": 1048576
"examples": [1048576]
},
"retries": {
"title": "Retries",
"description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.",
"type": "integer",
"default": 2147483647
"examples": [2147483647]
},
"socket_connection_setup_timeout_ms": {
"title": "Socket connection setup timeout",
"description": "The amount of time the client will wait for the socket connection to be established.",
"type": "number",
"default": 10000
"type": "string",
"examples": [10000]
},
"socket_connection_setup_timeout_max_ms": {
"title": "Socket connection setup max timeout",
"description": "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.",
"type": "number",
"default": 30000
"type": "string",
"examples": [30000]
},
"max_block_ms": {
"title": "Max block ms",
"description": "The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block.",
"type": "number",
"default": 60000
"type": "string",
"examples": [60000]
},
"request_timeout_ms": {
"title": "Request timeout",
"description": "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.",
"type": "integer",
"default": 30000
"examples": [30000]
},
"delivery_timeout_ms": {
"title": "Delivery timeout",
"description": "An upper bound on the time to report success or failure after a call to 'send()' returns.",
"type": "integer",
"default": 120000
"examples": [120000]
},
"send_buffer_bytes": {
"title": "Send buffer bytes",
"description": "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.",
"type": "integer",
"default": 131072
"examples": [131072]
},
"receive_buffer_bytes": {
"title": "Receive buffer bytes",
"description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.",
"type": "integer",
"default": 32768
"examples": [32768]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now Default value in spec.json is not being sent to API request => change the default to examples and force user to insert value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create another issue to investigate this in the front-end.

}
}
}
Expand Down