diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index 58cc32d629760..1c6ffde791816 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -25,6 +25,7 @@ uncompressed. The `ingest-geoip` config directory is located at `$ES_CONFIG/inge | `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip module ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files. | `properties` | no | [`continent_name`, `country_iso_code`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup. | `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document +| `first_only` | no | `false` | If `true` only first found geoip data will be returned, even if `field` contains array |====== *Depends on what is available in `database_file`: diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5c82c68d93032..de57ce85d5208 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -41,6 +41,7 @@ import java.net.InetAddress; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -68,26 +69,28 @@ public final class GeoIpProcessor extends AbstractProcessor { private final Set properties; private final boolean ignoreMissing; private final GeoIpCache cache; + private final boolean firstOnly; /** * Construct a geo-IP processor. - * - * @param tag the processor tag + * @param tag the processor tag * @param field the source field to geo-IP map * @param lazyLoader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use * @param targetField the target field * @param properties the properties; ideally this is lazily-loaded once on first use * @param ignoreMissing true if documents with a missing value for the field should be ignored * @param cache a geo-IP cache + * @param firstOnly true if only first result should be returned in case of array */ GeoIpProcessor( - final String tag, - final String field, - final DatabaseReaderLazyLoader lazyLoader, - final String targetField, - final Set properties, - final boolean ignoreMissing, - final GeoIpCache cache) { + final String tag, + final String field, + final DatabaseReaderLazyLoader lazyLoader, + final String targetField, + final Set properties, + final boolean ignoreMissing, + final GeoIpCache cache, + boolean firstOnly) { super(tag); this.field = field; this.targetField = targetField; @@ -95,6 +98,7 @@ public final class GeoIpProcessor extends AbstractProcessor { this.properties = properties; this.ignoreMissing = ignoreMissing; this.cache = cache; + this.firstOnly = firstOnly; } boolean isIgnoreMissing() { @@ -103,7 +107,7 @@ boolean isIgnoreMissing() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws IOException { - String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing); + Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); if (ip == null && ignoreMissing) { return ingestDocument; @@ -111,11 +115,37 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information."); } - final InetAddress ipAddress = InetAddresses.forString(ip); + if (ip instanceof String) { + Map geoData = getGeoData((String) ip); + if (geoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, geoData); + } + } else if (ip instanceof Iterable) { + List> geoDataList = new ArrayList<>(); + for (Object ipAddr : (Iterable) ip) { + if (ipAddr instanceof String == false) { + throw new IllegalArgumentException("array in field [" + field + "] should only contain strings"); + } + Map geoData = getGeoData((String) ipAddr); + if (firstOnly && geoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, geoData); + return ingestDocument; + } + geoDataList.add(geoData.isEmpty() ? null : geoData); + } + if (firstOnly == false) { + ingestDocument.setFieldValue(targetField, geoDataList); + } + } else { + throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings"); + } + return ingestDocument; + } - Map geoData; + private Map getGeoData(String ip) throws IOException { String databaseType = lazyLoader.getDatabaseType(); - + final InetAddress ipAddress = InetAddresses.forString(ip); + Map geoData; if (databaseType.endsWith(CITY_DB_SUFFIX)) { try { geoData = retrieveCityGeoData(ipAddress); @@ -138,10 +168,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType() + "]", new IllegalStateException()); } - if (geoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, geoData); - } - return ingestDocument; + return geoData; } @Override @@ -368,6 +395,7 @@ public GeoIpProcessor create( String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb"); List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", false); DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile); if (lazyLoader == null) { @@ -401,7 +429,7 @@ public GeoIpProcessor create( } } - return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache); + return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache, firstOnly); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index b136fbae0376a..34e40a803ebf1 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -29,9 +29,11 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -39,13 +41,14 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -70,7 +73,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -81,7 +84,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -91,7 +94,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -102,7 +105,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument)); @@ -112,7 +115,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -141,7 +144,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -158,7 +161,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -178,7 +181,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -196,7 +199,7 @@ public void testAsn() throws Exception { String ip = "82.171.64.0"; GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", ip); @@ -215,7 +218,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -228,7 +231,7 @@ public void testAddressIsNotInTheDatabase() throws Exception { public void testInvalid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); @@ -237,6 +240,80 @@ public void testInvalid() throws Exception { assertThat(e.getMessage(), containsString("not an IP string literal")); } + public void testListAllValid() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), false); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + List> geoData = (List>) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get(0).get("location"), equalTo(location)); + + assertThat(geoData.get(1).get("city_name"), equalTo("Hoensbroek")); + } + + public void testListPartiallyValid() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), false); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + List> geoData = (List>) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get(0).get("location"), equalTo(location)); + + assertThat(geoData.get(1), nullValue()); + } + + public void testListFirstOnly() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), true); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get("location"), equalTo(location)); + } + + public void testListFirstOnlyNoMatches() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), true); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false)); + } + private DatabaseReaderLazyLoader loader(final String path) { final Supplier databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path); final CheckedSupplier loader = diff --git a/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml b/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml index 27ab1f4e8747d..5a44549e36ba7 100644 --- a/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml +++ b/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml @@ -37,6 +37,87 @@ - match: { _source.geoip.region_name: "Minnesota" } - match: { _source.geoip.continent_name: "North America" } +--- +"Test geoip processor with list": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "geoip" : { + "field" : "field1" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {field1: ["128.101.101.101", "127.0.0.1"]} + + - do: + get: + index: test + id: 1 + - match: { _source.field1: ["128.101.101.101", "127.0.0.1"] } + - length: { _source.geoip: 2 } + - length: { _source.geoip.0: 6 } + - match: { _source.geoip.0.city_name: "Minneapolis" } + - match: { _source.geoip.0.country_iso_code: "US" } + - match: { _source.geoip.0.location.lon: -93.2548 } + - match: { _source.geoip.0.location.lat: 44.9399 } + - match: { _source.geoip.0.region_iso_code: "US-MN" } + - match: { _source.geoip.0.region_name: "Minnesota" } + - match: { _source.geoip.0.continent_name: "North America" } + - match: { _source.geoip.1: null } + +--- +"Test geoip processor with lists, first only": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "geoip" : { + "field" : "field1", + "first_only" : true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"]} + + - do: + get: + index: test + id: 1 + - match: { _source.field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"] } + - length: { _source.geoip: 6 } + - match: { _source.geoip.city_name: "Minneapolis" } + - match: { _source.geoip.country_iso_code: "US" } + - match: { _source.geoip.location.lon: -93.2548 } + - match: { _source.geoip.location.lat: 44.9399 } + - match: { _source.geoip.region_iso_code: "US-MN" } + - match: { _source.geoip.region_name: "Minnesota" } + - match: { _source.geoip.continent_name: "North America" } + --- "Test geoip processor with fields": - do: