diff --git a/x-pack/plugin/kafka-consumer/build.gradle b/x-pack/plugin/kafka-consumer/build.gradle new file mode 100644 index 000000000000..ceb6705524dc --- /dev/null +++ b/x-pack/plugin/kafka-consumer/build.gradle @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.internal-cluster-test' + +esplugin { + name 'x-pack-kafka-consumer' + description 'The Kafka Consumer plugin adds a Kafka consumer, for consuming data directly from Kafka and indexing documents.' + classname 'org.elasticsearch.xpack.kafkaconsumer.KafkaConsumerPlugin' +} + +dependencies { + compileOnly project(path: xpackModule('core')) + testImplementation project(path: ':x-pack:plugin:stack') + testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(':modules:data-streams') + clusterModules project(':modules:data-streams') + clusterModules project(':modules:ingest-common') + clusterModules project(':modules:mapper-extras') + //clusterModules project(xpackModule('analytics')) + //clusterModules project(xpackModule('ilm')) + //clusterModules project(xpackModule('mapper-aggregate-metric')) + clusterModules project(xpackModule('mapper-constant-keyword')) + //clusterModules project(xpackModule('mapper-counted-keyword')) + clusterModules project(xpackModule('stack')) + //clusterModules project(xpackModule('wildcard')) +} diff --git a/x-pack/plugin/kafka-consumer/src/main/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPlugin.java b/x-pack/plugin/kafka-consumer/src/main/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPlugin.java new file mode 100644 index 000000000000..0cf6fb9b85ee --- /dev/null +++ b/x-pack/plugin/kafka-consumer/src/main/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPlugin.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.kafkaconsumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackSettings; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class KafkaConsumerPlugin extends Plugin implements PersistentTaskPlugin { + private static final Logger logger = LogManager.getLogger(KafkaConsumerPlugin.class); + + public KafkaConsumerPlugin(Settings settings) { + } + + @Override + public List> getPersistentTasksExecutor( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver + ) { + return Collections.emptyList(); + } + + /* + @Override + public Collection createComponents(PluginServices services) { + logger.info("APM ingest plugin is {}", enabled ? "enabled" : "disabled"); + Settings settings = services.environment().settings(); + ClusterService clusterService = services.clusterService(); + registry.set( + new APMIndexTemplateRegistry(settings, clusterService, services.threadPool(), services.client(), services.xContentRegistry()) + ); + if (enabled) { + APMIndexTemplateRegistry registryInstance = registry.get(); + registryInstance.setEnabled(APM_DATA_REGISTRY_ENABLED.get(settings)); + registryInstance.initialize(); + } + return Collections.emptyList(); + } + */ + + //@Override + //public void close() { + //registry.get(). + //close(); + //} +} diff --git a/x-pack/plugin/kafka-consumer/src/test/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPluginTests.java b/x-pack/plugin/kafka-consumer/src/test/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPluginTests.java new file mode 100644 index 000000000000..47006943e72a --- /dev/null +++ b/x-pack/plugin/kafka-consumer/src/test/java/org/elasticsearch/xpack/kafkaconsumer/KafkaConsumerPluginTests.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.kafkaconsumer; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackSettings; +import org.junit.After; +import org.junit.Before; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class KafkaConsumerPluginTests extends ESTestCase { + private KafkaConsumerPlugin plugin; + private ClusterService clusterService; + private ThreadPool threadPool; + + @Before + public void createPlugin() { + final ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream() + .collect(Collectors.toSet()) + ); + threadPool = new TestThreadPool(this.getClass().getName()); + clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings); + plugin = new KafkaConsumerPlugin(Settings.builder().build()); + } + + private void createComponents() { + Environment mockEnvironment = mock(Environment.class); + when(mockEnvironment.settings()).thenReturn(Settings.builder().build()); + Plugin.PluginServices services = mock(Plugin.PluginServices.class); + when(services.clusterService()).thenReturn(clusterService); + when(services.threadPool()).thenReturn(threadPool); + when(services.environment()).thenReturn(mockEnvironment); + //plugin.createComponents(services); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + plugin.close(); + threadPool.shutdownNow(); + } +}