Skip to content

Commit

Permalink
Always record tracking metrics (#16779)
Browse files Browse the repository at this point in the history
* Always record tracking metrics

* PR feedback
  • Loading branch information
jdpgrailsdev authored Sep 16, 2022
1 parent 18552b4 commit 9903415
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.format.FormatStyle;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,9 +80,9 @@ private void notifyJob(final String reason,
final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason);
final String jobDescription = getJobDescription(job, failReason);
final String logUrl = webUrlHelper.getConnectionUrl(workspaceId, connectionId);
final ImmutableMap<String, Object> jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job);
final ImmutableMap<String, Object> sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
final ImmutableMap<String, Object> destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
final Map<String, Object> jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job);
final Map<String, Object> sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
final Map<String, Object> destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
for (final Notification notification : notifications) {
final NotificationClient notificationClient = getNotificationClient(notification);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
Expand All @@ -25,6 +24,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -55,7 +55,7 @@ public JsonNode injectSourceOAuthParameters(final UUID sourceDefinitionId, final
.ifPresent(sourceOAuthParameter -> {
if (injectOAuthParameters(sourceDefinition.getName(), sourceDefinition.getSpec(), sourceOAuthParameter.getConfiguration(),
sourceConnectorConfig)) {
final ImmutableMap<String, Object> metadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
final Map<String, Object> metadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata));
}
});
Expand All @@ -75,7 +75,7 @@ public JsonNode injectDestinationOAuthParameters(final UUID destinationDefinitio
.ifPresent(destinationOAuthParameter -> {
if (injectOAuthParameters(destinationDefinition.getName(), destinationDefinition.getSpec(), destinationOAuthParameter.getConfiguration(),
destinationConnectorConfig)) {
final ImmutableMap<String, Object> metadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
final Map<String, Object> metadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
Expand All @@ -35,6 +33,7 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -84,10 +83,10 @@ public void trackCheckConnectionSource(final UUID jobId,
final JobState jobState,
final StandardCheckConnectionOutput output) {
Exceptions.swallow(() -> {
final ImmutableMap<String, Object> checkConnMetadata = generateCheckConnectionMetadata(output);
final ImmutableMap<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_SOURCE);
final ImmutableMap<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId);
final ImmutableMap<String, Object> stateMetadata = generateStateMetadata(jobState);
final Map<String, Object> checkConnMetadata = generateCheckConnectionMetadata(output);
final Map<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_SOURCE);
final Map<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId);
final Map<String, Object> stateMetadata = generateStateMetadata(jobState);

track(workspaceId, MoreMaps.merge(checkConnMetadata, jobMetadata, sourceDefMetadata, stateMetadata));
});
Expand All @@ -99,20 +98,20 @@ public void trackCheckConnectionDestination(final UUID jobId,
final JobState jobState,
final StandardCheckConnectionOutput output) {
Exceptions.swallow(() -> {
final ImmutableMap<String, Object> checkConnMetadata = generateCheckConnectionMetadata(output);
final ImmutableMap<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_DESTINATION);
final ImmutableMap<String, Object> destinationDefinitionMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId);
final ImmutableMap<String, Object> stateMetadata = generateStateMetadata(jobState);
final Map<String, Object> checkConnMetadata = generateCheckConnectionMetadata(output);
final Map<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_DESTINATION);
final Map<String, Object> destinationDefinitionMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId);
final Map<String, Object> stateMetadata = generateStateMetadata(jobState);

track(workspaceId, MoreMaps.merge(checkConnMetadata, jobMetadata, destinationDefinitionMetadata, stateMetadata));
});
}

public void trackDiscover(final UUID jobId, final UUID sourceDefinitionId, final UUID workspaceId, final JobState jobState) {
Exceptions.swallow(() -> {
final ImmutableMap<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.DISCOVER_SCHEMA);
final ImmutableMap<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId);
final ImmutableMap<String, Object> stateMetadata = generateStateMetadata(jobState);
final Map<String, Object> jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.DISCOVER_SCHEMA);
final Map<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId);
final Map<String, Object> stateMetadata = generateStateMetadata(jobState);

track(workspaceId, MoreMaps.merge(jobMetadata, sourceDefMetadata, stateMetadata));
});
Expand All @@ -127,14 +126,12 @@ public void trackSync(final Job job, final JobState jobState) {
final long jobId = job.getId();
final UUID connectionId = UUID.fromString(job.getScope());
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final UUID sourceDefinitionId = sourceDefinition.getSourceDefinitionId();
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final UUID destinationDefinitionId = destinationDefinition.getDestinationDefinitionId();

final Map<String, Object> jobMetadata = generateJobMetadata(String.valueOf(jobId), configType, job.getAttemptsCount());
final Map<String, Object> jobAttemptMetadata = generateJobAttemptMetadata(job.getId(), jobState);
final Map<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId);
final Map<String, Object> destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId);
final Map<String, Object> jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState);
final Map<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition);
final Map<String, Object> destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition);
final Map<String, Object> syncMetadata = generateSyncMetadata(connectionId);
final Map<String, Object> stateMetadata = generateStateMetadata(jobState);
final Map<String, Object> syncConfigMetadata = generateSyncConfigMetadata(
Expand All @@ -155,6 +152,38 @@ public void trackSync(final Job job, final JobState jobState) {
});
}

public void trackSyncForInternalFailure(final Long jobId,
final UUID connectionId,
final Integer attempts,
final JobState jobState,
final Exception e) {
Exceptions.swallow(() -> {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);

final Map<String, Object> jobMetadata = generateJobMetadata(String.valueOf(jobId), null, attempts);
final Map<String, Object> jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState);
final Map<String, Object> sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition);
final Map<String, Object> destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition);
final Map<String, Object> syncMetadata = generateSyncMetadata(connectionId);
final Map<String, Object> stateMetadata = generateStateMetadata(jobState);
final Map<String, Object> generalMetadata = Map.of("connection_id", connectionId, "internal_error_cause", e.getMessage(),
"internal_error_type", e.getClass().getName());

final UUID workspaceId = workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(jobId);

track(workspaceId,
MoreMaps.merge(
jobMetadata,
jobAttemptMetadata,
sourceDefMetadata,
destinationDefMetadata,
syncMetadata,
stateMetadata,
generalMetadata));
});
}

private Map<String, Object> generateSyncConfigMetadata(final JobConfig config,
final JsonNode sourceConfigSchema,
final JsonNode destinationConfigSchema) {
Expand Down Expand Up @@ -293,8 +322,8 @@ private Map<String, Object> generateSyncMetadata(final UUID connectionId) throws
return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage, streamCountData);
}

private static ImmutableMap<String, Object> generateStateMetadata(final JobState jobState) {
final Builder<String, Object> metadata = ImmutableMap.builder();
private static Map<String, Object> generateStateMetadata(final JobState jobState) {
final Map<String, Object> metadata = new HashMap<>();

if (JobState.STARTED.equals(jobState)) {
metadata.put("attempt_stage", "STARTED");
Expand All @@ -303,7 +332,7 @@ private static ImmutableMap<String, Object> generateStateMetadata(final JobState
metadata.put("attempt_completion_status", jobState);
}

return metadata.build();
return Collections.unmodifiableMap(metadata);
}

/**
Expand All @@ -312,46 +341,54 @@ private static ImmutableMap<String, Object> generateStateMetadata(final JobState
* job with a failed check. Because of this, tracking just the job attempt status does not capture
* the whole picture. The `check_connection_outcome` field tracks this.
*/
private ImmutableMap<String, Object> generateCheckConnectionMetadata(final StandardCheckConnectionOutput output) {
private Map<String, Object> generateCheckConnectionMetadata(final StandardCheckConnectionOutput output) {
if (output == null) {
return ImmutableMap.of();
return Map.of();
}
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("check_connection_outcome", output.getStatus().toString());
return metadata.build();
return Map.of("check_connection_outcome", output.getStatus().toString());
}

private ImmutableMap<String, Object> generateDestinationDefinitionMetadata(final UUID destinationDefinitionId)
throws ConfigNotFoundException, IOException, JsonValidationException {
private Map<String, Object> generateDestinationDefinitionMetadata(final UUID destinationDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
return generateDestinationDefinitionMetadata(destinationDefinition);
}

private Map<String, Object> generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) {
return TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
}

private ImmutableMap<String, Object> generateSourceDefinitionMetadata(final UUID sourceDefinitionId)
throws ConfigNotFoundException, IOException, JsonValidationException {
private Map<String, Object> generateSourceDefinitionMetadata(final UUID sourceDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
return generateSourceDefinitionMetadata(sourceDefinition);
}

private Map<String, Object> generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) {
return TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
}

private ImmutableMap<String, Object> generateJobMetadata(final String jobId, final ConfigType configType) {
private Map<String, Object> generateJobMetadata(final String jobId, final ConfigType configType) {
return generateJobMetadata(jobId, configType, 0);
}

private ImmutableMap<String, Object> generateJobMetadata(final String jobId, final ConfigType configType, final int attempt) {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("job_type", configType);
private Map<String, Object> generateJobMetadata(final String jobId, final ConfigType configType, final int attempt) {
final Map<String, Object> metadata = new HashMap<>();
if (configType != null) {
metadata.put("job_type", configType);
}
metadata.put("job_id", jobId);
metadata.put("attempt_id", attempt);

return metadata.build();
return Collections.unmodifiableMap(metadata);
}

private ImmutableMap<String, Object> generateJobAttemptMetadata(final long jobId, final JobState jobState) throws IOException {
private Map<String, Object> generateJobAttemptMetadata(final long jobId, final JobState jobState) throws IOException {
final Job job = jobPersistence.getJob(jobId);
if (jobState != JobState.STARTED) {
return TrackingMetadata.generateJobAttemptMetadata(job);
} else {
return ImmutableMap.of();
return Map.of();
}
}

Expand All @@ -362,7 +399,7 @@ private void track(final UUID workspaceId, final Map<String, Object> metadata)
if (workspaceId != null) {
final StandardWorkspace standardWorkspace = configRepository.getStandardWorkspace(workspaceId, true);
if (standardWorkspace != null && standardWorkspace.getName() != null) {
final Map<String, Object> standardTrackingMetadata = ImmutableMap.of(
final Map<String, Object> standardTrackingMetadata = Map.of(
"workspace_id", workspaceId,
"workspace_name", standardWorkspace.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.util.Strings;

public class TrackingMetadata {

public static ImmutableMap<String, Object> generateSyncMetadata(final StandardSync standardSync) {
public static Map<String, Object> generateSyncMetadata(final StandardSync standardSync) {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connection_id", standardSync.getConnectionId());

Expand Down Expand Up @@ -75,7 +76,7 @@ public static ImmutableMap<String, Object> generateSyncMetadata(final StandardSy
return metadata.build();
}

public static ImmutableMap<String, Object> generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) {
public static Map<String, Object> generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
Expand All @@ -87,7 +88,7 @@ public static ImmutableMap<String, Object> generateDestinationDefinitionMetadata
return metadata.build();
}

public static ImmutableMap<String, Object> generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) {
public static Map<String, Object> generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
Expand All @@ -99,7 +100,7 @@ public static ImmutableMap<String, Object> generateSourceDefinitionMetadata(fina
return metadata.build();
}

public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job job) {
public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
final Builder<String, Object> metadata = ImmutableMap.builder();
if (job != null) {
final List<Attempt> attempts = job.getAttempts();
Expand Down
Loading

0 comments on commit 9903415

Please sign in to comment.