Skip to content

Commit

Permalink
Notifications Workflow (#18735)
Browse files Browse the repository at this point in the history
* notification workflow
  • Loading branch information
alovew authored Nov 15, 2022
1 parent b689d5a commit fdb96d0
Show file tree
Hide file tree
Showing 30 changed files with 513 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5
MAX_NOTIFY_WORKERS=5
SHOULD_RUN_NOTIFY_WORKFLOWS=false
# Temporal Activity configuration
ACTIVITY_MAX_ATTEMPT=
ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=
Expand Down
2 changes: 2 additions & 0 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-worker-models')
implementation project(':airbyte-api')
implementation project(':airbyte-json-validation')

testImplementation libs.temporal.testing
// Needed to be able to mock final class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public enum TemporalJobType {
SYNC,
RESET_CONNECTION,
CONNECTION_UPDATER,
REPLICATE
REPLICATE,
NOTIFY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.io.IOException;
import java.util.UUID;

@WorkflowInterface
public interface ConnectionNotificationWorkflow {

@WorkflowMethod
boolean sendSchemaChangeNotification(UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,11 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

/**
* Define if the worker should run notification workflows. Defaults to true. Internal-use only.
*/
public boolean shouldRunNotifyWorkflows();

// Worker - Data Plane configs

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class EnvConfigs implements Configs {
public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS";
public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS";
public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS";
public static final String MAX_NOTIFY_WORKERS = "MAX_NOTIFY_WORKERS";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS";
Expand Down Expand Up @@ -135,6 +136,7 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS";
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";
private static final String SHOULD_RUN_NOTIFY_WORKFLOWS = "SHOULD_RUN_NOTIFY_WORKFLOWS";

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
Expand Down Expand Up @@ -198,6 +200,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_CHECK_WORKERS = 5;
private static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
Expand Down Expand Up @@ -918,7 +921,8 @@ public MaxWorkersConfig getMaxWorkers() {
Math.toIntExact(getEnvOrDefault(MAX_SPEC_WORKERS, DEFAULT_MAX_SPEC_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_CHECK_WORKERS, DEFAULT_MAX_CHECK_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_DISCOVER_WORKERS, DEFAULT_MAX_DISCOVER_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)));
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_NOTIFY_WORKERS, DEFAULT_MAX_NOTIFY_WORKERS)));
}

@Override
Expand Down Expand Up @@ -946,6 +950,11 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

@Override
public boolean shouldRunNotifyWorkflows() {
return getEnvOrDefault(SHOULD_RUN_NOTIFY_WORKFLOWS, false);
}

// Worker - Data plane

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ public class MaxWorkersConfig {
private final int maxCheckWorkers;
private final int maxDiscoverWorkers;
private final int maxSyncWorkers;
private final int maxNotifyWorkers;

public MaxWorkersConfig(final int maxSpecWorkers, final int maxCheckWorkers, final int maxDiscoverWorkers, final int maxSyncWorkers) {
public MaxWorkersConfig(final int maxSpecWorkers,
final int maxCheckWorkers,
final int maxDiscoverWorkers,
final int maxSyncWorkers,
final int maxNotifyWorkers) {
this.maxSpecWorkers = maxSpecWorkers;
this.maxCheckWorkers = maxCheckWorkers;
this.maxDiscoverWorkers = maxDiscoverWorkers;
this.maxSyncWorkers = maxSyncWorkers;
this.maxNotifyWorkers = maxNotifyWorkers;
}

public int getMaxSpecWorkers() {
Expand All @@ -34,13 +40,18 @@ public int getMaxSyncWorkers() {
return maxSyncWorkers;
}

public int getMaxNotifyWorkers() {
return maxNotifyWorkers;
}

@Override
public String toString() {
return "MaxWorkersConfig{" +
"maxSpecWorkers=" + maxSpecWorkers +
", maxCheckWorkers=" + maxCheckWorkers +
", maxDiscoverWorkers=" + maxDiscoverWorkers +
", maxSyncWorkers=" + maxSyncWorkers +
", maxNotifyWorkers=" + maxNotifyWorkers +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public boolean notifyFailure(final String message) throws IOException, Interrupt
throw new NotImplementedException();
}

@Override
public boolean notifySchemaChange(final UUID connectionId, final boolean isBreaking) {
throw new NotImplementedException();
}

private boolean notifyByEmail(final String requestBody) throws IOException, InterruptedException {
final HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract boolean notifyConnectionDisableWarning(String receiverEmail,

public abstract boolean notifyFailure(String message) throws IOException, InterruptedException;

public abstract boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException;

public static NotificationClient createNotificationClient(final Notification notification) {
return switch (notification.getNotificationType()) {
case SLACK -> new SlackNotificationClient(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.notification;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -33,11 +34,9 @@ public class SlackNotificationClient extends NotificationClient {

private static final Logger LOGGER = LoggerFactory.getLogger(SlackNotificationClient.class);

private final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
private final SlackNotificationConfiguration config;

@JsonCreator
public SlackNotificationClient(final Notification notification) {
super(notification);
this.config = notification.getSlackConfiguration();
Expand Down Expand Up @@ -121,7 +120,22 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail,
return false;
}

@Override
public boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException {
final String message = renderTemplate(
isBreaking ? "slack/breaking_schema_change_notification_template.txt" : "slack/non_breaking_schema_change_notification_template.txt",
connectionId.toString());
final String webhookUrl = config.getWebhook();
if (!Strings.isEmpty(webhookUrl)) {
return notify(message);
}
return false;
}

private boolean notify(final String message) throws IOException, InterruptedException {
final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
final ImmutableMap<String, String> body = new Builder<String, String>()
.put("text", message)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Your source schema has changed for connection ID: %s

Airbyte has disabled this connection because this source schema change will cause broken syncs. Visit your connection page, refresh your source schema, and reset your data in order to fix this connection.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Your source schema has changed for connection ID: %s

Visit your connection page, refresh your source schema, and reset your data in order to update this connection.
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-notification')
implementation (project(':airbyte-persistence:job-persistence')) {
// Temporary hack to avoid dependency conflicts
exclude group: 'io.micronaut'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionNotificationWorkflowImpl;
import io.airbyte.workers.temporal.spec.SpecWorkflowImpl;
import io.airbyte.workers.temporal.support.TemporalProxyHelper;
import io.airbyte.workers.temporal.sync.SyncWorkflowImpl;
Expand Down Expand Up @@ -77,6 +78,11 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
@Inject
@Named("discoverActivities")
private Optional<List<Object>> discoverActivities;

@Inject
@Named("notifyActivities")
private Optional<List<Object>> notifyActivities;

@Inject
@Named(TaskExecutors.IO)
private ExecutorService executorService;
Expand All @@ -91,6 +97,8 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private Optional<LogConfigs> logConfigs;
@Value("${airbyte.worker.check.max-workers}")
private Integer maxCheckWorkers;
@Value("${airbyte.worker.notify.max-workers}")
private Integer maxNotifyWorkers;
@Value("${airbyte.worker.discover.max-workers}")
private Integer maxDiscoverWorkers;
@Value("${airbyte.worker.spec.max-workers}")
Expand All @@ -107,6 +115,9 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private boolean shouldRunGetSpecWorkflows;
@Value("${airbyte.worker.sync.enabled}")
private boolean shouldRunSyncWorkflows;
@Value("${airbyte.worker.sync.enabled}")
private boolean shouldRunNotifyWorkflows;

@Inject
@Named("specActivities")
private Optional<List<Object>> specActivities;
Expand Down Expand Up @@ -148,7 +159,7 @@ public void onApplicationEvent(final ServiceReadyEvent event) {

registerWorkerFactory(workerFactory,
new MaxWorkersConfig(maxCheckWorkers, maxDiscoverWorkers, maxSpecWorkers,
maxSyncWorkers));
maxSyncWorkers, maxNotifyWorkers));

log.info("Starting worker factory...");
workerFactory.start();
Expand Down Expand Up @@ -220,6 +231,18 @@ private void registerWorkerFactory(final WorkerFactory workerFactory,
if (shouldRunConnectionManagerWorkflows) {
registerConnectionManager(workerFactory, maxWorkersConfiguration);
}

if (shouldRunNotifyWorkflows) {
registerConnectionNotification(workerFactory, maxWorkersConfiguration);
}
}

private void registerConnectionNotification(final WorkerFactory factory, final MaxWorkersConfig maxWorkersConfig) {
final Worker notifyWorker = factory.newWorker(TemporalJobType.NOTIFY.name(), getWorkerOptions(maxWorkersConfig.getMaxNotifyWorkers()));
final WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder().setFailWorkflowExceptionTypes(NonDeterministicException.class).build();
notifyWorker.registerWorkflowImplementationTypes(options, temporalProxyHelper.proxyWorkflowClass(ConnectionNotificationWorkflowImpl.class));
notifyWorker.registerActivitiesImplementations(notifyActivities.orElseThrow().toArray(new Object[] {}));
}

private void registerCheckConnection(final WorkerFactory factory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity;
import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity;
import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity;
import io.airbyte.workers.temporal.spec.SpecActivity;
Expand Down Expand Up @@ -52,6 +54,15 @@ public List<Object> checkConnectionActivities(
return List.of(checkConnectionActivity);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("notifyActivities")
public List<Object> notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity,
SlackConfigActivity slackConfigActivity,
ConfigFetchActivity configFetchActivity) {
return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("connectionManagerActivities")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow;
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConnectionNotificationWorkflowImpl implements ConnectionNotificationWorkflow {

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private NotifySchemaChangeActivity notifySchemaChangeActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private SlackConfigActivity slackConfigActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Override
public boolean sendSchemaChangeNotification(final UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException {
final StandardSync standardSync = configFetchActivity.getStandardSync(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, standardSync.getBreakingChange());
} else {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -33,6 +37,8 @@ class ScheduleRetrieverOutput {

}

StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

/**
* Return how much time to wait before running the next sync. It will query the DB to get the last
* starting time of the latest terminal job (Failed, canceled or successful) and return the amount
Expand Down
Loading

0 comments on commit fdb96d0

Please sign in to comment.