diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index 141238b851dd..e57d584961a3 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -400,7 +400,9 @@ public TemporalResponse submitSync(final long jobId, final i .withState(config.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) - .withDestinationResourceRequirements(config.getDestinationResourceRequirements()); + .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) + .withConnectionId(connectionId) + .withWorkspaceId(config.getWorkspaceId()); return execute(jobRunConfig, () -> getWorkflowStub(SyncWorkflow.class, TemporalJobType.SYNC).run( diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml index 8e7b7398f5e6..cbea5cf7e293 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml @@ -70,3 +70,7 @@ properties: description: The id of the workspace associated with this sync type: string format: uuid + connectionId: + description: The id of the connection associated with this sync + type: string + format: uuid diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 62d27f1bf1f6..403c70fe33d6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -149,6 +149,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) + .withConnectionId(standardSync.getConnectionId()) .withWorkspaceId(config.getWorkspaceId()); return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index e3eeec37dea6..c28554049602 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -6,6 +6,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.REPLICATION_BYTES_SYNCED_KEY; @@ -149,8 +150,12 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, final StandardSyncInput syncInput, @Nullable final String taskQueue) { final Map traceAttributes = - Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, - destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()); + Map.of( + ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), + CONNECTION_ID_KEY, syncInput.getConnectionId(), + JOB_ID_KEY, jobRunConfig.getJobId(), + DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage(), + SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()); ApmTraceUtils .addTagsToTrace(traceAttributes); if (isResetJob(sourceLauncherConfig.getDockerImage())) {