Skip to content

Commit

Permalink
feat(ui): add multi-message produce feature (#791)
Browse files Browse the repository at this point in the history
Co-authored-by: xpr3sso <xpr3sso@noreply.org>
  • Loading branch information
2 people authored and tchiotludo committed Oct 24, 2021
1 parent d734316 commit 6241bd0
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 47 deletions.
21 changes: 19 additions & 2 deletions client/src/components/Form/Form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ class Form extends Root {
);
};

renderJSONInput = (name, label, onChange) => {
renderJSONInput = (name, label, onChange, textMode, options) => {
const { formData, errors } = this.state;
const inputMode = textMode ? "text" : (formData.schemaType === "PROTOBUF" ? "protobuf" : "json")
return (
<div className="form-group row">
{label !== '' ? (
Expand All @@ -125,7 +126,7 @@ class Form extends Root {
)}
<div className="col-sm-10" style={{ height: '100%' }}>
<AceEditor
mode={ formData.schemaType === "PROTOBUF"? "protobuf" : "json" }
mode={ inputMode }
id={name}
theme="merbivore_soft"
value={formData[name]}
Expand All @@ -134,6 +135,7 @@ class Form extends Root {
}}
name="UNIQUE_ID_OF_DIV"
editorProps={{ $blockScrolling: true }}
setOptions={options}
style={{ width: '100%', minHeight: '25vh' }}
/>
{errors[name] && <div className="alert alert-danger mt-1 p-1">{errors[name]}</div>}
Expand Down Expand Up @@ -238,6 +240,21 @@ class Form extends Root {
</React.Fragment>
);
};

renderCheckbox = (name, label, isChecked, onChange, isDefaultChecked) => {
return (
<input
type="checkbox"
name={name}
id={name}
class="form-input-check"
checked={isChecked}
onChange={onChange}
defaultChecked={ isDefaultChecked ? isDefaultChecked : false}
/>
);
};
}

export default Form;

16 changes: 15 additions & 1 deletion client/src/components/Form/styles.scss
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,18 @@ input.placeholder{
border-bottom-color: transparent !important;
width: 100%;
padding: 0px;
}
}

.ace_active-line {
opacity: 0.2;
}

.ace_placeholder {
font-family: "Source Code Pro",SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace;
transform: scale(1);
}

.form-input-check{
margin-block: auto;
}

98 changes: 81 additions & 17 deletions client/src/containers/Topic/TopicProduce/TopicProduce.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class TopicProduce extends Form {
key: '',
hKey0: '',
hValue0: '',
value: ''
value: '',
keyValueSeparator: ':'
},
datetime: new Date(),
openDateModal: false,
Expand All @@ -40,7 +41,9 @@ class TopicProduce extends Form {
valueSchemaSearchValue: '',
selectedValueSchema: '',
clusterId: '',
topicId: ''
topicId: '',
multiMessage: false,
valuePlaceholder: '{"param": "value"}'
};

schema = {
Expand All @@ -58,7 +61,10 @@ class TopicProduce extends Form {
.label('hValue0'),
value: Joi.string()
.allow('')
.label('Value')
.label('Value'),
keyValueSeparator: Joi.string()
.min(1)
.label('keyValueSeparator')
};

async componentDidMount() {
Expand Down Expand Up @@ -98,7 +104,8 @@ class TopicProduce extends Form {
selectedKeySchema,
selectedValueSchema,
keySchema,
valueSchema
valueSchema,
multiMessage
} = this.state;
const { clusterId, topicId } = this.props.match.params;

Expand All @@ -110,9 +117,11 @@ class TopicProduce extends Form {
partition: formData.partition,
key: formData.key,
timestamp: datetime.toISOString(),
value: JSON.parse(JSON.stringify(formData.value)),
value: multiMessage ? formData.value : JSON.parse(JSON.stringify(formData.value)),
keySchema: schemaKeyToSend ? schemaKeyToSend.id : '',
valueSchema: schemaValueToSend ? schemaValueToSend.id : ''
valueSchema: schemaValueToSend ? schemaValueToSend.id : '',
multiMessage: multiMessage,
keyValueSeparator: formData.keyValueSeparator
};

let headers = {};
Expand All @@ -134,6 +143,55 @@ class TopicProduce extends Form {
});
}

renderMultiMessage() {
const { formData, multiMessage } = this.state;

return (
<div className="form-group row">
<label className="col-sm-2 col-form-label">Multi message</label>
<div className="row khq-multiple col-sm-7">
{this.renderCheckbox(
'isMultiMessage',
'',
multiMessage,
() => {
this.setState({multiMessage: !multiMessage,
valuePlaceholder: this.getPlaceholderValue(!multiMessage, formData.keyValueSeparator)})
},
false
)}

<label className="col-auto col-form-label">Separator</label>
<input
type='text'
name='keyValueSeparator'
id='keyValueSeparator'
placeholder=':'
class='col-sm-2 form-control'
disabled={ !multiMessage }
onChange={
event => {
this.setState({
formData: { ...formData,
keyValueSeparator: event.target.value},
valuePlaceholder: this.getPlaceholderValue(!multiMessage, event.target.value)})
}
}
/>
</div>
</div>
);
}

getPlaceholderValue(isMultiMessage, keyValueSeparator) {
if(isMultiMessage) {
return "key1" + keyValueSeparator + "{\"param\": \"value1\"}\n"
+ "key2" + keyValueSeparator + "{\"param\": \"value2\"}";
} else {
return '{"param": "value"}';
}
}

renderHeaders() {
let headers = [];

Expand Down Expand Up @@ -284,13 +342,13 @@ class TopicProduce extends Form {
selectedKeySchema,
valueSchema,
valueSchemaSearchValue,
selectedValueSchema
selectedValueSchema,
multiMessage
} = this.state;
let date = moment(datetime);
return (
<div>
<form encType="multipart/form-data" className="khq-form khq-form-config">
<div>
<Header title={`Produce to ${topicId} `} />
{this.renderSelect('partition', 'Partition', partitions, value => {
this.setState({ formData: { ...formData, partition: value.target.value } });
Expand All @@ -313,11 +371,11 @@ class TopicProduce extends Form {
'key'
)
)}
{this.renderInput('key', 'Key', 'Key', 'Key')}
<div></div>
</div>

{(this.renderInput('key', 'Key', 'Key', 'Key', undefined, undefined, undefined, undefined,{ disabled: multiMessage }))}

{this.renderHeaders()}

{this.renderDropdown(
'Value schema',
valueSchema.map(value => value.subject),
Expand All @@ -336,14 +394,19 @@ class TopicProduce extends Form {
'value'
)
)}

{this.renderMultiMessage()}

{this.renderJSONInput('value', 'Value', value => {
this.setState({
formData: {
...formData,
value: value
}
});
})}
formData: {
...formData,
value: value
}
})},
multiMessage, // true -> 'text' mode; json, protobuff, ... mode otherwise
{ placeholder: this.getPlaceholderValue(multiMessage, formData.keyValueSeparator) }
)}
<div style={{ display: 'flex', flexDirection: 'row', width: '100%', padding: 0 }}>
<label
style={{ padding: 0, alignItems: 'center', display: 'flex' }}
Expand Down Expand Up @@ -429,3 +492,4 @@ class TopicProduce extends Form {
}

export default TopicProduce;

28 changes: 17 additions & 11 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Topic create(
@Secured(Role.ROLE_TOPIC_DATA_INSERT)
@Post(value = "api/{cluster}/topic/{topicName}/data")
@Operation(tags = {"topic data"}, summary = "Produce data to a topic")
public Record produce(
public List<Record> produce(
HttpRequest<?> request,
String cluster,
String topicName,
Expand All @@ -133,9 +133,12 @@ public Record produce(
Optional<String> timestamp,
Map<String, String> headers,
Optional<Integer> keySchema,
Optional<Integer> valueSchema
Optional<Integer> valueSchema,
Boolean multiMessage,
Optional<String> keyValueSeparator
) throws ExecutionException, InterruptedException {
return new Record(
Topic targetTopic = topicRepository.findByName(cluster, topicName);
return
this.recordRepository.produce(
cluster,
topicName,
Expand All @@ -145,14 +148,16 @@ public Record produce(
partition,
timestamp.map(r -> Instant.parse(r).toEpochMilli()),
keySchema,
valueSchema
),
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers,
topicRepository.findByName(cluster, topicName)
);
valueSchema,
multiMessage,
keyValueSeparator).stream()
.map(recordMetadata -> new Record(recordMetadata,
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers,
targetTopic))
.collect(Collectors.toList());
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
Expand Down Expand Up @@ -504,3 +509,4 @@ public static class OffsetCopy {
private long offset;
}
}

14 changes: 14 additions & 0 deletions src/main/java/org/akhq/models/KeyValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.akhq.models;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* Represents a simple key-value pair of any type
*/
@Getter
@AllArgsConstructor
public class KeyValue<K,V> {
K key;
V value;
}
48 changes: 48 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.controllers.TopicController;
import org.akhq.models.KeyValue;
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
Expand Down Expand Up @@ -463,6 +464,35 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
);
}

public List<RecordMetadata> produce(
String clusterId,
String topic,
String value,
Map<String, String> headers,
Optional<String> key,
Optional<Integer> partition,
Optional<Long> timestamp,
Optional<Integer> keySchemaId,
Optional<Integer> valueSchemaId,
Boolean multiMessage,
Optional<String> keyValueSeparator) throws ExecutionException, InterruptedException {

List<RecordMetadata> produceResults = new ArrayList<>();

// Distinguish between single record produce, and multiple messages
if (multiMessage.booleanValue()) {
// Split key-value pairs and produce them
for (KeyValue<String, String> kvPair : splitMultiMessage(value, keyValueSeparator.orElseThrow())) {
produceResults.add(produce(clusterId, topic, kvPair.getValue(), headers, Optional.of(kvPair.getKey()),
partition, timestamp, keySchemaId, valueSchemaId));
}
} else {
produceResults.add(
produce(clusterId, topic, value, headers, key, partition, timestamp, keySchemaId, valueSchemaId));
}
return produceResults;
}

private RecordMetadata produce(
String clusterId,
String topic, byte[] value,
Expand Down Expand Up @@ -492,6 +522,23 @@ private RecordMetadata produce(
.get();
}

/**
* Splits a multi-message into a list of key-value pairs.
* @param value The multi-message string submitted by the {@link TopicController}
* @param keyValueSeparator The character(s) separating each key from their corresponding value
* @return A list of {@link KeyValue}, holding the split pairs
*/
private List<KeyValue<String, String>> splitMultiMessage(String value, String keyValueSeparator) {
return List.of(value.split("\r\n|\r|\n")).stream().map(v -> splitKeyValue(v, keyValueSeparator))
.collect(Collectors.toList());
}

private KeyValue<String, String> splitKeyValue(String keyValueStr, String keyValueSeparator) {
String[] keyValue = null;
keyValue = keyValueStr.split(keyValueSeparator, 2);
return new KeyValue<>(keyValue[0].trim(),keyValue[1]);
}

public void emptyTopic(String clusterId, String topicName) throws ExecutionException, InterruptedException {
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
var topic = topicRepository.findByName(clusterId, topicName);
Expand Down Expand Up @@ -1215,3 +1262,4 @@ private static class EndOffsetBound {
private final KafkaConsumer<byte[], byte[]> consumer;
}
}

Loading

0 comments on commit 6241bd0

Please sign in to comment.