Skip to content

Commit

Permalink
expose streams to reset in reset config api
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Jun 19, 2022
1 parent 354c6e2 commit bcac051
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 73 deletions.
7 changes: 6 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3729,7 +3729,12 @@ components:
format: int64
status:
$ref: "#/components/schemas/JobStatus"
streams:
resetConfig:
$ref: "#/components/schemas/ResetConfig"
ResetConfig:
type: object
properties:
streamsToReset:
type: array
items:
$ref: "#/components/schemas/StreamDescriptor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.airbyte.api.model.generated.JobStatus;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.ResetConfig;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
Expand All @@ -35,8 +35,6 @@
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.models.Attempt;
Expand Down Expand Up @@ -87,42 +85,27 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
.id(job.getId())
.configId(configId)
.configType(configType)
.streams(extractStreamNamesFromCatalogIfSync(job).orElse(null))
.resetConfig(extractResetConfigIfReset(job).orElse(null))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond())
.status(Enums.convertTo(job.getStatus(), JobStatus.class)))
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).collect(Collectors.toList()));
}

/**
* If the job is of type SYNC or RESET, extracts the stream names that were in the catalog.
* Otherwise, returns an empty optional.
* If the job is of type RESET, extracts the part of the reset config that we expose in the API.
* Otherwise, returns empty optional.
*
* @param job - job whose streams to extract
* @return stream descriptors of streams in the catalog used by job
* @param job - job
* @return api representation of reset config
*/
private static Optional<List<StreamDescriptor>> extractStreamNamesFromCatalogIfSync(final Job job) {
return extractCatalogIfSyncOrReset(job)
.map(CatalogHelpers::extractStreamDescriptors)
.map(streamDescriptor -> streamDescriptor.stream().map(ProtocolConverters::streamDescriptorToApi).toList());
}

/**
* If the job is of type SYNC or RESET, extracts the configured catalog. Otherwise, returns an empty
* optional.
*
* @param job - job whose catalog to extract
* @return catalog used by job
*/
private static Optional<ConfiguredAirbyteCatalog> extractCatalogIfSyncOrReset(final Job job) {
private static Optional<ResetConfig> extractResetConfigIfReset(final Job job) {
if (job.getConfigType() == ConfigType.SYNC) {
return Optional.of(job.getConfig()
.getSync()
.getConfiguredAirbyteCatalog());
} else if (job.getConfigType() == ConfigType.RESET_CONNECTION) {
return Optional.of(job.getConfig()
.getResetConnection()
.getConfiguredAirbyteCatalog());
return Optional.ofNullable(
new ResetConfig().streamsToReset(job.getConfig().getResetConnection().getResetSourceConfiguration().getStreamsToReset()
.stream()
.map(ProtocolConverters::streamDescriptorToApi)
.toList()));
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
*/
public class ProtocolConverters {

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.config.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}
Expand Down
106 changes: 63 additions & 43 deletions docs/reference/api/generated-api-html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -1145,14 +1145,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -1477,14 +1479,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -4060,14 +4064,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -4462,14 +4468,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -4636,14 +4644,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -4743,14 +4753,16 @@ <h3 class="field-label">Example data</h3>
"job" : {
"createdAt" : 6,
"configId" : "configId",
"streams" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ],
"id" : 0,
"resetConfig" : {
"streamsToReset" : [ {
"name" : "name",
"namespace" : "namespace"
}, {
"name" : "name",
"namespace" : "namespace"
} ]
},
"updatedAt" : 1
},
"attempts" : [ {
Expand Down Expand Up @@ -11051,6 +11063,7 @@ <h3>Table of Contents</h3>
<li><a href="#PrivateSourceDefinitionRead"><code>PrivateSourceDefinitionRead</code> - </a></li>
<li><a href="#PrivateSourceDefinitionReadList"><code>PrivateSourceDefinitionReadList</code> - </a></li>
<li><a href="#ReleaseStage"><code>ReleaseStage</code> - </a></li>
<li><a href="#ResetConfig"><code>ResetConfig</code> - </a></li>
<li><a href="#ResourceRequirements"><code>ResourceRequirements</code> - </a></li>
<li><a href="#SetInstancewideDestinationOauthParamsRequestBody"><code>SetInstancewideDestinationOauthParamsRequestBody</code> - </a></li>
<li><a href="#SetInstancewideSourceOauthParamsRequestBody"><code>SetInstancewideSourceOauthParamsRequestBody</code> - </a></li>
Expand Down Expand Up @@ -11792,7 +11805,7 @@ <h3><a name="JobRead"><code>JobRead</code> - </a> <a class="up" href="#__Models"
<div class="param">createdAt </div><div class="param-desc"><span class="param-type"><a href="#long">Long</a></span> format: int64</div>
<div class="param">updatedAt </div><div class="param-desc"><span class="param-type"><a href="#long">Long</a></span> format: int64</div>
<div class="param">status </div><div class="param-desc"><span class="param-type"><a href="#JobStatus">JobStatus</a></span> </div>
<div class="param">streams (optional)</div><div class="param-desc"><span class="param-type"><a href="#StreamDescriptor">array[StreamDescriptor]</a></span> </div>
<div class="param">resetConfig (optional)</div><div class="param-desc"><span class="param-type"><a href="#ResetConfig">ResetConfig</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
Expand Down Expand Up @@ -12054,6 +12067,13 @@ <h3><a name="ReleaseStage"><code>ReleaseStage</code> - </a> <a class="up" href="
<div class="field-items">
</div> <!-- field-items -->
</div>
<div class="model">
<h3><a name="ResetConfig"><code>ResetConfig</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'></div>
<div class="field-items">
<div class="param">streamsToReset (optional)</div><div class="param-desc"><span class="param-type"><a href="#StreamDescriptor">array[StreamDescriptor]</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
<h3><a name="ResourceRequirements"><code>ResourceRequirements</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'>optional resource requirements to run workers (blank for unbounded allocations)</div>
Expand Down

0 comments on commit bcac051

Please sign in to comment.