diff --git a/ui/src/containers/configs/NewConfig.js b/ui/src/containers/configs/NewConfig.js index 70ee49d6..4b524bac 100644 --- a/ui/src/containers/configs/NewConfig.js +++ b/ui/src/containers/configs/NewConfig.js @@ -8,6 +8,11 @@ import { getConfigKindOptions } from "../../helpers/getConfigKindOptions"; import Header from "../../components/layout/Header"; import { addConfig } from "../../actions/configs"; import { fetchPipelines } from "../../actions/pipelines"; +import { getStreamConnectionOptions } from "../../helpers/getStreamConnectionOptions"; +import { getStreamTopicOptions } from "../../helpers/getStreamTopicOptions"; +import { getDeserializerOptions } from "../../helpers/getDeserializerOptions"; +import { getSerializerOptions } from "../../helpers/getSerializerOptions"; +import { isStreamHoldingAvroFormat } from "../../helpers/isStreamHoldingAvroFormat"; import "../../scss/fonts.scss"; class NewConfig extends Component { @@ -17,6 +22,7 @@ class NewConfig extends Component { this.state = { creatingConfigFailed: false, errorMessages: {}, + showTopicConfig: false, config: { name: "", kind: "STREAM", @@ -42,14 +48,39 @@ class NewConfig extends Component { }, }, }, + tempStreamConfig: { + topicName: "", + topicValue: "", + connectionName: "", + connectionValue: "", + }, configCreated: false, }; this.handleCreateConfig = this.handleCreateConfig.bind(this); this.handleChange = this.handleChange.bind(this); + this.handleEventChange = this.handleEventChange.bind(this); this.addLabel = this.addLabel.bind(this); this.removeLabel = this.removeLabel.bind(this); + this.toggleShowTopicConfig = this.toggleShowTopicConfig.bind(this); + this.addStreamConfig = this.addStreamConfig.bind(this); + this.removeStreamConfig = this.removeStreamConfig.bind(this); } + updateTempStreamConfig(field, value) { + let tempStreamConfig = this.state.tempStreamConfig; + + tempStreamConfig[field] = value; + + this.setState({ tempStreamConfig: tempStreamConfig }); + } + + updateConnectionConfig(field, value) { + let stream = this.state.stream; + + stream.spec.kafka[field] = value; + + this.setState({ stream: stream }); + } componentDidMount() { @@ -88,11 +119,49 @@ class NewConfig extends Component { }); } + handleEventChange(event) { + let stream = this.state.stream; + + const newValue = + event.target.type === "checkbox" + ? event.target.checked + : event.target.value; + + switch (event.target.dataset.prefix) { + case "stream.spec": + stream["spec"][event.target.name] = newValue; + break; + case "stream.spec.kafka": + stream["spec"]["kafka"][event.target.name] = newValue; + break; + case "stream.spec.kafka.topic": + stream["spec"]["kafka"]["topic"][event.target.name] = newValue; + break; + case "stream.spec.kafka.topic.config": + stream["spec"]["kafka"]["topic"]["config"][event.target.name] = + newValue; + break; + default: + stream[event.target.name] = newValue; + break; + } + + this.setState({ + creatingConfigFailed: false, + errorMessage: "", + stream: stream, + }); + } + handleChange(name, value, prefix) { let config = this.state.config; let deployment = this.state.deployment; let stream = this.state.stream; + if(prefix === undefined){ + prefix = ""; + } + if(prefix === "deployment.spec"){ deployment.spec[name] = value; } else{ @@ -105,10 +174,34 @@ class NewConfig extends Component { config.spec = stream.spec; } + if(prefix.includes("stream")){ + switch (event.target.dataset.prefix) { + case "stream.spec": + stream["spec"][event.target.name] = newValue; + break; + case "stream.spec.kafka": + stream["spec"]["kafka"][event.target.name] = newValue; + break; + case "stream.spec.kafka.topic": + stream["spec"]["kafka"]["topic"][event.target.name] = newValue; + break; + case "stream.spec.kafka.topic.config": + stream["spec"]["kafka"]["topic"]["config"][event.target.name] = + newValue; + break; + default: + stream[event.target.name] = newValue; + break; + } + } + + this.setState({ creatingConfigFailed: false, errorMessage: "", config: config, + deployment: deployment, + stream: stream, }); } @@ -128,15 +221,55 @@ class NewConfig extends Component { this.setState({ config: config }); } - setDefaultKind(field, value) { - let config = this.state.config; + setDefaultKind(field, value) { + let config = this.state.config; + + config.kind = value; + + this.setState({ config: config }); + return this.state.config.kind; + } + + toggleShowTopicConfig(event) { + event.preventDefault(); - config.kind = value; + this.setState({ + showTopicConfig: !this.state.showTopicConfig, + }); + } + + addStreamConfig(event) { + event.preventDefault(); + const tempStreamConfig = this.state.tempStreamConfig; + let stream = this.state.stream; + + if (event.target.dataset.prefix === "stream.spec.kafka") { + stream.spec.kafka[tempStreamConfig.connectionName] = tempStreamConfig.connectionValue; + this.setState({ stream: stream, tempStreamConfig: tempStreamConfig }); + } else if (event.target.dataset.prefix === "stream.spec.kafka.topic.config") { + stream.spec.kafka.topic.config[tempStreamConfig.topicName] = + tempStreamConfig.topicValue; + this.setState({ stream: stream, tempStreamConfig: tempStreamConfig }); + } + } - this.setState({ config: config }); - return this.state.config.kind; + removeStreamConfig(event) { + event.preventDefault(); + let stream = this.state.stream; + const prefix = event.target.dataset.prefix; + const streamConfig = event.target.dataset.config; + + if (prefix === "stream.spec.kafka") { + delete stream.spec.kafka[streamConfig]; + } else if (prefix === "stream.spec.kafka.topic.config") { + delete stream.spec.kafka.topic.config[streamConfig]; } + this.setState({ stream: stream }); + } + + + render() { if (this.state.configCreated) { return ( @@ -156,6 +289,32 @@ class NewConfig extends Component { return { value: pipeline.uuid, label: name }; }); + const stream = this.state.stream; + + const topicOptions = getStreamTopicOptions(); + const connectionOptions = getStreamConnectionOptions(); + const deserializerOptions = getDeserializerOptions(stream); + const serializerOptions = getSerializerOptions(stream); + + const defaultDeserializer = "io.datacater.core.serde.JsonDeserializer"; + const defaultSerializer = "io.datacater.core.serde.JsonSerializer"; + + const addedTopicConfigs = Object.keys(stream.spec.kafka.topic.config); + const addedConnectionConfigs = Object.keys(stream.spec.kafka).filter( + (item) => + ![ + "bootstrap.servers", + "key.deserializer", + "value.deserializer", + "key.serializer", + "value.serializer", + "schema.registry.url", + "topic", + ].includes(item) + ); + + const streamHoldsAvroFormat = isStreamHoldingAvroFormat(stream); + return (