Skip to content

Commit

Permalink
add deployments to tracking model (#4837)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Jul 24, 2021
1 parent fdc8ea1 commit 67dd8a3
Show file tree
Hide file tree
Showing 19 changed files with 308 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.analytics;

import com.google.common.base.Preconditions;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.UUID;

public class Deployment {

public enum DeploymentMode {
OSS,
CLOUD
}

/**
* deployment - deployment tracking info.
*/
private final DeploymentMode deploymentMode;
/**
* deploymentId - Identifier for the deployment.
*
* This identifier tracks an install of Airbyte. Any time Airbyte is started up with new volumes or
* persistence, it will be assigned a new deployment id. This is different from the lifecycle of the
* rest of the data layer which may be persisted across deployments.
*/
private final UUID deploymentId;
/**
* deploymentEnvironment - the environment that airbyte is running in.
*/
private final Configs.WorkerEnvironment deploymentEnv;

public Deployment(final DeploymentMode deploymentMode, final UUID deploymentId, final WorkerEnvironment deploymentEnv) {
Preconditions.checkNotNull(deploymentMode);
Preconditions.checkNotNull(deploymentId);
Preconditions.checkNotNull(deploymentEnv);

this.deploymentMode = deploymentMode;
this.deploymentId = deploymentId;
this.deploymentEnv = deploymentEnv;
}

public DeploymentMode getDeploymentMode() {
return deploymentMode;
}

public UUID getDeploymentId() {
return deploymentId;
}

public WorkerEnvironment getDeploymentEnv() {
return deploymentEnv;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -46,24 +44,26 @@ public class SegmentTrackingClient implements TrackingClient {
// Analytics is threadsafe.
private final Analytics analytics;
private final Supplier<TrackingIdentity> identitySupplier;
private final Deployment deployment;
private final String airbyteRole;
private final WorkerEnvironment deploymentEnvironment;

@VisibleForTesting
SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,

final String airbyteRole,
final Analytics analytics) {
this.identitySupplier = identitySupplier;
this.deploymentEnvironment = deploymentEnvironment;
this.deployment = deployment;
this.analytics = analytics;
this.airbyteRole = airbyteRole;
}

public SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,

final String airbyteRole) {
this(identitySupplier, deploymentEnvironment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
this(identitySupplier, deployment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
}

@Override
Expand All @@ -73,7 +73,9 @@ public void identify() {

// deployment
identityMetadata.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
identityMetadata.put("deployment_env", deploymentEnvironment);
identityMetadata.put("deployment_mode", deployment.getDeploymentMode());
identityMetadata.put("deployment_env", deployment.getDeploymentEnv());
identityMetadata.put("deployment_id", deployment.getDeploymentId());

// workspace (includes info that in the future we would store in an organization)
identityMetadata.put("anonymized", trackingIdentity.isAnonymousDataCollection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ static void initialize(TrackingClient trackingClient) {
}

public static void initialize(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,
final String airbyteRole,
final String airbyteVersion,
final ConfigRepository configRepository) {
initialize(createTrackingClient(
trackingStrategy,
deploymentEnvironment,
deployment,
airbyteRole,
() -> getTrackingIdentity(configRepository, airbyteVersion)));
}
Expand Down Expand Up @@ -98,7 +98,8 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
* Creates a tracking client that uses the appropriate strategy from an identity supplier.
*
* @param trackingStrategy - what type of tracker we want to use.
* @param deploymentEnvironment - the environment that airbyte is running in.
* @param deployment - deployment tracking info. static because it should not change once the
* instance is running.
* @param airbyteRole
* @param trackingIdentitySupplier - how we get the identity of the user. we have a supplier,
* because we if the identity updates over time (which happens during initial setup), we
Expand All @@ -107,11 +108,11 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
*/
@VisibleForTesting
static TrackingClient createTrackingClient(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,
final String airbyteRole,
final Supplier<TrackingIdentity> trackingIdentitySupplier) {
return switch (trackingStrategy) {
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deploymentEnvironment, airbyteRole);
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deployment, airbyteRole);
case LOGGING -> new LoggingTrackingClient(trackingIdentitySupplier);
default -> throw new IllegalStateException("unrecognized tracking strategy");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Map;
import java.util.UUID;
Expand All @@ -44,6 +45,7 @@
class SegmentTrackingClientTest {

private static final String AIRBYTE_VERSION = "dev";
private static final Deployment DEPLOYMENT = new Deployment(DeploymentMode.OSS, UUID.randomUUID(), WorkerEnvironment.DOCKER);
private static final String EMAIL = "a@airbyte.io";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);

Expand All @@ -56,7 +58,7 @@ class SegmentTrackingClientTest {
void setup() {
analytics = mock(Analytics.class);
roleSupplier = mock(Supplier.class);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, WorkerEnvironment.DOCKER, null, analytics);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, null, analytics);
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
Expand All @@ -71,7 +73,9 @@ void testIdentify() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("deployment_env", WorkerEnvironment.DOCKER)
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
Expand All @@ -84,7 +88,7 @@ void testIdentify() {

@Test
void testIdentifyWithRole() {
segmentTrackingClient = new SegmentTrackingClient(() -> identity, WorkerEnvironment.DOCKER, "role", analytics);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, "role", analytics);
// equals is not defined on MessageBuilder, so we need to use ArgumentCaptor to inspect each field
// manually.
ArgumentCaptor<IdentifyMessage.Builder> mockBuilder = ArgumentCaptor.forClass(IdentifyMessage.Builder.class);
Expand All @@ -95,7 +99,9 @@ void testIdentifyWithRole() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("deployment_env", WorkerEnvironment.DOCKER)
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.StandardWorkspace;
Expand All @@ -44,7 +45,7 @@
class TrackingClientSingletonTest {

private static final String AIRBYTE_VERSION = "dev";

private static final Deployment DEPLOYMENT = new Deployment(DeploymentMode.OSS, UUID.randomUUID(), WorkerEnvironment.DOCKER);
private ConfigRepository configRepository;

@BeforeEach
Expand All @@ -59,7 +60,7 @@ void testCreateTrackingClientLogging() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.LOGGING,
WorkerEnvironment.DOCKER,
DEPLOYMENT,
"role",
TrackingIdentity::empty) instanceof LoggingTrackingClient);
}
Expand All @@ -69,7 +70,7 @@ void testCreateTrackingClientSegment() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.SEGMENT,
WorkerEnvironment.DOCKER,
DEPLOYMENT,
"role",
TrackingIdentity::empty) instanceof SegmentTrackingClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package io.airbyte.scheduler.app;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -239,13 +241,6 @@ public static void main(String[] args) throws IOException, InterruptedException
});
}

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
configs.getWorkerEnvironment(),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
int loopCount = 0;
while ((airbyteDatabaseVersion.isEmpty() || !AirbyteVersion.isCompatible(configs.getAirbyteVersion(), airbyteDatabaseVersion.get()))
Expand All @@ -261,6 +256,15 @@ public static void main(String[] args) throws IOException, InterruptedException
throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting...");
}

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
// todo (cgardens) - we need to do the `#runServer` pattern here that we do in `ServerApp` so that
// the deployment mode can be set by the cloud version.
new Deployment(DeploymentMode.OSS, jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class DefaultJobPersistence implements JobPersistence {
private static final JSONFormat DB_JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
protected static final String DEFAULT_SCHEMA = "public";
private static final String BACKUP_SCHEMA = "import_backup";
public static final String DEPLOYMENT_ID_KEY = "deployment_id";
public static final String METADATA_KEY_COL = "key";
public static final String METADATA_VAL_COL = "value";

@VisibleForTesting
static final String BASE_JOB_SELECT_AND_JOIN =
Expand Down Expand Up @@ -449,18 +452,60 @@ private static long getEpoch(Record record, String fieldName) {
public Optional<String> getVersion() throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field("key").eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.where(DSL.field(METADATA_KEY_COL).eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.fetch());
return result.stream().findFirst().map(r -> r.getValue("value", String.class));
return result.stream().findFirst().map(r -> r.getValue(METADATA_VAL_COL, String.class));
}

@Override
public void setVersion(String airbyteVersion) throws IOException {
database.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (key) DO UPDATE SET value = '%s'",
"INSERT INTO %s(%s, %s) VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME, airbyteVersion,
current_timestamp(), airbyteVersion, airbyteVersion)));
METADATA_KEY_COL,
METADATA_VAL_COL,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME,
airbyteVersion,
current_timestamp(),
airbyteVersion,
METADATA_KEY_COL,
METADATA_VAL_COL,
airbyteVersion)));
}

@Override
public Optional<UUID> getDeployment() throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(DEPLOYMENT_ID_KEY))
.fetch());
return result.stream().findFirst().map(r -> UUID.fromString(r.getValue(METADATA_VAL_COL, String.class)));
}

@Override
public void setDeployment(UUID deployment) throws IOException {
// if an existing deployment id already exists, on conflict, return it so we can log it.
final UUID committedDeploymentId = database.query(ctx -> ctx.fetch(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO NOTHING RETURNING (SELECT %s FROM %s WHERE %s='%s') as existing_deployment_id",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
DEPLOYMENT_ID_KEY,
deployment,
METADATA_KEY_COL,
METADATA_VAL_COL,
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
DEPLOYMENT_ID_KEY)))
.stream()
.filter(record -> record.get("existing_deployment_id", String.class) != null)
.map(record -> UUID.fromString(record.get("existing_deployment_id", String.class)))
.findFirst()
.orElse(deployment); // if no record was returned that means that the new deployment id was used.

if (!deployment.equals(committedDeploymentId)) {
LOGGER.warn("Attempted to set a deployment id %s, but deployment id %s already set. Retained original value.");
}
}

private static String current_timestamp() {
Expand Down Expand Up @@ -674,7 +719,11 @@ private static void resetIdentityColumn(final DSLContext ctx, final String schem
*/
private static void registerImportMetadata(final DSLContext ctx, final String airbyteVersion) {
ctx.execute(String.format("INSERT INTO %s VALUES('%s_import_db', '%s');", AIRBYTE_METADATA_TABLE, current_timestamp(), airbyteVersion));
ctx.execute(String.format("UPDATE %s SET value = '%s' WHERE key = '%s';", AIRBYTE_METADATA_TABLE, airbyteVersion,
ctx.execute(String.format("UPDATE %s SET %s = '%s' WHERE %s = '%s';",
AIRBYTE_METADATA_TABLE,
METADATA_VAL_COL,
airbyteVersion,
METADATA_KEY_COL,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME));
}

Expand Down
Loading

0 comments on commit 67dd8a3

Please sign in to comment.