Skip to content

Commit

Permalink
Bug fix and refactoring of code
Browse files Browse the repository at this point in the history
1. Bugfix: Ingest metadata can be null if there is no processor created
2. Refactoring: Moved private method to another class for better testing support
3. Refactoring: Set some private static final variable as public so that unit test can use it
4. Refactoring: Changed string value to static variable

Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed May 12, 2023
1 parent 01720bf commit 690a960
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand All @@ -36,6 +35,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;

/**
* Constructor
Expand All @@ -51,12 +51,14 @@ public DeleteDatasourceTransportAction(
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade
final DatasourceFacade datasourceFacade,
final Ip2GeoProcessorFacade ip2GeoProcessorFacade
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
}

/**
Expand Down Expand Up @@ -101,8 +103,8 @@ protected void deleteDatasource(final String datasourceName) throws IOException
datasourceFacade.deleteDatasource(datasource);
}

private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException {
if (isSafeToDelete(datasource) == false) {
private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
throw new OpenSearchException("datasource is being used by one of processors");
}

Expand All @@ -114,21 +116,10 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) throws IO
// If it fails to update the state back to the previous state, the new processor
// will fail to convert an ip to a geo data.
// In such case, user have to delete the processor and delete this datasource again.
if (isSafeToDelete(datasource) == false) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
}
}

private boolean isSafeToDelete(Datasource datasource) {
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
return ingestMetadata.getPipelines()
.keySet()
.stream()
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName()))
.findAny()
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
/**
* @param name the datasource name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;

public class Ip2GeoProcessorFacade {
private final IngestService ingestService;

@Inject
public Ip2GeoProcessorFacade(final IngestService ingestService) {
this.ingestService = ingestService;
}

public List<Ip2GeoProcessor> getProcessors(final String datasourceName) {
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
if (ingestMetadata == null) {
return Collections.emptyList();
}
return ingestMetadata.getPipelines()
.keySet()
.stream()
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasourceName))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
public static final String CONFIG_FIELD = "field";
public static final String CONFIG_TARGET_FIELD = "target_field";
public static final String CONFIG_DATASOURCE = "datasource";
public static final String CONFIG_PROPERTIES = "target_field";
public static final String CONFIG_PROPERTIES = "properties";
public static final String CONFIG_IGNORE_MISSING = "ignore_missing";
public static final String CONFIG_FIRST_ONLY = "first_only";

Expand Down
27 changes: 27 additions & 0 deletions src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -46,9 +47,11 @@
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.jobscheduler.spi.LockModel;
Expand Down Expand Up @@ -86,6 +89,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
protected TransportService transportService;
@Mock
protected Ip2GeoLockService ip2GeoLockService;
@Mock
protected Ip2GeoProcessorFacade ip2GeoProcessorFacade;
protected IngestMetadata ingestMetadata;
protected NoOpNodeClient client;
protected VerifyingClient verifyingClient;
Expand Down Expand Up @@ -211,6 +216,28 @@ protected LockModel randomLockModel() {
return lockModel;
}

protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
String tag = GeospatialTestHelper.randomLowerCaseString();
String description = GeospatialTestHelper.randomLowerCaseString();
String field = GeospatialTestHelper.randomLowerCaseString();
String targetField = GeospatialTestHelper.randomLowerCaseString();
Set<String> properties = Set.of(GeospatialTestHelper.randomLowerCaseString());
Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor(
tag,
description,
field,
targetField,
datasourceName,
properties,
true,
true,
clusterSettings,
datasourceFacade,
geoIpDataFacade
);
return ip2GeoProcessor;
}

/**
* Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167)
* is merged in OpenSearch core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import lombok.SneakyThrows;

Expand All @@ -32,15 +27,10 @@
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.PipelineConfiguration;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.tasks.Task;

Expand All @@ -49,7 +39,14 @@ public class DeleteDatasourceTransportActionTests extends Ip2GeoTestCase {

@Before
public void init() {
action = new DeleteDatasourceTransportAction(transportService, actionFilters, ip2GeoLockService, ingestService, datasourceFacade);
action = new DeleteDatasourceTransportAction(
transportService,
actionFilters,
ip2GeoLockService,
ingestService,
datasourceFacade,
ip2GeoProcessorFacade
);
}

@SneakyThrows
Expand Down Expand Up @@ -113,6 +110,7 @@ public void testDeleteDatasource_whenNull_thenThrowException() {
public void testDeleteDatasource_whenSafeToDelete_thenDelete() {
Datasource datasource = randomDatasource();
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(Collections.emptyList());

// Run
action.deleteDatasource(datasource.getName());
Expand All @@ -128,14 +126,8 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti
Datasource datasource = randomDatasource();
datasource.setState(DatasourceState.AVAILABLE);
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);

String pipelineId = GeospatialTestHelper.randomLowerCaseString();
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
pipelines.put(pipelineId, createPipelineConfiguration());
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(
Arrays.asList(createIp2GeoProcessor(datasource.getName()))
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(
Arrays.asList(randomIp2GeoProcessor(datasource.getName()))
);

// Run
Expand All @@ -152,15 +144,9 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE
Datasource datasource = randomDatasource();
datasource.setState(DatasourceState.AVAILABLE);
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);

String pipelineId = GeospatialTestHelper.randomLowerCaseString();
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
pipelines.put(pipelineId, createPipelineConfiguration());
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(
Collections.emptyList(),
Arrays.asList(createIp2GeoProcessor(datasource.getName()))
Arrays.asList(randomIp2GeoProcessor(datasource.getName()))
);

// Run
Expand All @@ -170,33 +156,4 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE
verify(datasourceFacade, times(2)).updateDatasource(datasource);
verify(datasourceFacade, never()).deleteDatasource(datasource);
}

private PipelineConfiguration createPipelineConfiguration() {
String id = GeospatialTestHelper.randomLowerCaseString();
ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII));
BytesReference config = BytesReference.fromByteBuffer(byteBuffer);
return new PipelineConfiguration(id, config, XContentType.JSON);
}

private Ip2GeoProcessor createIp2GeoProcessor(String datasourceName) {
String tag = GeospatialTestHelper.randomLowerCaseString();
String description = GeospatialTestHelper.randomLowerCaseString();
String field = GeospatialTestHelper.randomLowerCaseString();
String targetField = GeospatialTestHelper.randomLowerCaseString();
Set<String> properties = Set.of(GeospatialTestHelper.randomLowerCaseString());
Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor(
tag,
description,
field,
targetField,
datasourceName,
properties,
true,
true,
clusterSettings,
datasourceFacade,
geoIpDataFacade
);
return ip2GeoProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;

import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Before;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.PipelineConfiguration;

public class Ip2GeoProcessorFacadeTests extends Ip2GeoTestCase {
private Ip2GeoProcessorFacade ip2GeoProcessorFacade;

@Before
public void init() {
ip2GeoProcessorFacade = new Ip2GeoProcessorFacade(ingestService);
}

public void testGetProcessors_whenNullMetadata_thenReturnEmpty() {
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(null);

List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName);
assertTrue(ip2GeoProcessorList.isEmpty());
}

public void testGetProcessors_whenNoProcessorForGivenDatasource_thenReturnEmpty() {
String datasourceBeingUsed = GeospatialTestHelper.randomLowerCaseString();
String datasourceNotBeingUsed = GeospatialTestHelper.randomLowerCaseString();
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
pipelines.put(pipelineId, createPipelineConfiguration());
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceBeingUsed);
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor));

List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceNotBeingUsed);
assertTrue(ip2GeoProcessorList.isEmpty());
}

public void testGetProcessors_whenProcessorsForGivenDatasource_thenReturnProcessors() {
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
pipelines.put(pipelineId, createPipelineConfiguration());
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceName);
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor));

List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName);
assertEquals(1, ip2GeoProcessorList.size());
assertEquals(ip2GeoProcessor.getDatasourceName(), ip2GeoProcessorList.get(0).getDatasourceName());
}

private PipelineConfiguration createPipelineConfiguration() {
String id = GeospatialTestHelper.randomLowerCaseString();
ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII));
BytesReference config = BytesReference.fromByteBuffer(byteBuffer);
return new PipelineConfiguration(id, config, XContentType.JSON);
}
}

0 comments on commit 690a960

Please sign in to comment.