Skip to content

Commit

Permalink
refactor import / export endpoints to use the same code path as auto …
Browse files Browse the repository at this point in the history
…migration (#4797)
  • Loading branch information
cgardens authored Jul 20, 2021
1 parent dd37924 commit 4bd5015
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -46,38 +47,49 @@ public class SegmentTrackingClient implements TrackingClient {
private final Analytics analytics;
private final Supplier<TrackingIdentity> identitySupplier;
private final String airbyteRole;
private final WorkerEnvironment deploymentEnvironment;

@VisibleForTesting
SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final String airbyteRole,
final Analytics analytics) {
this.identitySupplier = identitySupplier;
this.deploymentEnvironment = deploymentEnvironment;
this.analytics = analytics;
this.airbyteRole = airbyteRole;
}

public SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier, final String airbyteRole) {
this(identitySupplier, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
public SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final String airbyteRole) {
this(identitySupplier, deploymentEnvironment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
}

@Override
public void identify() {
final TrackingIdentity trackingIdentity = identitySupplier.get();
final ImmutableMap.Builder<String, Object> identityMetadataBuilder = ImmutableMap.<String, Object>builder()
.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion())
.put("anonymized", trackingIdentity.isAnonymousDataCollection())
.put("subscribed_newsletter", trackingIdentity.isNews())
.put("subscribed_security", trackingIdentity.isSecurityUpdates());
final Map<String, Object> identityMetadata = new HashMap<>();

// deployment
identityMetadata.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
identityMetadata.put("deployment_env", deploymentEnvironment);

// workspace (includes info that in the future we would store in an organization)
identityMetadata.put("anonymized", trackingIdentity.isAnonymousDataCollection());
identityMetadata.put("subscribed_newsletter", trackingIdentity.isNews());
identityMetadata.put("subscribed_security", trackingIdentity.isSecurityUpdates());
trackingIdentity.getEmail().ifPresent(email -> identityMetadata.put("email", email));

// other
if (!Strings.isNullOrEmpty(airbyteRole)) {
identityMetadataBuilder.put(AIRBYTE_ROLE, airbyteRole);
identityMetadata.put(AIRBYTE_ROLE, airbyteRole);
}

trackingIdentity.getEmail().ifPresent(email -> identityMetadataBuilder.put("email", email));

analytics.enqueue(IdentifyMessage.builder()
// user id is scoped by workspace. there is no cross-workspace tracking.
.userId(trackingIdentity.getCustomerId().toString())
.traits(identityMetadataBuilder.build()));
.traits(identityMetadata));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ static void initialize(TrackingClient trackingClient) {
}

public static void initialize(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final String airbyteRole,
final String airbyteVersion,
final ConfigRepository configRepository) {
initialize(createTrackingClient(trackingStrategy, airbyteRole, () -> getTrackingIdentity(configRepository, airbyteVersion)));
initialize(createTrackingClient(
trackingStrategy,
deploymentEnvironment,
airbyteRole,
() -> getTrackingIdentity(configRepository, airbyteVersion)));
}

// fallback on a logging client with an empty identity.
Expand Down Expand Up @@ -93,6 +98,7 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
* Creates a tracking client that uses the appropriate strategy from an identity supplier.
*
* @param trackingStrategy - what type of tracker we want to use.
* @param deploymentEnvironment - the environment that airbyte is running in.
* @param airbyteRole
* @param trackingIdentitySupplier - how we get the identity of the user. we have a supplier,
* because we if the identity updates over time (which happens during initial setup), we
Expand All @@ -101,10 +107,11 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
*/
@VisibleForTesting
static TrackingClient createTrackingClient(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final String airbyteRole,
final Supplier<TrackingIdentity> trackingIdentitySupplier) {
return switch (trackingStrategy) {
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, airbyteRole);
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deploymentEnvironment, airbyteRole);
case LOGGING -> new LoggingTrackingClient(trackingIdentitySupplier);
default -> throw new IllegalStateException("unrecognized tracking strategy");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -58,6 +59,7 @@ void testCreateTrackingClientLogging() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.LOGGING,
WorkerEnvironment.DOCKER,
"role",
TrackingIdentity::empty) instanceof LoggingTrackingClient);
}
Expand All @@ -67,6 +69,7 @@ void testCreateTrackingClientSegment() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.SEGMENT,
WorkerEnvironment.DOCKER,
"role",
TrackingIdentity::empty) instanceof SegmentTrackingClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public static void main(String[] args) throws IOException, InterruptedException

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
configs.getWorkerEnvironment(),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);
Expand Down

0 comments on commit 4bd5015

Please sign in to comment.