Skip to content

Commit

Permalink
Create CustomerIO email notification client (#11220)
Browse files Browse the repository at this point in the history
* Create CustomerIO email notification client

* remove unused docker yaml changes

* Remove unused comments

* Add unit test

* Rename to customerio specific notification client

* Rename email to customerio

* re-build
  • Loading branch information
terencecho authored Mar 31, 2022
1 parent 6aa7e4c commit e0aa76d
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 26 deletions.
6 changes: 5 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2290,18 +2290,22 @@ components:
default: true
slackConfiguration:
$ref: "#/components/schemas/SlackNotificationConfiguration"
customerioConfiguration:
$ref: "#/components/schemas/CustomerioNotificationConfiguration"
SlackNotificationConfiguration:
type: object
required:
- webhook
properties:
webhook:
type: string
CustomerioNotificationConfiguration:
type: object
NotificationType:
type: string
enum:
- slack
# - email
- customerio
# - webhook
NotificationRead:
type: object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml
title: CustomerioNotificationConfiguration
description: Customer.io Notification Settings
type: object
additionalProperties: false
properties:
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ properties:
default: true
slackConfiguration:
"$ref": SlackNotificationConfiguration.yaml
customerioConfiguration:
"$ref": CustomerioNotificationConfiguration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ description: Type of notification
type: string
enum:
- slack
- customerio
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.notification;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Notification;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Notification client that uses customer.io API send emails.
*/
public class CustomeriolNotificationClient extends NotificationClient {

private static final Logger LOGGER = LoggerFactory.getLogger(CustomeriolNotificationClient.class);

// Once the configs are editable through the UI, these should be stored in
// airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml
// - SENDER_EMAIL
// - receiver email
// - customer.io identifier email
// - customer.io TRANSACTION_MESSAGE_ID
private static final String SENDER_EMAIL = "Airbyte Notification <no-reply@airbyte.io>";
private static final String TRANSACTION_MESSAGE_ID = "6";

private static final String CUSTOMERIO_EMAIL_API_ENDPOINT = "https://api.customer.io/v1/send/email";
private static final String AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_notification_template.json";
private static final String AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_warning_notification_template.json";

private final HttpClient httpClient;
private final String apiToken;
private final String emailApiEndpoint;

public CustomeriolNotificationClient(final Notification notification) {
super(notification);
this.apiToken = System.getenv("CUSTOMERIO_API_KEY");
this.emailApiEndpoint = CUSTOMERIO_EMAIL_API_ENDPOINT;
this.httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
}

@VisibleForTesting
public CustomeriolNotificationClient(final Notification notification,
final String apiToken,
final String emailApiEndpoint,
final HttpClient httpClient) {
super(notification);
this.apiToken = apiToken;
this.emailApiEndpoint = emailApiEndpoint;
this.httpClient = httpClient;
}

@Override
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
throw new NotImplementedException();
}

@Override
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
throw new NotImplementedException();
}

@Override
public boolean notifyConnectionDisabled(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
throws IOException, InterruptedException {
final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl);
return notifyByEmail(requestBody);
}

@Override
public boolean notifyConnectionDisableWarning(
final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
throws IOException, InterruptedException {
final String requestBody = renderTemplate(AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl);
return notifyByEmail(requestBody);
}

@Override
public boolean notifySuccess(final String message) throws IOException, InterruptedException {
throw new NotImplementedException();
}

@Override
public boolean notifyFailure(final String message) throws IOException, InterruptedException {
throw new NotImplementedException();
}

private boolean notifyByEmail(final String requestBody) throws IOException, InterruptedException {
final HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.uri(URI.create(emailApiEndpoint))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + apiToken)
.build();

final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (isSuccessfulHttpResponse(response.statusCode())) {
LOGGER.info("Successful notification ({}): {}", response.statusCode(), response.body());
return true;
} else {
final String errorMessage = String.format("Failed to deliver notification (%s): %s", response.statusCode(), response.body());
throw new IOException(errorMessage);
}
}

/**
* Use an integer division to check successful HTTP status codes (i.e., those from 200-299), not
* just 200. https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
*/
private static boolean isSuccessfulHttpResponse(final int httpStatusCode) {
return httpStatusCode / 100 == 2;
}

public String renderTemplate(final String templateFile, final String... data) throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, data);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.notification;

import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import java.io.IOException;

public abstract class NotificationClient {
Expand All @@ -32,16 +31,30 @@ public abstract boolean notifyJobSuccess(
String logUrl)
throws IOException, InterruptedException;

public abstract boolean notifyConnectionDisabled(String receiverEmail,
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
throws IOException, InterruptedException;

public abstract boolean notifyConnectionDisableWarning(String receiverEmail,
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
throws IOException, InterruptedException;

public abstract boolean notifySuccess(String message) throws IOException, InterruptedException;

public abstract boolean notifyFailure(String message) throws IOException, InterruptedException;

public static NotificationClient createNotificationClient(final Notification notification) {
if (notification.getNotificationType() == NotificationType.SLACK) {
return new SlackNotificationClient(notification);
} else {
throw new IllegalArgumentException("Unknown notification type:" + notification.getNotificationType());
}
return switch (notification.getNotificationType()) {
case SLACK -> new SlackNotificationClient(notification);
case CUSTOMERIO -> new CustomeriolNotificationClient(notification);
default -> throw new IllegalArgumentException("Unknown notification type:" + notification.getNotificationType());
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -47,7 +48,7 @@ public SlackNotificationClient(final Notification notification) {
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
return notifyFailure(renderJobData(
"failure_slack_notification_template.txt",
"slack/failure_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
Expand All @@ -58,13 +59,34 @@ public boolean notifyJobFailure(final String sourceConnector, final String desti
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
return notifySuccess(renderJobData(
"success_slack_notification_template.txt",
"slack/success_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl));
}

@Override
public boolean notifyConnectionDisabled(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
throws IOException, InterruptedException {
throw new NotImplementedException();
}

@Override
public boolean notifyConnectionDisableWarning(
final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
throws IOException, InterruptedException {
throw new NotImplementedException();
}

private String renderJobData(final String templateFile,
final String sourceConnector,
final String destinationConnector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"transactional_message_id": "%s",
"from": "%s",
"subject": "Connection Auto-Disabled",
"to": "%s",
"identifiers": {
"email": "%s"
},
"message_data": {
"email_title": "Connection Auto-Disabled",
"email_body": "Your connection from <b>%s</b> to <b>%s</b> was disabled because it failed consecutively 100 times or that there were only failed jobs in the past 14 days.<p>Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
},

"disable_message_retention": false,
"send_to_unsubscribed": true,
"tracked": true,
"queue_draft": false,
"disable_css_preprocessing": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"transactional_message_id": "%s",
"from": "%s",
"subject": "Connection Auto-Disabled Warning",
"to": "%s",
"identifiers": {
"email": "%s"
},
"message_data": {
"email_title": "Connection Auto-Disabled Warning",
"email_body": "Your connection from <b>%s</b> to <b>%s</b> is about to be disabled because it failed consecutively 50 times or that there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.<p>Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
},

"disable_message_retention": false,
"send_to_unsubscribed": true,
"tracked": true,
"queue_draft": false,
"disable_css_preprocessing": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.notification;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.StandardWorkspace;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

class CustomerioNotificationClientTest {

private static final String API_KEY = "api-key";
private static final String URI_BASE = "https://customer.io";
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final StandardWorkspace WORKSPACE = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID)
.withName("workspace-name")
.withEmail("test@airbyte.io");
private static final String RANDOM_INPUT = "input";

@Mock
private HttpClient mHttpClient;

@BeforeEach
void setUp() {
mHttpClient = mock(HttpClient.class);
}

// this only tests that the headers are set correctly and that a http post request is sent to the
// correct URI
// this test does _not_ check the body of the request.
@Test
void notifyConnectionDisabled() throws IOException, InterruptedException {
final CustomeriolNotificationClient customeriolNotificationClient = new CustomeriolNotificationClient(new Notification()
.withNotificationType(NotificationType.CUSTOMERIO), API_KEY, URI_BASE, mHttpClient);

final HttpRequest expectedRequest = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(""))
.uri(URI.create(URI_BASE))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + API_KEY)
.build();

final HttpResponse httpResponse = mock(HttpResponse.class);
Mockito.when(mHttpClient.send(Mockito.any(), Mockito.any())).thenReturn(httpResponse);
Mockito.when(httpResponse.statusCode()).thenReturn(200);

final boolean result =
customeriolNotificationClient.notifyConnectionDisabled(WORKSPACE.getEmail(), RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT);
Mockito.verify(mHttpClient).send(expectedRequest, HttpResponse.BodyHandlers.ofString());

assertTrue(result);
}

}
Loading

0 comments on commit e0aa76d

Please sign in to comment.