diff --git a/docker/sasl/docker-compose.yml b/docker/sasl/docker-compose.yml index ca9b1fadb..dbc728c41 100644 --- a/docker/sasl/docker-compose.yml +++ b/docker/sasl/docker-compose.yml @@ -39,10 +39,46 @@ services: KAFKA_MECHANISMS_INTER_BROKER_PROTOCOL: 'SASL_PLAINTEXT' KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_SUPER_USERS: 'User:kafka' + KAFKA_SUPER_USERS: 'User:kafka;User:ANONYMOUS' KAFKA_OPTS: '-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf' KAFKA_LOG4J_LOGGERS: "kafka.authorizer.logger=DEBUG" + kafka-connect: + build: + context: kafka-connect/ + dockerfile: Dockerfile + container_name: kafka-connect + depends_on: + - zookeeper + - kafka + - schema-registry + ports: + - 18083:18083 + environment: + CUB_CLASSPATH: '/usr/share/java/confluent-security/connect/*:/usr/share/java/kafka/*:/usr/share/java/cp-base-new/*' + CLASSPATH: "/usr/share/java/kafka-connect-replicator/*:/usr/share/java/monitoring-interceptors/*" + CONNECT_BOOTSTRAP_SERVERS: "kafka:29092" + CONNECT_REST_PORT: 18083 + CONNECT_GROUP_ID: kafka-connect + CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status + CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components + volumes: + - $PWD/scripts:/scripts + - $PWD/connect-plugins:/usr/share/confluent-hub-components + - $PWD/jars:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/jars + schema-registry: image: confluentinc/cp-schema-registry:${TAG} hostname: schema-registry diff --git a/docker/sasl/kafka-connect/Dockerfile b/docker/sasl/kafka-connect/Dockerfile new file mode 100644 index 000000000..b21f70f62 --- /dev/null +++ b/docker/sasl/kafka-connect/Dockerfile @@ -0,0 +1,8 @@ +FROM confluentinc/cp-server-connect:6.2.0 + +ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" + +USER root + +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.5.0 \ + && confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.1 diff --git a/docker/sasl/kafka/kafka.properties b/docker/sasl/kafka/kafka.properties index 7a8716c22..4750263d9 100644 --- a/docker/sasl/kafka/kafka.properties +++ b/docker/sasl/kafka/kafka.properties @@ -1,5 +1,5 @@ inter.broker.listener.name=SASL_PLAINTEXT -super.users=User:kafka +super.users=User:kafka;User:ANONYMOUS mechanisms.inter.broker.protocol=SASL_PLAINTEXT group.initial.rebalance.delay.ms=100 auto.create.topics.enable=true diff --git a/example/connectors/sink-jdbc.json b/example/connectors/sink-jdbc.json new file mode 100644 index 000000000..4116a1a48 --- /dev/null +++ b/example/connectors/sink-jdbc.json @@ -0,0 +1,9 @@ +{ + "name": "sink-jdbc", + "config": { + "connector.class": "FileStreamSource", + "tasks.max": "1", + "file": "/tmp/test.txt", + "topic": "connect-test" + } +} \ No newline at end of file diff --git a/example/connectors/source-jdbc.json b/example/connectors/source-jdbc.json new file mode 100644 index 000000000..fb87bf316 --- /dev/null +++ b/example/connectors/source-jdbc.json @@ -0,0 +1,9 @@ +{ + "name": "source-jdbc", + "config": { + "connector.class": "FileStreamSource", + "tasks.max": "1", + "file": "/tmp/test.txt", + "topic": "connect-test" + } +} \ No newline at end of file diff --git a/example/descriptor-connect.yaml b/example/descriptor-connect.yaml new file mode 100644 index 000000000..8a9c1109e --- /dev/null +++ b/example/descriptor-connect.yaml @@ -0,0 +1,93 @@ +--- +context: "context" +company: "company" +env: "env" +source: "source" +projects: + - name: "projectA" + consumers: + - principal: "User:App0" + - principal: "User:App1" + producers: + - principal: "User:App3" + - principal: "User:App4" + streams: + - principal: "User:Streams0" + topics: + read: + - "topicA" + - "topicB" + write: + - "topicC" + - "topicD" + connectors: + artifacts: + - path: "connectors/source-jdbc.json" + server: "connect" + name: "source-jdbc" + - path: "connectors/sink-jdbc.json" + server: "connect" + name: "sink-jdbc" + access_control: + - principal: "User:Connect1" + cluster_id: "foo" + group: "group" + status_topic: "status" + offset_topic: "offset" + configs_topic: "configs" + topics: + read: + - "topicA" + - "topicB" + - principal: "User:Connect2" + topics: + write: + - "topicC" + - "topicD" + topics: + - name: "foo" + config: + replication.factor: "1" + num.partitions: "1" + - name: "bar" + dataType: "avro" + config: + replication.factor: "1" + num.partitions: "1" + - name: "projectB" + topics: + - dataType: "avro" + name: "bar" + config: + replication.factor: "1" + num.partitions: "1" + - name: "projectC" + streams: + - principal: "User:App0" + applicationId: "streamsApplicationId" + topics: + read: + - "topicE" + write: + - "topicF" + topics: + - name: "topicE" + config: + replication.factor: "1" + num.partitions: "1" + - name: "topicF" + config: + replication.factor: "1" + num.partitions: "1" +platform: + schema_registry: + instances: + - principal: "User:SchemaRegistry01" + topic: "foo" + group: "bar" + - principal: "User:SchemaRegistry02" + topic: "zet" + control_center: + instances: + - principal: "User:ControlCenter" + appId: "controlcenter" diff --git a/example/topology-builder-sasl-plain.properties b/example/topology-builder-sasl-plain.properties index 96a7ac9e4..92f2e5c4a 100644 --- a/example/topology-builder-sasl-plain.properties +++ b/example/topology-builder-sasl-plain.properties @@ -5,4 +5,6 @@ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule require password="kafka"; #topology.validations.0=com.purbon.kafka.topology.validation.topology.CamelCaseNameFormatValidation #topology.validations.1=com.purbon.kafka.topology.validation.topic.PartitionNumberValidation -#confluent.schema.registry.url="http://localhost:8082" \ No newline at end of file +#confluent.schema.registry.url="http://localhost:8082" +platform.servers.connect.0=connect:http://localhost:18083 +topology.state.cluster.enabled=false \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java b/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java index 3444a2c9e..50259ca62 100644 --- a/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java +++ b/src/main/java/com/purbon/kafka/topology/api/connect/KConnectApiClient.java @@ -64,4 +64,9 @@ public String status(String connectorName) throws IOException { public void pause(String connectorName) throws IOException { doPut("/connectors/" + connectorName + "/pause"); } + + @Override + public String toString() { + return "KConnectApiClient{" + server + "}"; + } } diff --git a/src/main/java/com/purbon/kafka/topology/model/Artefact.java b/src/main/java/com/purbon/kafka/topology/model/Artefact.java index d91636751..4402b0ede 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Artefact.java +++ b/src/main/java/com/purbon/kafka/topology/model/Artefact.java @@ -50,4 +50,19 @@ public int hashCode() { return Objects.hash(getName().toLowerCase()); } } + + @Override + public String toString() { + return "Artefact{" + + "path='" + + path + + '\'' + + ", serverLabel='" + + serverLabel + + '\'' + + ", name='" + + name + + '\'' + + '}'; + } }