diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java index 09039e530631d0..d7c76c0235dcc0 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/jobs/DataJobRunsResolver.java @@ -4,9 +4,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils; -import com.linkedin.datahub.graphql.generated.DataProcessInstance; -import com.linkedin.datahub.graphql.generated.DataProcessInstanceResult; -import com.linkedin.datahub.graphql.generated.Entity; +import com.linkedin.datahub.graphql.generated.*; import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; @@ -33,6 +31,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** GraphQL Resolver used for fetching a list of Task Runs associated with a Data Job */ public class DataJobRunsResolver @@ -40,6 +40,8 @@ public class DataJobRunsResolver private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate"; private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created"; + private static final String HAS_RUN_EVENTS_FIELD_NAME = "hasRunEvents"; + private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class); private final EntityClient _entityClient; @@ -117,7 +119,12 @@ private Filter buildTaskRunsEntityFilter(final String entityUrn) { new Criterion() .setField(PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME) .setCondition(Condition.EQUAL) - .setValue(entityUrn))); + .setValue(entityUrn), + new Criterion() + .setField(HAS_RUN_EVENTS_FIELD_NAME) + .setCondition(Condition.EQUAL) + .setValue(Boolean.TRUE.toString()))); + final Filter filter = new Filter(); filter.setOr( new ConjunctiveCriterionArray(ImmutableList.of(new ConjunctiveCriterion().setAnd(array)))); diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java new file mode 100644 index 00000000000000..bc55ad38765ed5 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillDataProcessInstancesConfig.java @@ -0,0 +1,43 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import io.datahubproject.metadata.context.OperationContext; +import org.opensearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) +public class BackfillDataProcessInstancesConfig { + + @Bean + public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents( + final OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + @Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled, + @Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}") + boolean reprocessEnabled, + @Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize, + @Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs, + @Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays, + @Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) { + return new BackfillDataProcessInstances( + opContext, + entityService, + elasticSearchService, + restHighLevelClient, + enabled, + reprocessEnabled, + batchSize, + delayMs, + totalDays, + windowDays); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java new file mode 100644 index 00000000000000..643a0ff5a4ce25 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstances.java @@ -0,0 +1,54 @@ +package com.linkedin.datahub.upgrade.system.dataprocessinstances; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import org.opensearch.client.RestHighLevelClient; + +public class BackfillDataProcessInstances implements NonBlockingSystemUpgrade { + + private final List _steps; + + public BackfillDataProcessInstances( + OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + boolean enabled, + boolean reprocessEnabled, + Integer batchSize, + Integer batchDelayMs, + Integer totalDays, + Integer windowDays) { + if (enabled) { + _steps = + ImmutableList.of( + new BackfillDataProcessInstancesHasRunEventsStep( + opContext, + entityService, + elasticSearchService, + restHighLevelClient, + reprocessEnabled, + batchSize, + batchDelayMs, + totalDays, + windowDays)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return "BackfillDataProcessInstances"; + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java new file mode 100644 index 00000000000000..55cdcae931ab5b --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/dataprocessinstances/BackfillDataProcessInstancesHasRunEventsStep.java @@ -0,0 +1,213 @@ +package com.linkedin.datahub.upgrade.system.dataprocessinstances; + +import static com.linkedin.metadata.Constants.*; + +import com.google.common.base.Throwables; +import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.upgrade.DataHubUpgradeState; +import io.datahubproject.metadata.context.OperationContext; +import java.io.IOException; +import java.net.URISyntaxException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; + +@Slf4j +public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep { + + private static final String UPGRADE_ID = "BackfillDataProcessInstancesHasRunEvents"; + private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID); + + private final OperationContext opContext; + private final EntityService entityService; + private final ElasticSearchService elasticSearchService; + private final RestHighLevelClient restHighLevelClient; + + private final boolean reprocessEnabled; + private final Integer batchSize; + private final Integer batchDelayMs; + + private final Integer totalDays; + private final Integer windowDays; + + public BackfillDataProcessInstancesHasRunEventsStep( + OperationContext opContext, + EntityService entityService, + ElasticSearchService elasticSearchService, + RestHighLevelClient restHighLevelClient, + boolean reprocessEnabled, + Integer batchSize, + Integer batchDelayMs, + Integer totalDays, + Integer windowDays) { + this.opContext = opContext; + this.entityService = entityService; + this.elasticSearchService = elasticSearchService; + this.restHighLevelClient = restHighLevelClient; + this.reprocessEnabled = reprocessEnabled; + this.batchSize = batchSize; + this.batchDelayMs = batchDelayMs; + this.totalDays = totalDays; + this.windowDays = windowDays; + } + + @SuppressWarnings("BusyWait") + @Override + public Function executable() { + return (context) -> { + TermsValuesSourceBuilder termsValuesSourceBuilder = + new TermsValuesSourceBuilder("urn").field("urn"); + + ObjectNode json = JsonNodeFactory.instance.objectNode(); + json.put("hasRunEvents", true); + + IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention(); + + String runEventsIndexName = + indexConvention.getTimeseriesAspectIndexName( + DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME); + + DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED; + + Instant now = Instant.now(); + Instant overallStart = now.minus(totalDays, ChronoUnit.DAYS); + for (int i = 0; ; i++) { + Instant windowEnd = now.minus(i * windowDays, ChronoUnit.DAYS); + if (!windowEnd.isAfter(overallStart)) { + break; + } + Instant windowStart = windowEnd.minus(windowDays, ChronoUnit.DAYS); + if (windowStart.isBefore(overallStart)) { + // last iteration, cap at overallStart + windowStart = overallStart; + } + + QueryBuilder queryBuilder = + QueryBuilders.boolQuery() + .must( + QueryBuilders.rangeQuery("@timestamp") + .gte(windowStart.toString()) + .lt(windowEnd.toString())); + + CompositeAggregationBuilder aggregationBuilder = + AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder)) + .size(batchSize); + + while (true) { + SearchRequest searchRequest = new SearchRequest(runEventsIndexName); + searchRequest.source( + new SearchSourceBuilder() + .size(0) + .aggregation(aggregationBuilder) + .query(queryBuilder)); + + SearchResponse response; + + try { + response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error(Throwables.getStackTraceAsString(e)); + log.error("Error querying index {}", runEventsIndexName); + upgradeState = DataHubUpgradeState.FAILED; + break; + } + List aggregations = response.getAggregations().asList(); + if (aggregations.isEmpty()) { + break; + } + CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0); + Set urns = new HashSet<>(); + for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) { + for (Object value : bucket.getKey().values()) { + try { + urns.add(Urn.createFromString(String.valueOf(value))); + } catch (URISyntaxException e) { + log.warn("Ignoring invalid urn {}", value); + } + } + } + if (!urns.isEmpty()) { + urns = entityService.exists(opContext, urns); + urns.forEach( + urn -> + elasticSearchService.upsertDocument( + opContext, + DATA_PROCESS_INSTANCE_ENTITY_NAME, + json.toString(), + indexConvention.getEntityDocumentId(urn))); + } + if (aggregation.afterKey() == null) { + break; + } + aggregationBuilder.aggregateAfter(aggregation.afterKey()); + if (batchDelayMs > 0) { + log.info("Sleeping for {} ms", batchDelayMs); + try { + Thread.sleep(batchDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService); + return new DefaultUpgradeStepResult(id(), upgradeState); + }; + } + + @Override + public String id() { + return UPGRADE_ID; + } + + /** + * Returns whether the upgrade should proceed if the step fails after exceeding the maximum + * retries. + */ + @Override + public boolean isOptional() { + return true; + } + + /** Returns whether the upgrade should be skipped. */ + @Override + public boolean skip(UpgradeContext context) { + if (reprocessEnabled) { + return false; + } + + boolean previouslyRun = + entityService.exists( + context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); + if (previouslyRun) { + log.info("{} was already run. Skipping.", id()); + } + return previouslyRun; + } +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl index d9850c82442bf6..c18a4168a2a76a 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dataprocess/DataProcessInstanceRunEvent.pdl @@ -15,6 +15,9 @@ import com.linkedin.common.Urn record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalReference { @TimeseriesField = {} + @Searchable = { + "hasValuesFieldName": "hasRunEvents" + } status: enum DataProcessRunStatus { /** * The status where the Data processing run is in. diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 0977c64d0e8609..12d0175bac854a 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -387,7 +387,15 @@ systemUpdate: batchSize: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE:500} delayMs: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS:5000} limit: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT:0} - + processInstanceHasRunEvents: + enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true} + batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100} + delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000} + totalDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_TOTAL_DAYS:90} + windowDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_WINDOW_DAYS:1} + reprocess: + enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false} + structuredProperties: enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings writeEnabled: ${ENABLE_STRUCTURED_PROPERTIES_WRITE:true} # write structured property values