Skip to content
This repository has been archived by the owner on Feb 16, 2023. It is now read-only.

Commit

Permalink
🐛 Source Qualaroo: Fix start_date & custom survey_ids (airbytehq#13121)
Browse files Browse the repository at this point in the history
* Fix survey_ids schema, fix epoch time start_date, add stream_slices

* Formatting

* Bump version

* Update tests

* Formatting

* Remove unused imports

* chore: bump version in source definitions

* chore: update seed file

Co-authored-by: Harshith Mullapudi <harshithmullapudi@gmail.com>
  • Loading branch information
2 people authored and Jordan Scott committed Jun 1, 2022
1 parent 7f4e123 commit 036a42a
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@
- name: Qualaroo
sourceDefinitionId: b08e4776-d1de-4e80-ab5c-1e51dad934a2
dockerRepository: airbyte/source-qualaroo
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/qualaroo
icon: qualaroo.svg
sourceType: api
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6796,7 +6796,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-qualaroo:0.1.1"
- dockerImage: "airbyte/source-qualaroo:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/qualaroo"
connectionSpecification:
Expand Down Expand Up @@ -6833,7 +6833,7 @@
type: "array"
items:
type: "string"
pattern: "^[0-9a-fA-F]{24}$"
pattern: "^[0-9]{1,8}$"
title: "Qualaroo survey IDs"
description: "IDs of the surveys from which you'd like to replicate data.\
\ If left empty, data from all surveys to which you have access will be\
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-qualaroo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ COPY source_qualaroo ./source_qualaroo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-qualaroo
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,45 @@
"type": ["null", "integer"]
},
"time": {
"type": ["null", "string"],
"type": ["null", "string"],
"format": "date-time"
},
"emitted_at": {
"type": ["null", "string"],
"format": "date-time"
},
"identity": {
"type": ["null", "string"]
},
"page": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"referrer": {
"type": ["null", "string"]
},
"user_agent": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"nudge_id": {
"type": ["null", "integer"]
},
"nudge_name": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"anon_visitor_id": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"ip_address": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"answered_questions": {
"type": "array",
"items": {
"type":["null", "object"]
"type": ["null", "object"]
}
},
"properties": {
"type":["null", "object"]
"type": ["null", "object"]
},
"nps": {
"type": ["null", "object"],
Expand All @@ -48,20 +52,20 @@
"type": ["null", "integer"]
},
"reason": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"respondent_id": {
"type": ["null", "integer"]
},
"time": {
"type": ["null", "string"],
"type": ["null", "string"],
"format": "date-time"
},
"category": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"response_uri": {
"type": ["null", "string"]
"type": ["null", "string"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,97 +3,16 @@
#


from abc import ABC
from base64 import b64encode
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from typing import Any, List, Mapping, Tuple

import pendulum
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator


class QualarooStream(HttpStream, ABC):
url_base = "https://api.qualaroo.com/api/v1/"

# Define primary key as sort key for full_refresh, or very first sync for incremental_refresh
primary_key = "id"

# Page size
limit = 500

extra_params = None

def __init__(self, config: Mapping[str, Any]):
super().__init__(authenticator=config["authenticator"])
self.start_date = config["start_date"]
self.config = config

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"limit": self.limit, "start_date": self.start_date}
if next_page_token:
params.update(**next_page_token)
if self.extra_params:
params.update(self.extra_params)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
for record in json_response:
yield record


class ChildStreamMixin:
parent_stream_class: Optional[QualarooStream] = None

def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for item in self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode):
yield {"id": item["id"]}


class Surveys(QualarooStream):
"""Return list of all Surveys.
API Docs: https://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "nudges"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
survey_ids = self.config.get("survey_ids", [])
for record in super().parse_response(response, **kwargs):
if not survey_ids or record["id"] in survey_ids:
yield record


class Responses(ChildStreamMixin, QualarooStream):
"""Return list of all responses of a survey.
API Docs: hhttps://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/<id>/responses.json
"""

parent_stream_class = Surveys
limit = 500
extra_params = {}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()

# de-nest the answered_questions object if exists
for rec in response_data:
if "answered_questions" in rec:
rec["answered_questions"] = list(rec["answered_questions"].values())
yield from response_data

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"nudges/{stream_slice['id']}/responses.json"
from .streams import QualarooStream, Responses, Surveys


class QualarooAuthenticator(HttpAuthenticator):
Expand All @@ -110,7 +29,6 @@ def __init__(
token_header: str = "oauth_token",
):
self._key = key
self._token = token
self._token = b64encode(b":".join((key.encode("latin1"), token.encode("latin1")))).strip().decode("ascii")
self.auth_header = auth_header
self.key_header = key_header
Expand Down Expand Up @@ -155,5 +73,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
config["authenticator"] = self._get_authenticator(config)
return [Surveys(config), Responses(config)]
args = {}
# convert start_date to epoch time for qualaroo API
args["start_date"] = pendulum.parse(config["start_date"]).strftime("%s")
args["survey_ids"] = config.get("survey_ids", [])
args["authenticator"] = self._get_authenticator(config)
return [Surveys(**args), Responses(**args)]
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"type": "array",
"items": {
"type": "string",
"pattern": "^[0-9a-fA-F]{24}$"
"pattern": "^[0-9]{1,8}$"
},
"title": "Qualaroo survey IDs",
"description": "IDs of the surveys from which you'd like to replicate data. If left empty, data from all surveys to which you have access will be replicated."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream


class QualarooStream(HttpStream, ABC):
url_base = "https://api.qualaroo.com/api/v1/"

# Define primary key as sort key for full_refresh, or very first sync for incremental_refresh
primary_key = "id"

# Page size
limit = 500

extra_params = None

def __init__(self, start_date: pendulum.datetime, survey_ids: List[str] = [], **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._survey_ids = survey_ids
self._offset = 0

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
resp_json = response.json()

if len(resp_json) == 500:
self._offset += 500
return {"offset": self._offset}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"limit": self.limit, "start_date": self._start_date}
if next_page_token:
params.update(**next_page_token)
if self.extra_params:
params.update(self.extra_params)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
for record in json_response:
yield record


class ChildStreamMixin:
parent_stream_class: Optional[QualarooStream] = None

def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for item in self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode):
yield {"id": item["id"]}


class Surveys(QualarooStream):
"""Return list of all Surveys.
API Docs: https://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "nudges"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
survey_ids = self._survey_ids
result = super().parse_response(response=response, **kwargs)
for record in result:
if not survey_ids or str(record["id"]) in survey_ids:
yield record


class Responses(ChildStreamMixin, QualarooStream):
"""Return list of all responses of a survey.
API Docs: hhttps://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/<id>/responses.json
"""

parent_stream_class = Surveys

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
survey_id = stream_slice["survey_id"]
return f"nudges/{survey_id}/responses.json"

def stream_slices(self, **kwargs):
survey_stream = Surveys(start_date=self._start_date, survey_ids=self._survey_ids, authenticator=self.authenticator)
for survey in survey_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"survey_id": survey["id"]}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
# de-nest the answered_questions object if exists
for rec in response_data:
if "answered_questions" in rec:
rec["answered_questions"] = list(rec["answered_questions"].values())
yield from response_data
Loading

0 comments on commit 036a42a

Please sign in to comment.