diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index d8939ddcff09c7..52aa3cb89d1da4 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -13,11 +13,17 @@ from requests.models import Response from requests.sessions import Session +import datahub from datahub.cli import config_utils from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.request_helper import make_curl_command from datahub.emitter.serialization_helper import post_json_transform -from datahub.metadata.schema_classes import _Aspect +from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( + MetadataChangeEvent, + MetadataChangeProposal, +) +from datahub.metadata.schema_classes import SystemMetadataClass, _Aspect from datahub.utilities.urns.urn import Urn, guess_entity_type log = logging.getLogger(__name__) @@ -689,3 +695,18 @@ def generate_access_token( return token_name, response.json().get("data", {}).get("createAccessToken", {}).get( "accessToken", None ) + + +def ensure_has_system_metadata( + event: Union[ + MetadataChangeProposal, MetadataChangeProposalWrapper, MetadataChangeEvent + ] +) -> None: + if event.systemMetadata is None: + event.systemMetadata = SystemMetadataClass() + metadata = event.systemMetadata + if metadata.properties is None: + metadata.properties = {} + props = metadata.properties + props["clientId"] = datahub.__package_name__ + props["clientVersion"] = datahub.__version__ diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e6257796aa4c4e..424e818e223eab 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -10,7 +10,11 @@ from requests.adapters import HTTPAdapter, Retry from requests.exceptions import HTTPError, RequestException -from datahub.cli.cli_utils import fixup_gms_url, get_system_auth +from datahub.cli.cli_utils import ( + ensure_has_system_metadata, + fixup_gms_url, + get_system_auth, +) from datahub.configuration.common import ConfigurationError, OperationalError from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -228,12 +232,10 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None: snapshot_fqn = ( f"com.linkedin.metadata.snapshot.{mce.proposedSnapshot.RECORD_SCHEMA.name}" ) - system_metadata_obj = {} - if mce.systemMetadata is not None: - system_metadata_obj = { - "lastObserved": mce.systemMetadata.lastObserved, - "runId": mce.systemMetadata.runId, - } + ensure_has_system_metadata(mce) + # To make lint happy + assert mce.systemMetadata is not None + system_metadata_obj = mce.systemMetadata.to_obj() snapshot = { "entity": {"value": {snapshot_fqn: mce_obj}}, "systemMetadata": system_metadata_obj, @@ -246,7 +248,7 @@ def emit_mcp( self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper] ) -> None: url = f"{self._gms_server}/aspects?action=ingestProposal" - + ensure_has_system_metadata(mcp) mcp_obj = pre_json_transform(mcp.to_obj()) payload = json.dumps({"proposal": mcp_obj}) @@ -256,6 +258,8 @@ def emit_mcps( self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]] ) -> None: url = f"{self._gms_server}/aspects?action=ingestProposalBatch" + for mcp in mcps: + ensure_has_system_metadata(mcp) mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps] payload = json.dumps({"proposals": mcp_objs}) diff --git a/metadata-ingestion/tests/unit/test_rest_sink.py b/metadata-ingestion/tests/unit/test_rest_sink.py index 7bfa09a35951b4..efa6c6678a8c75 100644 --- a/metadata-ingestion/tests/unit/test_rest_sink.py +++ b/metadata-ingestion/tests/unit/test_rest_sink.py @@ -75,7 +75,15 @@ } } }, - "systemMetadata": {}, + "systemMetadata": { + "lastObserved": 0, + "lastRunId": "no-run-id-provided", + "properties": { + "clientId": "acryl-datahub", + "clientVersion": "1!0.0.0.dev0", + }, + "runId": "no-run-id-provided", + }, }, ), ( @@ -125,7 +133,15 @@ } } }, - "systemMetadata": {}, + "systemMetadata": { + "lastObserved": 0, + "lastRunId": "no-run-id-provided", + "properties": { + "clientId": "acryl-datahub", + "clientVersion": "1!0.0.0.dev0", + }, + "runId": "no-run-id-provided", + }, }, ), ( @@ -161,7 +177,15 @@ } } }, - "systemMetadata": {}, + "systemMetadata": { + "lastObserved": 0, + "lastRunId": "no-run-id-provided", + "properties": { + "clientId": "acryl-datahub", + "clientVersion": "1!0.0.0.dev0", + }, + "runId": "no-run-id-provided", + }, }, ), ( @@ -238,6 +262,15 @@ "value": '{"owners": [{"owner": "urn:li:corpuser:fbar", "type": "DATAOWNER"}], "ownerTypes": {}, "lastModified": {"time": 0, "actor": "urn:li:corpuser:fbar"}}', "contentType": "application/json", }, + "systemMetadata": { + "lastObserved": 0, + "lastRunId": "no-run-id-provided", + "properties": { + "clientId": "acryl-datahub", + "clientVersion": "1!0.0.0.dev0", + }, + "runId": "no-run-id-provided", + }, } }, ),