Skip to content

Commit

Permalink
fix(gms): filter out runs of a dataJob without any run-events (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrinath authored and sleeperdeep committed Dec 17, 2024
1 parent b33f23f commit cd9b204
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstanceResult;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
Expand All @@ -33,13 +31,17 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** GraphQL Resolver used for fetching a list of Task Runs associated with a Data Job */
public class DataJobRunsResolver
implements DataFetcher<CompletableFuture<DataProcessInstanceResult>> {

private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate";
private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created";
private static final String HAS_RUN_EVENTS_FIELD_NAME = "hasRunEvents";
private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class);

private final EntityClient _entityClient;

Expand Down Expand Up @@ -117,7 +119,12 @@ private Filter buildTaskRunsEntityFilter(final String entityUrn) {
new Criterion()
.setField(PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(entityUrn)));
.setValue(entityUrn),
new Criterion()
.setField(HAS_RUN_EVENTS_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(Boolean.TRUE.toString())));

final Filter filter = new Filter();
filter.setOr(
new ConjunctiveCriterionArray(ImmutableList.of(new ConjunctiveCriterion().setAnd(array))));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillDataProcessInstancesConfig {

@Bean
public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents(
final OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
@Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled,
@Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}")
boolean reprocessEnabled,
@Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize,
@Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs,
@Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays,
@Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) {
return new BackfillDataProcessInstances(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
enabled,
reprocessEnabled,
batchSize,
delayMs,
totalDays,
windowDays);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import org.opensearch.client.RestHighLevelClient;

public class BackfillDataProcessInstances implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public BackfillDataProcessInstances(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillDataProcessInstancesHasRunEventsStep(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
reprocessEnabled,
batchSize,
batchDelayMs,
totalDays,
windowDays));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return "BackfillDataProcessInstances";
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;

import static com.linkedin.metadata.Constants.*;

import com.google.common.base.Throwables;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;

@Slf4j
public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep {

private static final String UPGRADE_ID = "BackfillDataProcessInstancesHasRunEvents";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final OperationContext opContext;
private final EntityService<?> entityService;
private final ElasticSearchService elasticSearchService;
private final RestHighLevelClient restHighLevelClient;

private final boolean reprocessEnabled;
private final Integer batchSize;
private final Integer batchDelayMs;

private final Integer totalDays;
private final Integer windowDays;

public BackfillDataProcessInstancesHasRunEventsStep(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
this.opContext = opContext;
this.entityService = entityService;
this.elasticSearchService = elasticSearchService;
this.restHighLevelClient = restHighLevelClient;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.totalDays = totalDays;
this.windowDays = windowDays;
}

@SuppressWarnings("BusyWait")
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
TermsValuesSourceBuilder termsValuesSourceBuilder =
new TermsValuesSourceBuilder("urn").field("urn");

ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("hasRunEvents", true);

IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention();

String runEventsIndexName =
indexConvention.getTimeseriesAspectIndexName(
DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME);

DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED;

Instant now = Instant.now();
Instant overallStart = now.minus(totalDays, ChronoUnit.DAYS);
for (int i = 0; ; i++) {
Instant windowEnd = now.minus(i * windowDays, ChronoUnit.DAYS);
if (!windowEnd.isAfter(overallStart)) {
break;
}
Instant windowStart = windowEnd.minus(windowDays, ChronoUnit.DAYS);
if (windowStart.isBefore(overallStart)) {
// last iteration, cap at overallStart
windowStart = overallStart;
}

QueryBuilder queryBuilder =
QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("@timestamp")
.gte(windowStart.toString())
.lt(windowEnd.toString()));

CompositeAggregationBuilder aggregationBuilder =
AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder))
.size(batchSize);

while (true) {
SearchRequest searchRequest = new SearchRequest(runEventsIndexName);
searchRequest.source(
new SearchSourceBuilder()
.size(0)
.aggregation(aggregationBuilder)
.query(queryBuilder));

SearchResponse response;

try {
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(Throwables.getStackTraceAsString(e));
log.error("Error querying index {}", runEventsIndexName);
upgradeState = DataHubUpgradeState.FAILED;
break;
}
List<Aggregation> aggregations = response.getAggregations().asList();
if (aggregations.isEmpty()) {
break;
}
CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0);
Set<Urn> urns = new HashSet<>();
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
for (Object value : bucket.getKey().values()) {
try {
urns.add(Urn.createFromString(String.valueOf(value)));
} catch (URISyntaxException e) {
log.warn("Ignoring invalid urn {}", value);
}
}
}
if (!urns.isEmpty()) {
urns = entityService.exists(opContext, urns);
urns.forEach(
urn ->
elasticSearchService.upsertDocument(
opContext,
DATA_PROCESS_INSTANCE_ENTITY_NAME,
json.toString(),
indexConvention.getEntityDocumentId(urn)));
}
if (aggregation.afterKey() == null) {
break;
}
aggregationBuilder.aggregateAfter(aggregation.afterKey());
if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), upgradeState);
};
}

@Override
public String id() {
return UPGRADE_ID;
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}

/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (reprocessEnabled) {
return false;
}

boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import com.linkedin.common.Urn
record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalReference {

@TimeseriesField = {}
@Searchable = {
"hasValuesFieldName": "hasRunEvents"
}
status: enum DataProcessRunStatus {
/**
* The status where the Data processing run is in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,15 @@ systemUpdate:
batchSize: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE:500}
delayMs: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS:5000}
limit: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT:0}

processInstanceHasRunEvents:
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true}
batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100}
delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000}
totalDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_TOTAL_DAYS:90}
windowDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_WINDOW_DAYS:1}
reprocess:
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false}

structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
writeEnabled: ${ENABLE_STRUCTURED_PROPERTIES_WRITE:true} # write structured property values
Expand Down

0 comments on commit cd9b204

Please sign in to comment.