Skip to content

Commit

Permalink
fix(ingestion/spark): Platform instance and column level lineage fix (#…
Browse files Browse the repository at this point in the history
…10843)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
treff7es and coderabbitai[bot] authored Jul 9, 2024
1 parent b6c7fe8 commit d204d56
Show file tree
Hide file tree
Showing 10 changed files with 556 additions and 25 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The alternative way to integrate is via the Hive connector. The [Hive starter re

## Databricks Spark

To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage/README.md#configuration-instructions-databricks).
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage-beta/README.md#configuration-instructions-databricks).

## Watch the DataHub Talk at the Data and AI Summit 2022

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;
@Builder.Default private final boolean lowerCaseDatasetUrns = false;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.Edge;
import com.linkedin.common.EdgeArray;
import com.linkedin.common.FabricType;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.Owner;
import com.linkedin.common.OwnerArray;
Expand Down Expand Up @@ -57,6 +58,8 @@
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.datahubproject.openlineage.dataset.HdfsPlatform;
import io.datahubproject.openlineage.dataset.PathSpec;
import io.datahubproject.openlineage.utils.DatahubUtils;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
Expand Down Expand Up @@ -151,6 +154,11 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) {
String platform;
if (mappingConfig.isLowerCaseDatasetUrns()) {
namespace = namespace.toLowerCase();
datasetName = datasetName.toLowerCase();
}

if (namespace.contains(SCHEME_SEPARATOR)) {
try {
URI datasetUri;
Expand Down Expand Up @@ -183,12 +191,45 @@ private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
platform = namespace;
}

if (mappingConfig.getCommonDatasetPlatformInstance() != null) {
datasetName = mappingConfig.getCommonDatasetPlatformInstance() + "." + datasetName;
String platformInstance = getPlatformInstance(mappingConfig, platform);
FabricType env = getEnv(mappingConfig, platform);
return Optional.of(DatahubUtils.createDatasetUrn(platform, platformInstance, datasetName, env));
}

private static FabricType getEnv(DatahubOpenlineageConfig mappingConfig, String platform) {
FabricType fabricType = mappingConfig.getFabricType();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getEnv().isPresent()) {
try {
fabricType = FabricType.valueOf(pathSpec.getEnv().get());
return fabricType;
} catch (IllegalArgumentException e) {
log.warn("Invalid environment value: {}", pathSpec.getEnv());
}
}
}
}
return fabricType;
}

return Optional.of(
new DatasetUrn(new DataPlatformUrn(platform), datasetName, mappingConfig.getFabricType()));
private static String getPlatformInstance(
DatahubOpenlineageConfig mappingConfig, String platform) {
// Use the platform instance from the path spec if it is present otherwise use the one from the
// commonDatasetPlatformInstance
String platformInstance = mappingConfig.getCommonDatasetPlatformInstance();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getPlatformInstance().isPresent()) {
return pathSpec.getPlatformInstance().get();
}
}
}
return platformInstance;
}

public static GlobalTags generateTags(List<String> tags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ private Pair<UrnArray, EdgeArray> processDownstreams(
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
downstream,
null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class PathSpec {
final String alias;
final String platform;
@Builder.Default final String env = "PROD";
@Builder.Default final Optional<String> env = Optional.empty();
final List<String> pathSpecList;
@Builder.Default final Optional<String> platformInstance = Optional.empty();
}
31 changes: 22 additions & 9 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.13 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
Expand All @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
Expand All @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.11")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.13")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
Expand All @@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.11")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.13")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
Expand Down Expand Up @@ -159,7 +159,7 @@ information like tokens.

| Field | Required | Default | Description |
|---------------------------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages || | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.jars.packages || | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.13 |
| spark.extraListeners || | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server || | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
Expand All @@ -181,9 +181,10 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. |

## What to Expect: The Metadata Model

Expand Down Expand Up @@ -343,3 +344,15 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b
```
## Known limitations
+
## Changelog
### Version 0.2.12
- Silencing some chatty warnings in RddPathUtils
### Version 0.2.12
- Add option to lowercase dataset URNs
- Add option to set platform instance and/or env per platform with `spark.datahub.platform.<platform_name>.env` and `spark.datahub.platform.<platform_name>.platform_instance` config parameter
- Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set
- Fixing column level lineage support when patch is enabled
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class SparkConfigParser {
public static final String DATAHUB_FLOW_NAME = "flow_name";
public static final String DATASET_ENV_KEY = "metadata.dataset.env";
public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns";

public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
Expand Down Expand Up @@ -152,6 +154,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
try {
String parentJob = SparkConfigParser.getParentJobKey(sparkConfig);
if (parentJob != null) {
Expand Down Expand Up @@ -246,15 +249,18 @@ public static Map<String, List<PathSpec>> getPathSpecListMap(Config datahubConfi
pathSpecBuilder.alias(pathSpecKey);
pathSpecBuilder.platform(key);
if (datahubConfig.hasPath(aliasKey + ".env")) {
pathSpecBuilder.env(datahubConfig.getString(aliasKey + ".env"));
pathSpecBuilder.env(Optional.ofNullable(datahubConfig.getString(aliasKey + ".env")));
}
if (datahubConfig.hasPath(aliasKey + ".platformInstance")) {
if (datahubConfig.hasPath(aliasKey + "." + PLATFORM_INSTANCE_KEY)) {
pathSpecBuilder.platformInstance(
Optional.ofNullable(datahubConfig.getString(aliasKey + ".platformInstance")));
Optional.ofNullable(
datahubConfig.getString(aliasKey + "." + PLATFORM_INSTANCE_KEY)));
}
if (datahubConfig.hasPath(aliasKey + "." + PATH_SPEC_LIST_KEY)) {
pathSpecBuilder.pathSpecList(
Arrays.asList(
datahubConfig.getString(aliasKey + "." + PATH_SPEC_LIST_KEY).split(",")));
}
pathSpecBuilder.pathSpecList(
Arrays.asList(datahubConfig.getString(aliasKey + "." + pathSpecKey).split(",")));

platformSpecs.add(pathSpecBuilder.build());
}
pathSpecMap.put(key, platformSpecs);
Expand All @@ -264,8 +270,8 @@ public static Map<String, List<PathSpec>> getPathSpecListMap(Config datahubConfi
}

public static String getPlatformInstance(Config pathSpecConfig) {
return pathSpecConfig.hasPath(PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PLATFORM_INSTANCE_KEY)
return pathSpecConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY)
: null;
}

Expand Down Expand Up @@ -341,4 +347,9 @@ public static boolean isEmitCoalescePeriodically(Config datahubConfig) {
return datahubConfig.hasPath(STAGE_METADATA_COALESCING)
&& datahubConfig.getBoolean(STAGE_METADATA_COALESCING);
}

public static boolean isLowerCaseDatasetUrns(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_LOWERCASE_URNS)
&& datahubConfig.getBoolean(DATASET_LOWERCASE_URNS);
}
}
Loading

0 comments on commit d204d56

Please sign in to comment.