From bfa54aca50115770530ca6fdff24d4125541d23b Mon Sep 17 00:00:00 2001 From: Nilkamal <107556247+nilkamalthakuria@users.noreply.github.com> Date: Wed, 13 Jul 2022 02:38:17 +0530 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20New=20Source:=20Elasticsearch=20?= =?UTF-8?q?(#14118)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * New source Elasticsearch * Update README.md * Update tests * File additions and changes in test * Deleting spec.json * File additions and changes in test * Fixed multiple streams bug in discover * run format * format files * eof and update doc * correct spec test * auto-bump connector version Co-authored-by: marcosmarxm Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 7 + .../src/main/resources/seed/source_specs.yaml | 75 ++++++ .../source-elasticsearch/.dockerignore | 3 + .../source-elasticsearch/Dockerfile | 21 ++ .../connectors/source-elasticsearch/README.md | 70 +++++ .../acceptance-test-config.yml | 6 + .../source-elasticsearch/build.gradle | 48 ++++ .../elasticsearch/ConnectorConfiguration.java | 159 +++++++++++ .../ElasticsearchAuthenticationMethod.java | 11 + .../ElasticsearchConnection.java | 247 ++++++++++++++++++ .../elasticsearch/ElasticsearchConstants.java | 14 + .../ElasticsearchInclusions.java | 15 ++ .../elasticsearch/ElasticsearchSource.java | 116 ++++++++ .../elasticsearch/ElasticsearchUtils.java | 40 +++ .../UnsupportedDatatypeException.java | 13 + .../typemapper/ElasticsearchTypeMapper.java | 157 +++++++++++ .../main/resources/log4j2-test.example.yml | 33 +++ .../src/main/resources/spec.json | 82 ++++++ .../ElasticsearchSourceAcceptanceTest.java | 116 ++++++++ .../resources/configured_catalog.json | 13 + .../ElasticsearchSourcesTest.java | 67 +++++ .../src/test/resources/expected_output.json | 37 +++ .../expected_output_extra_fields.json | 8 + .../src/test/resources/expected_spec.json | 82 ++++++ .../src/test/resources/sample_input.json | 34 +++ .../resources/sample_input_extra_fields.json | 13 + docs/integrations/sources/elasticsearch.md | 87 ++++++ 27 files changed, 1574 insertions(+) create mode 100644 airbyte-integrations/connectors/source-elasticsearch/.dockerignore create mode 100644 airbyte-integrations/connectors/source-elasticsearch/Dockerfile create mode 100644 airbyte-integrations/connectors/source-elasticsearch/README.md create mode 100644 airbyte-integrations/connectors/source-elasticsearch/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-elasticsearch/build.gradle create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ConnectorConfiguration.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchAuthenticationMethod.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConnection.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConstants.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchInclusions.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSource.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchUtils.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/UnsupportedDatatypeException.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/typemapper/ElasticsearchTypeMapper.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/resources/log4j2-test.example.yml create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/main/resources/spec.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test-integration/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourceAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test-integration/resources/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourcesTest.java create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output_extra_fields.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_spec.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input.json create mode 100644 airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input_extra_fields.json create mode 100644 docs/integrations/sources/elasticsearch.md diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 42be81eecbce..aa8cf4b7aa97 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1113,3 +1113,10 @@ documentationUrl: https://docs.airbyte.io/integrations/sources/firebolt sourceType: database releaseStage: alpha +- name: Elasticsearch + sourceDefinitionId: 7cf88806-25f5-4e1a-b422-b2fa9e1b0090 + dockerRepository: airbyte/source-elasticsearch + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/sources/elasticsearch + sourceType: api + releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e528bcdd4fa9..7d0d9d2e0e13 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -10450,3 +10450,78 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-elasticsearch:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.io/integrations/source/elasticsearch" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Elasticsearch Connection Configuration" + type: "object" + required: + - "endpoint" + additionalProperties: false + properties: + endpoint: + title: "Server Endpoint" + type: "string" + description: "The full url of the Elasticsearch server" + authenticationMethod: + title: "Authentication Method" + type: "object" + description: "The type of authentication to be used" + oneOf: + - title: "None" + additionalProperties: false + description: "No authentication will be used" + required: + - "method" + properties: + method: + type: "string" + const: "none" + - title: "Api Key/Secret" + additionalProperties: false + description: "Use a api key and secret combination to authenticate" + required: + - "method" + - "apiKeyId" + - "apiKeySecret" + properties: + method: + type: "string" + const: "secret" + apiKeyId: + title: "API Key ID" + description: "The Key ID to used when accessing an enterprise Elasticsearch\ + \ instance." + type: "string" + apiKeySecret: + title: "API Key Secret" + description: "The secret associated with the API Key ID." + type: "string" + airbyte_secret: true + - title: "Username/Password" + additionalProperties: false + description: "Basic auth header with a username and password" + required: + - "method" + - "username" + - "password" + properties: + method: + type: "string" + const: "basic" + username: + title: "Username" + description: "Basic auth username to access a secure Elasticsearch\ + \ server" + type: "string" + password: + title: "Password" + description: "Basic auth password to access a secure Elasticsearch\ + \ server" + type: "string" + airbyte_secret: true + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-elasticsearch/.dockerignore b/airbyte-integrations/connectors/source-elasticsearch/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/source-elasticsearch/Dockerfile b/airbyte-integrations/connectors/source-elasticsearch/Dockerfile new file mode 100644 index 000000000000..bb08a5f841b1 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/Dockerfile @@ -0,0 +1,21 @@ +FROM airbyte/integration-base-java:dev AS build + +WORKDIR /airbyte + +ENV APPLICATION source-elasticsearch + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar + +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION source-elasticsearch +ENV ENABLE_SENTRY true + +COPY --from=build /airbyte /airbyte + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-elasticsearch diff --git a/airbyte-integrations/connectors/source-elasticsearch/README.md b/airbyte-integrations/connectors/source-elasticsearch/README.md new file mode 100644 index 000000000000..4881e6583321 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/README.md @@ -0,0 +1,70 @@ +# Elasticsearch source + +This is the repository for the Elasticsearch source connector, written in Java using Elasticsearch's High Level Rest Client([HLRC](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html)). +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/elasticsearch). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-elasticsearch:build +``` + +#### Create credentials +Credentials can be provided in three ways: +1. Basic +2. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-elasticsearch:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-elasticsearch:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-elasticsearch:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-elasticsearch:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-elasticsearch:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +#### Sync Mode Support +Current version of this connector only allows the `FULL REFRESH` mode. + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/sources/elasticsearch-test`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. See example(s) in +`src/test-integration/java/io/airbyte/integrations/sources/elasticsearch/`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:sources-elasticsearch:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:sources-elasticsearch:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +2. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +3. Create a Pull Request. +4. Pat yourself on the back for being an awesome contributor. +5. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-elasticsearch/acceptance-test-config.yml b/airbyte-integrations/connectors/source-elasticsearch/acceptance-test-config.yml new file mode 100644 index 000000000000..fed3fdc47c64 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/acceptance-test-config.yml @@ -0,0 +1,6 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-elasticsearch +tests: + spec: + - spec_path: "src/test/resources/expected_spec.json" diff --git a/airbyte-integrations/connectors/source-elasticsearch/build.gradle b/airbyte-integrations/connectors/source-elasticsearch/build.gradle new file mode 100644 index 000000000000..dbd991036297 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/build.gradle @@ -0,0 +1,48 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.source.elasticsearch.ElasticsearchSource' + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] +} + +dependencies { + implementation project(':airbyte-config:config-models') + implementation project(':airbyte-protocol:protocol-models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation 'co.elastic.clients:elasticsearch-java:7.15.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3' + + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://eclipse-ee4j.github.io/jsonp/ + implementation 'jakarta.json:jakarta.json-api:2.0.1' + + // Needed even if using Jackson to have an implementation of the Jsonp object model + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + // https://github.com/eclipse-ee4j/jsonp + implementation 'org.glassfish:jakarta.json:2.0.1' + + // MIT + // https://www.testcontainers.org/ + testImplementation libs.connectors.testcontainers.elasticsearch + integrationTestJavaImplementation libs.connectors.testcontainers.elasticsearch + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-elasticsearch') +} + +repositories { + maven { + name = "ESSnapshots" + url = "https://snapshots.elastic.co/maven/" + } + maven { + name = "ESJavaGithubPackages" + url = "https://maven.pkg.github.com/elastic/elasticsearch-java" + } +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ConnectorConfiguration.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ConnectorConfiguration.java new file mode 100644 index 000000000000..d6b25ff1815f --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ConnectorConfiguration.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ConnectorConfiguration { + + private String endpoint; + private boolean upsert; + private AuthenticationMethod authenticationMethod = new AuthenticationMethod(); + + public ConnectorConfiguration() {} + + public static ConnectorConfiguration fromJsonNode(JsonNode config) { + return new ObjectMapper().convertValue(config, ConnectorConfiguration.class); + } + + public String getEndpoint() { + return this.endpoint; + } + + public boolean isUpsert() { + return this.upsert; + } + + public AuthenticationMethod getAuthenticationMethod() { + return this.authenticationMethod; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public void setUpsert(boolean upsert) { + this.upsert = upsert; + } + + public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) { + this.authenticationMethod = authenticationMethod; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ConnectorConfiguration that = (ConnectorConfiguration) o; + return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod); + } + + @Override + public int hashCode() { + return Objects.hash(endpoint, upsert, authenticationMethod); + } + + @Override + public String toString() { + return "ConnectorConfiguration{" + + "endpoint='" + endpoint + '\'' + + ", upsert=" + upsert + + ", authenticationMethod=" + authenticationMethod + + '}'; + } + + static class AuthenticationMethod { + + private ElasticsearchAuthenticationMethod method = ElasticsearchAuthenticationMethod.none; + private String username; + private String password; + private String apiKeyId; + private String apiKeySecret; + + public ElasticsearchAuthenticationMethod getMethod() { + return this.method; + } + + public String getUsername() { + return this.username; + } + + public String getPassword() { + return this.password; + } + + public String getApiKeyId() { + return this.apiKeyId; + } + + public String getApiKeySecret() { + return this.apiKeySecret; + } + + public void setMethod(ElasticsearchAuthenticationMethod method) { + this.method = method; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setApiKeyId(String apiKeyId) { + this.apiKeyId = apiKeyId; + } + + public void setApiKeySecret(String apiKeySecret) { + this.apiKeySecret = apiKeySecret; + } + + public boolean isValid() { + return switch (this.method) { + case none -> true; + case basic -> Objects.nonNull(this.username) && Objects.nonNull(this.password); + case secret -> Objects.nonNull(this.apiKeyId) && Objects.nonNull(this.apiKeySecret); + }; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + AuthenticationMethod that = (AuthenticationMethod) o; + return method == that.method && + Objects.equals(username, that.username) && + Objects.equals(password, that.password) && + Objects.equals(apiKeyId, that.apiKeyId) && + Objects.equals(apiKeySecret, that.apiKeySecret); + } + + @Override + public int hashCode() { + return Objects.hash(method, username, password, apiKeyId, apiKeySecret); + } + + @Override + public String toString() { + return "AuthenticationMethod{" + + "method=" + method + + ", username='" + username + '\'' + + ", apiKeyId='" + apiKeyId + '\'' + + '}'; + } + + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchAuthenticationMethod.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchAuthenticationMethod.java new file mode 100644 index 000000000000..b75a7e9495ab --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchAuthenticationMethod.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +public enum ElasticsearchAuthenticationMethod { + none, + secret, + basic +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConnection.java new file mode 100644 index 000000000000..e677cfa53496 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConnection.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import co.elastic.clients.base.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.regex.Pattern; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexResponse; +import org.elasticsearch.client.indices.GetMappingsRequest; +import org.elasticsearch.client.indices.GetMappingsResponse; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * All communication with Elasticsearch should be done through this class. + */ +public class ElasticsearchConnection { + + private static final int MAX_HITS = 10000; + private static final Logger log = LoggerFactory.getLogger(ElasticsearchConnection.class); + private final RestHighLevelClient client; + private final ObjectMapper mapper = new ObjectMapper(); + + /** + * Creates a new ElasticsearchConnection that can be used to read/write records to indices + * + * @param config Configuration parameters for connecting to the Elasticsearch host + */ + public ElasticsearchConnection(ConnectorConfiguration config) { + log.info(String.format( + "creating ElasticsearchConnection: %s", config.getEndpoint())); + + // Create the low-level client + + HttpHost httpHost = HttpHost.create(config.getEndpoint()); + + RestClientBuilder builder = RestClient.builder(httpHost).setDefaultHeaders(configureHeaders(config)).setFailureListener((new FailureListener())); + client = new RestHighLevelClient(builder); + } + + static class FailureListener extends RestClient.FailureListener { + + @Override + public void onFailure(Node node) { + log.error("RestClient failure: {}", node); + } + + } + + /** + * Configures the default headers for requests to the Elasticsearch server + * + * @param config connection information + * @return the default headers + */ + protected Header[] configureHeaders(ConnectorConfiguration config) { + final var headerList = new ArrayList
(); + // add Authorization header if credentials are present + final var auth = config.getAuthenticationMethod(); + switch (auth.getMethod()) { + case secret -> { + var bytes = (auth.getApiKeyId() + ":" + auth.getApiKeySecret()).getBytes(StandardCharsets.UTF_8); + var header = "ApiKey " + Base64.getEncoder().encodeToString(bytes); + headerList.add(new BasicHeader("Authorization", header)); + } + case basic -> { + var basicBytes = (auth.getUsername() + ":" + auth.getPassword()).getBytes(StandardCharsets.UTF_8); + var basicHeader = "Basic " + Base64.getEncoder().encodeToString(basicBytes); + headerList.add(new BasicHeader("Authorization", basicHeader)); + } + } + return headerList.toArray(new Header[headerList.size()]); + } + + /** + * Pings the Elasticsearch server for "up" check, and configuration validation + * + * @return true if connection was successful + */ + public boolean checkConnection() { + log.info("checking elasticsearch connection"); + try { + final var info = client.info(RequestOptions.DEFAULT); + log.info("checked elasticsearch connection: {}, node-name: {}, version: {}", info.getClusterName(), info.getNodeName(), info.getVersion()); + return true; + } catch (ApiException e) { + log.error("failed to ping elasticsearch", unwrappedApiException("failed write operation", e)); + return false; + } catch (Exception e) { + log.error("unknown exception while pinging elasticsearch server", e); + return false; + } + } + + /** + * Shutdown the connection to the Elasticsearch server + */ + public void close() throws IOException { + this.client.close(); + } + + /** + * Unwraps a rest client ApiException, so we can log the details + * + * @param message message to add to the log entry + * @param e source ApiException + * @return a new RuntimeException with the ApiException as the source + */ + private RuntimeException unwrappedApiException(String message, ApiException e) { + log.error(message); + if (Objects.isNull(e) || Objects.isNull(e.error())) { + log.error("unknown ApiException"); + return new RuntimeException(e); + } + if (ElasticsearchError.class.isAssignableFrom(e.error().getClass())) { + ElasticsearchError esException = ((ElasticsearchError) e.error()); + String errorMessage = String.format("ElasticsearchError: status:%s, error:%s", esException.status(), esException.error().toString()); + return new RuntimeException(errorMessage); + } + return new RuntimeException(e); + } + + /** + * Gets mappings (metadata for fields) from Elasticsearch cluster for given indices + * + * @param indices A list of indices for which the mapping is required + * @return String to MappingMetadata as a native Java Map + * @throws IOException throws IOException if Elasticsearch request fails + */ + public Map getMappings(final List indices) throws IOException { + GetMappingsRequest request = new GetMappingsRequest(); + String[] copiedIndices = indices.toArray(String[]::new); + request.indices(copiedIndices); + GetMappingsResponse getMappingResponse = client.indices().getMapping(request, RequestOptions.DEFAULT); + return getMappingResponse.mappings(); + } + + /** + * Gets all mappings (metadata for fields) from Elasticsearch cluster + * + * @return String to MappingMetadata as a native Java Map + * @throws IOException throws IOException if Elasticsearch request fails + */ + public Map getAllMappings() throws IOException { + // Need to exclude system mappings + GetMappingsRequest request = new GetMappingsRequest(); + GetMappingsResponse getMappingResponse = client.indices().getMapping(request, RequestOptions.DEFAULT); + return getMappingResponse.mappings(); + } + + /** + * Returns a list of all records, without the metadata in JsonNode format Uses scroll API for + * pagination + * + * @param index index name in Elasticsearch cluster + * @return list of documents + * @throws IOException throws IOException if Elasticsearch request fails + */ + public List getRecords(String index) throws IOException { + final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(MAX_HITS); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.scroll(scroll); + searchRequest.source(searchSourceBuilder); + + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + String scrollId = searchResponse.getScrollId(); + log.info("Running scroll query with scrollId {}", scrollId); + SearchHit[] searchHits = searchResponse.getHits().getHits(); + List data = new ArrayList<>(); + + while (searchHits != null && searchHits.length > 0) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); + scrollRequest.scroll(scroll); + searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT); + scrollId = searchResponse.getScrollId(); + + for (SearchHit hit : searchHits) { + data.add(mapper.convertValue(hit, JsonNode.class).get("sourceAsMap")); + + } + searchHits = searchResponse.getHits().getHits(); + } + + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + boolean succeeded = clearScrollResponse.isSucceeded(); + if (succeeded) { + log.info("scroll response cleared successfully"); + } else { + log.error("failed to clear scroll response"); + } + + return data; + } + + /** + * Returns a list of user defined indices, with system indices exclusions made using regex variable + * ALL_INDICES_QUERY + * + * @return indices list + * @throws IOException throws IOException if Elasticsearch request fails + */ + public List userIndices() throws IOException { + GetIndexRequest request = new GetIndexRequest(ElasticsearchConstants.ALL_INDICES_QUERY); + GetIndexResponse response = this.client.indices().get(request, RequestOptions.DEFAULT); + List indices = Arrays.asList(response.getIndices()); + Pattern pattern = Pattern.compile(ElasticsearchConstants.REGEX_FOR_USER_INDICES_ONLY); + indices = indices.stream().filter(pattern.asPredicate().negate()).toList(); + return indices; + } + + /** + * Returns a list of all indices including Elasticsearch system indices + * + * @return indices list + * @throws IOException throws IOException if Elasticsearch request fails + */ + public List allIndices() throws IOException { + GetIndexRequest request = new GetIndexRequest(ElasticsearchConstants.ALL_INDICES_QUERY); + GetIndexResponse response = this.client.indices().get(request, RequestOptions.DEFAULT); + return Arrays.asList(response.getIndices()); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConstants.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConstants.java new file mode 100644 index 000000000000..20ae43b18573 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchConstants.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +public class ElasticsearchConstants { + + private ElasticsearchConstants() {} + + public static final String REGEX_FOR_USER_INDICES_ONLY = "(^\\.)|(metrics-endpoint.metadata_current_default)"; + public static final String ALL_INDICES_QUERY = "*"; + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchInclusions.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchInclusions.java new file mode 100644 index 000000000000..07280a29ece4 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchInclusions.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import java.util.List; + +public class ElasticsearchInclusions { + + private static final String type = "type"; + private static final String properties = "properties"; + public static final List KEEP_LIST = List.of(type, properties); + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSource.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSource.java new file mode 100644 index 000000000000..6ee53db648e7 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSource.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import static io.airbyte.integrations.source.elasticsearch.typemapper.ElasticsearchTypeMapper.formatJSONSchema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; +import io.airbyte.protocol.models.*; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.io.IOException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchSource extends BaseConnector implements Source { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSource.class); + private final ObjectMapper mapper = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + final var Source = new ElasticsearchSource(); + LOGGER.info("starting Source: {}", ElasticsearchSource.class); + new IntegrationRunner(Source).run(args); + LOGGER.info("completed Source: {}", ElasticsearchSource.class); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + final ConnectorConfiguration configObject = convertConfig(config); + if (Objects.isNull(configObject.getEndpoint())) { + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("endpoint must not be empty"); + } + if (!configObject.getAuthenticationMethod().isValid()) { + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("authentication options are invalid"); + } + + final ElasticsearchConnection connection = new ElasticsearchConnection(configObject); + final var result = connection.checkConnection(); + try { + connection.close(); + } catch (IOException e) { + LOGGER.warn("failed while closing connection", e); + } + if (result) { + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } else { + return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("failed to ping elasticsearch"); + } + } + + @Override + public AirbyteCatalog discover(JsonNode config) throws Exception { + final ConnectorConfiguration configObject = convertConfig(config); + final ElasticsearchConnection connection = new ElasticsearchConnection(configObject); + final var indices = connection.userIndices(); + final var mappings = connection.getMappings(indices); + + List streams = new ArrayList<>(); + + for (var index : indices) { + JsonNode JSONSchema = mapper.convertValue(mappings.get(index).sourceAsMap(), JsonNode.class); + JsonNode formattedJSONSchema = formatJSONSchema(JSONSchema); + AirbyteStream stream = new AirbyteStream(); + stream.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + stream.setName(index); + stream.setJsonSchema(formattedJSONSchema); + streams.add(stream); + } + try { + connection.close(); + } catch (IOException e) { + LOGGER.warn("failed while closing connection", e); + } + return new AirbyteCatalog().withStreams(streams); + } + + @Override + public AutoCloseableIterator read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) { + final ConnectorConfiguration configObject = convertConfig(config); + final ElasticsearchConnection connection = new ElasticsearchConnection(configObject); + final List> iteratorList = new ArrayList<>(); + + catalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .forEach(stream -> { + AutoCloseableIterator data = ElasticsearchUtils.getDataIterator(connection, stream); + AutoCloseableIterator messageIterator = ElasticsearchUtils.getMessageIterator(data, stream.getName()); + iteratorList.add(messageIterator); + }); + return AutoCloseableIterators + .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> { + LOGGER.info("Closing server connection."); + connection.close(); + LOGGER.info("Closed server connection."); + }); + } + + private ConnectorConfiguration convertConfig(JsonNode config) { + return mapper.convertValue(config, ConnectorConfiguration.class); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchUtils.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchUtils.java new file mode 100644 index 000000000000..124ee54ae28a --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchUtils.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import java.time.Instant; +import java.util.List; + +public class ElasticsearchUtils { + + public static AutoCloseableIterator getDataIterator(final ElasticsearchConnection connection, + final AirbyteStream stream) { + return AutoCloseableIterators.lazyIterator(() -> { + try { + List data = connection.getRecords(stream.getName()); + return AutoCloseableIterators.fromIterator(data.iterator()); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + } + + public static AutoCloseableIterator getMessageIterator(final AutoCloseableIterator recordIterator, + final String streamName) { + return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(r))); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/UnsupportedDatatypeException.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/UnsupportedDatatypeException.java new file mode 100644 index 000000000000..8e551ef2cda5 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/UnsupportedDatatypeException.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +public class UnsupportedDatatypeException extends Exception { + + public UnsupportedDatatypeException(String message) { + super(message); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/typemapper/ElasticsearchTypeMapper.java b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/typemapper/ElasticsearchTypeMapper.java new file mode 100644 index 000000000000..8f014ab941d4 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/java/io/airbyte/integrations/source/elasticsearch/typemapper/ElasticsearchTypeMapper.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch.typemapper; + +import static io.airbyte.integrations.source.elasticsearch.ElasticsearchInclusions.KEEP_LIST; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.integrations.source.elasticsearch.UnsupportedDatatypeException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class ElasticsearchTypeMapper { + + private static final ObjectMapper mapper = new ObjectMapper(); + /* + * Mapping from elasticsearch to Airbyte types Elasticsearch data types: + * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html Airbyte data + * types: https://docs.airbyte.com/understanding-airbyte/supported-data-types/ + * + * In Elasticsearch, there is no dedicated array data type. Any field can contain zero or more + * values by default, however, all values in the array must be of the same data type + */ + private static final Map ElasticSearchToAirbyte = new HashMap<>() { + + { + + // BINARY + put("binary", Arrays.asList("string", "array")); + + // BOOLEAN + put("boolean", Arrays.asList("boolean", "array")); + + // KEYWORD FAMILY + put("keyword", Arrays.asList("string", "array", "number", "integer")); + put("constant_keyword", Arrays.asList("string", "array", "number", "integer")); + put("wildcard", Arrays.asList("string", "array", "number", "integer")); + + // NUMBERS + put("long", Arrays.asList("integer", "array")); + put("unsigned_long", Arrays.asList("integer", "array")); + put("integer", Arrays.asList("integer", "array")); + put("short", Arrays.asList("integer", "array")); + put("byte", Arrays.asList("integer", "array")); + put("double", Arrays.asList("number", "array")); + put("float", Arrays.asList("number", "array")); + put("half_float", Arrays.asList("number", "array")); + put("scaled_float", Arrays.asList("number", "array")); + + // ALIAS + /* Writes to alias field not supported by ES. Can be safely ignored */ + + // DATES + put("date", Arrays.asList("string", "array")); + put("date_nanos", Arrays.asList("number", "array")); + + // OBJECTS AND RELATIONAL TYPES + put("object", Arrays.asList("object", "array")); + put("flattened", Arrays.asList("object", "array")); + put("nested", Arrays.asList("object", "string")); + put("join", Arrays.asList("object", "string")); + + // STRUCTURED DATA TYPES + put("integer_range", Arrays.asList("object", "array")); + put("float_range", Arrays.asList("object", "array")); + put("long_range", Arrays.asList("object", "array")); + put("double_range", Arrays.asList("object", "array")); + put("date_range", Arrays.asList("object", "array")); + put("ip_range", Arrays.asList("object", "array")); + put("ip", Arrays.asList("string", "array")); + put("version", Arrays.asList("string", "array")); + put("murmur3", Arrays.asList("object", "array")); + + // AGGREGATE METRIC FIELD TYPES + put("aggregate_metric_double", Arrays.asList("object", "array")); + put("histogram", Arrays.asList("object", "array")); + + // TEXT SEARCH TYPES + put("text", Arrays.asList("string", "array")); + put("alias", Arrays.asList("string", "array")); + put("search_as_you_type", Arrays.asList("string", "array")); + put("token_count", Arrays.asList("integer", "array")); + + // DOCUMENT RANKING + put("dense_vector", "array"); + // put("rank_feature", "integer"); THEY ARE PUTTING OBJECTS HERE AS WELL???? + + // SPATIAL DATA TYPES (HARD TO HANDLE AS QUERYING MECHANISM IS BASED ON SHAPE, which has multiple + // fields) + put("geo_point", Arrays.asList("object", "array")); + put("geo_shape", Arrays.asList("object", "array")); + put("shape", Arrays.asList("object", "array")); + put("point", Arrays.asList("object", "array")); + } + + }; + + public static Map getMapper() { + return ElasticSearchToAirbyte; + } + + /** + * @param node JsonNode node which we want to format + * @return JsonNode + * @throws UnsupportedDatatypeException throws an exception if none of the types match + */ + public static JsonNode formatJSONSchema(JsonNode node) throws UnsupportedDatatypeException { + if (node.isObject()) { + if (!node.has("type") || node.has("properties")) { + ((ObjectNode) node).put("type", "object"); + } else if (node.has("type") && node.get("type").getNodeType() == JsonNodeType.STRING) { + retainAirbyteFieldsOnly(node); + + final String nodeType = node.get("type").textValue(); + + if (ElasticSearchToAirbyte.containsKey(nodeType)) { + ((ObjectNode) node).remove("type"); + ((ObjectNode) node).set("type", mapper.valueToTree(ElasticSearchToAirbyte.get(nodeType))); + } else + throw new UnsupportedDatatypeException("Cannot map unsupported data type to Airbyte data type: " + node.get("type").textValue()); + } + node.fields().forEachRemaining(entry -> { + try { + formatJSONSchema(entry.getValue()); + } catch (UnsupportedDatatypeException e) { + throw new RuntimeException(e); + } + }); + if (node.path("properties").path("type").getNodeType() == JsonNodeType.STRING) { + ((ObjectNode) node.path("properties")).remove("type"); + } else if (node.has("properties")) { + ((ObjectNode) node).set("type", mapper.valueToTree(Arrays.asList("array", "object"))); + } + } else if (node.isArray()) { + ArrayNode arrayNode = (ArrayNode) node; + Iterator temp = arrayNode.elements(); + while (temp.hasNext()) { + formatJSONSchema(temp.next()); + } + } + return node; + } + + private static void retainAirbyteFieldsOnly(JsonNode jsonNode) { + if (jsonNode instanceof ObjectNode) { + ((ObjectNode) jsonNode).retain(KEEP_LIST); + } + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/log4j2-test.example.yml b/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/log4j2-test.example.yml new file mode 100644 index 000000000000..bbff489a9242 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/log4j2-test.example.yml @@ -0,0 +1,33 @@ +Configuration: + status: warn + name: DefaultLog4j2Config + thresholdFilter: + level: debug + appenders: + Console: + name: STDOUT + target: SYSTEM_OUT + PatternLayout: + Pattern: "[%-6p] %c{3}.%M(%F:%L) – %m%n" + + Loggers: + logger: + - name: org.apache.http.wire + level: trace + additivity: false + AppenderRef: + ref: STDOUT + - name: co.elastic + level: debug + additivity: false + AppenderRef: + ref: STDOUT + - name: io.airbyte.integrations + level: debug + additivity: false + AppenderRef: + ref: STDOUT + Root: + level: info + AppenderRef: + ref: STDOUT diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/spec.json b/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/spec.json new file mode 100644 index 000000000000..a2b88dfbf374 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/main/resources/spec.json @@ -0,0 +1,82 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/source/elasticsearch", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Elasticsearch Connection Configuration", + "type": "object", + "required": ["endpoint"], + "additionalProperties": false, + "properties": { + "endpoint": { + "title": "Server Endpoint", + "type": "string", + "description": "The full url of the Elasticsearch server" + }, + "authenticationMethod": { + "title": "Authentication Method", + "type": "object", + "description": "The type of authentication to be used", + "oneOf": [ + { + "title": "None", + "additionalProperties": false, + "description": "No authentication will be used", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "none" + } + } + }, + { + "title": "Api Key/Secret", + "additionalProperties": false, + "description": "Use a api key and secret combination to authenticate", + "required": ["method", "apiKeyId", "apiKeySecret"], + "properties": { + "method": { + "type": "string", + "const": "secret" + }, + "apiKeyId": { + "title": "API Key ID", + "description": "The Key ID to used when accessing an enterprise Elasticsearch instance.", + "type": "string" + }, + "apiKeySecret": { + "title": "API Key Secret", + "description": "The secret associated with the API Key ID.", + "type": "string", + "airbyte_secret": true + } + } + }, + { + "title": "Username/Password", + "additionalProperties": false, + "description": "Basic auth header with a username and password", + "required": ["method", "username", "password"], + "properties": { + "method": { + "type": "string", + "const": "basic" + }, + "username": { + "title": "Username", + "description": "Basic auth username to access a secure Elasticsearch server", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Basic auth password to access a secure Elasticsearch server", + "type": "string", + "airbyte_secret": true + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourceAcceptanceTest.java new file mode 100644 index 000000000000..ab59281fef1a --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourceAcceptanceTest.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.*; +import java.io.IOException; +import java.time.Duration; +import java.util.Date; +import java.util.HashMap; +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public class ElasticsearchSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSourceAcceptanceTest.class); + private static final ObjectMapper mapper = MoreMappers.initMapper(); + private static final String index = "sample"; + private static ElasticsearchContainer container; + private RestHighLevelClient client; + private JsonNode config; + + @Override + protected String getImageName() { + return "airbyte/source-elasticsearch:dev"; + } + + @Override + protected JsonNode getConfig() { + var configJson = mapper.createObjectNode(); + configJson.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))); + return configJson; + } + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") + .withEnv("ES_JAVA_OPTS", "-Xms512m -Xms512m") + .withEnv("discovery.type", "single-node") + .withEnv("network.host", "0.0.0.0") + .withEnv("logger.org.elasticsearch", "INFO") + .withEnv("ingest.geoip.downloader.enabled", "false") + .withEnv("xpack.security.enabled", "false") + .withExposedPorts(9200) + .withStartupTimeout(Duration.ofSeconds(60)); + container.start(); + getRestHighLevelClient(container); + createIndex(client); + addDocument(client); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) throws Exception { + client.close(); + container.stop(); + container.close(); + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws IOException { + return Jsons.deserialize(MoreResources.readResource("configured_catalog.json"), ConfiguredAirbyteCatalog.class); + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + + private void getRestHighLevelClient(ElasticsearchContainer container) { + RestClientBuilder restClientBuilder = + RestClient.builder(HttpHost.create(container.getHttpHostAddress())).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder); + client = new RestHighLevelClient(restClientBuilder); + } + + private void createIndex(final RestHighLevelClient client) throws IOException { + CreateIndexRequest request = new CreateIndexRequest(index); + CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); + if (createIndexResponse.isAcknowledged()) { + LOGGER.info("Successfully created index: {}", index); + } + } + + private void addDocument(final RestHighLevelClient client) throws IOException { + IndexRequest indexRequest = new IndexRequest(index) + .id("1") + .source("user", "kimchy", + "postDate", new Date(), + "message", "trying out Elasticsearch"); + IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); + LOGGER.info("Index response status: {}", indexResponse.status()); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/resources/configured_catalog.json b/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/resources/configured_catalog.json new file mode 100644 index 000000000000..ddbddcdb772e --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test-integration/resources/configured_catalog.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "sample", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "default_cursor_field": [], + "source_defined_primary_key": [] + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourcesTest.java b/airbyte-integrations/connectors/source-elasticsearch/src/test/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourcesTest.java new file mode 100644 index 000000000000..8e76ff27fb68 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/java/io/airbyte/integrations/source/elasticsearch/ElasticsearchSourcesTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.elasticsearch; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.source.elasticsearch.typemapper.ElasticsearchTypeMapper; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.io.IOException; +import java.util.*; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class ElasticsearchSourcesTest { + + @Test + @DisplayName("Spec should match") + public void specShouldMatch() throws Exception { + final ConnectorSpecification actual = new ElasticsearchSource().spec(); + final ConnectorSpecification expected = Jsons.deserialize( + MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class); + assertEquals(expected, actual); + } + + @Test + @DisplayName("Actual mapper keyset should contain expected keyset") + public void actualMapperKeySetShouldContainExpectedKeySet() { + final Set expectedKeySet = new HashSet<>(Arrays.asList( + "binary", "boolean", "keyword", "constant_keyword", + "wildcard", "long", "unsigned_long", + "integer", "short", "byte", "double", "float", + "half_float", "scaled_float", "date", "date_nanos", "ip", + "text", "geo_point", "geo_shape", "shape", "point")); + Set actualKeySet = new HashSet<>(ElasticsearchTypeMapper.getMapper().keySet()); + + assertTrue(actualKeySet.containsAll(expectedKeySet)); + } + + @Test + @DisplayName("Formatter should transform objects conforming to airbyte spec") + public void testFormatter() throws IOException, UnsupportedDatatypeException { + final JsonNode input = Jsons.deserialize( + MoreResources.readResource("sample_input.json"), JsonNode.class); + final JsonNode expectedOutput = Jsons.deserialize( + MoreResources.readResource("expected_output.json"), JsonNode.class); + JsonNode actualOutput = ElasticsearchTypeMapper.formatJSONSchema(input); + assertEquals(expectedOutput, actualOutput); + } + + @Test + @DisplayName("Formatter should remove extra fields") + public void testFormatterRemovals() throws IOException, UnsupportedDatatypeException { + final JsonNode input = Jsons.deserialize( + MoreResources.readResource("sample_input_extra_fields.json"), JsonNode.class); + final JsonNode expectedOutput = Jsons.deserialize( + MoreResources.readResource("expected_output_extra_fields.json"), JsonNode.class); + JsonNode actualOutput = ElasticsearchTypeMapper.formatJSONSchema(input); + assertEquals(expectedOutput, actualOutput); + } + +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output.json b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output.json new file mode 100644 index 000000000000..193566f0fc22 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output.json @@ -0,0 +1,37 @@ +{ + "properties": { + "nestedObject": { + "type": "object", + "properties": { + "binary": { "type": ["string", "array"] }, + "boolean": { "type": ["boolean", "array"] }, + "keyword": { "type": ["string", "array", "number", "integer"] }, + "constant_keyword": { + "type": ["string", "array", "number", "integer"] + }, + "wildcard": { "type": ["string", "array", "number", "integer"] }, + "long": { "type": ["integer", "array"] }, + "unsigned_long": { "type": ["integer", "array"] }, + "integer": { "type": ["integer", "array"] }, + "short": { "type": ["integer", "array"] }, + "byte": { "type": ["integer", "array"] }, + "double": { "type": ["number", "array"] }, + "float": { "type": ["number", "array"] }, + "half_float": { "type": ["number", "array"] }, + "scaled_float": { "type": ["number", "array"] }, + "date": { "type": ["string", "array"] }, + "date_nanos": { "type": ["number", "array"] }, + "object": { "type": ["object", "array"] }, + "flattened": { "type": ["object", "array"] }, + "nested": { "type": ["object", "string"] }, + "join": { "type": ["object", "string"] }, + "text": { "type": ["string", "array"] }, + "geo_point": { "type": ["object", "array"] }, + "geo_shape": { "type": ["object", "array"] }, + "shape": { "type": ["object", "array"] }, + "point": { "type": ["object", "array"] } + } + } + }, + "type": "object" +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output_extra_fields.json b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output_extra_fields.json new file mode 100644 index 000000000000..7d9c9fde1440 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_output_extra_fields.json @@ -0,0 +1,8 @@ +{ + "properties": { + "product_name": { + "type": ["string", "array"] + } + }, + "type": "object" +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_spec.json new file mode 100644 index 000000000000..a2b88dfbf374 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/expected_spec.json @@ -0,0 +1,82 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/source/elasticsearch", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Elasticsearch Connection Configuration", + "type": "object", + "required": ["endpoint"], + "additionalProperties": false, + "properties": { + "endpoint": { + "title": "Server Endpoint", + "type": "string", + "description": "The full url of the Elasticsearch server" + }, + "authenticationMethod": { + "title": "Authentication Method", + "type": "object", + "description": "The type of authentication to be used", + "oneOf": [ + { + "title": "None", + "additionalProperties": false, + "description": "No authentication will be used", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "none" + } + } + }, + { + "title": "Api Key/Secret", + "additionalProperties": false, + "description": "Use a api key and secret combination to authenticate", + "required": ["method", "apiKeyId", "apiKeySecret"], + "properties": { + "method": { + "type": "string", + "const": "secret" + }, + "apiKeyId": { + "title": "API Key ID", + "description": "The Key ID to used when accessing an enterprise Elasticsearch instance.", + "type": "string" + }, + "apiKeySecret": { + "title": "API Key Secret", + "description": "The secret associated with the API Key ID.", + "type": "string", + "airbyte_secret": true + } + } + }, + { + "title": "Username/Password", + "additionalProperties": false, + "description": "Basic auth header with a username and password", + "required": ["method", "username", "password"], + "properties": { + "method": { + "type": "string", + "const": "basic" + }, + "username": { + "title": "Username", + "description": "Basic auth username to access a secure Elasticsearch server", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Basic auth password to access a secure Elasticsearch server", + "type": "string", + "airbyte_secret": true + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input.json b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input.json new file mode 100644 index 000000000000..3bbf5c83cccd --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input.json @@ -0,0 +1,34 @@ +{ + "properties": { + "nestedObject": { + "type": "nested", + "properties": { + "binary": { "type": "binary" }, + "boolean": { "type": "boolean" }, + "keyword": { "type": "keyword" }, + "constant_keyword": { "type": "constant_keyword" }, + "wildcard": { "type": "wildcard" }, + "long": { "type": "long" }, + "unsigned_long": { "type": "unsigned_long" }, + "integer": { "type": "integer" }, + "short": { "type": "short" }, + "byte": { "type": "byte" }, + "double": { "type": "double" }, + "float": { "type": "float" }, + "half_float": { "type": "half_float" }, + "scaled_float": { "type": "scaled_float" }, + "date": { "type": "date" }, + "date_nanos": { "type": "date_nanos" }, + "object": { "type": "object" }, + "flattened": { "type": "flattened" }, + "nested": { "type": "nested" }, + "join": { "type": "join" }, + "text": { "type": "text" }, + "geo_point": { "type": "geo_point" }, + "geo_shape": { "type": "geo_shape" }, + "shape": { "type": "shape" }, + "point": { "type": "point" } + } + } + } +} diff --git a/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input_extra_fields.json b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input_extra_fields.json new file mode 100644 index 000000000000..e828358f1190 --- /dev/null +++ b/airbyte-integrations/connectors/source-elasticsearch/src/test/resources/sample_input_extra_fields.json @@ -0,0 +1,13 @@ +{ + "properties": { + "product_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + }, + "analyzer": "english" + } + } +} diff --git a/docs/integrations/sources/elasticsearch.md b/docs/integrations/sources/elasticsearch.md new file mode 100644 index 000000000000..940c11546f4d --- /dev/null +++ b/docs/integrations/sources/elasticsearch.md @@ -0,0 +1,87 @@ +# Elasticsearch Source + +## Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental Sync | No | | + +This source syncs data from an ElasticSearch domain. + +## Supported Tables + +This source automatically discovers all indices in the domain and can sync any of them. + +## Getting Started \(Airbyte Open-Source\) + +#### Requirements + +* Elasticsearch endpoint URL +* Elasticsearch credentials (optional) + +### Performance Considerations + +ElasticSearch calls may be rate limited by the underlying service. +This is specific to each deployment. + +#### Data type mapping + +Elasticsearch data types: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html + +Airbyte data types: https://docs.airbyte.com/understanding-airbyte/supported-data-types/ + +In Elasticsearch, there is no dedicated array data type. +Any field can contain zero or more values by default, however, +all values in the array must be of the same data type. Hence, every field can be an array as well. + +| Integration Type | Airbyte Type | Notes | +|:--------------------------|:-------------------------------------------|:-----| +| `binary` | `["string", "array"]` | | +| `boolean` | `["boolean", "array"]` | | +| `keyword` | `["string", "array", "number", "integer"]` | | +| `constant_keyword` | `["string", "array", "number", "integer"]` | | +| `wildcard` | `["string", "array", "number", "integer"]` | | +| `long` | `["integer", "array"]` | | +| `unsigned_long` | `["integer", "array"]` | | +| `integer` | `["integer", "array"]` | | +| `short` | `["integer", "array"]` | | +| `byte` | `["integer", "array"]` | | +| `double` | `["number", "array"]` | | +| `float` | `["number", "array"]` | | +| `half_float` | `["number", "array"]` | | +| `scaled_float` | `["number", "array"]` | | +| `date` | `["string", "array"]` | | +| `date_nanos` | `["number", "array"]` | | +| `object` | `["object", "array"]` | | +| `flattened` | `["object", "array"]` | | +| `nested` | `["object", "string"]` | | +| `join` | `["object", "string"]` | | +| `integer_range` | `["object", "array"]` | | +| `float_range` | `["object", "array"]` | | +| `long_range` | `["object", "array"]` | | +| `double_range` | `["object", "array"]` | | +| `date_range` | `["object", "array"]` | | +| `ip_range` | `["object", "array"]` | | +| `ip` | `["string", "array"]` | | +| `version` | `["string", "array"]` | | +| `murmur3` | `["string", "array", "number", "integer"]` | | +| `aggregate_metric_double` | `["string", "array", "number", "integer"]` | | +| `histogram` | `["string", "array", "number", "integer"]` | | +| `text` | `["string", "array", "number", "integer"]` | | +| `alias` | `["string", "array", "number", "integer"]` | | +| `search_as_you_type` | `["string", "array", "number", "integer"]` | | +| `token_count` | `["string", "array", "number", "integer"]` | | +| `dense_vector` | `["string", "array", "number", "integer"]` | | +| `geo_point` | `["string", "array", "number", "integer"]` | | +| `geo_shape` | `["string", "array", "number", "integer"]` | | +| `shape` | `["string", "array", "number", "integer"]` | | +| `point` | `["string", "array", "number", "integer"]` | | + + + + +## Changelog +| Version | Date | Pull Request | Subject | +|:--------| :--- | :--- | :--- | +| `0.1.0` | 2022-07-12 | [14118](https://github.com/airbytehq/airbyte/pull/14118) | Initial Release |