Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Dec 16, 2024
1 parent c1569b2 commit f526a9e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
32 changes: 32 additions & 0 deletions x-pack/plugin/kafka-consumer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-cluster-test'

esplugin {
name 'x-pack-kafka-consumer'
description 'The Kafka Consumer plugin adds a Kafka consumer, for consuming data directly from Kafka and indexing documents.'
classname 'org.elasticsearch.xpack.kafkaconsumer.KafkaConsumerPlugin'
}

dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation project(path: ':x-pack:plugin:stack')
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(':modules:data-streams')
clusterModules project(':modules:data-streams')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:mapper-extras')
//clusterModules project(xpackModule('analytics'))
//clusterModules project(xpackModule('ilm'))
//clusterModules project(xpackModule('mapper-aggregate-metric'))
clusterModules project(xpackModule('mapper-constant-keyword'))
//clusterModules project(xpackModule('mapper-counted-keyword'))
clusterModules project(xpackModule('stack'))
//clusterModules project(xpackModule('wildcard'))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.kafkaconsumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class KafkaConsumerPlugin extends Plugin implements PersistentTaskPlugin {
private static final Logger logger = LogManager.getLogger(KafkaConsumerPlugin.class);

public KafkaConsumerPlugin(Settings settings) {
}

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return Collections.emptyList();
}

/*
@Override
public Collection<?> createComponents(PluginServices services) {
logger.info("APM ingest plugin is {}", enabled ? "enabled" : "disabled");
Settings settings = services.environment().settings();
ClusterService clusterService = services.clusterService();
registry.set(
new APMIndexTemplateRegistry(settings, clusterService, services.threadPool(), services.client(), services.xContentRegistry())
);
if (enabled) {
APMIndexTemplateRegistry registryInstance = registry.get();
registryInstance.setEnabled(APM_DATA_REGISTRY_ENABLED.get(settings));
registryInstance.initialize();
}
return Collections.emptyList();
}
*/

//@Override
//public void close() {
//registry.get().
//close();
//}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.kafkaconsumer;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackSettings;
import org.junit.After;
import org.junit.Before;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KafkaConsumerPluginTests extends ESTestCase {
private KafkaConsumerPlugin plugin;
private ClusterService clusterService;
private ThreadPool threadPool;

@Before
public void createPlugin() {
final ClusterSettings clusterSettings = new ClusterSettings(
Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream()
.collect(Collectors.toSet())
);
threadPool = new TestThreadPool(this.getClass().getName());
clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
plugin = new KafkaConsumerPlugin(Settings.builder().build());
}

private void createComponents() {
Environment mockEnvironment = mock(Environment.class);
when(mockEnvironment.settings()).thenReturn(Settings.builder().build());
Plugin.PluginServices services = mock(Plugin.PluginServices.class);
when(services.clusterService()).thenReturn(clusterService);
when(services.threadPool()).thenReturn(threadPool);
when(services.environment()).thenReturn(mockEnvironment);
//plugin.createComponents(services);
}

@After
@Override
public void tearDown() throws Exception {
super.tearDown();
plugin.close();
threadPool.shutdownNow();
}
}

0 comments on commit f526a9e

Please sign in to comment.