Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add deployments to tracking model #4837

Merged
merged 2 commits into from
Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.analytics;

import com.google.common.base.Preconditions;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.UUID;

public class Deployment {

public enum DeploymentMode {
OSS,
CLOUD
}

/**
* deployment - deployment tracking info.
*/
private final DeploymentMode deploymentMode;
/**
* deploymentId - Identifier for the deployment.
*
* This identifier tracks an install of Airbyte. Any time Airbyte is started up with new volumes or
* persistence, it will be assigned a new deployment id. This is different from the lifecycle of the
* rest of the data layer which may be persisted across deployments.
*/
private final UUID deploymentId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opinion: I think it's useful to leave a comment here summarising how deploymetId -> workspace tie together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instanceId? I think instance has been the most common way to refer to the concept of a deployment (and that's also reflected in our docs). Either is fine with me, just don't want to diverge naming. Feels like it's an AirbyteInstance with an instanceId and deploymentMode/deploymentEnvironment (vars that describe the deployment of the instance with that id).

Copy link
Contributor Author

@cgardens cgardens Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @jrhizor offline. our goal here is to make sure the data model matches naming in the docs. we are keeping deployment, but i updated naming in docs to match deployment.

/**
* deploymentEnvironment - the environment that airbyte is running in.
*/
private final Configs.WorkerEnvironment deploymentEnv;

public Deployment(final DeploymentMode deploymentMode, final UUID deploymentId, final WorkerEnvironment deploymentEnv) {
Preconditions.checkNotNull(deploymentMode);
Preconditions.checkNotNull(deploymentId);
Preconditions.checkNotNull(deploymentEnv);

this.deploymentMode = deploymentMode;
this.deploymentId = deploymentId;
this.deploymentEnv = deploymentEnv;
}

public DeploymentMode getDeploymentMode() {
return deploymentMode;
}

public UUID getDeploymentId() {
return deploymentId;
}

public WorkerEnvironment getDeploymentEnv() {
return deploymentEnv;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -46,24 +44,26 @@ public class SegmentTrackingClient implements TrackingClient {
// Analytics is threadsafe.
private final Analytics analytics;
private final Supplier<TrackingIdentity> identitySupplier;
private final Deployment deployment;
private final String airbyteRole;
private final WorkerEnvironment deploymentEnvironment;

@VisibleForTesting
SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,

final String airbyteRole,
final Analytics analytics) {
this.identitySupplier = identitySupplier;
this.deploymentEnvironment = deploymentEnvironment;
this.deployment = deployment;
this.analytics = analytics;
this.airbyteRole = airbyteRole;
}

public SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,

final String airbyteRole) {
this(identitySupplier, deploymentEnvironment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
this(identitySupplier, deployment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
}

@Override
Expand All @@ -73,7 +73,9 @@ public void identify() {

// deployment
identityMetadata.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
identityMetadata.put("deployment_env", deploymentEnvironment);
identityMetadata.put("deployment_mode", deployment.getDeploymentMode());
identityMetadata.put("deployment_env", deployment.getDeploymentEnv());
identityMetadata.put("deployment_id", deployment.getDeploymentId());

// workspace (includes info that in the future we would store in an organization)
identityMetadata.put("anonymized", trackingIdentity.isAnonymousDataCollection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ static void initialize(TrackingClient trackingClient) {
}

public static void initialize(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,
final String airbyteRole,
final String airbyteVersion,
final ConfigRepository configRepository) {
initialize(createTrackingClient(
trackingStrategy,
deploymentEnvironment,
deployment,
airbyteRole,
() -> getTrackingIdentity(configRepository, airbyteVersion)));
}
Expand Down Expand Up @@ -98,7 +98,8 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
* Creates a tracking client that uses the appropriate strategy from an identity supplier.
*
* @param trackingStrategy - what type of tracker we want to use.
* @param deploymentEnvironment - the environment that airbyte is running in.
* @param deployment - deployment tracking info. static because it should not change once the
* instance is running.
* @param airbyteRole
* @param trackingIdentitySupplier - how we get the identity of the user. we have a supplier,
* because we if the identity updates over time (which happens during initial setup), we
Expand All @@ -107,11 +108,11 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
*/
@VisibleForTesting
static TrackingClient createTrackingClient(final Configs.TrackingStrategy trackingStrategy,
final Configs.WorkerEnvironment deploymentEnvironment,
final Deployment deployment,
final String airbyteRole,
final Supplier<TrackingIdentity> trackingIdentitySupplier) {
return switch (trackingStrategy) {
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deploymentEnvironment, airbyteRole);
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deployment, airbyteRole);
case LOGGING -> new LoggingTrackingClient(trackingIdentitySupplier);
default -> throw new IllegalStateException("unrecognized tracking strategy");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Map;
import java.util.UUID;
Expand All @@ -44,6 +45,7 @@
class SegmentTrackingClientTest {

private static final String AIRBYTE_VERSION = "dev";
private static final Deployment DEPLOYMENT = new Deployment(DeploymentMode.OSS, UUID.randomUUID(), WorkerEnvironment.DOCKER);
private static final String EMAIL = "a@airbyte.io";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);

Expand All @@ -56,7 +58,7 @@ class SegmentTrackingClientTest {
void setup() {
analytics = mock(Analytics.class);
roleSupplier = mock(Supplier.class);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, WorkerEnvironment.DOCKER, null, analytics);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, null, analytics);
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
Expand All @@ -71,7 +73,9 @@ void testIdentify() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("deployment_env", WorkerEnvironment.DOCKER)
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
Expand All @@ -84,7 +88,7 @@ void testIdentify() {

@Test
void testIdentifyWithRole() {
segmentTrackingClient = new SegmentTrackingClient(() -> identity, WorkerEnvironment.DOCKER, "role", analytics);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, "role", analytics);
// equals is not defined on MessageBuilder, so we need to use ArgumentCaptor to inspect each field
// manually.
ArgumentCaptor<IdentifyMessage.Builder> mockBuilder = ArgumentCaptor.forClass(IdentifyMessage.Builder.class);
Expand All @@ -95,7 +99,9 @@ void testIdentifyWithRole() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("deployment_env", WorkerEnvironment.DOCKER)
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.StandardWorkspace;
Expand All @@ -44,7 +45,7 @@
class TrackingClientSingletonTest {

private static final String AIRBYTE_VERSION = "dev";

private static final Deployment DEPLOYMENT = new Deployment(DeploymentMode.OSS, UUID.randomUUID(), WorkerEnvironment.DOCKER);
private ConfigRepository configRepository;

@BeforeEach
Expand All @@ -59,7 +60,7 @@ void testCreateTrackingClientLogging() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.LOGGING,
WorkerEnvironment.DOCKER,
DEPLOYMENT,
"role",
TrackingIdentity::empty) instanceof LoggingTrackingClient);
}
Expand All @@ -69,7 +70,7 @@ void testCreateTrackingClientSegment() {
assertTrue(
TrackingClientSingleton.createTrackingClient(
Configs.TrackingStrategy.SEGMENT,
WorkerEnvironment.DOCKER,
DEPLOYMENT,
"role",
TrackingIdentity::empty) instanceof SegmentTrackingClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package io.airbyte.scheduler.app;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.Deployment.DeploymentMode;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -239,13 +241,6 @@ public static void main(String[] args) throws IOException, InterruptedException
});
}

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
configs.getWorkerEnvironment(),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
int loopCount = 0;
while ((airbyteDatabaseVersion.isEmpty() || !AirbyteVersion.isCompatible(configs.getAirbyteVersion(), airbyteDatabaseVersion.get()))
Expand All @@ -261,6 +256,15 @@ public static void main(String[] args) throws IOException, InterruptedException
throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting...");
}

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
// todo (cgardens) - we need to do the `#runServer` pattern here that we do in `ServerApp` so that
// the deployment mode can be set by the cloud version.
new Deployment(DeploymentMode.OSS, jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhizor i did this heinous thing to get around the merge conflict. Obviously this is not how we want to set deployment mode. do you have thoughts on the right way to inject it? i'm still wrapping my head around the new factory.

configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class DefaultJobPersistence implements JobPersistence {
private static final JSONFormat DB_JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
protected static final String DEFAULT_SCHEMA = "public";
private static final String BACKUP_SCHEMA = "import_backup";
public static final String DEPLOYMENT_ID_KEY = "deployment_id";
public static final String METADATA_KEY_COL = "key";
public static final String METADATA_VAL_COL = "value";

@VisibleForTesting
static final String BASE_JOB_SELECT_AND_JOIN =
Expand Down Expand Up @@ -449,18 +452,60 @@ private static long getEpoch(Record record, String fieldName) {
public Optional<String> getVersion() throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field("key").eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.where(DSL.field(METADATA_KEY_COL).eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.fetch());
return result.stream().findFirst().map(r -> r.getValue("value", String.class));
return result.stream().findFirst().map(r -> r.getValue(METADATA_VAL_COL, String.class));
}

@Override
public void setVersion(String airbyteVersion) throws IOException {
database.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (key) DO UPDATE SET value = '%s'",
"INSERT INTO %s(%s, %s) VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME, airbyteVersion,
current_timestamp(), airbyteVersion, airbyteVersion)));
METADATA_KEY_COL,
METADATA_VAL_COL,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME,
airbyteVersion,
current_timestamp(),
airbyteVersion,
METADATA_KEY_COL,
METADATA_VAL_COL,
airbyteVersion)));
}

@Override
public Optional<UUID> getDeployment() throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(DEPLOYMENT_ID_KEY))
.fetch());
return result.stream().findFirst().map(r -> UUID.fromString(r.getValue(METADATA_VAL_COL, String.class)));
}

@Override
public void setDeployment(UUID deployment) throws IOException {
// if an existing deployment id already exists, on conflict, return it so we can log it.
final UUID committedDeploymentId = database.query(ctx -> ctx.fetch(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO NOTHING RETURNING (SELECT %s FROM %s WHERE %s='%s') as existing_deployment_id",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
DEPLOYMENT_ID_KEY,
deployment,
METADATA_KEY_COL,
METADATA_VAL_COL,
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
DEPLOYMENT_ID_KEY)))
.stream()
.filter(record -> record.get("existing_deployment_id", String.class) != null)
.map(record -> UUID.fromString(record.get("existing_deployment_id", String.class)))
.findFirst()
.orElse(deployment); // if no record was returned that means that the new deployment id was used.

if (!deployment.equals(committedDeploymentId)) {
LOGGER.warn("Attempted to set a deployment id %s, but deployment id %s already set. Retained original value.");
}
}

private static String current_timestamp() {
Expand Down Expand Up @@ -674,7 +719,11 @@ private static void resetIdentityColumn(final DSLContext ctx, final String schem
*/
private static void registerImportMetadata(final DSLContext ctx, final String airbyteVersion) {
ctx.execute(String.format("INSERT INTO %s VALUES('%s_import_db', '%s');", AIRBYTE_METADATA_TABLE, current_timestamp(), airbyteVersion));
ctx.execute(String.format("UPDATE %s SET value = '%s' WHERE key = '%s';", AIRBYTE_METADATA_TABLE, airbyteVersion,
ctx.execute(String.format("UPDATE %s SET %s = '%s' WHERE %s = '%s';",
AIRBYTE_METADATA_TABLE,
METADATA_VAL_COL,
airbyteVersion,
METADATA_KEY_COL,
AirbyteVersion.AIRBYTE_VERSION_KEY_NAME));
}

Expand Down
Loading