Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source connector tutorial #1428

Merged
merged 18 commits into from
Dec 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions airbyte-integrations/connector-templates/generator/plopfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@
const fs = require('fs');
const path = require('path');

const getSuccessMessage = function(connectorName, outputPath){
return `
🚀 🚀 🚀 🚀 🚀 🚀

Success!

Your ${connectorName} connector has been created at ${path.resolve(outputPath)}.

Follow instructions in NEW_SOURCE_CHECKLIST.md to finish your connector.

Questions, comments, or concerns? Let us know at:
Slack: https://slack.airbyte.io
Github: https://github.com/airbytehq/airbyte

We're always happy to provide you with any support :)
`
}

module.exports = function (plop) {
const pythonSourceInputRoot = '../source-python';
const singerSourceInputRoot = '../source-singer';
Expand All @@ -13,6 +31,10 @@ module.exports = function (plop) {
const singerSourceOutputRoot = `${outputDir}/source-{{dashCase name}}-singer`;
const genericSourceOutputRoot = `${outputDir}/source-{{dashCase name}}`;

plop.setActionType('emitSuccess', function(answers, config, plopApi){
console.log(getSuccessMessage(answers.name, plopApi.renderString(config.outputPath, answers)));
});

plop.setGenerator('Python Source', {
description: 'Generate an Airbyte Source written in Python',
prompts: [{type: 'input', name: 'name', message: 'Source name, without the "source-" prefix e.g: "google-analytics"'}],
Expand Down Expand Up @@ -49,7 +71,7 @@ module.exports = function (plop) {
fs.symlinkSync(`${basesDir}/base-python/base_python`, `${renderedOutputDir}/base_python`);
fs.symlinkSync(`${basesDir}/airbyte-protocol/airbyte_protocol`, `${renderedOutputDir}/airbyte_protocol`);
},
'Your new Python source connector has been created. Follow the instructions and TODOs in the newly created package for next steps. Happy coding! 🐍🐍',]
{type: 'emitSuccess', outputPath: pythonSourceOutputRoot}]
});

plop.setGenerator('Singer-based Python Source', {
Expand Down Expand Up @@ -88,7 +110,7 @@ module.exports = function (plop) {
fs.symlinkSync(`${basesDir}/airbyte-protocol/airbyte_protocol`, `${renderedOutputDir}/airbyte_protocol`);
fs.symlinkSync(`${basesDir}/base-singer/base_singer`, `${renderedOutputDir}/base_singer`);
},
'Your new Singer-based source connector has been created. Follow the instructions and TODOs in the newly created package for next steps. Happy coding! 🐍🐍',
{type: 'emitSuccess', outputPath: singerSourceOutputRoot},
]
});

Expand All @@ -110,7 +132,9 @@ module.exports = function (plop) {
templateFile: `${genericSourceInputRoot}/.gitignore.hbs`,
path: `${genericSourceOutputRoot}/.gitignore`
},
'Your new connector package has been created. Follow the instructions and TODOs in the newly created package for next steps. Happy coding! 🚀',
{type: 'emitSuccess', outputPath: genericSourceOutputRoot}
]
});


};
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NEW_SOURCE_CHECKLIST.md
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/source-stock-ticker-api/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3.7-slim

WORKDIR /airbyte/integration_code
RUN pip install requests
COPY source.py .
COPY spec.json .

ENTRYPOINT ["python", "/airbyte/integration_code/source.py"]

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.name=airbyte/source-stock-ticker-api
LABEL io.airbyte.version=0.1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
// Makes building the docker image a dependency of Gradle's "build" command. This way you could run your entire build inside a docker image
// via ./gradlew :airbyte-integrations:connectors:source-stock-ticker-api:build
id 'airbyte-docker'
id 'airbyte-standard-source-test-file'
}

airbyteStandardSourceTestFile {
// All these input paths must live inside this connector's directory (or subdirectories)
configPath = "secrets/valid_config.json"
configuredCatalogPath = "fullrefresh_configured_catalog.json"
specPath = "spec.json"
}

dependencies {
implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"stream": {
"name": "stock_prices",
"supported_sync_modes": [
"full_refresh"
],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
},
"sync_mode": "full_refresh"
}
]
}
186 changes: 186 additions & 0 deletions airbyte-integrations/connectors/source-stock-ticker-api/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# source.py
import argparse # helps parse commandline arguments
import json
import sys
import os
import requests
import datetime


def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
stock_prices_stream = None
for configured_stream in catalog["streams"]:
if configured_stream["stream"]["name"] == "stock_prices":
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
api_key = config["api_key"]
stock_ticker = config["stock_ticker"]
response = _call_api(f"/stock/{stock_ticker}/chart/7d", api_key)
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling IEX API")
sys.exit(1)
else:
# Sort the stock prices ascending by date then output them one by one as AirbyteMessages
prices = sorted(response.json(), key=lambda record: datetime.datetime.strptime(record["date"], '%Y-%m-%d'))
for price in prices:
data = {"date": price["date"], "stock_ticker": price["symbol"], "price": price["close"]}
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000}
output_message = {"type": "RECORD", "record": record}
print(json.dumps(output_message))


def read_json(filepath):
with open(filepath, "r") as f:
return json.loads(f.read())


def _call_api(endpoint, token):
return requests.get("https://cloud.iexapis.com/v1/" + endpoint + "?token=" + token)


def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the price of the input stock ticker for the previous day
response = _call_api(endpoint="stock/" + config["stock_ticker"] + "/previous", token=config["api_key"])
if response.status_code == 200:
result = {"status": "SUCCEEDED"}
elif response.status_code == 403:
# HTTP code 403 means authorization failed so the API key is incorrect
result = {"status": "FAILED", "message": "API Key is incorrect."}
else:
# Consider any other code a "generic" failure and tell the user to make sure their config is correct.
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}

# Format the result of the check operation according to the Airbyte Specification
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
print(json.dumps(output_message))


def log(message):
log_json = {"type": "LOG", "log": message}
print(json.dumps(log_json))


def discover():
catalog = {
"streams": [{
"name": "stock_prices",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
}]
}
airbyte_message = {"type": "CATALOG", "catalog": catalog}
print(json.dumps(airbyte_message))


def get_input_file_path(path):
if os.path.isabs(path):
return path
else:
return os.path.join(os.getcwd(), path)


def spec():
# Read the file named spec.json from the module directory as a JSON file
current_script_directory = os.path.dirname(os.path.realpath(__file__))
spec_path = os.path.join(current_script_directory, "spec.json")
specification = read_json(spec_path)

# form an Airbyte Message containing the spec and print it to stdout
airbyte_message = {"type": "SPEC", "spec": specification}
# json.dumps converts the JSON (python dict) to a string
print(json.dumps(airbyte_message))


def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])

# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")

# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")

# Accept the read command
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument(
"--catalog", type=str, required=True, help="path to the catalog used to determine which data to read"
)

parsed_args = main_parser.parse_args(args)
command = parsed_args.command

if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
elif command == "discover":
discover()
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
read(config, configured_catalog)
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log("Invalid command. Allowable commands: [spec, check, discover, read]")
sys.exit(1)

# A zero exit code means the process successfully completed
sys.exit(0)


def main():
arguments = sys.argv[1:]
run(arguments)


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions airbyte-integrations/connectors/source-stock-ticker-api/spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"documentationUrl": "https://iexcloud.io/docs/api",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["stock_ticker", "api_key"],
"additionalProperties": false,
"properties": {
"stock_ticker": {
"type": "string",
"title": "Stock Ticker",
"description": "The stock ticker to track",
"examples": ["AAPL", "TSLA", "AMZN"]
},
"api_key": {
"title": "API Key",
"type": "string",
"description": "The IEX Cloud API key to use to hit the API.",
"airbyte_secret": true
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/.gitbook/assets/newsourcetutorial_plop.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [Tutorials](tutorials/README.md)
* [Postgres Replication](tutorials/postgres-replication.md)
* [Config & Persistence](tutorials/airbyte-config-persistence.md)
* [Building a toy connector](tutorials/toy-connector.md)
* [Changelog](changelog.md)
* [Releases](deploying-airbyte/releases.md)
* [Roadmap](roadmap.md)
Expand Down
13 changes: 5 additions & 8 deletions docs/contributing-to-airbyte/building-new-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ Each requirement has a subsection below.

### 1. Implement & package the connector

From the project root directory, build Airbyte locally:

```text
./gradlew build
```

If you are building a connector in any of the following languages/frameworks, then you're in luck! We provide autogenerated templates to get you started quickly:

* **Java Source Connector**
Expand All @@ -40,15 +34,18 @@ If your language/framework is not listed above, we have a minimal generic templa

#### Creating a connector from a template

From the `airbyte-integrations/connector-templates/generator` directory, run the interactive generator:
Run the interactive generator:

```text
cd airbyte-integrations/connector-templates/generator
npm install
npm run generate
```

and choose the relevant template. This will generate a new connector in the `airbyte-integrations/connectors/<your-connector>` directory.

Follow the instructions generated in the `NEW_SOURCE_CHECKLIST.md` file to complete the connector.
Follow the instructions generated in the `NEW_SOURCE_CHECKLIST.md` checklist to complete the connector.
The checklist contains helpful tips about how to implement your connector

### 2. Integration tests

Expand Down
Loading