Skip to content

Commit

Permalink
Revive REST API Source (#1103)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Nov 28, 2020
1 parent b98aeb7 commit 83fc3e5
Show file tree
Hide file tree
Showing 23 changed files with 107 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"sourceDefinitionId": "9845d17a-45f1-4070-8a60-50914b1c8e2b",
"name": "HTTP Request",
"dockerRepository": "airbyte/source-http-request",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-http-request"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -45,6 +46,7 @@ public class AirbyteProtocolConverters {

public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
final List<ConfiguredAirbyteStream> airbyteStreams = schema.getStreams().stream()
.filter(s -> s.getSelected() != null && s.getSelected())
.map(s -> new ConfiguredAirbyteStream()
// immutable
// todo (cgardens) - not great that we just trust the API to not mutate these.
Expand All @@ -53,16 +55,13 @@ public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
.withJsonSchema(toJson(s.getFields()))
.withSupportedSyncModes(s.getSupportedSyncModes()
.stream()
.map(e -> Enums.convertTo(e, io.airbyte.protocol.models.SyncMode.class))
.map(e -> Enums.convertTo(e, SyncMode.class))
.collect(Collectors.toList()))
.withSourceDefinedCursor(s.getSourceDefinedCursor())
.withDefaultCursorField(s.getDefaultCursorField()))
// configurable
.withSyncMode(Enums.convertTo(s.getSyncMode(), io.airbyte.protocol.models.SyncMode.class))
.withSyncMode(Enums.convertTo(s.getSyncMode(), SyncMode.class))
.withCursorField(s.getCursorField()))

// perform selection based on the output of toJson, which keeps properties if selected=true
.filter(s -> !s.getStream().getJsonSchema().get("properties").isEmpty())
.collect(Collectors.toList());
return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams);
}
Expand All @@ -81,7 +80,6 @@ public static JsonNode toJson(List<Field> fields) {
.put("type", "object")
.put("properties", fields
.stream()
.filter(Field::getSelected)
.collect(Collectors.toMap(
Field::getName,
field -> ImmutableMap.of("type", field.getDataType().toString().toLowerCase()))))
Expand All @@ -100,7 +98,8 @@ public static Schema toSchema(AirbyteCatalog catalog) {
.map(e -> Enums.convertTo(e, StandardSync.SyncMode.class))
.collect(Collectors.toList()))
.withSourceDefinedCursor(airbyteStream.getSourceDefinedCursor())
.withDefaultCursorField(airbyteStream.getDefaultCursorField()))
.withDefaultCursorField(airbyteStream.getDefaultCursorField())
.withSelected(true)) // by default all discovered streams are treated as default.
// configurable fields syncMode and cursorField are not set since they will never be defined in an
// AirbyteCatalog.
.collect(Collectors.toList()));
Expand All @@ -115,10 +114,13 @@ private static List<Field> toFields(JsonNode jsonSchemaPropertiesObject) {
list.add(iterator.next());
}

return list.stream().map(item -> new Field()
.withName(item.getKey())
.withDataType(getDataType(item.getValue()))
.withSelected(true)).collect(Collectors.toList());
return list
.stream()
.map(item -> new Field()
.withName(item.getKey())
.withDataType(getDataType(item.getValue()))
.withSelected(true))
.collect(Collectors.toList());
}

// todo (cgardens) - add more robust handling for jsonschema types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
Expand All @@ -38,6 +39,7 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import org.junit.jupiter.api.Test;

class AirbyteProtocolConvertersTest {
Expand Down Expand Up @@ -66,32 +68,30 @@ class AirbyteProtocolConvertersTest {

private static final Schema SCHEMA = new Schema()
.withStreams(Lists.newArrayList(new Stream()
.withSelected(true)
.withName(STREAM)
.withFields(Lists.newArrayList(
new io.airbyte.config.Field()
.withName(COLUMN_NAME)
.withDataType(DataType.STRING)
.withSelected(true),
.withDataType(DataType.STRING),
new io.airbyte.config.Field()
.withName(COLUMN_AGE)
.withDataType(DataType.NUMBER)
.withSelected(true)))
.withDataType(DataType.NUMBER)))
.withSourceDefinedCursor(false)
.withSupportedSyncModes(Lists.newArrayList(StandardSync.SyncMode.FULL_REFRESH, StandardSync.SyncMode.INCREMENTAL))
.withDefaultCursorField(Lists.newArrayList(COLUMN_AGE))));

private static final Schema SCHEMA_WITH_UNSELECTED = new Schema()
.withStreams(Lists.newArrayList(new Stream()
.withSelected(true)
.withName(STREAM)
.withFields(Lists.newArrayList(
new io.airbyte.config.Field()
.withName(COLUMN_NAME)
.withDataType(DataType.STRING)
.withSelected(true),
.withDataType(DataType.STRING),
new io.airbyte.config.Field()
.withName(COLUMN_AGE)
.withDataType(DataType.NUMBER)
.withSelected(true)))
.withDataType(DataType.NUMBER)))
.withSourceDefinedCursor(false)
.withSupportedSyncModes(Lists.newArrayList(StandardSync.SyncMode.FULL_REFRESH, StandardSync.SyncMode.INCREMENTAL))
.withDefaultCursorField(Lists.newArrayList(COLUMN_AGE)),
Expand All @@ -100,12 +100,10 @@ class AirbyteProtocolConvertersTest {
.withFields(Lists.newArrayList(
new io.airbyte.config.Field()
.withName(COLUMN_NAME)
.withDataType(DataType.STRING)
.withSelected(false),
.withDataType(DataType.STRING),
new io.airbyte.config.Field()
.withName(COLUMN_AGE)
.withDataType(DataType.NUMBER)
.withSelected(false)))));
.withDataType(DataType.NUMBER)))));

@Test
void testToConfiguredCatalog() {
Expand All @@ -127,7 +125,10 @@ void testToConfiguredCatalogWithUnselectedStream() {

@Test
void testToSchema() {
assertEquals(SCHEMA, AirbyteProtocolConverters.toSchema(CATALOG));
final Schema schema = Jsons.clone(SCHEMA);
schema.getStreams().forEach(table -> table.getFields().forEach(c -> c.setSelected(true))); // select all fields

assertEquals(schema, AirbyteProtocolConverters.toSchema(CATALOG));
}

@Test
Expand All @@ -145,7 +146,8 @@ void testToSchemaWithMultipleJsonSchemaTypesAndFormats() {
new io.airbyte.config.Field()
.withName(COLUMN_AGE)
.withDataType(DataType.NUMBER)
.withSelected(true)))));
.withSelected(true)))
.withSelected(true)));

assertEquals(schema, AirbyteProtocolConverters.toSchema(catalog));
}
Expand All @@ -162,12 +164,27 @@ void testAnyOfAsObject() {
new io.airbyte.config.Field()
.withName("date")
.withDataType(DataType.OBJECT)
.withSelected(true)))));
.withSelected(true)))
.withSelected(true)));

final AirbyteCatalog catalog = Jsons.deserialize(testString, AirbyteCatalog.class);
assertEquals(schema, AirbyteProtocolConverters.toSchema(catalog));
}

@Test
void testStreamWithNoFields() {
final Schema schema = Jsons.clone(SCHEMA);
schema.getStreams().get(0).withCursorField(Lists.newArrayList(COLUMN_NAME));
schema.getStreams().get(0).withSyncMode(StandardSync.SyncMode.INCREMENTAL);
schema.getStreams().get(0).setFields(Lists.newArrayList());
final ConfiguredAirbyteCatalog actualCatalog = AirbyteProtocolConverters.toConfiguredCatalog(schema);

final ConfiguredAirbyteCatalog expectedCatalog = Jsons.clone(CONFIGURED_CATALOG);
((ObjectNode) expectedCatalog.getStreams().get(0).getStream().getJsonSchema()).set("properties", Jsons.jsonNode(Collections.emptyMap()));

assertEquals(expectedCatalog, actualCatalog);
}

@Test
void testEnumConversion() {
assertTrue(Enums.isCompatible(io.airbyte.protocol.models.SyncMode.class, io.airbyte.config.StandardSync.SyncMode.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ FROM airbyte/integration-base-python:dev

RUN apt-get update && apt-get install -y jq curl bash && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="source_rest_api"
ENV AIRBYTE_IMPL_MODULE="source_rest_api"
ENV AIRBYTE_IMPL_PATH="SourceRestApi"
ENV CODE_PATH="source_http_request"
ENV AIRBYTE_IMPL_MODULE="source_http_request"
ENV AIRBYTE_IMPL_PATH="SourceHttpRequest"

WORKDIR /airbyte/integration_code
COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install ".[main]"

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-rest-api
LABEL io.airbyte.name=airbyte/source-http-request
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ RUN apt-get update && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="integration_tests"
ENV AIRBYTE_TEST_MODULE="integration_tests"
ENV AIRBYTE_TEST_PATH="SourceRestApiStandardTest"
ENV AIRBYTE_TEST_PATH="SourceHttpRequestStandardTest"
ENV AIRBYTE_TEST_CASE=true

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-rest-api-standard-test
LABEL io.airbyte.name=airbyte/source-http-request-standard-test

WORKDIR /airbyte/integration_code
COPY source_rest_api source_rest_api
COPY source_http_request source_http_request
COPY $CODE_PATH $CODE_PATH
COPY source_rest_api/*.json $CODE_PATH
COPY source_http_request/*.json $CODE_PATH
COPY setup.py ./

RUN pip install ".[tests]"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Rest Api Source
# HTTP Request Source

This is the repository for the Rest Api source connector, written in Python.
This is the repository for the HTTP Request source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/rest-api).

## Local development
### Build
First, build the module by running the following from the `airbyte` project root directory:
```
./gradlew :airbyte-integrations:connectors:source-rest-api:build
./gradlew :airbyte-integrations:connectors:source-http-request:build
```

This should generate a virtualenv for this module in `source-rest-api/.venv`. Make sure this venv is active in your
development environment of choice. If you are on the terminal, run the following from the `source-rest-api` directory:
This should generate a virtualenv for this module in `source-http-request/.venv`. Make sure this venv is active in your
development environment of choice. If you are on the terminal, run the following from the `source-http-request` directory:
```
cd airbyte-integrations/connectors/source-rest-api # cd into the connector directory
cd airbyte-integrations/connectors/source-http-request # cd into the connector directory
source .venv/bin/activate
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Expand All @@ -37,16 +37,16 @@ pytest unit_tests
### Locally running the connector docker image
```
# in airbyte root directory
./gradlew :airbyte-integrations:connectors:source-rest-api:airbyteDocker
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-rest-api:/sample_files airbyte/source-rest-api:dev spec
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-rest-api:/sample_files airbyte/source-rest-api:dev check --config /sample_files/sample_files/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-rest-api:/sample_files airbyte/source-rest-api:dev discover --config /sample_files/sample_files/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-rest-api:/sample_files airbyte/source-rest-api:dev read --config /sample_files/sample_files/config.json --catalog /sample_files/integration_tests/catalog.json
./gradlew :airbyte-integrations:connectors:source-http-request:airbyteDocker
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-http-request:/sample_files airbyte/source-http-request:dev spec
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-http-request:/sample_files airbyte/source-http-request:dev check --config /sample_files/sample_files/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-http-request:/sample_files airbyte/source-http-request:dev discover --config /sample_files/sample_files/config.json
docker run --rm -v $(pwd)/airbyte-integrations/connectors/source-http-request:/sample_files airbyte/source-http-request:dev read --config /sample_files/sample_files/config.json --catalog /sample_files/integration_tests/catalog.json
```

### Integration Tests
1. Configure credentials as appropriate, described below.
1. From the airbyte project root, run `./gradlew :airbyte-integrations:connectors:source-rest-api:standardSourceTestPython` to run the standard integration test suite.
1. From the airbyte project root, run `./gradlew :airbyte-integrations:connectors:source-http-request:standardSourceTestPython` to run the standard integration test suite.
1. To run additional integration tests, place your integration tests in the `integration_tests` directory and run them with `pytest integration_tests`.
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ plugins {
id 'java'
id 'airbyte-python'
id 'airbyte-docker'
// todo (cgardens) - bring back when we re-add this to the product.
// id 'airbyte-source-test'
id 'airbyte-source-test'
}

airbytePython {
moduleDirectory 'source_rest_api'
moduleDirectory 'source_http_request'
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
SOFTWARE.
"""

from .standard_source_test import SourceRestApiStandardTest
from .standard_source_test import SourceHttpRequestStandardTest

__all__ = ["SourceRestApiStandardTest"]
__all__ = ["SourceHttpRequestStandardTest"]
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from base_python_test import StandardSourceTestIface


class SourceRestApiStandardTest(StandardSourceTestIface):
class SourceHttpRequestStandardTest(StandardSourceTestIface):
def __init__(self):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import sys

from base_python.entrypoint import launch
from source_rest_api import SourceRestApi
from source_http_request import SourceHttpRequest

if __name__ == "__main__":
source = SourceRestApi()
source = SourceHttpRequest()
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from setuptools import find_packages, setup

setup(
name="source_rest_api",
description="Source implementation for Rest Api.",
name="source_http_request",
description="Source implementation for HTTP Request.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
SOFTWARE.
"""

from .source import SourceRestApi
from .source import SourceHttpRequest

__all__ = ["SourceRestApi"]
__all__ = ["SourceHttpRequest"]
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from base_python import AirbyteLogger, Source


class SourceRestApi(Source):
class SourceHttpRequest(Source):
STREAM_NAME = "data"

def __init__(self):
Expand All @@ -49,11 +49,13 @@ def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog:
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {},
# todo (cgardens) - remove data column. added to handle UI bug where streams without fields cannot be selected.
# issue: https://github.com/airbytehq/airbyte/issues/1104
"properties": {"data": {"type": "object"}},
}

# json body will be returned as the "data" stream". we can't know its schema ahead of time, so we assume it's object (i.e. valid json).
return AirbyteCatalog(streams=[AirbyteStream(name=SourceRestApi.STREAM_NAME, json_schema=json_schema)])
return AirbyteCatalog(streams=[AirbyteStream(name=SourceHttpRequest.STREAM_NAME, json_schema=json_schema)])

def read(self, logger: AirbyteLogger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
r = self._make_request(config_container.rendered_config)
Expand All @@ -63,8 +65,11 @@ def read(self, logger: AirbyteLogger, config_container, catalog_path, state=None
# need to eagerly fetch the json.
message = AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream=SourceRestApi.STREAM_NAME, data=r.json(), emitted_at=int(datetime.now().timestamp()) * 1000),
record=AirbyteRecordMessage(
stream=SourceHttpRequest.STREAM_NAME, data=r.json(), emitted_at=int(datetime.now().timestamp()) * 1000
),
)

return (m for m in [message])

def _make_request(self, config):
Expand Down
Loading

0 comments on commit 83fc3e5

Please sign in to comment.