Skip to content

Commit

Permalink
Format code (#6381)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren authored Sep 22, 2021
1 parent c86403f commit 548a3a3
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ def test_refresh_request_body(self):
assert body == expected

def test_refresh_access_token(self, requests_mock):
mock_refresh_token_call = requests_mock.post(TestOauth2Authenticator.refresh_endpoint,
json={"access_token": "token", "expires_in": 10})
mock_refresh_token_call = requests_mock.post(
TestOauth2Authenticator.refresh_endpoint, json={"access_token": "token", "expires_in": 10}
)

oauth = Oauth2Authenticator(
TestOauth2Authenticator.refresh_endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, start_date, credentials, **kwargs):
"contact_lists": ContactListStream(**common_params),
"contacts": CRMObjectStream(entity="contact", **common_params),
"deal_pipelines": DealPipelineStream(**common_params),
"deals": DealStream(associations=['contacts'], **common_params),
"deals": DealStream(associations=["contacts"], **common_params),
"email_events": EmailEventStream(**common_params),
"engagements": EngagementStream(**common_params),
"forms": FormStream(**common_params),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -69,22 +68,22 @@ private KafkaConsumer<String, JsonNode> buildKafkaConsumer(JsonNode config) {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
config.has("group_id") ? config.get("group_id").asText() : null);
config.has("group_id") ? config.get("group_id").asText() : null);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null);
config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null);
props.putAll(propertiesByProtocol(config));
props.put(ConsumerConfig.CLIENT_ID_CONFIG,
config.has("client_id") ? config.get("client_id").asText() : null);
props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null);
config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null);
config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null);
config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null);
config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());

Expand Down Expand Up @@ -127,8 +126,8 @@ public KafkaConsumer<String, JsonNode> getConsumer() {
String topicPattern = subscription.get("topic_pattern").asText();
consumer.subscribe(Pattern.compile(topicPattern));
topicsToSubscribe = consumer.listTopics().keySet().stream()
.filter(topic -> topic.matches(topicPattern))
.collect(Collectors.toSet());
.filter(topic -> topic.matches(topicPattern))
.collect(Collectors.toSet());
}
case "assign" -> {
topicsToSubscribe = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Kafka Source Spec",
"type": "object",
"required": [
"bootstrap_servers",
"subscription",
"protocol"
],
"required": ["bootstrap_servers", "subscription", "protocol"],
"additionalProperties": false,
"properties": {
"bootstrap_servers": {
Expand Down Expand Up @@ -46,7 +42,7 @@
},
{
"title": "Subscribe to all topics matching specified pattern",
"required": ["subscription_type","topic_pattern"],
"required": ["subscription_type", "topic_pattern"],
"properties": {
"subscription_type": {
"description": "Topic pattern from which the records will be read.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
STREAM_NAME,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
STREAM_NAME2,
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@
"discount_codes": {
"updated_at": "2021-09-10T06:48:10-07:00"
}
}
}

0 comments on commit 548a3a3

Please sign in to comment.